diff --git a/src/core/state.h b/src/core/state.h index 918aa255..d9b75219 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -73,4 +73,10 @@ extern JANET_THREAD_LOCAL void **janet_scratch_mem; extern JANET_THREAD_LOCAL size_t janet_scratch_cap; extern JANET_THREAD_LOCAL size_t janet_scratch_len; +/* Setup / teardown */ +#ifdef JANET_THREADS +void janet_threads_init(void); +void janet_threads_deinit(void); +#endif + #endif /* JANET_STATE_H_defined */ diff --git a/src/core/thread.c b/src/core/thread.c index 5939973d..a7334125 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -29,9 +29,18 @@ #ifdef JANET_THREADS -#include #include +JANET_THREAD_LOCAL pthread_cond_t janet_vm_thread_cond; + +void janet_threads_init(void) { + pthread_cond_init(&janet_vm_thread_cond, NULL); +} + +void janet_threads_deinit(void) { + pthread_cond_destroy(&janet_vm_thread_cond); +} + static JanetTable *janet_get_core_table(const char *name) { JanetTable *env = janet_core_env(NULL); Janet out = janet_wrap_nil(); @@ -44,20 +53,26 @@ static JanetTable *janet_get_core_table(const char *name) { static void janet_channel_init(JanetChannel *channel) { janet_buffer_init(&channel->buf, 0); pthread_mutex_init(&channel->lock, NULL); - pthread_cond_init(&channel->cond, NULL); + channel->rx_cond = NULL; channel->refCount = 2; + channel->mailboxFlag = 0; } -/* Return 1 if channel memory should be freed, otherwise false */ +/* Return 1 if channel memory should be freed, otherwise 0 */ static int janet_channel_deref(JanetChannel *channel) { pthread_mutex_lock(&channel->lock); - if (0 == --channel->refCount) { + if (1 == channel->refCount) { janet_buffer_deinit(&channel->buf); pthread_mutex_destroy(&channel->lock); - pthread_cond_destroy(&channel->cond); return 1; } else { + channel->refCount--; pthread_mutex_unlock(&channel->lock); + /* Wake up other side if they are blocked, otherwise + * they will block forever. */ + if (NULL != channel->rx_cond) { + pthread_cond_signal(channel->rx_cond); + } return 0; } } @@ -85,8 +100,8 @@ static int janet_channel_send(JanetChannel *channel, Janet msg, JanetTable *dict janet_marshal(&channel->buf, msg, dict, 0); /* Was empty, signal to cond */ - if (oldcount == 0) { - pthread_cond_signal(&channel->cond); + if (oldcount == 0 && (NULL != channel->rx_cond)) { + pthread_cond_signal(channel->rx_cond); } } @@ -106,7 +121,8 @@ static int janet_channel_receive(JanetChannel *channel, Janet *msg_out, JanetTab while (channel->buf.count == 0) { /* Check for closed channel (1 ref left means other side quit) */ if (channel->refCount <= 1) return 1; - pthread_cond_wait(&channel->cond, &channel->lock); + /* Since each thread sets its own rx_cond, we know it's not NULL */ + pthread_cond_wait(channel->rx_cond, &channel->lock); } /* Hack to capture all panics from marshalling. This works because @@ -211,6 +227,7 @@ static int thread_worker(JanetChannel *tx) { /* Create self thread */ JanetChannel *rx = tx + 1; + rx->rx_cond = &janet_vm_thread_cond; JanetThread *thread = janet_make_thread(rx, tx, encode, decode); Janet threadv = janet_wrap_abstract(thread); @@ -281,6 +298,7 @@ static Janet cfun_thread_new(int32_t argc, Janet *argv) { JanetChannel *tx = rx + 1; janet_channel_init(rx); janet_channel_init(tx); + rx->rx_cond = &janet_vm_thread_cond; JanetThread *thread = janet_make_thread(rx, tx, encode, decode); if (janet_thread_start_child(thread)) janet_panic("could not start thread"); @@ -310,9 +328,17 @@ static Janet cfun_thread_receive(int32_t argc, Janet *argv) { return out; } +static Janet cfun_thread_close(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetThread *thread = janet_getthread(argv, 0); + janet_close_thread(thread); + return janet_wrap_nil(); +} + static const JanetMethod janet_thread_methods[] = { {"send", cfun_thread_send}, {"receive", cfun_thread_receive}, + {"close", cfun_thread_close}, {NULL, NULL} }; @@ -340,6 +366,12 @@ static const JanetReg threadlib_cfuns[] = { JDOC("(thread/receive thread)\n\n" "Get a value sent to thread. Will block if there is no value was sent to this thread yet. Returns the message sent to the thread.") }, + { + "thread/close", cfun_thread_close, + JDOC("(thread/close thread)\n\n" + "Close a thread, unblocking it and ending communication with it. Note that closing " + "a thread is idempotent and does not cancel the thread's operation. Returns nil.") + }, {NULL, NULL, NULL} }; diff --git a/src/core/vm.c b/src/core/vm.c index d152ac0d..413c4a0b 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -1250,6 +1250,10 @@ int janet_init(void) { janet_vm_core_env = NULL; /* Seed RNG */ janet_rng_seed(janet_default_rng(), 0); + /* Threads */ +#ifdef JANET_THREADS + janet_threads_init(); +#endif return 0; } @@ -1263,4 +1267,7 @@ void janet_deinit(void) { janet_vm_root_capacity = 0; janet_vm_registry = NULL; janet_vm_core_env = NULL; +#ifdef JANET_THREADS + janet_threads_deinit(); +#endif } diff --git a/src/include/janet.h b/src/include/janet.h index b5427b71..686816c0 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -945,9 +945,10 @@ typedef struct JanetThread JanetThread; typedef struct JanetChannel JanetChannel; struct JanetChannel { pthread_mutex_t lock; - pthread_cond_t cond; + pthread_cond_t *rx_cond; JanetBuffer buf; int refCount; + int mailboxFlag; }; struct JanetThread { JanetChannel *rx;