1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-25 01:37:19 +00:00

Keep single global pthread_cond_t per thread.

This will allow thread/select to be implemented.
Also add thread/close and close method to threads.
This commit is contained in:
Calvin Rose 2019-12-04 21:44:53 -06:00
parent de6c3d6d70
commit fd4220f254
4 changed files with 55 additions and 9 deletions

View File

@ -73,4 +73,10 @@ extern JANET_THREAD_LOCAL void **janet_scratch_mem;
extern JANET_THREAD_LOCAL size_t janet_scratch_cap; extern JANET_THREAD_LOCAL size_t janet_scratch_cap;
extern JANET_THREAD_LOCAL size_t janet_scratch_len; extern JANET_THREAD_LOCAL size_t janet_scratch_len;
/* Setup / teardown */
#ifdef JANET_THREADS
void janet_threads_init(void);
void janet_threads_deinit(void);
#endif
#endif /* JANET_STATE_H_defined */ #endif /* JANET_STATE_H_defined */

View File

@ -29,9 +29,18 @@
#ifdef JANET_THREADS #ifdef JANET_THREADS
#include <pthread.h>
#include <setjmp.h> #include <setjmp.h>
JANET_THREAD_LOCAL pthread_cond_t janet_vm_thread_cond;
void janet_threads_init(void) {
pthread_cond_init(&janet_vm_thread_cond, NULL);
}
void janet_threads_deinit(void) {
pthread_cond_destroy(&janet_vm_thread_cond);
}
static JanetTable *janet_get_core_table(const char *name) { static JanetTable *janet_get_core_table(const char *name) {
JanetTable *env = janet_core_env(NULL); JanetTable *env = janet_core_env(NULL);
Janet out = janet_wrap_nil(); Janet out = janet_wrap_nil();
@ -44,20 +53,26 @@ static JanetTable *janet_get_core_table(const char *name) {
static void janet_channel_init(JanetChannel *channel) { static void janet_channel_init(JanetChannel *channel) {
janet_buffer_init(&channel->buf, 0); janet_buffer_init(&channel->buf, 0);
pthread_mutex_init(&channel->lock, NULL); pthread_mutex_init(&channel->lock, NULL);
pthread_cond_init(&channel->cond, NULL); channel->rx_cond = NULL;
channel->refCount = 2; channel->refCount = 2;
channel->mailboxFlag = 0;
} }
/* Return 1 if channel memory should be freed, otherwise false */ /* Return 1 if channel memory should be freed, otherwise 0 */
static int janet_channel_deref(JanetChannel *channel) { static int janet_channel_deref(JanetChannel *channel) {
pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&channel->lock);
if (0 == --channel->refCount) { if (1 == channel->refCount) {
janet_buffer_deinit(&channel->buf); janet_buffer_deinit(&channel->buf);
pthread_mutex_destroy(&channel->lock); pthread_mutex_destroy(&channel->lock);
pthread_cond_destroy(&channel->cond);
return 1; return 1;
} else { } else {
channel->refCount--;
pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&channel->lock);
/* Wake up other side if they are blocked, otherwise
* they will block forever. */
if (NULL != channel->rx_cond) {
pthread_cond_signal(channel->rx_cond);
}
return 0; return 0;
} }
} }
@ -85,8 +100,8 @@ static int janet_channel_send(JanetChannel *channel, Janet msg, JanetTable *dict
janet_marshal(&channel->buf, msg, dict, 0); janet_marshal(&channel->buf, msg, dict, 0);
/* Was empty, signal to cond */ /* Was empty, signal to cond */
if (oldcount == 0) { if (oldcount == 0 && (NULL != channel->rx_cond)) {
pthread_cond_signal(&channel->cond); pthread_cond_signal(channel->rx_cond);
} }
} }
@ -106,7 +121,8 @@ static int janet_channel_receive(JanetChannel *channel, Janet *msg_out, JanetTab
while (channel->buf.count == 0) { while (channel->buf.count == 0) {
/* Check for closed channel (1 ref left means other side quit) */ /* Check for closed channel (1 ref left means other side quit) */
if (channel->refCount <= 1) return 1; if (channel->refCount <= 1) return 1;
pthread_cond_wait(&channel->cond, &channel->lock); /* Since each thread sets its own rx_cond, we know it's not NULL */
pthread_cond_wait(channel->rx_cond, &channel->lock);
} }
/* Hack to capture all panics from marshalling. This works because /* Hack to capture all panics from marshalling. This works because
@ -211,6 +227,7 @@ static int thread_worker(JanetChannel *tx) {
/* Create self thread */ /* Create self thread */
JanetChannel *rx = tx + 1; JanetChannel *rx = tx + 1;
rx->rx_cond = &janet_vm_thread_cond;
JanetThread *thread = janet_make_thread(rx, tx, encode, decode); JanetThread *thread = janet_make_thread(rx, tx, encode, decode);
Janet threadv = janet_wrap_abstract(thread); Janet threadv = janet_wrap_abstract(thread);
@ -281,6 +298,7 @@ static Janet cfun_thread_new(int32_t argc, Janet *argv) {
JanetChannel *tx = rx + 1; JanetChannel *tx = rx + 1;
janet_channel_init(rx); janet_channel_init(rx);
janet_channel_init(tx); janet_channel_init(tx);
rx->rx_cond = &janet_vm_thread_cond;
JanetThread *thread = janet_make_thread(rx, tx, encode, decode); JanetThread *thread = janet_make_thread(rx, tx, encode, decode);
if (janet_thread_start_child(thread)) if (janet_thread_start_child(thread))
janet_panic("could not start thread"); janet_panic("could not start thread");
@ -310,9 +328,17 @@ static Janet cfun_thread_receive(int32_t argc, Janet *argv) {
return out; return out;
} }
static Janet cfun_thread_close(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetThread *thread = janet_getthread(argv, 0);
janet_close_thread(thread);
return janet_wrap_nil();
}
static const JanetMethod janet_thread_methods[] = { static const JanetMethod janet_thread_methods[] = {
{"send", cfun_thread_send}, {"send", cfun_thread_send},
{"receive", cfun_thread_receive}, {"receive", cfun_thread_receive},
{"close", cfun_thread_close},
{NULL, NULL} {NULL, NULL}
}; };
@ -340,6 +366,12 @@ static const JanetReg threadlib_cfuns[] = {
JDOC("(thread/receive thread)\n\n" JDOC("(thread/receive thread)\n\n"
"Get a value sent to thread. Will block if there is no value was sent to this thread yet. Returns the message sent to the thread.") "Get a value sent to thread. Will block if there is no value was sent to this thread yet. Returns the message sent to the thread.")
}, },
{
"thread/close", cfun_thread_close,
JDOC("(thread/close thread)\n\n"
"Close a thread, unblocking it and ending communication with it. Note that closing "
"a thread is idempotent and does not cancel the thread's operation. Returns nil.")
},
{NULL, NULL, NULL} {NULL, NULL, NULL}
}; };

View File

@ -1250,6 +1250,10 @@ int janet_init(void) {
janet_vm_core_env = NULL; janet_vm_core_env = NULL;
/* Seed RNG */ /* Seed RNG */
janet_rng_seed(janet_default_rng(), 0); janet_rng_seed(janet_default_rng(), 0);
/* Threads */
#ifdef JANET_THREADS
janet_threads_init();
#endif
return 0; return 0;
} }
@ -1263,4 +1267,7 @@ void janet_deinit(void) {
janet_vm_root_capacity = 0; janet_vm_root_capacity = 0;
janet_vm_registry = NULL; janet_vm_registry = NULL;
janet_vm_core_env = NULL; janet_vm_core_env = NULL;
#ifdef JANET_THREADS
janet_threads_deinit();
#endif
} }

View File

@ -945,9 +945,10 @@ typedef struct JanetThread JanetThread;
typedef struct JanetChannel JanetChannel; typedef struct JanetChannel JanetChannel;
struct JanetChannel { struct JanetChannel {
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t cond; pthread_cond_t *rx_cond;
JanetBuffer buf; JanetBuffer buf;
int refCount; int refCount;
int mailboxFlag;
}; };
struct JanetThread { struct JanetThread {
JanetChannel *rx; JanetChannel *rx;