1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-28 11:09:54 +00:00

Add mailbox capacity for back pressure.

(thread/send thread msg &opt timeout) can now timeout. Also
changed thread/self to thread/current for better consistency with
fibers, and all blocking operations will by default timeout after 1
second. I think its bad to make things block forerver by default.
This commit is contained in:
Calvin Rose 2019-12-08 12:30:30 -06:00
parent c9c4424261
commit 66d82c4513

View File

@ -35,17 +35,35 @@
/* typedefed in janet.h */ /* typedefed in janet.h */
struct JanetMailbox { struct JanetMailbox {
/* Synchronization */
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t cond; pthread_cond_t cond;
JanetMailbox *parent; /* May not have a parent */
JanetTable *decode; /* Only allowed access by one thread */ /* Receiving messages - (only by owner thread) */
JanetTable *decode;
/* Only one buffer for now, but we probably want messageCapacity buffers
* to store messages. This means no need for memmove and other tricks
* we can do, such as giving buffers to the writing thread without
* blocking other threads accessing the mailbox. */
JanetBuffer buf; JanetBuffer buf;
/* Setup procedure - requires a parent mailbox
* to receive thunk from */
JanetMailbox *parent;
/* For back pressure */
int32_t messageCapacity;
int32_t messageCount;
/* Memory management - reference counting */
int refCount; int refCount;
int closed; int closed;
}; };
static JANET_THREAD_LOCAL JanetMailbox *janet_vm_mailbox = NULL; static JANET_THREAD_LOCAL JanetMailbox *janet_vm_mailbox = NULL;
static JANET_THREAD_LOCAL JanetThread *janet_vm_thread_self = NULL; static JANET_THREAD_LOCAL JanetThread *janet_vm_thread_current = NULL;
static JanetMailbox *janet_mailbox_create(JanetMailbox *parent, int refCount) { static JanetMailbox *janet_mailbox_create(JanetMailbox *parent, int refCount) {
JanetMailbox *mailbox = malloc(sizeof(JanetMailbox)); JanetMailbox *mailbox = malloc(sizeof(JanetMailbox));
@ -58,6 +76,8 @@ static JanetMailbox *janet_mailbox_create(JanetMailbox *parent, int refCount) {
mailbox->refCount = refCount; mailbox->refCount = refCount;
mailbox->closed = 0; mailbox->closed = 0;
mailbox->parent = parent; mailbox->parent = parent;
mailbox->messageCount = 0;
mailbox->messageCapacity = 0;
return mailbox; return mailbox;
} }
@ -106,47 +126,6 @@ static int thread_mark(void *p, size_t size) {
return 0; return 0;
} }
/* Returns 1 if could not send (encode error), 2 for mailbox closed, and
* 0 otherwise. Will not panic. */
int janet_thread_send(JanetThread *thread, Janet msg) {
/* Ensure mailbox is not closed. */
JanetMailbox *mailbox = thread->mailbox;
if (NULL == mailbox) return 2;
pthread_mutex_lock(&mailbox->lock);
if (mailbox->closed) {
janet_mailbox_ref_with_lock(mailbox, -1);
thread->mailbox = NULL;
return 2;
}
/* Hack to capture all panics from marshalling. This works because
* we know janet_marshal won't mess with other essential global state. */
jmp_buf buf;
jmp_buf *old_buf = janet_vm_jmp_buf;
janet_vm_jmp_buf = &buf;
int32_t oldcount = mailbox->buf.count;
int ret = 0;
if (setjmp(buf)) {
ret = 1;
mailbox->buf.count = oldcount;
} else {
janet_marshal(&mailbox->buf, msg, thread->encode, 0);
}
/* Cleanup */
janet_vm_jmp_buf = old_buf;
pthread_mutex_unlock(&mailbox->lock);
/* Potentially wake up a blocked thread */
if (oldcount == 0 && ret == 0) {
pthread_cond_signal(&mailbox->cond);
}
return ret;
}
/* Convert an interval from now in an absolute timespec */ /* Convert an interval from now in an absolute timespec */
static void janet_sec2ts(double sec, struct timespec *ts) { static void janet_sec2ts(double sec, struct timespec *ts) {
struct timespec now; struct timespec now;
@ -163,9 +142,85 @@ static void janet_sec2ts(double sec, struct timespec *ts) {
ts->tv_nsec = tvnsec; ts->tv_nsec = tvnsec;
} }
static int mailbox_at_capacity(JanetMailbox *mailbox) {
return mailbox->messageCapacity > 0
&& mailbox->messageCount == mailbox->messageCapacity;
}
/* Returns 1 if could not send (encode error or timeout), 2 for mailbox closed, and
* 0 otherwise. Will not panic. */
int janet_thread_send(JanetThread *thread, Janet msg, double timeout) {
/* Ensure mailbox is not closed. */
JanetMailbox *mailbox = thread->mailbox;
if (NULL == mailbox) return 2;
pthread_mutex_lock(&mailbox->lock);
if (mailbox->closed) {
janet_mailbox_ref_with_lock(mailbox, -1);
thread->mailbox = NULL;
return 2;
}
/* Back pressure */
if (mailbox_at_capacity(mailbox)) {
struct timespec timeout_ts;
int timedwait = timeout > 0.0;
int nowait = timeout == 0.0;
if (timedwait) janet_sec2ts(timeout, &timeout_ts);
if (nowait) {
janet_mailbox_ref_with_lock(mailbox, -1);
return 1;
}
/* Retry loop, as there can be multiple writers */
while (mailbox->messageCount > mailbox->messageCapacity) {
if (timedwait) {
int status = pthread_cond_timedwait(&mailbox->cond, &mailbox->lock, &timeout_ts);
if (status) {
/* Timedout */
pthread_mutex_unlock(&mailbox->lock);
return 1;
}
} else {
pthread_cond_wait(&mailbox->cond, &mailbox->lock);
}
}
}
/* Hack to capture all panics from marshalling. This works because
* we know janet_marshal won't mess with other essential global state. */
jmp_buf buf;
jmp_buf *old_buf = janet_vm_jmp_buf;
janet_vm_jmp_buf = &buf;
int32_t oldcount = mailbox->buf.count;
int32_t oldmcount = mailbox->messageCount;
int ret = 0;
if (setjmp(buf)) {
ret = 1;
mailbox->buf.count = oldcount;
mailbox->messageCount = oldmcount;
} else {
janet_marshal(&mailbox->buf, msg, thread->encode, 0);
mailbox->messageCount++;
}
/* Cleanup */
janet_vm_jmp_buf = old_buf;
pthread_mutex_unlock(&mailbox->lock);
/* Potentially wake up a blocked thread */
if (oldcount == 0 && ret == 0) {
pthread_cond_signal(&mailbox->cond);
}
return ret;
}
/* Returns 0 on successful message. Returns 1 if timedout */ /* Returns 0 on successful message. Returns 1 if timedout */
int janet_thread_receive(Janet *msg_out, double timeout) { int janet_thread_receive(Janet *msg_out, double timeout) {
pthread_mutex_lock(&janet_vm_mailbox->lock); JanetMailbox *mailbox = janet_vm_mailbox;
pthread_mutex_lock(&mailbox->lock);
/* For timeouts */ /* For timeouts */
struct timespec timeout_ts; struct timespec timeout_ts;
@ -175,8 +230,13 @@ int janet_thread_receive(Janet *msg_out, double timeout) {
for (;;) { for (;;) {
/* Check for messages waiting for use */ /* Check for messages waiting for us */
if (janet_vm_mailbox->buf.count) { if (mailbox->messageCount > 0) {
/* If we were over capacity, receiving a message should
* bring us under capacity. We can therefor wake up pending writers. */
int wasAtCapacity = mailbox_at_capacity(mailbox);
/* Hack to capture all panics from marshalling. This works because /* Hack to capture all panics from marshalling. This works because
* we know janet_marshal won't mess with other essential global state. */ * we know janet_marshal won't mess with other essential global state. */
jmp_buf buf; jmp_buf buf;
@ -186,47 +246,59 @@ int janet_thread_receive(Janet *msg_out, double timeout) {
/* Handle errors */ /* Handle errors */
if (setjmp(buf)) { if (setjmp(buf)) {
/* Bad message, so clear buffer and wait for the next */ /* Bad message, so clear buffer and wait for the next */
janet_vm_mailbox->buf.count = 0; mailbox->buf.count = 0;
mailbox->messageCount = 0;
/* Cleanup jmp_buf, keep lock */
janet_vm_jmp_buf = old_buf; janet_vm_jmp_buf = old_buf;
} else { } else {
/* Read from beginning of channel */ /* Read from beginning of channel */
const uint8_t *nextItem = NULL; const uint8_t *nextItem = NULL;
Janet item = janet_unmarshal( Janet item = janet_unmarshal(
janet_vm_mailbox->buf.data, janet_vm_mailbox->buf.count, mailbox->buf.data, mailbox->buf.count,
0, janet_vm_mailbox->decode, &nextItem); 0, mailbox->decode, &nextItem);
/* Update memory and put result into *msg_out */ /* Update memory and put result into *msg_out */
int32_t chunkCount = nextItem - janet_vm_mailbox->buf.data; int32_t chunkCount = nextItem - mailbox->buf.data;
memmove(janet_vm_mailbox->buf.data, nextItem, janet_vm_mailbox->buf.count - chunkCount); memmove(mailbox->buf.data, nextItem, mailbox->buf.count - chunkCount);
janet_vm_mailbox->buf.count -= chunkCount; mailbox->buf.count -= chunkCount;
mailbox->messageCount--;
*msg_out = item; *msg_out = item;
/* Cleanup */
janet_vm_jmp_buf = old_buf; janet_vm_jmp_buf = old_buf;
pthread_mutex_unlock(&janet_vm_mailbox->lock); pthread_mutex_unlock(&mailbox->lock);
/* Wake up pending writers */
if (wasAtCapacity) {
pthread_cond_signal(&mailbox->cond);
}
return 0; return 0;
} }
} }
if (nowait || janet_vm_mailbox->refCount <= 1) { if (nowait || mailbox->refCount <= 1) {
/* If there is only one ref, it is us. This means that if we /* If there is only one ref, it is us. This means that if we
* start waiting now, we can never possibly get a message, as * start waiting now, we can never possibly get a message, as
* our reference will not propogate to other threads while we are blocked. */ * our reference will not propogate to other threads while we are blocked. */
pthread_mutex_unlock(&janet_vm_mailbox->lock); pthread_mutex_unlock(&mailbox->lock);
return 1; return 1;
} }
/* Wait for next message */ /* Wait for next message */
if (timedwait) { if (timedwait) {
if (pthread_cond_timedwait( if (pthread_cond_timedwait(
&janet_vm_mailbox->cond, &mailbox->cond,
&janet_vm_mailbox->lock, &mailbox->lock,
&timeout_ts)) { &timeout_ts)) {
pthread_mutex_unlock(&janet_vm_mailbox->lock); pthread_mutex_unlock(&mailbox->lock);
return 1; return 1;
} }
} else { } else {
pthread_cond_wait( pthread_cond_wait(
&janet_vm_mailbox->cond, &mailbox->cond,
&janet_vm_mailbox->lock); &mailbox->lock);
} }
} }
@ -350,29 +422,29 @@ void janet_threads_deinit(void) {
janet_vm_mailbox->closed = 1; janet_vm_mailbox->closed = 1;
janet_mailbox_ref_with_lock(janet_vm_mailbox, -1); janet_mailbox_ref_with_lock(janet_vm_mailbox, -1);
janet_vm_mailbox = NULL; janet_vm_mailbox = NULL;
janet_vm_thread_self = NULL; janet_vm_thread_current = NULL;
} }
/* /*
* Cfuns * Cfuns
*/ */
static Janet cfun_thread_self(int32_t argc, Janet *argv) { static Janet cfun_thread_current(int32_t argc, Janet *argv) {
(void) argv; (void) argv;
janet_fixarity(argc, 0); janet_fixarity(argc, 0);
if (NULL == janet_vm_thread_self) { if (NULL == janet_vm_thread_current) {
janet_vm_thread_self = janet_make_thread(janet_vm_mailbox, janet_get_core_table("make-image-dict")); janet_vm_thread_current = janet_make_thread(janet_vm_mailbox, janet_get_core_table("make-image-dict"));
janet_mailbox_ref(janet_vm_mailbox, 1); janet_mailbox_ref(janet_vm_mailbox, 1);
janet_gcroot(janet_wrap_abstract(janet_vm_thread_self)); janet_gcroot(janet_wrap_abstract(janet_vm_thread_current));
} }
return janet_wrap_abstract(janet_vm_thread_self); return janet_wrap_abstract(janet_vm_thread_current);
} }
static Janet cfun_thread_new(int32_t argc, Janet *argv) { static Janet cfun_thread_new(int32_t argc, Janet *argv) {
janet_fixarity(argc, 0); janet_arity(argc, 0, 1);
(void) argv;
JanetTable *encode = janet_get_core_table("make-image-dict"); JanetTable *encode = janet_get_core_table("make-image-dict");
JanetMailbox *mailbox = janet_mailbox_create(janet_vm_mailbox, 2); JanetMailbox *mailbox = janet_mailbox_create(janet_vm_mailbox, 2);
mailbox->messageCapacity = janet_optinteger(argv, argc, 0, 0);
/* one for created thread, one for ->parent reference in new mailbox */ /* one for created thread, one for ->parent reference in new mailbox */
janet_mailbox_ref(janet_vm_mailbox, 2); janet_mailbox_ref(janet_vm_mailbox, 2);
@ -387,9 +459,9 @@ static Janet cfun_thread_new(int32_t argc, Janet *argv) {
} }
static Janet cfun_thread_send(int32_t argc, Janet *argv) { static Janet cfun_thread_send(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2); janet_arity(argc, 2, 3);
JanetThread *thread = janet_getthread(argv, 0); JanetThread *thread = janet_getthread(argv, 0);
int status = janet_thread_send(thread, argv[1]); int status = janet_thread_send(thread, argv[1], janet_optnumber(argv, argc, 2, 1.0));
switch (status) { switch (status) {
default: default:
break; break;
@ -403,7 +475,7 @@ static Janet cfun_thread_send(int32_t argc, Janet *argv) {
static Janet cfun_thread_receive(int32_t argc, Janet *argv) { static Janet cfun_thread_receive(int32_t argc, Janet *argv) {
janet_arity(argc, 0, 1); janet_arity(argc, 0, 1);
double wait = janet_optnumber(argv, argc, 0, -1.0); double wait = janet_optnumber(argv, argc, 0, 1.0);
Janet out; Janet out;
int status = janet_thread_receive(&out, wait); int status = janet_thread_receive(&out, wait);
switch (status) { switch (status) {
@ -436,15 +508,16 @@ static Janet janet_thread_getter(void *p, Janet key) {
static const JanetReg threadlib_cfuns[] = { static const JanetReg threadlib_cfuns[] = {
{ {
"thread/self", cfun_thread_self, "thread/current", cfun_thread_current,
JDOC("(thread/self)\n\n" JDOC("(thread/current)\n\n"
"Get the current running thread.") "Get the current running thread.")
}, },
{ {
"thread/new", cfun_thread_new, "thread/new", cfun_thread_new,
JDOC("(thread/new)\n\n" JDOC("(thread/new &opt capacity)\n\n"
"Start a new thread. The thread will wait for a message containing the function used to start the thread, which should be subsequently " "Start a new thread. The thread will wait for a message containing the function used to start the thread, which should be subsequently "
"sent over after thread creation.") "sent over after thread creation. If capacity is provided, that is how many messages can be stored in the thread's mailbox before blocking senders. "
"Returns a handle to that thread.")
}, },
{ {
"thread/send", cfun_thread_send, "thread/send", cfun_thread_send,