mirror of
https://github.com/janet-lang/janet
synced 2024-11-28 19:19:53 +00:00
Work on moving to mailbox abstraction.
Should be more efficient in the common case.
This commit is contained in:
parent
0e690b4fa0
commit
73f5314141
@ -31,19 +31,114 @@
|
|||||||
|
|
||||||
#include <setjmp.h>
|
#include <setjmp.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
static JANET_THREAD_LOCAL JanetThreadSelector janet_vm_thread_selector;
|
/* Global data */
|
||||||
|
static pthread_rwlock_t janet_g_lock = PTHREAD_RWLOCK_INITIALIZER;
|
||||||
|
static JanetMailbox **janet_g_mailboxes = NULL;
|
||||||
|
static size_t janet_g_mailboxes_cap = 0;
|
||||||
|
static size_t janet_g_mailboxes_count = 0;
|
||||||
|
static uint64_t janet_g_next_mailbox_id = 0;
|
||||||
|
|
||||||
|
/* typedefed in janet.h */
|
||||||
|
struct JanetMailbox {
|
||||||
|
pthread_mutex_t lock;
|
||||||
|
pthread_cond_t cond;
|
||||||
|
uint64_t id;
|
||||||
|
JanetBuffer buf;
|
||||||
|
int refCount;
|
||||||
|
int closed;
|
||||||
|
};
|
||||||
|
|
||||||
|
static JANET_THREAD_LOCAL JanetMailbox *janet_vm_mailbox;
|
||||||
|
|
||||||
void janet_threads_init(void) {
|
void janet_threads_init(void) {
|
||||||
pthread_mutex_init(&janet_vm_thread_selector.mutex, NULL);
|
janet_vm_mailbox = malloc(sizeof(JanetMailbox));
|
||||||
pthread_cond_init(&janet_vm_thread_selector.cond, NULL);
|
if (NULL == janet_vm_mailbox) {
|
||||||
janet_vm_thread_selector.channel = NULL;
|
JANET_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
pthread_mutex_init(&janet_vm_mailbox->lock, NULL);
|
||||||
|
pthread_cond_init(&janet_vm_mailbox->cond, NULL);
|
||||||
|
janet_buffer_init(&janet_vm_mailbox->buf, 1024);
|
||||||
|
janet_vm_mailbox->refcount = 1;
|
||||||
|
janet_vm_mailbox->closed = 0;
|
||||||
|
|
||||||
|
/* Add mailbox to global table */
|
||||||
|
pthread_rwlock_wrlock(&janet_janet_lock);
|
||||||
|
janet_vm_mailbox->id = janet_g_next_mailbox_id++;
|
||||||
|
size_t newcount = janet_g_mailboxes_count + 1;
|
||||||
|
if (janet_g_mailboxes_cap < newcount) {
|
||||||
|
size_t newcap = newcount * 2;
|
||||||
|
JanetMailbox **mailboxes = realloc(janet_g_mailboxes, newcap * sizeof(JanetMailbox *));
|
||||||
|
if (NULL == mailboxes) {
|
||||||
|
pthread_rwlock_unlock(&janet_janet_lock);
|
||||||
|
/* this maybe should be a different error, as this basically means
|
||||||
|
* we cannot create a new thread. So janet_init should probably fail. */
|
||||||
|
JANET_OUT_OF_MEMORY;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
janet_g_mailboxes = mailboxes;
|
||||||
|
janet_g_mailboxes_cap = newcap;
|
||||||
|
}
|
||||||
|
janet_g_mailboxes[janet_g_mailboxes_count] = janet_vm_mailbox;
|
||||||
|
janet_g_mailboxes_count = newcount;
|
||||||
|
pthread_rwlock_unlock(&janet_janet_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void janet_mailbox_ref(JanetMailbox *mailbox) {
|
||||||
|
pthread_mutex_lock(&mailbox->lock);
|
||||||
|
mailbox->refCount++;
|
||||||
|
pthread_mutex_unlock(&mailbox->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
static JanetMailbox *janet_find_mailbox(uint64_t id, int remove) {
|
||||||
|
JanetMailbox *ret = NULL;
|
||||||
|
if (remove) {
|
||||||
|
pthread_rwlock_wrlock(&janet_janet_lock);
|
||||||
|
} else {
|
||||||
|
pthread_rwlock_rdlock(&janet_janet_lock);
|
||||||
|
}
|
||||||
|
size_t i = 0;
|
||||||
|
while (i < janet_g_mailboxes_count && janet_g_mailboxes[i]->id != mailbox->id)
|
||||||
|
i++;
|
||||||
|
if (i < janet_g_mailboxes_count) {
|
||||||
|
ret = janet_g_mailboxes[i];
|
||||||
|
if (remove) {
|
||||||
|
janet_g_mailboxes[i] = janet_g_mailboxes[--janet_g_mailboxes_count];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pthread_rwlock_unlock(&janet_janet_lock);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Assumes you have the mailbox lock already */
|
||||||
|
static void janet_mailbox_deref_with_lock(JanetMailbox *mailbox) {
|
||||||
|
if (mailbox->refCount <= 1) {
|
||||||
|
/* We are the last reference */
|
||||||
|
pthread_mutex_destroy(&mailbox->lock);
|
||||||
|
pthread_mutex_destroy(&mailbox->cond);
|
||||||
|
janet_buffer_deinit(&mailbox->buf);
|
||||||
|
janet_find_mailbox(mailbox->id, 1);
|
||||||
|
free(mailbox);
|
||||||
|
} else {
|
||||||
|
/* There are other references */
|
||||||
|
if (mailbox == janet_vm_mailbox) {
|
||||||
|
/* We own this mailbox, so mark it as closed for other references. */
|
||||||
|
mailbox->closed = 1;
|
||||||
|
}
|
||||||
|
janet_vm_mailbox->refCount--;
|
||||||
|
pthread_mutex_unlock(&mailbox->lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void janet_mailbox_deref(JanetMailbox *mailbox) {
|
||||||
|
pthread_mutex_lock(&mailbox->lock);
|
||||||
|
janet_mailbox_deref_with_lock(mailbox);
|
||||||
}
|
}
|
||||||
|
|
||||||
void janet_threads_deinit(void) {
|
void janet_threads_deinit(void) {
|
||||||
pthread_mutex_destroy(&janet_vm_thread_selector.mutex);
|
janet_mailbox_deref(janet_vm_mailbox);
|
||||||
pthread_cond_destroy(&janet_vm_thread_selector.cond);
|
janet_vm_mailbox = NULL;
|
||||||
janet_vm_thread_selector.channel = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static JanetTable *janet_get_core_table(const char *name) {
|
static JanetTable *janet_get_core_table(const char *name) {
|
||||||
@ -55,41 +150,10 @@ static JanetTable *janet_get_core_table(const char *name) {
|
|||||||
return janet_unwrap_table(out);
|
return janet_unwrap_table(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void janet_channel_init(JanetChannel *channel) {
|
|
||||||
janet_buffer_init(&channel->buf, 0);
|
|
||||||
pthread_mutex_init(&channel->lock, NULL);
|
|
||||||
channel->selector = NULL;
|
|
||||||
channel->refCount = 2;
|
|
||||||
channel->encode = NULL;
|
|
||||||
channel->decode = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Return 1 if channel memory should be freed, otherwise 0 */
|
|
||||||
static int janet_channel_deref(JanetChannel *channel) {
|
|
||||||
pthread_mutex_lock(&channel->lock);
|
|
||||||
if (1 == channel->refCount) {
|
|
||||||
janet_buffer_deinit(&channel->buf);
|
|
||||||
pthread_mutex_destroy(&channel->lock);
|
|
||||||
return 1;
|
|
||||||
} else {
|
|
||||||
channel->refCount--;
|
|
||||||
pthread_mutex_unlock(&channel->lock);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void janet_close_thread(JanetThread *thread) {
|
static void janet_close_thread(JanetThread *thread) {
|
||||||
if (NULL != thread->rx) {
|
if (thread->mailbox) {
|
||||||
JanetChannel *rx = thread->rx;
|
janet_mailbox_deref(thread->mailbox);
|
||||||
JanetChannel *tx = thread->tx;
|
thread->mailbox = NULL;
|
||||||
/* Deref both. The reference counts should be in sync. */
|
|
||||||
janet_channel_deref(rx);
|
|
||||||
if (janet_channel_deref(tx)) {
|
|
||||||
/* tx and rx were allocated together. free the one with the lower address. */
|
|
||||||
free(rx < tx ? rx : tx);
|
|
||||||
}
|
|
||||||
thread->rx = NULL;
|
|
||||||
thread->tx = NULL;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,45 +166,41 @@ static int thread_gc(void *p, size_t size) {
|
|||||||
|
|
||||||
static int thread_mark(void *p, size_t size) {
|
static int thread_mark(void *p, size_t size) {
|
||||||
JanetThread *thread = (JanetThread *)p;
|
JanetThread *thread = (JanetThread *)p;
|
||||||
(void) size;
|
if (thread->encode) {
|
||||||
JanetChannel *rx = thread->rx;
|
janet_mark(janet_wrap_table(thread->encode));
|
||||||
JanetChannel *tx = thread->tx;
|
|
||||||
if (tx && tx->encode) {
|
|
||||||
janet_mark(janet_wrap_table(tx->encode));
|
|
||||||
}
|
|
||||||
if (rx && rx->encode) {
|
|
||||||
janet_mark(janet_wrap_table(rx->decode));
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Returns 1 if could not send, but do not panic or block (for long). */
|
/* Returns 1 if could not send (encode error), 2 for mailbox closed, and
|
||||||
static int janet_channel_send(JanetChannel *tx, Janet msg) {
|
* 0 otherwise. Will not panic. */
|
||||||
JanetThreadSelector *selector = tx->selector;
|
int janet_thread_send(JanetThread *thread, Janet msg) {
|
||||||
|
|
||||||
/* Check for closed channel */
|
/* Ensure mailbox is not closed. */
|
||||||
if (tx->refCount <= 1) return 1;
|
JanetMailbox *mailbox = thread->mailbox;
|
||||||
|
if (NULL == mailbox) return 2;
|
||||||
|
pthread_mutex_lock(mailbox->lock);
|
||||||
|
if (mailbox->closed) {
|
||||||
|
janet_mailbox_deref_with_lock(mailbox);
|
||||||
|
thread->mailbox == NULL;
|
||||||
|
return 2;
|
||||||
|
}
|
||||||
|
|
||||||
/* Hack to capture all panics from marshalling. This works because
|
/* Hack to capture all panics from marshalling. This works because
|
||||||
* we know janet_marshal won't mess with other essential global state. */
|
* we know janet_marshal won't mess with other essential global state. */
|
||||||
jmp_buf buf;
|
jmp_buf buf;
|
||||||
jmp_buf *old_buf = janet_vm_jmp_buf;
|
jmp_buf *old_buf = janet_vm_jmp_buf;
|
||||||
janet_vm_jmp_buf = &buf;
|
janet_vm_jmp_buf = &buf;
|
||||||
int32_t oldcount = tx->buf.count;
|
int32_t oldcount = mailbox->buf.count;
|
||||||
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
if (setjmp(buf)) {
|
if (setjmp(buf)) {
|
||||||
ret = 1;
|
ret = 1;
|
||||||
tx->buf.count = oldcount;
|
mailbox->buf.count = oldcount;
|
||||||
} else {
|
} else {
|
||||||
janet_marshal(&tx->buf, msg, tx->encode, 0);
|
janet_marshal(&mailbox->buf, msg, thread->encode, 0);
|
||||||
if (selector) {
|
if (oldcount == 0) {
|
||||||
pthread_mutex_lock(&selector->mutex);
|
pthread_cond_signal(&mailbox->cond);
|
||||||
if (!selector->channel) {
|
|
||||||
selector->channel = tx;
|
|
||||||
pthread_cond_signal(&selector->cond);
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&selector->mutex);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,55 +210,6 @@ static int janet_channel_send(JanetChannel *tx, Janet msg) {
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Returns 0 on successful message.
|
|
||||||
* Returns 1 if nothing in queue or failed to get item. In this case,
|
|
||||||
* also sets the channel's selector value.
|
|
||||||
* Returns 2 if channel closed.
|
|
||||||
* Does not block (for long) or panic, and sets the channel's selector
|
|
||||||
* . */
|
|
||||||
static int janet_channel_receive(JanetChannel *rx, Janet *msg_out) {
|
|
||||||
|
|
||||||
/* Check for no messages */
|
|
||||||
while (rx->buf.count == 0) {
|
|
||||||
int is_dead = rx->refCount <= 1;
|
|
||||||
rx->selector = &janet_vm_thread_selector;
|
|
||||||
return is_dead ? 2 : 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Hack to capture all panics from marshalling. This works because
|
|
||||||
* we know janet_marshal won't mess with other essential global state. */
|
|
||||||
jmp_buf buf;
|
|
||||||
jmp_buf *old_buf = janet_vm_jmp_buf;
|
|
||||||
janet_vm_jmp_buf = &buf;
|
|
||||||
|
|
||||||
/* Handle errors */
|
|
||||||
int ret = 0;
|
|
||||||
if (setjmp(buf)) {
|
|
||||||
rx->buf.count = 0;
|
|
||||||
rx->selector = &janet_vm_thread_selector;
|
|
||||||
ret = 1;
|
|
||||||
} else {
|
|
||||||
/* Read from beginning of channel */
|
|
||||||
const uint8_t *nextItem = NULL;
|
|
||||||
Janet item = janet_unmarshal(rx->buf.data, rx->buf.count,
|
|
||||||
0, rx->decode, &nextItem);
|
|
||||||
|
|
||||||
/* Update memory and put result into *msg_out */
|
|
||||||
int32_t chunkCount = nextItem - rx->buf.data;
|
|
||||||
memmove(rx->buf.data, nextItem, rx->buf.count - chunkCount);
|
|
||||||
rx->buf.count -= chunkCount;
|
|
||||||
*msg_out = item;
|
|
||||||
|
|
||||||
/* Got message, unset selector */
|
|
||||||
rx->selector = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Cleanup */
|
|
||||||
janet_vm_jmp_buf = old_buf;
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Convert an interval from now in an absolute timespec */
|
/* Convert an interval from now in an absolute timespec */
|
||||||
static void janet_sec2ts(double sec, struct timespec *ts) {
|
static void janet_sec2ts(double sec, struct timespec *ts) {
|
||||||
struct timespec now;
|
struct timespec now;
|
||||||
@ -215,87 +226,73 @@ static void janet_sec2ts(double sec, struct timespec *ts) {
|
|||||||
ts->tv_nsec = tvnsec;
|
ts->tv_nsec = tvnsec;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Get a message from one of the channels given. */
|
/* Returns 0 on successful message.
|
||||||
static int janet_channel_select(int32_t n, JanetChannel **rxs, Janet *msg_out, double wait) {
|
* Returns 1 if nothing in queue or failed to get item. In this case,
|
||||||
int32_t maxChannel = -1;
|
* also sets the channel's selector value.
|
||||||
int result = -1;
|
* Returns 2 if channel closed.
|
||||||
while (result < 0) {
|
* . */
|
||||||
janet_vm_thread_selector.channel = NULL;
|
int janet_thread_receive(Janet *msg_out, double timeout, JanetTable *decode) {
|
||||||
|
pthread_mutex_lock(&janet_vm_mailbox->lock);
|
||||||
|
|
||||||
/* Try each channel, first without acquiring locks and looking
|
/* For timeouts */
|
||||||
* only for existing messages, then with acquiring
|
struct timespec timeout_ts;
|
||||||
* locks, which will not miss messages. If both fail, we also
|
int timedwait = timeout > 0.0;
|
||||||
* set the selector for each thread. */
|
int nowait = timeout == 0.0;
|
||||||
for (int trylock = 1; trylock >= 0 && result < 0; trylock--) {
|
if (timedwait) janet_sec2ts(timeout, &timeout_ts);
|
||||||
for (int32_t i = 0; i < n && result < 0; i++) {
|
|
||||||
JanetChannel *rx = rxs[i];
|
|
||||||
if (trylock) {
|
|
||||||
if (rx->buf.count == 0 || pthread_mutex_trylock(&rx->lock)) continue;
|
|
||||||
} else {
|
|
||||||
pthread_mutex_lock(&rxs[i]->lock);
|
|
||||||
}
|
|
||||||
int status = janet_channel_receive(rxs[i], msg_out);
|
|
||||||
pthread_mutex_unlock(&rxs[i]->lock);
|
|
||||||
if (status == 0) {
|
|
||||||
result = 0;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
maxChannel = maxChannel > i ? maxChannel : i;
|
|
||||||
if (status == 2) {
|
|
||||||
/* channel closed and will receive no more messages, drop it */
|
|
||||||
rxs[i] = rxs[--n];
|
|
||||||
--i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* All channels closed */
|
for (;;) {
|
||||||
if (n == 0) result = 1;
|
|
||||||
|
|
||||||
/* Don't wait for selector if we have a result */
|
/* Check for messages waiting for use */
|
||||||
if (result >= 0) break;
|
if (janet_vm_mailbox->buf.count) {
|
||||||
|
/* Hack to capture all panics from marshalling. This works because
|
||||||
|
* we know janet_marshal won't mess with other essential global state. */
|
||||||
|
jmp_buf buf;
|
||||||
|
jmp_buf *old_buf = janet_vm_jmp_buf;
|
||||||
|
janet_vm_jmp_buf = &buf;
|
||||||
|
|
||||||
pthread_mutex_lock(&janet_vm_thread_selector.mutex);
|
/* Handle errors */
|
||||||
{
|
if (setjmp(buf)) {
|
||||||
int status = 0;
|
/* Bad message, so clear buffer and wait for the next */
|
||||||
/* Wait until we have a channel */
|
janet_vm_mailbox->buf.count = 0;
|
||||||
if (NULL == janet_vm_thread_selector.channel) {
|
janet_vm_jmp_buf = old_buf;
|
||||||
if (wait <= 0.0) {
|
|
||||||
pthread_cond_wait(
|
|
||||||
&janet_vm_thread_selector.cond,
|
|
||||||
&janet_vm_thread_selector.mutex);
|
|
||||||
} else {
|
|
||||||
struct timespec ts;
|
|
||||||
janet_sec2ts(wait, &ts);
|
|
||||||
status = pthread_cond_timedwait(
|
|
||||||
&janet_vm_thread_selector.cond,
|
|
||||||
&janet_vm_thread_selector.mutex,
|
|
||||||
&ts);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Timeout? */
|
|
||||||
if (status) {
|
|
||||||
result = 1;
|
|
||||||
} else {
|
} else {
|
||||||
JanetChannel *rx = janet_vm_thread_selector.channel;
|
/* Read from beginning of channel */
|
||||||
int32_t index = 0;
|
const uint8_t *nextItem = NULL;
|
||||||
while (rxs[index] != rx) index++;
|
Janet item = janet_unmarshal(
|
||||||
rxs[index] = rxs[0];
|
janet_vm_mailbox->buf.data, janet_vm_mailbox->buf.count,
|
||||||
rxs[0] = rx;
|
0, rx->decode, &nextItem);
|
||||||
|
|
||||||
|
/* Update memory and put result into *msg_out */
|
||||||
|
int32_t chunkCount = nextItem - janet_vm_mailbox->buf.data;
|
||||||
|
memmove(janet_vm_mailbox->buf.data, nextItem, janet_vm_mailbox->buf.count - chunkCount);
|
||||||
|
janet_vm_mailbox->buf.count -= chunkCount;
|
||||||
|
*msg_out = item;
|
||||||
|
janet_vm_jmp_buf = old_buf;
|
||||||
|
pthread_mutex_unlock(&janet_vm_mailbox->lock);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&janet_vm_thread_selector.mutex);
|
|
||||||
|
if (nowait) {
|
||||||
|
pthread_mutex_unlock(&janet_vm_mailbox->lock);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Wait for next message */
|
||||||
|
if (timedwait) {
|
||||||
|
if (pthread_cond_timedwait(
|
||||||
|
&janet_vm_mailbox->cond,
|
||||||
|
&janet_vm_mailbox->mutex,
|
||||||
|
&timeout_ts)) {
|
||||||
|
return 1; /* Timeout */
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pthread_cond_wait(
|
||||||
|
&janet_vm_mailbox->cond,
|
||||||
|
&janet_vm_mailbox->mutex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* got message, unset selectors and return */
|
|
||||||
for (int32_t j = 0; j <= maxChannel; j++) {
|
|
||||||
pthread_mutex_lock(&rxs[j]->lock);
|
|
||||||
rxs[j]->selector = NULL;
|
|
||||||
pthread_mutex_unlock(&rxs[j]->lock);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static Janet janet_thread_getter(void *p, Janet key);
|
static Janet janet_thread_getter(void *p, Janet key);
|
||||||
@ -311,12 +308,11 @@ static JanetAbstractType Thread_AT = {
|
|||||||
NULL
|
NULL
|
||||||
};
|
};
|
||||||
|
|
||||||
static JanetThread *janet_make_thread(JanetChannel *rx, JanetChannel *tx, JanetTable *encode, JanetTable *decode) {
|
static JanetThread *janet_make_thread(JanetMailbox *mailbox, Janet *encode) {
|
||||||
JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread));
|
JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread));
|
||||||
thread->rx = rx;
|
janet_mailbox_ref(mailbox)
|
||||||
thread->tx = tx;
|
thread->mailbox = mailbox;
|
||||||
rx->decode = decode;
|
thread->encode = encode;
|
||||||
tx->encode = encode;
|
|
||||||
return thread;
|
return thread;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -325,22 +321,23 @@ JanetThread *janet_getthread(const Janet *argv, int32_t n) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Runs in new thread */
|
/* Runs in new thread */
|
||||||
static int thread_worker(JanetChannel *tx) {
|
static int thread_worker(JanetMailbox *mailbox) {
|
||||||
/* Init VM */
|
/* Init VM */
|
||||||
janet_init();
|
janet_init();
|
||||||
|
|
||||||
/* Get dictionaries */
|
/* Get dictionaries */
|
||||||
JanetTable *decode = janet_get_core_table("load-image-dict");
|
|
||||||
JanetTable *encode = janet_get_core_table("make-image-dict");
|
JanetTable *encode = janet_get_core_table("make-image-dict");
|
||||||
|
JanetTable *decode = janet_get_core_table("load-image-dict");
|
||||||
|
|
||||||
/* Create self thread */
|
/* Create self thread */
|
||||||
JanetChannel *rx = tx + 1;
|
JanetThread *thread = janet_make_thread(mailbox, encode);
|
||||||
JanetThread *thread = janet_make_thread(rx, tx, encode, decode);
|
|
||||||
Janet threadv = janet_wrap_abstract(thread);
|
Janet threadv = janet_wrap_abstract(thread);
|
||||||
|
|
||||||
|
/* Send pointer to current mailbox to parent */
|
||||||
|
|
||||||
/* Unmarshal the function */
|
/* Unmarshal the function */
|
||||||
Janet funcv;
|
Janet funcv;
|
||||||
int status = janet_channel_select(1, &rx, &funcv, -1.0);
|
int status = janet_thread_receive(&funcv, -1.0, decode);
|
||||||
if (status) goto error;
|
if (status) goto error;
|
||||||
if (!janet_checktype(funcv, JANET_FUNCTION)) goto error;
|
if (!janet_checktype(funcv, JANET_FUNCTION)) goto error;
|
||||||
JanetFunction *func = janet_unwrap_function(funcv);
|
JanetFunction *func = janet_unwrap_function(funcv);
|
||||||
|
@ -944,26 +944,11 @@ struct JanetRNG {
|
|||||||
|
|
||||||
/* Thread types */
|
/* Thread types */
|
||||||
#ifdef JANET_THREADS
|
#ifdef JANET_THREADS
|
||||||
#include <pthread.h>
|
|
||||||
typedef struct JanetThread JanetThread;
|
typedef struct JanetThread JanetThread;
|
||||||
typedef struct JanetChannel JanetChannel;
|
typedef struct JanetMailbox JanetMailbox;
|
||||||
typedef struct JanetThreadSelector JanetThreadSelector;
|
|
||||||
struct JanetThreadSelector {
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
pthread_cond_t cond;
|
|
||||||
JanetChannel *channel;
|
|
||||||
};
|
|
||||||
struct JanetChannel {
|
|
||||||
pthread_mutex_t lock;
|
|
||||||
JanetBuffer buf;
|
|
||||||
int refCount;
|
|
||||||
JanetThreadSelector *selector;
|
|
||||||
JanetTable *encode; /* only touched by writers */
|
|
||||||
JanetTable *decode; /* only touched by readers */
|
|
||||||
};
|
|
||||||
struct JanetThread {
|
struct JanetThread {
|
||||||
JanetChannel *rx;
|
JanetMailbox *mailbox;
|
||||||
JanetChannel *tx;
|
JanetTable *encode;
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user