Add utitities for interrupting the event loop.

janet_loop1_interrupt makes the event loop compatible
with safe interruptions for custom scheduling. Does this by exposing
custom events on the event loop. A custom event schedules a function pointer
to run in a way that can interrupt
epoll_wait/poll/GetQueuedCompletionStatus.
This commit is contained in:
bakpakin 2021-07-24 20:30:36 -05:00
parent 160dd830a0
commit 6f1695ecd4
5 changed files with 128 additions and 32 deletions

View File

@ -520,21 +520,6 @@ static Janet make_supervisor_event(const char *name, JanetFiber *fiber) {
return janet_wrap_tuple(janet_tuple_n(tup, 2));
}
/* Run a top level task */
static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) {
fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED;
Janet res;
JanetSignal sig = janet_continue_signal(fiber, value, &res, sigin);
JanetChannel *chan = (JanetChannel *)(fiber->supervisor_channel);
if (NULL == chan) {
if (sig != JANET_SIGNAL_EVENT && sig != JANET_SIGNAL_YIELD) {
janet_stacktrace(fiber, res);
}
} else if (sig == JANET_SIGNAL_OK || (fiber->flags & (1 << sig))) {
janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], fiber), 2);
}
}
/* Common init code */
void janet_ev_init_common(void) {
janet_q_init(&janet_vm.spawn);
@ -864,7 +849,14 @@ static Janet janet_chanat_next(void *p, Janet key) {
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout);
void janet_loop1(void) {
int janet_loop_done(void) {
return !(janet_vm.listener_count ||
(janet_vm.spawn.head != janet_vm.spawn.tail) ||
janet_vm.tq_count ||
janet_vm.extra_listeners);
}
JanetFiber *janet_loop1(void) {
/* Schedule expired timers */
JanetTimeout to;
JanetTimestamp now = ts_now();
@ -899,7 +891,21 @@ void janet_loop1(void) {
while (janet_vm.spawn.head != janet_vm.spawn.tail) {
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK};
janet_q_pop(&janet_vm.spawn, &task, sizeof(task));
run_one(task.fiber, task.value, task.sig);
task.fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED;
Janet res;
JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig);
JanetChannel *chan = (JanetChannel *)(task.fiber->supervisor_channel);
if (NULL == chan) {
if (sig != JANET_SIGNAL_EVENT && sig != JANET_SIGNAL_YIELD && sig != JANET_SIGNAL_INTERRUPT) {
janet_stacktrace(task.fiber, res);
}
} else if (sig == JANET_SIGNAL_OK || (task.fiber->flags & (1 << sig))) {
janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], task.fiber), 2);
}
if (sig == JANET_SIGNAL_INTERRUPT) {
/* On interrupts, return the interrupted fiber immediately */
return task.fiber;
}
}
/* Poll for events */
@ -916,11 +922,28 @@ void janet_loop1(void) {
janet_loop1_impl(has_timeout, to.when);
}
}
/* No fiber was interrupted */
return NULL;
}
/* Same as janet_interpreter_interrupt, but will also
* break out of the event loop if waiting for an event
* (say, waiting for ev/sleep to finish). Does this by pushing
* an empty event to the event loop. */
void janet_loop1_interrupt(JanetVM *vm) {
janet_interpreter_interrupt(vm);
JanetEVGenericMessage msg = {0};
JanetCallback cb = NULL;
janet_ev_post_event(vm, cb, msg);
}
void janet_loop(void) {
while (janet_vm.listener_count || (janet_vm.spawn.head != janet_vm.spawn.tail) || janet_vm.tq_count || janet_vm.extra_listeners) {
janet_loop1();
while (!janet_loop_done()) {
JanetFiber *interrupted_fiber = janet_loop1();
if (NULL != interrupted_fiber) {
janet_schedule(interrupted_fiber, janet_wrap_nil());
}
}
}
@ -945,8 +968,9 @@ static void janet_ev_setup_selfpipe(void) {
static void janet_ev_handle_selfpipe(void) {
JanetSelfPipeEvent response;
while (read(janet_vm.selfpipe[0], &response, sizeof(response)) > 0) {
response.cb(response.msg);
janet_ev_dec_refcount();
if (NULL != response.cb) {
response.cb(response.msg);
}
}
}
@ -1014,9 +1038,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
if (0 == completionKey) {
/* Custom event */
JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped);
response->cb(response->msg);
if (NULL != response->cb) {
response->cb(response->msg);
}
janet_free(response);
janet_ev_dec_refcount();
} else {
/* Normal event */
JanetStream *stream = (JanetStream *) completionKey;
@ -1310,6 +1335,45 @@ void janet_ev_deinit(void) {
* End poll implementation
*/
/*
* Generic Callback system. Post a function pointer + data to the event loop (from another
* thread or even a signal handler). Allows posting events from another thread or signal handler.
*/
void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGenericMessage msg) {
vm = vm ? vm : &janet_vm;
#ifdef JANET_WINDOWS
JanetHandle iocp = vm->iocp;
JanetSelfPipeEvent *event = janet_malloc(sizeof(JanetSelfPipeEvent));
if (NULL == event) {
JANET_OUT_OF_MEMORY;
}
event->msg = msg;
event->cb = cb;
janet_assert(PostQueuedCompletionStatus(iocp,
sizeof(JanetSelfPipeEvent),
0,
(LPOVERLAPPED) event),
"failed to post completion event");
#else
JanetSelfPipeEvent event;
event.msg = msg;
event.cb = cb;
int fd = vm->selfpipe;
/* handle a bit of back pressure before giving up. */
int tries = 4;
while (tries > 0) {
int status;
do {
status = write(fd, &event, sizeof(event));
} while (status == -1 && errno == EINTR);
if (status > 0) break;
sleep(1);
tries--;
}
janet_assert(tries > 0, "failed to write event to self-pipe");
#endif
}
/*
* Threaded calls
*/
@ -1391,6 +1455,7 @@ void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage ar
/* Default callback for janet_ev_threaded_await. */
void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) {
janet_ev_dec_refcount();
if (return_value.fiber == NULL) {
return;
}
@ -1808,8 +1873,8 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
} else
#endif
{
/*
* File handles in IOCP need to specify this if they are writing to the
/*
* File handles in IOCP need to specify this if they are writing to the
* ends of files, like how this is used here.
* If the underlying resource doesn't support seeking
* byte offsets, they will be ignored

View File

@ -411,6 +411,7 @@ static JanetEVGenericMessage janet_proc_wait_subr(JanetEVGenericMessage args) {
/* Callback that is called in main thread when subroutine completes. */
static void janet_proc_wait_cb(JanetEVGenericMessage args) {
janet_ev_dec_refcount();
int status = args.argi;
JanetProc *proc = (JanetProc *) args.argp;
if (NULL != proc) {

View File

@ -56,9 +56,6 @@ void janet_vm_load(JanetVM *from) {
* exit the interpeter loop when convenient. You can optionally
* use NULL to interrupt the current VM when convenient */
void janet_interpreter_interrupt(JanetVM *vm) {
if (NULL == vm) {
janet_vm.auto_suspend = 1;
} else {
vm->auto_suspend = 1;
}
vm = vm ? vm : &janet_vm;
vm->auto_suspend = 1;
}

View File

@ -118,7 +118,7 @@
if ((COND) && janet_vm.auto_suspend) { \
janet_vm.auto_suspend = 0; \
fiber->flags |= (JANET_FIBER_RESUME_NO_USEVAL | JANET_FIBER_RESUME_NO_SKIP); \
vm_return(JANET_SIGNAL_EVENT, janet_wrap_nil()); \
vm_return(JANET_SIGNAL_INTERRUPT, janet_wrap_nil()); \
} \
} while (0)
#endif

View File

@ -355,6 +355,7 @@ typedef enum {
} JanetSignal;
#define JANET_SIGNAL_EVENT JANET_SIGNAL_USER9
#define JANET_SIGNAL_INTERRUPT JANET_SIGNAL_USER8
/* Fiber statuses - mostly corresponds to signals. */
typedef enum {
@ -1281,6 +1282,31 @@ extern JANET_API const JanetAbstractType janet_stream_type;
/* Run the event loop */
JANET_API void janet_loop(void);
/* Run the event loop, but allow for user scheduled interrupts triggered
* by janet_loop1_interrupt being called in library code, a signal handler, or
* another thread.
*
* Example:
*
* while (!janet_loop_done()) {
* // One turn of the event loop
* JanetFiber *interrupted_fiber = janet_loop1();
* // interrupted_fiber may be NULL
* // do some work here periodically...
* if (NULL != interrupted_fiber) {
* if (cancel_interrupted_fiber) {
* janet_cancel(interrupted_fiber, janet_cstringv("fiber was interrupted for [reason]"));
* } else {
* janet_schedule(interrupted_fiber, janet_wrap_nil());
* }
* }
* }
*
*/
JANET_API int janet_loop_done(void);
JANET_API JanetFiber *janet_loop1(void);
JANET_API void janet_loop1_interrupt(JanetVM *vm);
/* Wrapper around streams */
JANET_API JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods);
JANET_API void janet_stream_close(JanetStream *stream);
@ -1344,13 +1370,20 @@ typedef struct {
/* Function pointer that is run in the thread pool */
typedef JanetEVGenericMessage(*JanetThreadedSubroutine)(JanetEVGenericMessage arguments);
/* Handler that is run in the main thread with the result of the JanetAsyncSubroutine */
/* Handler for events posted to the event loop */
typedef void (*JanetCallback)(JanetEVGenericMessage return_value);
/* Handler that is run in the main thread with the result of the JanetAsyncSubroutine (same as JanetCallback) */
typedef void (*JanetThreadedCallback)(JanetEVGenericMessage return_value);
/* API calls for quickly offloading some work in C to a new thread or thread pool. */
JANET_API void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb);
JANET_NO_RETURN JANET_API void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp);
/* Post callback + userdata to an event loop. Takes the vm parameter to allow posting from other
* threads or signal handlers. Use NULL to post to the current thread. */
JANET_API void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGenericMessage msg);
/* Callback used by janet_ev_threaded_await */
JANET_API void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value);