From 0e690b4fa050e63c60d4d692e122f65060004a40 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Fri, 6 Dec 2019 09:21:36 -0600 Subject: [PATCH] Add timeout to thread/receive. If provided, throws an error if no message is received before timeout. Perhaps should return nil?. --- examples/threads.janet | 14 +++++++ src/core/thread.c | 92 ++++++++++++++++++++++++++++++------------ 2 files changed, 80 insertions(+), 26 deletions(-) diff --git a/examples/threads.janet b/examples/threads.janet index 61e2cb0a..ef74f6db 100644 --- a/examples/threads.janet +++ b/examples/threads.janet @@ -57,3 +57,17 @@ (def lines (:receive (-> (thread/new) (:send worker-tree) (:send "adam") (:send 0)))) (map print lines) + +# +# Receive timeout +# + +(def slow (make-worker "slow-loras" 4)) +(for i 0 50 + (try + (let [msg (thread/receive slow 0.46)] + (print "\n" msg)) + ([err] (prin ".") (:flush stdout)))) + +(print "\ndone timing, timeouts ending.") +(try (while true (print (:receive slow))) ([err] (print "done"))) diff --git a/src/core/thread.c b/src/core/thread.c index a0b13829..d4dbe3ff 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -30,6 +30,7 @@ #ifdef JANET_THREADS #include +#include static JANET_THREAD_LOCAL JanetThreadSelector janet_vm_thread_selector; @@ -198,17 +199,35 @@ static int janet_channel_receive(JanetChannel *rx, Janet *msg_out) { return ret; } +/* Convert an interval from now in an absolute timespec */ +static void janet_sec2ts(double sec, struct timespec *ts) { + struct timespec now; + clock_gettime(CLOCK_REALTIME, &now); + time_t tvsec = (time_t) floor(sec); + long tvnsec = (long) floor(1000000000.0 * (sec - ((double) tvsec))); + tvsec += now.tv_sec; + tvnsec += now.tv_nsec; + if (tvnsec >= 1000000000L) { + tvnsec -= 1000000000L; + tvsec += 1; + } + ts->tv_sec = tvsec; + ts->tv_nsec = tvnsec; +} + /* Get a message from one of the channels given. */ -static int janet_channel_select(int32_t n, JanetChannel **rxs, Janet *msg_out) { +static int janet_channel_select(int32_t n, JanetChannel **rxs, Janet *msg_out, double wait) { int32_t maxChannel = -1; - for (;;) { + int result = -1; + while (result < 0) { janet_vm_thread_selector.channel = NULL; /* Try each channel, first without acquiring locks and looking * only for existing messages, then with acquiring - * locks, which will not miss messages. */ - for (int trylock = 1; trylock >= 0; trylock--) { - for (int32_t i = 0; i < n; i++) { + * locks, which will not miss messages. If both fail, we also + * set the selector for each thread. */ + for (int trylock = 1; trylock >= 0 && result < 0; trylock--) { + for (int32_t i = 0; i < n && result < 0; i++) { JanetChannel *rx = rxs[i]; if (trylock) { if (rx->buf.count == 0 || pthread_mutex_trylock(&rx->lock)) continue; @@ -217,7 +236,10 @@ static int janet_channel_select(int32_t n, JanetChannel **rxs, Janet *msg_out) { } int status = janet_channel_receive(rxs[i], msg_out); pthread_mutex_unlock(&rxs[i]->lock); - if (status == 0) goto gotMessage; + if (status == 0) { + result = 0; + continue; + } maxChannel = maxChannel > i ? maxChannel : i; if (status == 2) { /* channel closed and will receive no more messages, drop it */ @@ -228,36 +250,52 @@ static int janet_channel_select(int32_t n, JanetChannel **rxs, Janet *msg_out) { } /* All channels closed */ - if (n == 0) return 1; + if (n == 0) result = 1; + + /* Don't wait for selector if we have a result */ + if (result >= 0) break; pthread_mutex_lock(&janet_vm_thread_selector.mutex); { + int status = 0; /* Wait until we have a channel */ if (NULL == janet_vm_thread_selector.channel) { - pthread_cond_wait( - &janet_vm_thread_selector.cond, - &janet_vm_thread_selector.mutex); + if (wait <= 0.0) { + pthread_cond_wait( + &janet_vm_thread_selector.cond, + &janet_vm_thread_selector.mutex); + } else { + struct timespec ts; + janet_sec2ts(wait, &ts); + status = pthread_cond_timedwait( + &janet_vm_thread_selector.cond, + &janet_vm_thread_selector.mutex, + &ts); + } + } + + /* Timeout? */ + if (status) { + result = 1; + } else { + JanetChannel *rx = janet_vm_thread_selector.channel; + int32_t index = 0; + while (rxs[index] != rx) index++; + rxs[index] = rxs[0]; + rxs[0] = rx; } - /* Got channel, swap it with first channel, and - * then go back to receiving messages. */ - JanetChannel *rx = janet_vm_thread_selector.channel; - int32_t index = 0; - while (rxs[index] != rx) index++; - rxs[index] = rxs[0]; - rxs[0] = rx; } pthread_mutex_unlock(&janet_vm_thread_selector.mutex); } -gotMessage: /* got message, unset selectors and return */ - for (int32_t j = 0; j <= maxChannel && j < n; j++) { + for (int32_t j = 0; j <= maxChannel; j++) { pthread_mutex_lock(&rxs[j]->lock); rxs[j]->selector = NULL; pthread_mutex_unlock(&rxs[j]->lock); } - return 0; + return result; } static Janet janet_thread_getter(void *p, Janet key); @@ -302,7 +340,7 @@ static int thread_worker(JanetChannel *tx) { /* Unmarshal the function */ Janet funcv; - int status = janet_channel_select(1, &rx, &funcv); + int status = janet_channel_select(1, &rx, &funcv, -1.0); if (status) goto error; if (!janet_checktype(funcv, JANET_FUNCTION)) goto error; JanetFunction *func = janet_unwrap_function(funcv); @@ -387,11 +425,12 @@ static Janet cfun_thread_send(int32_t argc, Janet *argv) { } static Janet cfun_thread_receive(int32_t argc, Janet *argv) { - janet_fixarity(argc, 1); + janet_arity(argc, 1, 2); int status; Janet out = janet_wrap_nil(); int32_t count; const Janet *items; + double wait = janet_optnumber(argv, argc, 1, -1.0); if (janet_indexed_view(argv[0], &items, &count)) { /* Select on multiple threads */ if (count == 0) janet_panic("expected at least 1 thread"); @@ -404,13 +443,13 @@ static Janet cfun_thread_receive(int32_t argc, Janet *argv) { JanetThread *thread = janet_getthread(items, i); if (thread->rx != NULL) rxs[realcount++] = thread->rx; } - status = janet_channel_select(realcount, rxs, &out); + status = janet_channel_select(realcount, rxs, &out, wait); if (rxs != rxs_stack) janet_sfree(rxs); } else { /* Get from one thread */ JanetThread *thread = janet_getthread(argv, 0); if (NULL == thread->rx) janet_panic("channel has closed"); - status = janet_channel_select(1, &thread->rx, &out); + status = janet_channel_select(1, &thread->rx, &out, wait); } if (status) { janet_panic("failed to receive message"); @@ -453,11 +492,12 @@ static const JanetReg threadlib_cfuns[] = { }, { "thread/receive", cfun_thread_receive, - JDOC("(thread/receive threads)\n\n" + JDOC("(thread/receive threads &opt timeout)\n\n" "Get a value sent to 1 or more threads. Will block if no value was sent to this thread " "yet. threads can also be an array or tuple of threads, in which case " "thread/receive will select on the first thread to return a value. Returns " - "the message sent to the thread.") + "the message sent to the thread. If a timeout (in seconds) is provided, failure " + "to get a message will throw an error after the timeout has elapsed.") }, { "thread/close", cfun_thread_close,