1
0
mirror of https://github.com/janet-lang/janet synced 2025-01-15 09:55:40 +00:00

Switch to multiple buffers per mailbox.

Needs less copying.
This commit is contained in:
Calvin Rose 2019-12-10 13:26:00 -06:00
parent eb1c21b0da
commit 4187c972a3

View File

@ -43,48 +43,53 @@ struct JanetMailbox {
/* Receiving messages - (only by owner thread) */ /* Receiving messages - (only by owner thread) */
JanetTable *decode; JanetTable *decode;
/* Only one buffer for now, but we probably want messageCapacity buffers
* to store messages. This means no need for memmove and other tricks
* we can do, such as giving buffers to the writing thread without
* blocking other threads accessing the mailbox. */
JanetBuffer buf;
/* Setup procedure - requires a parent mailbox /* Setup procedure - requires a parent mailbox
* to receive thunk from */ * to receive thunk from */
JanetMailbox *parent; JanetMailbox *parent;
/* For back pressure */
int32_t messageCapacity;
int32_t messageCount;
/* Memory management - reference counting */ /* Memory management - reference counting */
int refCount; int refCount;
int closed; int closed;
/* Store messages */
uint16_t messageCapacity;
uint16_t messageCount;
uint16_t messageFirst;
uint16_t messageNext;
/* Buffers to store messages */
JanetBuffer messages[];
}; };
static JANET_THREAD_LOCAL JanetMailbox *janet_vm_mailbox = NULL; static JANET_THREAD_LOCAL JanetMailbox *janet_vm_mailbox = NULL;
static JANET_THREAD_LOCAL JanetThread *janet_vm_thread_current = NULL; static JANET_THREAD_LOCAL JanetThread *janet_vm_thread_current = NULL;
static JanetMailbox *janet_mailbox_create(JanetMailbox *parent, int refCount) { static JanetMailbox *janet_mailbox_create(JanetMailbox *parent, int refCount, uint16_t capacity) {
JanetMailbox *mailbox = malloc(sizeof(JanetMailbox)); JanetMailbox *mailbox = malloc(sizeof(JanetMailbox) + sizeof(JanetBuffer) * capacity);
if (NULL == mailbox) { if (NULL == mailbox) {
JANET_OUT_OF_MEMORY; JANET_OUT_OF_MEMORY;
} }
pthread_mutex_init(&mailbox->lock, NULL); pthread_mutex_init(&mailbox->lock, NULL);
pthread_cond_init(&mailbox->cond, NULL); pthread_cond_init(&mailbox->cond, NULL);
janet_buffer_init(&mailbox->buf, 1024);
mailbox->refCount = refCount; mailbox->refCount = refCount;
mailbox->closed = 0; mailbox->closed = 0;
mailbox->parent = parent; mailbox->parent = parent;
mailbox->messageCount = 0; mailbox->messageCount = 0;
mailbox->messageCapacity = 0; mailbox->messageCapacity = capacity;
mailbox->messageFirst = 0;
mailbox->messageNext = 0;
for (uint16_t i = 0; i < capacity; i++) {
janet_buffer_init(mailbox->messages + i, 0);
}
return mailbox; return mailbox;
} }
static void janet_mailbox_destroy(JanetMailbox *mailbox) { static void janet_mailbox_destroy(JanetMailbox *mailbox) {
pthread_mutex_destroy(&mailbox->lock); pthread_mutex_destroy(&mailbox->lock);
pthread_cond_destroy(&mailbox->cond); pthread_cond_destroy(&mailbox->cond);
janet_buffer_deinit(&mailbox->buf); for (uint16_t i = 0; i < mailbox->messageCapacity; i++) {
janet_buffer_deinit(mailbox->messages + i);
}
free(mailbox); free(mailbox);
} }
@ -192,16 +197,21 @@ int janet_thread_send(JanetThread *thread, Janet msg, double timeout) {
jmp_buf buf; jmp_buf buf;
jmp_buf *old_buf = janet_vm_jmp_buf; jmp_buf *old_buf = janet_vm_jmp_buf;
janet_vm_jmp_buf = &buf; janet_vm_jmp_buf = &buf;
int32_t oldcount = mailbox->buf.count;
int32_t oldmcount = mailbox->messageCount; int32_t oldmcount = mailbox->messageCount;
int ret = 0; int ret = 0;
if (setjmp(buf)) { if (setjmp(buf)) {
ret = 1; ret = 1;
mailbox->buf.count = oldcount;
mailbox->messageCount = oldmcount; mailbox->messageCount = oldmcount;
} else { } else {
janet_marshal(&mailbox->buf, msg, thread->encode, 0); JanetBuffer *msgbuf = mailbox->messages + mailbox->messageNext;
msgbuf->count = 0;
/* Start panic zone */
janet_marshal(msgbuf, msg, thread->encode, 0);
/* End panic zone */
mailbox->messageNext = (mailbox->messageNext + 1) % mailbox->messageCapacity;
mailbox->messageCount++; mailbox->messageCount++;
} }
@ -210,7 +220,7 @@ int janet_thread_send(JanetThread *thread, Janet msg, double timeout) {
pthread_mutex_unlock(&mailbox->lock); pthread_mutex_unlock(&mailbox->lock);
/* Potentially wake up a blocked thread */ /* Potentially wake up a blocked thread */
if (oldcount == 0 && ret == 0) { if (oldmcount == 0 && ret == 0) {
pthread_cond_signal(&mailbox->cond); pthread_cond_signal(&mailbox->cond);
} }
@ -245,24 +255,18 @@ int janet_thread_receive(Janet *msg_out, double timeout) {
/* Handle errors */ /* Handle errors */
if (setjmp(buf)) { if (setjmp(buf)) {
/* Bad message, so clear buffer and wait for the next */
mailbox->buf.count = 0;
mailbox->messageCount = 0;
/* Cleanup jmp_buf, keep lock */ /* Cleanup jmp_buf, keep lock */
janet_vm_jmp_buf = old_buf; janet_vm_jmp_buf = old_buf;
} else { } else {
JanetBuffer *msgbuf = mailbox->messages + mailbox->messageFirst;
mailbox->messageCount--;
mailbox->messageFirst = (mailbox->messageFirst + 1) % mailbox->messageCapacity;
/* Read from beginning of channel */ /* Read from beginning of channel */
const uint8_t *nextItem = NULL; const uint8_t *nextItem = NULL;
Janet item = janet_unmarshal( Janet item = janet_unmarshal(
mailbox->buf.data, mailbox->buf.count, msgbuf->data, msgbuf->count,
0, mailbox->decode, &nextItem); 0, mailbox->decode, &nextItem);
/* Update memory and put result into *msg_out */
int32_t chunkCount = nextItem - mailbox->buf.data;
memmove(mailbox->buf.data, nextItem, mailbox->buf.count - chunkCount);
mailbox->buf.count -= chunkCount;
mailbox->messageCount--;
*msg_out = item; *msg_out = item;
/* Cleanup */ /* Cleanup */
@ -413,7 +417,7 @@ static int janet_thread_start_child(JanetThread *thread) {
void janet_threads_init(void) { void janet_threads_init(void) {
if (NULL == janet_vm_mailbox) { if (NULL == janet_vm_mailbox) {
janet_vm_mailbox = janet_mailbox_create(NULL, 1); janet_vm_mailbox = janet_mailbox_create(NULL, 1, 10);
} }
} }
@ -442,9 +446,12 @@ static Janet cfun_thread_current(int32_t argc, Janet *argv) {
static Janet cfun_thread_new(int32_t argc, Janet *argv) { static Janet cfun_thread_new(int32_t argc, Janet *argv) {
janet_arity(argc, 0, 1); janet_arity(argc, 0, 1);
int32_t cap = janet_optinteger(argv, argc, 0, 10);
if (cap < 1 || cap > UINT16_MAX) {
janet_panicf("bad slot #1, expected integer in range [1, 65535], got %d", cap);
}
JanetTable *encode = janet_get_core_table("make-image-dict"); JanetTable *encode = janet_get_core_table("make-image-dict");
JanetMailbox *mailbox = janet_mailbox_create(janet_vm_mailbox, 2); JanetMailbox *mailbox = janet_mailbox_create(janet_vm_mailbox, 2, (uint16_t) cap);
mailbox->messageCapacity = janet_optinteger(argv, argc, 0, 0);
/* one for created thread, one for ->parent reference in new mailbox */ /* one for created thread, one for ->parent reference in new mailbox */
janet_mailbox_ref(janet_vm_mailbox, 2); janet_mailbox_ref(janet_vm_mailbox, 2);
@ -515,9 +522,10 @@ static const JanetReg threadlib_cfuns[] = {
{ {
"thread/new", cfun_thread_new, "thread/new", cfun_thread_new,
JDOC("(thread/new &opt capacity)\n\n" JDOC("(thread/new &opt capacity)\n\n"
"Start a new thread. The thread will wait for a message containing the function used to start the thread, which should be subsequently " "Start a new thread. The thread will wait for a message containing the function used to start the thread, which should be passed to the thread "
"sent over after thread creation. If capacity is provided, that is how many messages can be stored in the thread's mailbox before blocking senders. " "via thread/send. If capacity is provided, that is how many messages can be stored in the thread's mailbox before blocking senders. "
"Returns a handle to that thread.") "The capacity must be between 1 and 65535 inclusive, and defaults to 10. "
"Returns a handle to the new thread.")
}, },
{ {
"thread/send", cfun_thread_send, "thread/send", cfun_thread_send,