From 6f1695ecd4ad85562719288096a92ce32e43d6d4 Mon Sep 17 00:00:00 2001 From: bakpakin Date: Sat, 24 Jul 2021 20:30:36 -0500 Subject: [PATCH] Add utitities for interrupting the event loop. janet_loop1_interrupt makes the event loop compatible with safe interruptions for custom scheduling. Does this by exposing custom events on the event loop. A custom event schedules a function pointer to run in a way that can interrupt epoll_wait/poll/GetQueuedCompletionStatus. --- src/core/ev.c | 115 ++++++++++++++++++++++++++++++++++---------- src/core/os.c | 1 + src/core/state.c | 7 +-- src/core/vm.c | 2 +- src/include/janet.h | 35 +++++++++++++- 5 files changed, 128 insertions(+), 32 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index 81d28b56..ecc945ad 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -520,21 +520,6 @@ static Janet make_supervisor_event(const char *name, JanetFiber *fiber) { return janet_wrap_tuple(janet_tuple_n(tup, 2)); } -/* Run a top level task */ -static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) { - fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED; - Janet res; - JanetSignal sig = janet_continue_signal(fiber, value, &res, sigin); - JanetChannel *chan = (JanetChannel *)(fiber->supervisor_channel); - if (NULL == chan) { - if (sig != JANET_SIGNAL_EVENT && sig != JANET_SIGNAL_YIELD) { - janet_stacktrace(fiber, res); - } - } else if (sig == JANET_SIGNAL_OK || (fiber->flags & (1 << sig))) { - janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], fiber), 2); - } -} - /* Common init code */ void janet_ev_init_common(void) { janet_q_init(&janet_vm.spawn); @@ -864,7 +849,14 @@ static Janet janet_chanat_next(void *p, Janet key) { void janet_loop1_impl(int has_timeout, JanetTimestamp timeout); -void janet_loop1(void) { +int janet_loop_done(void) { + return !(janet_vm.listener_count || + (janet_vm.spawn.head != janet_vm.spawn.tail) || + janet_vm.tq_count || + janet_vm.extra_listeners); +} + +JanetFiber *janet_loop1(void) { /* Schedule expired timers */ JanetTimeout to; JanetTimestamp now = ts_now(); @@ -899,7 +891,21 @@ void janet_loop1(void) { while (janet_vm.spawn.head != janet_vm.spawn.tail) { JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK}; janet_q_pop(&janet_vm.spawn, &task, sizeof(task)); - run_one(task.fiber, task.value, task.sig); + task.fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED; + Janet res; + JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig); + JanetChannel *chan = (JanetChannel *)(task.fiber->supervisor_channel); + if (NULL == chan) { + if (sig != JANET_SIGNAL_EVENT && sig != JANET_SIGNAL_YIELD && sig != JANET_SIGNAL_INTERRUPT) { + janet_stacktrace(task.fiber, res); + } + } else if (sig == JANET_SIGNAL_OK || (task.fiber->flags & (1 << sig))) { + janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], task.fiber), 2); + } + if (sig == JANET_SIGNAL_INTERRUPT) { + /* On interrupts, return the interrupted fiber immediately */ + return task.fiber; + } } /* Poll for events */ @@ -916,11 +922,28 @@ void janet_loop1(void) { janet_loop1_impl(has_timeout, to.when); } } + + /* No fiber was interrupted */ + return NULL; +} + +/* Same as janet_interpreter_interrupt, but will also + * break out of the event loop if waiting for an event + * (say, waiting for ev/sleep to finish). Does this by pushing + * an empty event to the event loop. */ +void janet_loop1_interrupt(JanetVM *vm) { + janet_interpreter_interrupt(vm); + JanetEVGenericMessage msg = {0}; + JanetCallback cb = NULL; + janet_ev_post_event(vm, cb, msg); } void janet_loop(void) { - while (janet_vm.listener_count || (janet_vm.spawn.head != janet_vm.spawn.tail) || janet_vm.tq_count || janet_vm.extra_listeners) { - janet_loop1(); + while (!janet_loop_done()) { + JanetFiber *interrupted_fiber = janet_loop1(); + if (NULL != interrupted_fiber) { + janet_schedule(interrupted_fiber, janet_wrap_nil()); + } } } @@ -945,8 +968,9 @@ static void janet_ev_setup_selfpipe(void) { static void janet_ev_handle_selfpipe(void) { JanetSelfPipeEvent response; while (read(janet_vm.selfpipe[0], &response, sizeof(response)) > 0) { - response.cb(response.msg); - janet_ev_dec_refcount(); + if (NULL != response.cb) { + response.cb(response.msg); + } } } @@ -1014,9 +1038,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { if (0 == completionKey) { /* Custom event */ JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped); - response->cb(response->msg); + if (NULL != response->cb) { + response->cb(response->msg); + } janet_free(response); - janet_ev_dec_refcount(); } else { /* Normal event */ JanetStream *stream = (JanetStream *) completionKey; @@ -1310,6 +1335,45 @@ void janet_ev_deinit(void) { * End poll implementation */ +/* + * Generic Callback system. Post a function pointer + data to the event loop (from another + * thread or even a signal handler). Allows posting events from another thread or signal handler. + */ +void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGenericMessage msg) { + vm = vm ? vm : &janet_vm; +#ifdef JANET_WINDOWS + JanetHandle iocp = vm->iocp; + JanetSelfPipeEvent *event = janet_malloc(sizeof(JanetSelfPipeEvent)); + if (NULL == event) { + JANET_OUT_OF_MEMORY; + } + event->msg = msg; + event->cb = cb; + janet_assert(PostQueuedCompletionStatus(iocp, + sizeof(JanetSelfPipeEvent), + 0, + (LPOVERLAPPED) event), + "failed to post completion event"); +#else + JanetSelfPipeEvent event; + event.msg = msg; + event.cb = cb; + int fd = vm->selfpipe; + /* handle a bit of back pressure before giving up. */ + int tries = 4; + while (tries > 0) { + int status; + do { + status = write(fd, &event, sizeof(event)); + } while (status == -1 && errno == EINTR); + if (status > 0) break; + sleep(1); + tries--; + } + janet_assert(tries > 0, "failed to write event to self-pipe"); +#endif +} + /* * Threaded calls */ @@ -1391,6 +1455,7 @@ void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage ar /* Default callback for janet_ev_threaded_await. */ void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) { + janet_ev_dec_refcount(); if (return_value.fiber == NULL) { return; } @@ -1808,8 +1873,8 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) } else #endif { - /* - * File handles in IOCP need to specify this if they are writing to the + /* + * File handles in IOCP need to specify this if they are writing to the * ends of files, like how this is used here. * If the underlying resource doesn't support seeking * byte offsets, they will be ignored diff --git a/src/core/os.c b/src/core/os.c index 2bd7f3ed..47aba63e 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -411,6 +411,7 @@ static JanetEVGenericMessage janet_proc_wait_subr(JanetEVGenericMessage args) { /* Callback that is called in main thread when subroutine completes. */ static void janet_proc_wait_cb(JanetEVGenericMessage args) { + janet_ev_dec_refcount(); int status = args.argi; JanetProc *proc = (JanetProc *) args.argp; if (NULL != proc) { diff --git a/src/core/state.c b/src/core/state.c index 027fa4a0..ffe9e660 100644 --- a/src/core/state.c +++ b/src/core/state.c @@ -56,9 +56,6 @@ void janet_vm_load(JanetVM *from) { * exit the interpeter loop when convenient. You can optionally * use NULL to interrupt the current VM when convenient */ void janet_interpreter_interrupt(JanetVM *vm) { - if (NULL == vm) { - janet_vm.auto_suspend = 1; - } else { - vm->auto_suspend = 1; - } + vm = vm ? vm : &janet_vm; + vm->auto_suspend = 1; } diff --git a/src/core/vm.c b/src/core/vm.c index 3a8db3ae..175aa0ae 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -118,7 +118,7 @@ if ((COND) && janet_vm.auto_suspend) { \ janet_vm.auto_suspend = 0; \ fiber->flags |= (JANET_FIBER_RESUME_NO_USEVAL | JANET_FIBER_RESUME_NO_SKIP); \ - vm_return(JANET_SIGNAL_EVENT, janet_wrap_nil()); \ + vm_return(JANET_SIGNAL_INTERRUPT, janet_wrap_nil()); \ } \ } while (0) #endif diff --git a/src/include/janet.h b/src/include/janet.h index 61f0e7ad..09b46530 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -355,6 +355,7 @@ typedef enum { } JanetSignal; #define JANET_SIGNAL_EVENT JANET_SIGNAL_USER9 +#define JANET_SIGNAL_INTERRUPT JANET_SIGNAL_USER8 /* Fiber statuses - mostly corresponds to signals. */ typedef enum { @@ -1281,6 +1282,31 @@ extern JANET_API const JanetAbstractType janet_stream_type; /* Run the event loop */ JANET_API void janet_loop(void); +/* Run the event loop, but allow for user scheduled interrupts triggered + * by janet_loop1_interrupt being called in library code, a signal handler, or + * another thread. + * + * Example: + * + * while (!janet_loop_done()) { + * // One turn of the event loop + * JanetFiber *interrupted_fiber = janet_loop1(); + * // interrupted_fiber may be NULL + * // do some work here periodically... + * if (NULL != interrupted_fiber) { + * if (cancel_interrupted_fiber) { + * janet_cancel(interrupted_fiber, janet_cstringv("fiber was interrupted for [reason]")); + * } else { + * janet_schedule(interrupted_fiber, janet_wrap_nil()); + * } + * } + * } + * + */ +JANET_API int janet_loop_done(void); +JANET_API JanetFiber *janet_loop1(void); +JANET_API void janet_loop1_interrupt(JanetVM *vm); + /* Wrapper around streams */ JANET_API JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods); JANET_API void janet_stream_close(JanetStream *stream); @@ -1344,13 +1370,20 @@ typedef struct { /* Function pointer that is run in the thread pool */ typedef JanetEVGenericMessage(*JanetThreadedSubroutine)(JanetEVGenericMessage arguments); -/* Handler that is run in the main thread with the result of the JanetAsyncSubroutine */ +/* Handler for events posted to the event loop */ +typedef void (*JanetCallback)(JanetEVGenericMessage return_value); + +/* Handler that is run in the main thread with the result of the JanetAsyncSubroutine (same as JanetCallback) */ typedef void (*JanetThreadedCallback)(JanetEVGenericMessage return_value); /* API calls for quickly offloading some work in C to a new thread or thread pool. */ JANET_API void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb); JANET_NO_RETURN JANET_API void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp); +/* Post callback + userdata to an event loop. Takes the vm parameter to allow posting from other + * threads or signal handlers. Use NULL to post to the current thread. */ +JANET_API void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGenericMessage msg); + /* Callback used by janet_ev_threaded_await */ JANET_API void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value);