From 4187c972a35768c49cdd8a2614847303ba23c7f0 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Tue, 10 Dec 2019 13:26:00 -0600 Subject: [PATCH] Switch to multiple buffers per mailbox. Needs less copying. --- src/core/thread.c | 80 ++++++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/src/core/thread.c b/src/core/thread.c index eed9008d..67b5df2d 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -43,48 +43,53 @@ struct JanetMailbox { /* Receiving messages - (only by owner thread) */ JanetTable *decode; - /* Only one buffer for now, but we probably want messageCapacity buffers - * to store messages. This means no need for memmove and other tricks - * we can do, such as giving buffers to the writing thread without - * blocking other threads accessing the mailbox. */ - JanetBuffer buf; - /* Setup procedure - requires a parent mailbox * to receive thunk from */ JanetMailbox *parent; - /* For back pressure */ - int32_t messageCapacity; - int32_t messageCount; - /* Memory management - reference counting */ int refCount; int closed; + + /* Store messages */ + uint16_t messageCapacity; + uint16_t messageCount; + uint16_t messageFirst; + uint16_t messageNext; + + /* Buffers to store messages */ + JanetBuffer messages[]; }; static JANET_THREAD_LOCAL JanetMailbox *janet_vm_mailbox = NULL; static JANET_THREAD_LOCAL JanetThread *janet_vm_thread_current = NULL; -static JanetMailbox *janet_mailbox_create(JanetMailbox *parent, int refCount) { - JanetMailbox *mailbox = malloc(sizeof(JanetMailbox)); +static JanetMailbox *janet_mailbox_create(JanetMailbox *parent, int refCount, uint16_t capacity) { + JanetMailbox *mailbox = malloc(sizeof(JanetMailbox) + sizeof(JanetBuffer) * capacity); if (NULL == mailbox) { JANET_OUT_OF_MEMORY; } pthread_mutex_init(&mailbox->lock, NULL); pthread_cond_init(&mailbox->cond, NULL); - janet_buffer_init(&mailbox->buf, 1024); mailbox->refCount = refCount; mailbox->closed = 0; mailbox->parent = parent; mailbox->messageCount = 0; - mailbox->messageCapacity = 0; + mailbox->messageCapacity = capacity; + mailbox->messageFirst = 0; + mailbox->messageNext = 0; + for (uint16_t i = 0; i < capacity; i++) { + janet_buffer_init(mailbox->messages + i, 0); + } return mailbox; } static void janet_mailbox_destroy(JanetMailbox *mailbox) { pthread_mutex_destroy(&mailbox->lock); pthread_cond_destroy(&mailbox->cond); - janet_buffer_deinit(&mailbox->buf); + for (uint16_t i = 0; i < mailbox->messageCapacity; i++) { + janet_buffer_deinit(mailbox->messages + i); + } free(mailbox); } @@ -192,16 +197,21 @@ int janet_thread_send(JanetThread *thread, Janet msg, double timeout) { jmp_buf buf; jmp_buf *old_buf = janet_vm_jmp_buf; janet_vm_jmp_buf = &buf; - int32_t oldcount = mailbox->buf.count; int32_t oldmcount = mailbox->messageCount; int ret = 0; if (setjmp(buf)) { ret = 1; - mailbox->buf.count = oldcount; mailbox->messageCount = oldmcount; } else { - janet_marshal(&mailbox->buf, msg, thread->encode, 0); + JanetBuffer *msgbuf = mailbox->messages + mailbox->messageNext; + msgbuf->count = 0; + + /* Start panic zone */ + janet_marshal(msgbuf, msg, thread->encode, 0); + /* End panic zone */ + + mailbox->messageNext = (mailbox->messageNext + 1) % mailbox->messageCapacity; mailbox->messageCount++; } @@ -210,7 +220,7 @@ int janet_thread_send(JanetThread *thread, Janet msg, double timeout) { pthread_mutex_unlock(&mailbox->lock); /* Potentially wake up a blocked thread */ - if (oldcount == 0 && ret == 0) { + if (oldmcount == 0 && ret == 0) { pthread_cond_signal(&mailbox->cond); } @@ -245,24 +255,18 @@ int janet_thread_receive(Janet *msg_out, double timeout) { /* Handle errors */ if (setjmp(buf)) { - /* Bad message, so clear buffer and wait for the next */ - mailbox->buf.count = 0; - mailbox->messageCount = 0; - /* Cleanup jmp_buf, keep lock */ janet_vm_jmp_buf = old_buf; } else { + JanetBuffer *msgbuf = mailbox->messages + mailbox->messageFirst; + mailbox->messageCount--; + mailbox->messageFirst = (mailbox->messageFirst + 1) % mailbox->messageCapacity; + /* Read from beginning of channel */ const uint8_t *nextItem = NULL; Janet item = janet_unmarshal( - mailbox->buf.data, mailbox->buf.count, + msgbuf->data, msgbuf->count, 0, mailbox->decode, &nextItem); - - /* Update memory and put result into *msg_out */ - int32_t chunkCount = nextItem - mailbox->buf.data; - memmove(mailbox->buf.data, nextItem, mailbox->buf.count - chunkCount); - mailbox->buf.count -= chunkCount; - mailbox->messageCount--; *msg_out = item; /* Cleanup */ @@ -413,7 +417,7 @@ static int janet_thread_start_child(JanetThread *thread) { void janet_threads_init(void) { if (NULL == janet_vm_mailbox) { - janet_vm_mailbox = janet_mailbox_create(NULL, 1); + janet_vm_mailbox = janet_mailbox_create(NULL, 1, 10); } } @@ -442,9 +446,12 @@ static Janet cfun_thread_current(int32_t argc, Janet *argv) { static Janet cfun_thread_new(int32_t argc, Janet *argv) { janet_arity(argc, 0, 1); + int32_t cap = janet_optinteger(argv, argc, 0, 10); + if (cap < 1 || cap > UINT16_MAX) { + janet_panicf("bad slot #1, expected integer in range [1, 65535], got %d", cap); + } JanetTable *encode = janet_get_core_table("make-image-dict"); - JanetMailbox *mailbox = janet_mailbox_create(janet_vm_mailbox, 2); - mailbox->messageCapacity = janet_optinteger(argv, argc, 0, 0); + JanetMailbox *mailbox = janet_mailbox_create(janet_vm_mailbox, 2, (uint16_t) cap); /* one for created thread, one for ->parent reference in new mailbox */ janet_mailbox_ref(janet_vm_mailbox, 2); @@ -515,9 +522,10 @@ static const JanetReg threadlib_cfuns[] = { { "thread/new", cfun_thread_new, JDOC("(thread/new &opt capacity)\n\n" - "Start a new thread. The thread will wait for a message containing the function used to start the thread, which should be subsequently " - "sent over after thread creation. If capacity is provided, that is how many messages can be stored in the thread's mailbox before blocking senders. " - "Returns a handle to that thread.") + "Start a new thread. The thread will wait for a message containing the function used to start the thread, which should be passed to the thread " + "via thread/send. If capacity is provided, that is how many messages can be stored in the thread's mailbox before blocking senders. " + "The capacity must be between 1 and 65535 inclusive, and defaults to 10. " + "Returns a handle to the new thread.") }, { "thread/send", cfun_thread_send,