From 2ed186664febd3eb30e9ee04a235130fc8121978 Mon Sep 17 00:00:00 2001 From: Michael Camilleri Date: Tue, 25 Mar 2025 20:27:44 +0900 Subject: [PATCH] Support using a background thread to monitor deadlines --- src/core/ev.c | 108 +++++++++++++++++++++++++++++++++++++++----- src/core/state.h | 6 +++ test/suite-ev.janet | 4 ++ 3 files changed, 107 insertions(+), 11 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index e51fee80..ecbd39bc 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -112,6 +112,13 @@ typedef struct { JanetHandle write_pipe; } JanetEVThreadInit; +/* Structure used to initialize threads that run timeouts */ +typedef struct { + double sec; + JanetVM *vm; + JanetFiber *fiber; +} JanetThreadedTimeout; + #define JANET_MAX_Q_CAPACITY 0x7FFFFFF static void janet_q_init(JanetQueue *q) { @@ -1431,6 +1438,17 @@ JanetFiber *janet_loop1(void) { while ((has_timeout = peek_timeout(&to))) { if (to.curr_fiber != NULL) { if (!janet_fiber_can_resume(to.curr_fiber)) { + if (to.has_worker) { +#ifdef JANET_WINDOWS + QueueUserAPC(janet_timeout_stop, to.worker, 0); + WaitForSingleObject(to.worker, INFINITE); + CloseHandle(to.worker); +#else + pthread_cancel(to.worker); + void *res; + pthread_join(to.worker, &res); +#endif + } janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(to.curr_fiber)); pop_timeout(0); continue; @@ -3102,27 +3120,95 @@ JANET_CORE_FN(cfun_ev_sleep, janet_sleep_await(sec); } +#ifdef JANET_WINDOWS +static VOID CALLBACK janet_timeout_stop(ULONG_PTR ptr) { + UNREFERENCED_PARAMETER(ptr); + ExitThread(0); +} +#endif + +static void janet_timeout_cb(JanetEVGenericMessage msg) { + (void) msg; + janet_interpreter_interrupt_handled(&janet_vm); +} + +#ifdef JANET_WINDOWS +static DWORD WINAPI janet_timeout_body(LPVOID ptr) { + JanetThreadedTimeout *tto = (JanetThreadedTimeout *)ptr; + double sec = (tto->sec > 0) ? tto->sec : 0; + SleepEx((DWORD)(tto->sec * 1000), TRUE); + if (janet_fiber_can_resume(tto->fiber)) { + janet_interpreter_interrupt(tto->vm); + JanetEVGenericMessage msg = {0}; + janet_ev_post_event(tto->vm, janet_timeout_cb, msg); + } + return 0; +} +#else +static void *janet_timeout_body(void *ptr) { + JanetThreadedTimeout *tto = (JanetThreadedTimeout *)ptr; + struct timespec ts; + ts.tv_sec = (time_t) tto->sec; + ts.tv_nsec = (tto->sec <= UINT32_MAX) + ? (long)((tto->sec - ((uint32_t)tto->sec)) * 1000000000) + : 0; + nanosleep(&ts, &ts); + if (janet_fiber_can_resume(tto->fiber)) { + janet_interpreter_interrupt(tto->vm); + JanetEVGenericMessage msg = {0}; + janet_ev_post_event(tto->vm, janet_timeout_cb, msg); + } + return NULL; +} +#endif + JANET_CORE_FN(cfun_ev_deadline, - "(ev/deadline sec &opt tocancel tocheck)", - "Schedules the event loop to try to cancel the `tocancel` " - "task as with `ev/cancel`. After `sec` seconds, the event " - "loop will attempt cancellation of `tocancel` if the " - "`tocheck` fiber is resumable. `sec` is a number that can " - "have a fractional part. `tocancel` defaults to " - "`(fiber/root)`, but if specified, must be a task (root " - "fiber). `tocheck` defaults to `(fiber/current)`, but if " - "specified, should be a fiber. Returns `tocancel` " - "immediately.") { - janet_arity(argc, 1, 3); + "(ev/deadline sec &opt tocancel tocheck intr?)", + "Schedules the event loop to try to cancel the `tocancel` task as with `ev/cancel`. " + "After `sec` seconds, the event loop will attempt cancellation of `tocancel` if the " + "`tocheck` fiber is resumable. `sec` is a number that can have a fractional part. " + "`tocancel` defaults to `(fiber/root)`, but if specified, must be a task (root " + "fiber). `tocheck` defaults to `(fiber/current)`, but if specified, must be a fiber. " + "Returns `tocancel` immediately. If `interrupt?` is set to true, will create a " + "background thread to try to interrupt the VM if the timeout expires.") { + janet_arity(argc, 1, 4); double sec = janet_getnumber(argv, 0); JanetFiber *tocancel = janet_optfiber(argv, argc, 1, janet_vm.root_fiber); JanetFiber *tocheck = janet_optfiber(argv, argc, 2, janet_vm.fiber); + int use_interrupt = janet_optboolean(argv, argc, 3, 0); JanetTimeout to; to.when = ts_delta(ts_now(), sec); to.fiber = tocancel; to.curr_fiber = tocheck; to.is_error = 0; to.sched_id = to.fiber->sched_id; + if (use_interrupt) { + JanetThreadedTimeout *tto = janet_malloc(sizeof(JanetThreadedTimeout)); + if (NULL == tto) { + JANET_OUT_OF_MEMORY; + } + tto->sec = sec; + tto->vm = &janet_vm; + tto->fiber = tocheck; +#ifdef JANET_WINDOWS + HANDLE worker = CreateThread(NULL, 0, janet_timeout_body, tto, 0, NULL); + if (NULL == worker) { + janet_free(tto); + janet_panic("failed to create thread"); + } +#else + pthread_t worker; + int err = pthread_create(&worker, NULL, janet_timeout_body, tto); + if (err) { + janet_free(tto); + janet_panicf("%s", janet_strerror(err)); + } +#endif + to.has_worker = 1; + to.worker = worker; + } else { + to.has_worker = 0; + } add_timeout(to); return janet_wrap_fiber(tocancel); } diff --git a/src/core/state.h b/src/core/state.h index 7fb42139..0b69d6e9 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -59,6 +59,12 @@ typedef struct { JanetFiber *curr_fiber; uint32_t sched_id; int is_error; + int has_worker; +#ifdef JANET_WINDOWS + HANDLE worker; +#else + pthread_t worker; +#endif } JanetTimeout; /* Registry table for C functions - contains metadata that can diff --git a/test/suite-ev.janet b/test/suite-ev.janet index ee14f468..cf040e2c 100644 --- a/test/suite-ev.janet +++ b/test/suite-ev.janet @@ -550,4 +550,8 @@ (ev/sleep 0.15) (assert (not terminated-normally) "early termination failure 3")) +(let [f (coro (forever :foo))] + (ev/deadline 0.01 nil f true) + (assert-error "deadline expired" (resume f))) + (end-suite)