From 9d231926147f4e83cc7eb82fd6614fbf5a876a1c Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sat, 5 Dec 2020 10:32:34 -0600 Subject: [PATCH] Add ev/deadline and ev/with-deadline. This should be more useful than timeouts in real-world use cases. The deadline system is based on fibers and is target to much more coarse-grained (and therfor reliable) timeouts than things like ev/sleep and timeout arguments. --- src/boot/boot.janet | 11 ++++++++- src/core/ev.c | 57 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 61 insertions(+), 7 deletions(-) diff --git a/src/boot/boot.janet b/src/boot/boot.janet index 0b867a62..4dc0fc94 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -3019,7 +3019,16 @@ (defmacro ev/spawn "Run some code in a new fiber. This is shorthand for (ev/call (fn [] ;body))." [& body] - ~(,ev/call (fn [] ,;body)))) + ~(,ev/call (fn [] ,;body))) + + (defmacro ev/with-deadline + `Run a body of code with a deadline, such that if the code does not complete before + the deadline is up, it will be canceled.` + [deadline & body] + (with-syms [f] + ~(let [,f (coro ,;body)] + (,ev/deadline ,deadline nil ,f) + (,resume ,f))))) (compwhen (dyn 'net/listen) (defn net/server diff --git a/src/core/ev.c b/src/core/ev.c index bad6a932..e270d92c 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -131,6 +131,7 @@ typedef struct JanetTimeout JanetTimeout; struct JanetTimeout { JanetTimestamp when; JanetFiber *fiber; + JanetFiber *curr_fiber; uint32_t sched_id; int is_error; }; @@ -433,6 +434,9 @@ void janet_ev_mark(void) { /* Pending timeouts */ for (size_t i = 0; i < janet_vm_tq_count; i++) { janet_mark(janet_wrap_fiber(janet_vm_tq[i].fiber)); + if (janet_vm_tq[i].curr_fiber != NULL) { + janet_mark(janet_wrap_fiber(janet_vm_tq[i].curr_fiber)); + } } /* Pending listeners */ @@ -486,6 +490,7 @@ void janet_addtimeout(double sec) { JanetTimeout to; to.when = ts_delta(ts_now(), sec); to.fiber = fiber; + to.curr_fiber = NULL; to.sched_id = fiber->sched_id; to.is_error = 1; add_timeout(to); @@ -766,11 +771,27 @@ void janet_loop1(void) { JanetTimestamp now = ts_now(); while (peek_timeout(&to) && to.when <= now) { pop_timeout(0); - if (to.fiber->sched_id == to.sched_id) { - if (to.is_error) { - janet_cancel(to.fiber, janet_cstringv("timeout")); - } else { - janet_schedule(to.fiber, janet_wrap_nil()); + if (to.curr_fiber != NULL) { + /* This is a deadline (for a fiber, not a function call) */ + JanetFiberStatus s = janet_fiber_status(to.curr_fiber); + int isFinished = s == (JANET_STATUS_DEAD || + s == JANET_STATUS_ERROR || + s == JANET_STATUS_USER0 || + s == JANET_STATUS_USER1 || + s == JANET_STATUS_USER2 || + s == JANET_STATUS_USER3 || + s == JANET_STATUS_USER4); + if (!isFinished) { + janet_cancel(to.fiber, janet_cstringv("deadline expired")); + } + } else { + /* This is a timeout (for a function call, not a whole fiber) */ + if (to.fiber->sched_id == to.sched_id) { + if (to.is_error) { + janet_cancel(to.fiber, janet_cstringv("timeout")); + } else { + janet_schedule(to.fiber, janet_wrap_nil()); + } } } } @@ -786,7 +807,7 @@ void janet_loop1(void) { memset(&to, 0, sizeof(to)); int has_timeout; /* Drop timeouts that are no longer needed */ - while ((has_timeout = peek_timeout(&to)) && to.fiber->sched_id != to.sched_id) { + while ((has_timeout = peek_timeout(&to)) && (to.curr_fiber == NULL) && to.fiber->sched_id != to.sched_id) { pop_timeout(0); } /* Run polling implementation only if pending timeouts or pending events */ @@ -1699,10 +1720,26 @@ static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { to.fiber = janet_vm_root_fiber; to.is_error = 0; to.sched_id = to.fiber->sched_id; + to.curr_fiber = NULL; add_timeout(to); janet_await(); } +static Janet cfun_ev_deadline(int32_t argc, Janet *argv) { + janet_arity(argc, 1, 3); + double sec = janet_getnumber(argv, 0); + JanetFiber *tocancel = janet_optfiber(argv, argc, 1, janet_vm_root_fiber); + JanetFiber *tocheck = janet_optfiber(argv, argc, 2, janet_vm_fiber); + JanetTimeout to; + to.when = ts_delta(ts_now(), sec); + to.fiber = tocancel; + to.curr_fiber = tocheck; + to.is_error = 0; + to.sched_id = to.fiber->sched_id; + add_timeout(to); + return janet_wrap_fiber(tocancel); +} + static Janet cfun_ev_cancel(int32_t argc, Janet *argv) { janet_fixarity(argc, 2); JanetFiber *fiber = janet_getfiber(argv, 0); @@ -1776,6 +1813,14 @@ static const JanetReg ev_cfuns[] = { JDOC("(ev/sleep sec)\n\n" "Suspend the current fiber for sec seconds without blocking the event loop.") }, + { + "ev/deadline", cfun_ev_deadline, + JDOC("(ev/deadline sec &opt tocancel tocheck)\n\n" + "Set a deadline for a fiber `tocheck`. If `tocheck` is not finished after `sec` seconds, " + "`tocancel` will be canceled as with `ev/cancel`. " + "If `tocancel` and `tocheck` are not given, they default to `(fiber/root)` and " + "`(fiber/current)` respectively. Returns `tocancel`.") + }, { "ev/chan", cfun_channel_new, JDOC("(ev/chan &opt capacity)\n\n"