1
0
mirror of https://github.com/janet-lang/janet synced 2025-01-24 22:26:52 +00:00

Add timeout to thread/receive.

If provided, throws an error if no message is received before
timeout. Perhaps should return nil?.
This commit is contained in:
Calvin Rose 2019-12-06 09:21:36 -06:00
parent c804ae9f7c
commit 0e690b4fa0
2 changed files with 80 additions and 26 deletions

View File

@ -57,3 +57,17 @@
(def lines (:receive (-> (thread/new) (:send worker-tree) (:send "adam") (:send 0))))
(map print lines)
#
# Receive timeout
#
(def slow (make-worker "slow-loras" 4))
(for i 0 50
(try
(let [msg (thread/receive slow 0.46)]
(print "\n" msg))
([err] (prin ".") (:flush stdout))))
(print "\ndone timing, timeouts ending.")
(try (while true (print (:receive slow))) ([err] (print "done")))

View File

@ -30,6 +30,7 @@
#ifdef JANET_THREADS
#include <setjmp.h>
#include <time.h>
static JANET_THREAD_LOCAL JanetThreadSelector janet_vm_thread_selector;
@ -198,17 +199,35 @@ static int janet_channel_receive(JanetChannel *rx, Janet *msg_out) {
return ret;
}
/* Convert an interval from now in an absolute timespec */
static void janet_sec2ts(double sec, struct timespec *ts) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
time_t tvsec = (time_t) floor(sec);
long tvnsec = (long) floor(1000000000.0 * (sec - ((double) tvsec)));
tvsec += now.tv_sec;
tvnsec += now.tv_nsec;
if (tvnsec >= 1000000000L) {
tvnsec -= 1000000000L;
tvsec += 1;
}
ts->tv_sec = tvsec;
ts->tv_nsec = tvnsec;
}
/* Get a message from one of the channels given. */
static int janet_channel_select(int32_t n, JanetChannel **rxs, Janet *msg_out) {
static int janet_channel_select(int32_t n, JanetChannel **rxs, Janet *msg_out, double wait) {
int32_t maxChannel = -1;
for (;;) {
int result = -1;
while (result < 0) {
janet_vm_thread_selector.channel = NULL;
/* Try each channel, first without acquiring locks and looking
* only for existing messages, then with acquiring
* locks, which will not miss messages. */
for (int trylock = 1; trylock >= 0; trylock--) {
for (int32_t i = 0; i < n; i++) {
* locks, which will not miss messages. If both fail, we also
* set the selector for each thread. */
for (int trylock = 1; trylock >= 0 && result < 0; trylock--) {
for (int32_t i = 0; i < n && result < 0; i++) {
JanetChannel *rx = rxs[i];
if (trylock) {
if (rx->buf.count == 0 || pthread_mutex_trylock(&rx->lock)) continue;
@ -217,7 +236,10 @@ static int janet_channel_select(int32_t n, JanetChannel **rxs, Janet *msg_out) {
}
int status = janet_channel_receive(rxs[i], msg_out);
pthread_mutex_unlock(&rxs[i]->lock);
if (status == 0) goto gotMessage;
if (status == 0) {
result = 0;
continue;
}
maxChannel = maxChannel > i ? maxChannel : i;
if (status == 2) {
/* channel closed and will receive no more messages, drop it */
@ -228,36 +250,52 @@ static int janet_channel_select(int32_t n, JanetChannel **rxs, Janet *msg_out) {
}
/* All channels closed */
if (n == 0) return 1;
if (n == 0) result = 1;
/* Don't wait for selector if we have a result */
if (result >= 0) break;
pthread_mutex_lock(&janet_vm_thread_selector.mutex);
{
int status = 0;
/* Wait until we have a channel */
if (NULL == janet_vm_thread_selector.channel) {
pthread_cond_wait(
&janet_vm_thread_selector.cond,
&janet_vm_thread_selector.mutex);
if (wait <= 0.0) {
pthread_cond_wait(
&janet_vm_thread_selector.cond,
&janet_vm_thread_selector.mutex);
} else {
struct timespec ts;
janet_sec2ts(wait, &ts);
status = pthread_cond_timedwait(
&janet_vm_thread_selector.cond,
&janet_vm_thread_selector.mutex,
&ts);
}
}
/* Timeout? */
if (status) {
result = 1;
} else {
JanetChannel *rx = janet_vm_thread_selector.channel;
int32_t index = 0;
while (rxs[index] != rx) index++;
rxs[index] = rxs[0];
rxs[0] = rx;
}
/* Got channel, swap it with first channel, and
* then go back to receiving messages. */
JanetChannel *rx = janet_vm_thread_selector.channel;
int32_t index = 0;
while (rxs[index] != rx) index++;
rxs[index] = rxs[0];
rxs[0] = rx;
}
pthread_mutex_unlock(&janet_vm_thread_selector.mutex);
}
gotMessage:
/* got message, unset selectors and return */
for (int32_t j = 0; j <= maxChannel && j < n; j++) {
for (int32_t j = 0; j <= maxChannel; j++) {
pthread_mutex_lock(&rxs[j]->lock);
rxs[j]->selector = NULL;
pthread_mutex_unlock(&rxs[j]->lock);
}
return 0;
return result;
}
static Janet janet_thread_getter(void *p, Janet key);
@ -302,7 +340,7 @@ static int thread_worker(JanetChannel *tx) {
/* Unmarshal the function */
Janet funcv;
int status = janet_channel_select(1, &rx, &funcv);
int status = janet_channel_select(1, &rx, &funcv, -1.0);
if (status) goto error;
if (!janet_checktype(funcv, JANET_FUNCTION)) goto error;
JanetFunction *func = janet_unwrap_function(funcv);
@ -387,11 +425,12 @@ static Janet cfun_thread_send(int32_t argc, Janet *argv) {
}
static Janet cfun_thread_receive(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
janet_arity(argc, 1, 2);
int status;
Janet out = janet_wrap_nil();
int32_t count;
const Janet *items;
double wait = janet_optnumber(argv, argc, 1, -1.0);
if (janet_indexed_view(argv[0], &items, &count)) {
/* Select on multiple threads */
if (count == 0) janet_panic("expected at least 1 thread");
@ -404,13 +443,13 @@ static Janet cfun_thread_receive(int32_t argc, Janet *argv) {
JanetThread *thread = janet_getthread(items, i);
if (thread->rx != NULL) rxs[realcount++] = thread->rx;
}
status = janet_channel_select(realcount, rxs, &out);
status = janet_channel_select(realcount, rxs, &out, wait);
if (rxs != rxs_stack) janet_sfree(rxs);
} else {
/* Get from one thread */
JanetThread *thread = janet_getthread(argv, 0);
if (NULL == thread->rx) janet_panic("channel has closed");
status = janet_channel_select(1, &thread->rx, &out);
status = janet_channel_select(1, &thread->rx, &out, wait);
}
if (status) {
janet_panic("failed to receive message");
@ -453,11 +492,12 @@ static const JanetReg threadlib_cfuns[] = {
},
{
"thread/receive", cfun_thread_receive,
JDOC("(thread/receive threads)\n\n"
JDOC("(thread/receive threads &opt timeout)\n\n"
"Get a value sent to 1 or more threads. Will block if no value was sent to this thread "
"yet. threads can also be an array or tuple of threads, in which case "
"thread/receive will select on the first thread to return a value. Returns "
"the message sent to the thread.")
"the message sent to the thread. If a timeout (in seconds) is provided, failure "
"to get a message will throw an error after the timeout has elapsed.")
},
{
"thread/close", cfun_thread_close,