1
0
mirror of https://github.com/janet-lang/janet synced 2025-01-25 22:56:52 +00:00

Work on thread/receive doubling as select.

This commit is contained in:
Calvin Rose 2019-12-04 22:31:01 -06:00
parent fd4220f254
commit e908029392
3 changed files with 65 additions and 16 deletions

View File

@ -5,7 +5,7 @@
(for i 0 10 (for i 0 10
(os/sleep interval) (os/sleep interval)
(printf "thread %s wakeup no. %d" name i)) (printf "thread %s wakeup no. %d" name i))
(:send parent :done)) (:send parent name))
(defn make-worker (defn make-worker
[name interval] [name interval]
@ -19,6 +19,5 @@
(def sam (make-worker "sam" 0.5)) (def sam (make-worker "sam" 0.5))
# Receive out of order # Receive out of order
(:receive bob) (for i 0 3
(:receive sam) (print "worker " (thread/receive [bob sam joe]) " finished!"))
(:receive joe)

View File

@ -32,13 +32,16 @@
#include <setjmp.h> #include <setjmp.h>
JANET_THREAD_LOCAL pthread_cond_t janet_vm_thread_cond; JANET_THREAD_LOCAL pthread_cond_t janet_vm_thread_cond;
JANET_THREAD_LOCAL pthread_mutex_t janet_vm_thread_lock;
void janet_threads_init(void) { void janet_threads_init(void) {
pthread_cond_init(&janet_vm_thread_cond, NULL); pthread_cond_init(&janet_vm_thread_cond, NULL);
pthread_mutex_init(&janet_vm_thread_lock, NULL);
} }
void janet_threads_deinit(void) { void janet_threads_deinit(void) {
pthread_cond_destroy(&janet_vm_thread_cond); pthread_cond_destroy(&janet_vm_thread_cond);
pthread_mutex_destroy(&janet_vm_thread_lock);
} }
static JanetTable *janet_get_core_table(const char *name) { static JanetTable *janet_get_core_table(const char *name) {
@ -54,8 +57,8 @@ static void janet_channel_init(JanetChannel *channel) {
janet_buffer_init(&channel->buf, 0); janet_buffer_init(&channel->buf, 0);
pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->lock, NULL);
channel->rx_cond = NULL; channel->rx_cond = NULL;
channel->rx_lock = NULL;
channel->refCount = 2; channel->refCount = 2;
channel->mailboxFlag = 0;
} }
/* Return 1 if channel memory should be freed, otherwise 0 */ /* Return 1 if channel memory should be freed, otherwise 0 */
@ -83,7 +86,10 @@ static int janet_channel_send(JanetChannel *channel, Janet msg, JanetTable *dict
pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->lock);
/* Check for closed channel */ /* Check for closed channel */
if (channel->refCount <= 1) return 1; if (channel->refCount <= 1) {
pthread_mutex_unlock(&channel->lock);
return 1;
}
/* Hack to capture all panics from marshalling. This works because /* Hack to capture all panics from marshalling. This works because
* we know janet_marshal won't mess with other essential global state. */ * we know janet_marshal won't mess with other essential global state. */
@ -114,13 +120,17 @@ static int janet_channel_send(JanetChannel *channel, Janet msg, JanetTable *dict
/* Returns 1 if nothing in queue or failed to get item. Does not block or panic. Uses dict to read bytes from /* Returns 1 if nothing in queue or failed to get item. Does not block or panic. Uses dict to read bytes from
* the channel and unmarshal them. */ * the channel and unmarshal them. */
static int janet_channel_receive(JanetChannel *channel, Janet *msg_out, JanetTable *dict) { static int janet_channel_receive(JanetChannel *channel, Janet *msg_out,
JanetTable *dict, int nowait) {
pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->lock);
/* If queue is empty, block for now. */ /* If queue is empty, block for now. */
while (channel->buf.count == 0) { while (channel->buf.count == 0) {
/* Check for closed channel (1 ref left means other side quit) */ /* Check for closed channel (1 ref left means other side quit) */
if (channel->refCount <= 1) return 1; if (nowait || channel->refCount <= 1) {
pthread_mutex_unlock(&channel->lock);
return 1;
}
/* Since each thread sets its own rx_cond, we know it's not NULL */ /* Since each thread sets its own rx_cond, we know it's not NULL */
pthread_cond_wait(channel->rx_cond, &channel->lock); pthread_cond_wait(channel->rx_cond, &channel->lock);
} }
@ -156,6 +166,25 @@ static int janet_channel_receive(JanetChannel *channel, Janet *msg_out, JanetTab
return ret; return ret;
} }
static int janet_channel_select(int32_t n, JanetThread **threads,
Janet *msg_out) {
for (;;) {
/* First, loop over channels for any that have any messages, but
* don't acquire any locks. Any incorrect behavior here will not mess
* anything up*/
for (int32_t i = 0; i < n; i++) {
JanetThread *thread = threads[i];
JanetChannel *channel = thread->rx;
if (channel != NULL && channel->buf.count) {
int status = janet_channel_receive(channel, msg_out, thread->decode, 1);
if (!status) return 0;
}
}
/* If no messages waiting, wait for signal */
pthread_cond_wait(&janet_vm_thread_cond, &janet_vm_thread_lock);
}
}
static void janet_close_thread(JanetThread *thread) { static void janet_close_thread(JanetThread *thread) {
if (NULL != thread->rx) { if (NULL != thread->rx) {
JanetChannel *rx = thread->rx; JanetChannel *rx = thread->rx;
@ -212,7 +241,7 @@ static JanetThread *janet_make_thread(JanetChannel *rx, JanetChannel *tx, JanetT
return thread; return thread;
} }
JanetThread *janet_getthread(Janet *argv, int32_t n) { JanetThread *janet_getthread(const Janet *argv, int32_t n) {
return (JanetThread *) janet_getabstract(argv, n, &Thread_AT); return (JanetThread *) janet_getabstract(argv, n, &Thread_AT);
} }
@ -228,12 +257,13 @@ static int thread_worker(JanetChannel *tx) {
/* Create self thread */ /* Create self thread */
JanetChannel *rx = tx + 1; JanetChannel *rx = tx + 1;
rx->rx_cond = &janet_vm_thread_cond; rx->rx_cond = &janet_vm_thread_cond;
rx->rx_lock = &janet_vm_thread_lock;
JanetThread *thread = janet_make_thread(rx, tx, encode, decode); JanetThread *thread = janet_make_thread(rx, tx, encode, decode);
Janet threadv = janet_wrap_abstract(thread); Janet threadv = janet_wrap_abstract(thread);
/* Unmarshal the function */ /* Unmarshal the function */
Janet funcv; Janet funcv;
int status = janet_channel_receive(rx, &funcv, decode); int status = janet_channel_receive(rx, &funcv, decode, 0);
if (status) goto error; if (status) goto error;
if (!janet_checktype(funcv, JANET_FUNCTION)) goto error; if (!janet_checktype(funcv, JANET_FUNCTION)) goto error;
JanetFunction *func = janet_unwrap_function(funcv); JanetFunction *func = janet_unwrap_function(funcv);
@ -299,6 +329,7 @@ static Janet cfun_thread_new(int32_t argc, Janet *argv) {
janet_channel_init(rx); janet_channel_init(rx);
janet_channel_init(tx); janet_channel_init(tx);
rx->rx_cond = &janet_vm_thread_cond; rx->rx_cond = &janet_vm_thread_cond;
rx->rx_lock = &janet_vm_thread_lock;
JanetThread *thread = janet_make_thread(rx, tx, encode, decode); JanetThread *thread = janet_make_thread(rx, tx, encode, decode);
if (janet_thread_start_child(thread)) if (janet_thread_start_child(thread))
janet_panic("could not start thread"); janet_panic("could not start thread");
@ -318,10 +349,26 @@ static Janet cfun_thread_send(int32_t argc, Janet *argv) {
static Janet cfun_thread_receive(int32_t argc, Janet *argv) { static Janet cfun_thread_receive(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1); janet_fixarity(argc, 1);
int status;
Janet out = janet_wrap_nil();
int32_t count;
const Janet *items;
if (janet_indexed_view(argv[0], &items, &count)) {
int32_t realcount = 0;
JanetThread **threads = janet_smalloc(sizeof(JanetThread *) * count);
/* Select on multiple threads */
for (int32_t i = 0; i < count; i++) {
JanetThread *thread = janet_getthread(items, i);
if (thread->rx != NULL) threads[realcount++] = thread;
}
status = janet_channel_select(realcount, threads, &out);
janet_sfree(threads);
} else {
/* Get from one thread */
JanetThread *thread = janet_getthread(argv, 0); JanetThread *thread = janet_getthread(argv, 0);
if (NULL == thread->rx) janet_panic("channel has closed"); if (NULL == thread->rx) janet_panic("channel has closed");
Janet out = janet_wrap_nil(); status = janet_channel_receive(thread->rx, &out, thread->decode, 0);
int status = janet_channel_receive(thread->rx, &out, thread->decode); }
if (status) { if (status) {
janet_panic("failed to receive message"); janet_panic("failed to receive message");
} }
@ -363,8 +410,11 @@ static const JanetReg threadlib_cfuns[] = {
}, },
{ {
"thread/receive", cfun_thread_receive, "thread/receive", cfun_thread_receive,
JDOC("(thread/receive thread)\n\n" JDOC("(thread/receive threads)\n\n"
"Get a value sent to thread. Will block if there is no value was sent to this thread yet. Returns the message sent to the thread.") "Get a value sent to thread. Will block if there is no value was sent to this thread "
"yet. threads can also be an array or tuple of threads, in which case "
"thread/receive will select on the first thread to return a value. Returns "
"the message sent to the thread.")
}, },
{ {
"thread/close", cfun_thread_close, "thread/close", cfun_thread_close,

View File

@ -946,9 +946,9 @@ typedef struct JanetChannel JanetChannel;
struct JanetChannel { struct JanetChannel {
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t *rx_cond; pthread_cond_t *rx_cond;
pthread_mutex_t *rx_lock;
JanetBuffer buf; JanetBuffer buf;
int refCount; int refCount;
int mailboxFlag;
}; };
struct JanetThread { struct JanetThread {
JanetChannel *rx; JanetChannel *rx;