diff --git a/src/boot/boot.janet b/src/boot/boot.janet index 8f7a7b55..b09fbee0 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -2119,17 +2119,6 @@ :on-status (or onsignal (make-onsignal env 1)) :source "repl"})) -### -### -### Thread Extras -### -### - -(defn thread/new - "Create a new thread from a closure." - [f] - (thread/from-image (make-image f))) - ### ### ### CLI Tool Main diff --git a/src/core/corelib.c b/src/core/corelib.c index d2074e23..318ede2a 100644 --- a/src/core/corelib.c +++ b/src/core/corelib.c @@ -1194,8 +1194,27 @@ JanetTable *janet_core_env(JanetTable *replacements) { #else JanetTable *janet_core_dictionary(JanetTable *replacements) { - JanetTable *dict = (NULL != replacements) ? replacements : janet_table(0); - janet_load_libs(dict); + JanetTable *dict; + if (NULL == janet_vm_core_dictionary) { + dict = janet_table(0); + janet_load_libs(dict); + janet_vm_core_dictionary = dict; + janet_gcroot(janet_wrap_table(dict)); + /* do replacements */ + if (NULL != replacements) { + for (int32_t i = 0; i < replacements->capacity; i++) { + if (!janet_checktype(replacements->data[i].key, JANET_NIL)) { + const JanetKV *kv = replacements->data + i; + janet_table_put(dict, kv->key, kv->value); + if (janet_checktype(kv->value, JANET_CFUNCTION)) { + janet_table_put(janet_vm_registry, kv->value, kv->key); + } + } + } + } + } else { + dict = janet_vm_core_dictionary; + } return dict; } diff --git a/src/core/state.h b/src/core/state.h index cce4b0c6..10434321 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -32,6 +32,9 @@ * be in it. However, thread local global variables for interpreter * state should allow easy multi-threading. */ +/* The core dictionary is memoized */ +extern JANET_THREAD_LOCAL JanetTable *janet_vm_core_dictionary; + /* How many VM stacks have been entered */ extern JANET_THREAD_LOCAL int janet_vm_stackn; diff --git a/src/core/thread.c b/src/core/thread.c index 8d439ff1..b5df42d3 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -24,19 +24,134 @@ #include #include "gc.h" #include "util.h" +#include "state.h" #endif #ifdef JANET_THREADS #include +#include -static void shared_cleanup(JanetThreadShared *shared) { +static void janet_channel_init(JanetChannel *channel, size_t initialSize) { + janet_buffer_init(&channel->buf, (int32_t) initialSize); + pthread_mutex_init(&channel->lock, NULL); + pthread_cond_init(&channel->cond, NULL); +} + +static void janet_channel_destroy(JanetChannel *channel) { + janet_buffer_deinit(&channel->buf); + pthread_mutex_destroy(&channel->lock); + pthread_cond_destroy(&channel->cond); +} + +static JanetThreadShared *janet_shared_create(size_t initialSize) { + const char *errmsg = "could not allocate memory for thread"; + JanetThreadShared *shared = malloc(sizeof(JanetThreadShared)); + if (NULL == shared) janet_panicf(errmsg); + uint8_t *mem = malloc(initialSize); + if (NULL == mem) janet_panic(errmsg); + janet_channel_init(&shared->parent, 0); + janet_channel_init(&shared->child, initialSize); + shared->refCount = 2; + pthread_mutex_init(&shared->refCountLock, NULL); + return shared; +} + +static void janet_shared_destroy(JanetThreadShared *shared) { + janet_channel_destroy(&shared->parent); + janet_channel_destroy(&shared->child); pthread_mutex_destroy(&shared->refCountLock); - pthread_mutex_destroy(&shared->memoryLock); - free(shared->memory); free(shared); } +/* Returns 1 if could not send. Does not block or panic. Bytes should be a janet value that + * has been marshalled. */ +static int janet_channel_send_any(JanetChannel *channel, Janet msg, JanetByteView bytes, int is_bytes, JanetTable *dict) { + pthread_mutex_lock(&channel->lock); + + /* Hack to capture all panics from marshalling. This works because + * we know janet_marshal won't mess with other essential global state. */ + jmp_buf buf; + jmp_buf *old_buf = janet_vm_jmp_buf; + janet_vm_jmp_buf = &buf; + int32_t oldcount = channel->buf.count; + + int ret = 0; + if (setjmp(buf)) { + ret = 1; + channel->buf.count = oldcount; + } else { + if (is_bytes) { + janet_buffer_push_bytes(&channel->buf, bytes.bytes, bytes.len); + } else { + janet_marshal(&channel->buf, msg, dict, 0); + } + + /* Was empty, signal to cond */ + if (oldcount == 0) { + pthread_cond_signal(&channel->cond); + } + } + + /* Cleanup */ + janet_vm_jmp_buf = old_buf; + pthread_mutex_unlock(&channel->lock); + + return ret; +} + +static int janet_channel_send(JanetChannel *channel, Janet msg, JanetTable *dict) { + JanetByteView dud = {0}; + return janet_channel_send_any(channel, msg, dud, 0, dict); +} + +/* +static int janet_channel_send_image(JanetChannel *channel, JanetByteView bytes) { + return janet_channel_send_any(channel, janet_wrap_nil(), bytes, 1, NULL); +} +*/ + +/* Returns 1 if nothing in queue or failed to get item. Does not block or panic. Uses dict to read bytes from + * the channel and unmarshal them. */ +static int janet_channel_receive(JanetChannel *channel, Janet *msg_out, JanetTable *dict) { + pthread_mutex_lock(&channel->lock); + + /* If queue is empty, block for now. */ + while (channel->buf.count == 0) { + pthread_cond_wait(&channel->cond, &channel->lock); + } + + /* Hack to capture all panics from marshalling. This works because + * we know janet_marshal won't mess with other essential global state. */ + jmp_buf buf; + jmp_buf *old_buf = janet_vm_jmp_buf; + janet_vm_jmp_buf = &buf; + + /* Handle errors */ + int ret = 0; + if (setjmp(buf)) { + /* Clear the channel on errors */ + channel->buf.count = 0; + ret = 1; + } else { + /* Read from beginning of channel */ + const uint8_t *nextItem = NULL; + Janet item = janet_unmarshal(channel->buf.data, channel->buf.count, 0, dict, &nextItem); + + /* Update memory and put result into *msg_out */ + int32_t chunkCount = nextItem - channel->buf.data; + memmove(channel->buf.data, nextItem, channel->buf.count - chunkCount); + channel->buf.count -= chunkCount; + *msg_out = item; + } + + /* Cleanup */ + janet_vm_jmp_buf = old_buf; + pthread_mutex_unlock(&channel->lock); + + return ret; +} + static int thread_gc(void *p, size_t size) { JanetThread *thread = (JanetThread *)p; JanetThreadShared *shared = thread->shared; @@ -45,17 +160,26 @@ static int thread_gc(void *p, size_t size) { pthread_mutex_lock(&shared->refCountLock); int refcount = --shared->refCount; if (refcount == 0) { - shared_cleanup(shared); + janet_shared_destroy(shared); } else { pthread_mutex_unlock(&shared->refCountLock); } return 0; } +static int thread_mark(void *p, size_t size) { + JanetThread *thread = (JanetThread *)p; + (void) size; + if (NULL != thread->dict) { + janet_mark(janet_wrap_table(thread->dict)); + } + return 0; +} + static JanetAbstractType Thread_AT = { "core/thread", thread_gc, - NULL, + thread_mark, NULL, NULL, NULL, @@ -63,43 +187,53 @@ static JanetAbstractType Thread_AT = { NULL }; +static JanetThread *janet_make_thread(JanetThreadShared *shared, JanetTable *dict, int who) { + JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread)); + thread->shared = shared; + thread->kind = who; + thread->dict = dict; + thread->handle = NULL; + return thread; +} + JanetThread *janet_getthread(Janet *argv, int32_t n) { return (JanetThread *) janet_getabstract(argv, n, &Thread_AT); } /* Runs in new thread */ static int thread_worker(JanetThreadShared *shared) { + pthread_t handle = pthread_self(); + pthread_detach(handle); + /* Init VM */ janet_init(); + /* Get dictionary */ JanetTable *dict = janet_core_dictionary(NULL); - const uint8_t *next = NULL; + + /* Create self thread */ + JanetThread *thread = janet_make_thread(shared, dict, JANET_THREAD_SELF); + thread->handle = handle; + Janet threadv = janet_wrap_abstract(thread); /* Unmarshal the function */ - Janet funcv = janet_unmarshal(shared->memory, shared->memorySize, 0, dict, &next); - if (next == shared->memory) goto error; + Janet funcv; + int status = janet_channel_receive(&shared->child, &funcv, dict); + if (status) goto error; if (!janet_checktype(funcv, JANET_FUNCTION)) goto error; JanetFunction *func = janet_unwrap_function(funcv); - /* Create self thread */ - JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread)); - thread->shared = shared; - thread->handle = pthread_self(); - - /* Clean up thread when done, do not wait for a join. For - * communicating with other threads, we will rely on the - * JanetThreadShared structure. */ - pthread_detach(thread->handle); + /* Arity check */ + if (func->def->min_arity > 1 || func->def->max_arity < 1) { + goto error; + } /* Call function */ - JanetFiber *fiber = janet_fiber(func, 64, 0, NULL); - fiber->env = janet_table(0); - janet_table_put(fiber->env, janet_ckeywordv("worker"), janet_wrap_abstract(thread)); + Janet argv[1] = { threadv }; + JanetFiber *fiber = janet_fiber(func, 64, 1, argv); Janet out; janet_continue(fiber, janet_wrap_nil(), &out); - /* TODO - marshal 'out' into sharedMemory */ - /* Success */ janet_deinit(); return 0; @@ -110,47 +244,84 @@ error: return 1; } -void *janet_pthread_wrapper(void *param) { +static void *janet_pthread_wrapper(void *param) { thread_worker((JanetThreadShared *)param); return NULL; } -static Janet cfun_from_image(int32_t argc, Janet *argv) { - janet_fixarity(argc, 1); - JanetByteView bytes = janet_getbytes(argv, 0); - - /* Create Shared memory chunk of thread object */ - JanetThreadShared *shared = malloc(sizeof(JanetThreadShared)); - uint8_t *mem = malloc(bytes.len); - if (NULL == shared || NULL == mem) { - janet_panicf("could not allocate memory for thread"); - } - shared->memory = mem; - shared->memorySize = bytes.len; - memcpy(mem, bytes.bytes, bytes.len); - shared->refCount = 2; - pthread_mutex_init(&shared->refCountLock, NULL); - pthread_mutex_init(&shared->memoryLock, NULL); - - /* Create thread abstract */ - JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread)); - thread->shared = shared; - - /* Run thread */ +static void janet_thread_start_child(JanetThread *thread) { + JanetThreadShared *shared = thread->shared; int error = pthread_create(&thread->handle, NULL, janet_pthread_wrapper, shared); if (error) { thread->shared = NULL; /* Prevent GC from trying to mess with shared memory here */ - shared_cleanup(shared); + janet_shared_destroy(shared); } +} +/* + * Cfuns + */ + +static Janet cfun_thread_new(int32_t argc, Janet *argv) { + janet_arity(argc, 0, 1); + JanetTable *dict = NULL; + if (argc == 0 || janet_checktype(argv[0], JANET_NIL)) { + dict = janet_core_dictionary(NULL); + } else { + dict = janet_gettable(argv, 0); + } + JanetThreadShared *shared = janet_shared_create(0); + JanetThread *thread = janet_make_thread(shared, dict, JANET_THREAD_OTHER); + janet_thread_start_child(thread); return janet_wrap_abstract(thread); } +static Janet cfun_thread_send(int32_t argc, Janet *argv) { + janet_fixarity(argc, 2); + JanetThread *thread = janet_getthread(argv, 0); + JanetThreadShared *shared = thread->shared; + if (NULL == shared) janet_panic("channel has closed"); + int status = janet_channel_send(thread->kind == JANET_THREAD_SELF ? &shared->parent : &shared->child, + argv[1], + thread->dict); + if (status) { + janet_panicf("failed to send message %v", argv[1]); + } + return argv[0]; +} + +static Janet cfun_thread_receive(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetThread *thread = janet_getthread(argv, 0); + JanetThreadShared *shared = thread->shared; + if (NULL == shared) janet_panic("channel has closed"); + Janet out = janet_wrap_nil(); + int status = janet_channel_receive(thread->kind == JANET_THREAD_SELF ? &shared->child : &shared->parent, + &out, + thread->dict); + if (status) { + janet_panic("failed to receive message"); + } + return out; +} + static const JanetReg threadlib_cfuns[] = { { - "thread/from-image", cfun_from_image, - JDOC("(thread/from-image image)\n\n" - "Start a new thread. image is a byte sequence, containing a marshalled function.") + "thread/new", cfun_thread_new, + JDOC("(thread/new &opt dict)\n\n" + "Start a new thread. The thread will wait for a message containing the function used to start the thread, which should be subsequently " + "sent over after thread creation.") + }, + { + "thread/send", cfun_thread_send, + JDOC("(thread/send thread msg)\n\n" + "Send a message to the thread. This will never block and returns thread immediately. " + "Will throw an error if there is a problem sending the message.") + }, + { + "thread/receive", cfun_thread_receive, + JDOC("(thread/receive thread)\n\n" + "Get a value sent to thread. Will block if there is no value was sent to this thread yet. Returns the message sent to the thread.") }, {NULL, NULL, NULL} }; diff --git a/src/core/util.c b/src/core/util.c index a2d56594..c893f25c 100644 --- a/src/core/util.c +++ b/src/core/util.c @@ -331,18 +331,9 @@ const JanetAbstractType *janet_get_abstract_type(Janet key) { void janet_core_def(JanetTable *env, const char *name, Janet x, const void *p) { (void) p; Janet key = janet_csymbolv(name); - Janet value; - /* During init, allow replacing core library cfunctions with values from - * the env. */ - Janet check = janet_table_get(env, key); - if (janet_checktype(check, JANET_NIL)) { - value = x; - } else { - value = check; - } - janet_table_put(env, key, value); - if (janet_checktype(value, JANET_CFUNCTION)) { - janet_table_put(janet_vm_registry, value, key); + janet_table_put(env, key, x); + if (janet_checktype(x, JANET_CFUNCTION)) { + janet_table_put(janet_vm_registry, x, key); } } diff --git a/src/core/vm.c b/src/core/vm.c index 0cc69324..1cf83694 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -30,6 +30,7 @@ #endif /* VM state */ +JANET_THREAD_LOCAL JanetTable *janet_vm_core_dictionary; JANET_THREAD_LOCAL JanetTable *janet_vm_registry; JANET_THREAD_LOCAL int janet_vm_stackn = 0; JANET_THREAD_LOCAL JanetFiber *janet_vm_fiber = NULL; @@ -1240,6 +1241,8 @@ int janet_init(void) { /* Initialize registry */ janet_vm_registry = janet_table(0); janet_gcroot(janet_wrap_table(janet_vm_registry)); + /* Core env and dictionary */ + janet_vm_core_dictionary = NULL; /* Seed RNG */ janet_rng_seed(janet_default_rng(), 0); return 0; @@ -1254,4 +1257,5 @@ void janet_deinit(void) { janet_vm_root_count = 0; janet_vm_root_capacity = 0; janet_vm_registry = NULL; + janet_vm_core_dictionary = NULL; } diff --git a/src/include/janet.h b/src/include/janet.h index dd1db087..c644ffa4 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -324,24 +324,6 @@ typedef struct JanetRange JanetRange; typedef struct JanetRNG JanetRNG; typedef Janet(*JanetCFunction)(int32_t argc, Janet *argv); -/* Thread types */ -#ifdef JANET_THREADS -#include -typedef struct JanetThread JanetThread; -typedef struct JanetThreadShared JanetThreadShared; -struct JanetThreadShared { - pthread_mutex_t memoryLock; - pthread_mutex_t refCountLock; - uint8_t *memory; - size_t memorySize; - int refCount; -}; -struct JanetThread { - pthread_t handle; - JanetThreadShared *shared; -}; -#endif - /* Basic types for all Janet Values */ typedef enum JanetType { JANET_NUMBER, @@ -956,6 +938,35 @@ struct JanetRNG { uint32_t counter; }; +/* Thread types */ +#ifdef JANET_THREADS +#include +typedef struct JanetThread JanetThread; +typedef struct JanetThreadShared JanetThreadShared; +typedef struct JanetChannel JanetChannel; +struct JanetChannel { + pthread_mutex_t lock; + pthread_cond_t cond; + JanetBuffer buf; +}; +struct JanetThreadShared { + pthread_mutex_t refCountLock; + int refCount; + JanetChannel parent; + JanetChannel child; +}; +struct JanetThread { + pthread_t handle; + JanetThreadShared *shared; + JanetTable *dict; + enum { + JANET_THREAD_SELF, + JANET_THREAD_OTHER + } kind; +}; +#endif + + /***** END SECTION TYPES *****/ /***** START SECTION OPCODES *****/