diff --git a/src/core/thread.c b/src/core/thread.c index a3da7e21..eed9008d 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -35,17 +35,35 @@ /* typedefed in janet.h */ struct JanetMailbox { + + /* Synchronization */ pthread_mutex_t lock; pthread_cond_t cond; - JanetMailbox *parent; /* May not have a parent */ - JanetTable *decode; /* Only allowed access by one thread */ + + /* Receiving messages - (only by owner thread) */ + JanetTable *decode; + + /* Only one buffer for now, but we probably want messageCapacity buffers + * to store messages. This means no need for memmove and other tricks + * we can do, such as giving buffers to the writing thread without + * blocking other threads accessing the mailbox. */ JanetBuffer buf; + + /* Setup procedure - requires a parent mailbox + * to receive thunk from */ + JanetMailbox *parent; + + /* For back pressure */ + int32_t messageCapacity; + int32_t messageCount; + + /* Memory management - reference counting */ int refCount; int closed; }; static JANET_THREAD_LOCAL JanetMailbox *janet_vm_mailbox = NULL; -static JANET_THREAD_LOCAL JanetThread *janet_vm_thread_self = NULL; +static JANET_THREAD_LOCAL JanetThread *janet_vm_thread_current = NULL; static JanetMailbox *janet_mailbox_create(JanetMailbox *parent, int refCount) { JanetMailbox *mailbox = malloc(sizeof(JanetMailbox)); @@ -58,6 +76,8 @@ static JanetMailbox *janet_mailbox_create(JanetMailbox *parent, int refCount) { mailbox->refCount = refCount; mailbox->closed = 0; mailbox->parent = parent; + mailbox->messageCount = 0; + mailbox->messageCapacity = 0; return mailbox; } @@ -106,47 +126,6 @@ static int thread_mark(void *p, size_t size) { return 0; } -/* Returns 1 if could not send (encode error), 2 for mailbox closed, and - * 0 otherwise. Will not panic. */ -int janet_thread_send(JanetThread *thread, Janet msg) { - - /* Ensure mailbox is not closed. */ - JanetMailbox *mailbox = thread->mailbox; - if (NULL == mailbox) return 2; - pthread_mutex_lock(&mailbox->lock); - if (mailbox->closed) { - janet_mailbox_ref_with_lock(mailbox, -1); - thread->mailbox = NULL; - return 2; - } - - /* Hack to capture all panics from marshalling. This works because - * we know janet_marshal won't mess with other essential global state. */ - jmp_buf buf; - jmp_buf *old_buf = janet_vm_jmp_buf; - janet_vm_jmp_buf = &buf; - int32_t oldcount = mailbox->buf.count; - - int ret = 0; - if (setjmp(buf)) { - ret = 1; - mailbox->buf.count = oldcount; - } else { - janet_marshal(&mailbox->buf, msg, thread->encode, 0); - } - - /* Cleanup */ - janet_vm_jmp_buf = old_buf; - pthread_mutex_unlock(&mailbox->lock); - - /* Potentially wake up a blocked thread */ - if (oldcount == 0 && ret == 0) { - pthread_cond_signal(&mailbox->cond); - } - - return ret; -} - /* Convert an interval from now in an absolute timespec */ static void janet_sec2ts(double sec, struct timespec *ts) { struct timespec now; @@ -163,9 +142,85 @@ static void janet_sec2ts(double sec, struct timespec *ts) { ts->tv_nsec = tvnsec; } +static int mailbox_at_capacity(JanetMailbox *mailbox) { + return mailbox->messageCapacity > 0 + && mailbox->messageCount == mailbox->messageCapacity; +} + +/* Returns 1 if could not send (encode error or timeout), 2 for mailbox closed, and + * 0 otherwise. Will not panic. */ +int janet_thread_send(JanetThread *thread, Janet msg, double timeout) { + + /* Ensure mailbox is not closed. */ + JanetMailbox *mailbox = thread->mailbox; + if (NULL == mailbox) return 2; + pthread_mutex_lock(&mailbox->lock); + if (mailbox->closed) { + janet_mailbox_ref_with_lock(mailbox, -1); + thread->mailbox = NULL; + return 2; + } + + /* Back pressure */ + if (mailbox_at_capacity(mailbox)) { + struct timespec timeout_ts; + int timedwait = timeout > 0.0; + int nowait = timeout == 0.0; + if (timedwait) janet_sec2ts(timeout, &timeout_ts); + if (nowait) { + janet_mailbox_ref_with_lock(mailbox, -1); + return 1; + } + + /* Retry loop, as there can be multiple writers */ + while (mailbox->messageCount > mailbox->messageCapacity) { + if (timedwait) { + int status = pthread_cond_timedwait(&mailbox->cond, &mailbox->lock, &timeout_ts); + if (status) { + /* Timedout */ + pthread_mutex_unlock(&mailbox->lock); + return 1; + } + } else { + pthread_cond_wait(&mailbox->cond, &mailbox->lock); + } + } + } + + /* Hack to capture all panics from marshalling. This works because + * we know janet_marshal won't mess with other essential global state. */ + jmp_buf buf; + jmp_buf *old_buf = janet_vm_jmp_buf; + janet_vm_jmp_buf = &buf; + int32_t oldcount = mailbox->buf.count; + int32_t oldmcount = mailbox->messageCount; + + int ret = 0; + if (setjmp(buf)) { + ret = 1; + mailbox->buf.count = oldcount; + mailbox->messageCount = oldmcount; + } else { + janet_marshal(&mailbox->buf, msg, thread->encode, 0); + mailbox->messageCount++; + } + + /* Cleanup */ + janet_vm_jmp_buf = old_buf; + pthread_mutex_unlock(&mailbox->lock); + + /* Potentially wake up a blocked thread */ + if (oldcount == 0 && ret == 0) { + pthread_cond_signal(&mailbox->cond); + } + + return ret; +} + /* Returns 0 on successful message. Returns 1 if timedout */ int janet_thread_receive(Janet *msg_out, double timeout) { - pthread_mutex_lock(&janet_vm_mailbox->lock); + JanetMailbox *mailbox = janet_vm_mailbox; + pthread_mutex_lock(&mailbox->lock); /* For timeouts */ struct timespec timeout_ts; @@ -175,8 +230,13 @@ int janet_thread_receive(Janet *msg_out, double timeout) { for (;;) { - /* Check for messages waiting for use */ - if (janet_vm_mailbox->buf.count) { + /* Check for messages waiting for us */ + if (mailbox->messageCount > 0) { + + /* If we were over capacity, receiving a message should + * bring us under capacity. We can therefor wake up pending writers. */ + int wasAtCapacity = mailbox_at_capacity(mailbox); + /* Hack to capture all panics from marshalling. This works because * we know janet_marshal won't mess with other essential global state. */ jmp_buf buf; @@ -186,47 +246,59 @@ int janet_thread_receive(Janet *msg_out, double timeout) { /* Handle errors */ if (setjmp(buf)) { /* Bad message, so clear buffer and wait for the next */ - janet_vm_mailbox->buf.count = 0; + mailbox->buf.count = 0; + mailbox->messageCount = 0; + + /* Cleanup jmp_buf, keep lock */ janet_vm_jmp_buf = old_buf; } else { /* Read from beginning of channel */ const uint8_t *nextItem = NULL; Janet item = janet_unmarshal( - janet_vm_mailbox->buf.data, janet_vm_mailbox->buf.count, - 0, janet_vm_mailbox->decode, &nextItem); + mailbox->buf.data, mailbox->buf.count, + 0, mailbox->decode, &nextItem); /* Update memory and put result into *msg_out */ - int32_t chunkCount = nextItem - janet_vm_mailbox->buf.data; - memmove(janet_vm_mailbox->buf.data, nextItem, janet_vm_mailbox->buf.count - chunkCount); - janet_vm_mailbox->buf.count -= chunkCount; + int32_t chunkCount = nextItem - mailbox->buf.data; + memmove(mailbox->buf.data, nextItem, mailbox->buf.count - chunkCount); + mailbox->buf.count -= chunkCount; + mailbox->messageCount--; *msg_out = item; + + /* Cleanup */ janet_vm_jmp_buf = old_buf; - pthread_mutex_unlock(&janet_vm_mailbox->lock); + pthread_mutex_unlock(&mailbox->lock); + + /* Wake up pending writers */ + if (wasAtCapacity) { + pthread_cond_signal(&mailbox->cond); + } + return 0; } } - if (nowait || janet_vm_mailbox->refCount <= 1) { + if (nowait || mailbox->refCount <= 1) { /* If there is only one ref, it is us. This means that if we * start waiting now, we can never possibly get a message, as * our reference will not propogate to other threads while we are blocked. */ - pthread_mutex_unlock(&janet_vm_mailbox->lock); + pthread_mutex_unlock(&mailbox->lock); return 1; } /* Wait for next message */ if (timedwait) { if (pthread_cond_timedwait( - &janet_vm_mailbox->cond, - &janet_vm_mailbox->lock, + &mailbox->cond, + &mailbox->lock, &timeout_ts)) { - pthread_mutex_unlock(&janet_vm_mailbox->lock); + pthread_mutex_unlock(&mailbox->lock); return 1; } } else { pthread_cond_wait( - &janet_vm_mailbox->cond, - &janet_vm_mailbox->lock); + &mailbox->cond, + &mailbox->lock); } } @@ -350,29 +422,29 @@ void janet_threads_deinit(void) { janet_vm_mailbox->closed = 1; janet_mailbox_ref_with_lock(janet_vm_mailbox, -1); janet_vm_mailbox = NULL; - janet_vm_thread_self = NULL; + janet_vm_thread_current = NULL; } /* * Cfuns */ -static Janet cfun_thread_self(int32_t argc, Janet *argv) { +static Janet cfun_thread_current(int32_t argc, Janet *argv) { (void) argv; janet_fixarity(argc, 0); - if (NULL == janet_vm_thread_self) { - janet_vm_thread_self = janet_make_thread(janet_vm_mailbox, janet_get_core_table("make-image-dict")); + if (NULL == janet_vm_thread_current) { + janet_vm_thread_current = janet_make_thread(janet_vm_mailbox, janet_get_core_table("make-image-dict")); janet_mailbox_ref(janet_vm_mailbox, 1); - janet_gcroot(janet_wrap_abstract(janet_vm_thread_self)); + janet_gcroot(janet_wrap_abstract(janet_vm_thread_current)); } - return janet_wrap_abstract(janet_vm_thread_self); + return janet_wrap_abstract(janet_vm_thread_current); } static Janet cfun_thread_new(int32_t argc, Janet *argv) { - janet_fixarity(argc, 0); - (void) argv; + janet_arity(argc, 0, 1); JanetTable *encode = janet_get_core_table("make-image-dict"); JanetMailbox *mailbox = janet_mailbox_create(janet_vm_mailbox, 2); + mailbox->messageCapacity = janet_optinteger(argv, argc, 0, 0); /* one for created thread, one for ->parent reference in new mailbox */ janet_mailbox_ref(janet_vm_mailbox, 2); @@ -387,9 +459,9 @@ static Janet cfun_thread_new(int32_t argc, Janet *argv) { } static Janet cfun_thread_send(int32_t argc, Janet *argv) { - janet_fixarity(argc, 2); + janet_arity(argc, 2, 3); JanetThread *thread = janet_getthread(argv, 0); - int status = janet_thread_send(thread, argv[1]); + int status = janet_thread_send(thread, argv[1], janet_optnumber(argv, argc, 2, 1.0)); switch (status) { default: break; @@ -403,7 +475,7 @@ static Janet cfun_thread_send(int32_t argc, Janet *argv) { static Janet cfun_thread_receive(int32_t argc, Janet *argv) { janet_arity(argc, 0, 1); - double wait = janet_optnumber(argv, argc, 0, -1.0); + double wait = janet_optnumber(argv, argc, 0, 1.0); Janet out; int status = janet_thread_receive(&out, wait); switch (status) { @@ -436,15 +508,16 @@ static Janet janet_thread_getter(void *p, Janet key) { static const JanetReg threadlib_cfuns[] = { { - "thread/self", cfun_thread_self, - JDOC("(thread/self)\n\n" + "thread/current", cfun_thread_current, + JDOC("(thread/current)\n\n" "Get the current running thread.") }, { "thread/new", cfun_thread_new, - JDOC("(thread/new)\n\n" + JDOC("(thread/new &opt capacity)\n\n" "Start a new thread. The thread will wait for a message containing the function used to start the thread, which should be subsequently " - "sent over after thread creation.") + "sent over after thread creation. If capacity is provided, that is how many messages can be stored in the thread's mailbox before blocking senders. " + "Returns a handle to that thread.") }, { "thread/send", cfun_thread_send,