Allow multiple simultaneous interrupts cleanly for #1262

Instead of setting a flag, each interrupt increments an atomic
counter. When the interrupt is finally handled, either by scheduling
code to run on the event loop or executing some out of band code, the
user must now decrement the interrupt counter with
janet_interpreter_interrupt_handled. While this counter is non-zero, the
event loop will not enter the interpreter. This changes the API a bit but
makes it possible and easy to handle signals without race conditions
or scheduler hacks, as the runtime can ensure that high priority code is
run before re-entering possibly blocking interpreter code again.

Also included is a new function janet_schedule_soon, which prepends to
the task queue instead of appending, allowing interrupt handler to skip
ahead of all other scheduled fibers.

Lastly, also update meson default options to include the
interpreter_interrupt code and raise a runtime error if os/sigaction
is used with interpreter interrupt but that build option is not enabled.
This commit is contained in:
Calvin Rose 2023-08-23 20:14:38 -05:00
parent 21eab7e9cc
commit ffd79c6097
7 changed files with 117 additions and 80 deletions

View File

@ -18,7 +18,7 @@ option('realpath', type : 'boolean', value : true)
option('simple_getline', type : 'boolean', value : false)
option('epoll', type : 'boolean', value : false)
option('kqueue', type : 'boolean', value : false)
option('interpreter_interrupt', type : 'boolean', value : false)
option('interpreter_interrupt', type : 'boolean', value : true)
option('ffi', type : 'boolean', value : true)
option('ffi_jit', type : 'boolean', value : true)

View File

@ -127,7 +127,7 @@ static int32_t janet_q_count(JanetQueue *q) {
: (q->tail - q->head);
}
static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) {
static int janet_q_maybe_resize(JanetQueue *q, size_t itemsize) {
int32_t count = janet_q_count(q);
/* Resize if needed */
if (count + 1 >= q->capacity) {
@ -151,11 +151,27 @@ static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) {
}
q->capacity = newcap;
}
return 0;
}
static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) {
if (janet_q_maybe_resize(q, itemsize)) return 1;
memcpy((char *) q->data + itemsize * q->tail, item, itemsize);
q->tail = q->tail + 1 < q->capacity ? q->tail + 1 : 0;
return 0;
}
static int janet_q_push_head(JanetQueue *q, void *item, size_t itemsize) {
if (janet_q_maybe_resize(q, itemsize)) return 1;
int32_t newhead = q->head - 1;
if (newhead < 0) {
newhead += q->capacity;
}
memcpy((char *) q->data + itemsize * newhead, item, itemsize);
q->head = newhead;
return 0;
}
static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) {
if (q->head == q->tail) return 1;
memcpy(out, (char *) q->data + itemsize * q->head, itemsize);
@ -468,7 +484,7 @@ const JanetAbstractType janet_stream_type = {
};
/* Register a fiber to resume with value */
void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) {
static void janet_schedule_general(JanetFiber *fiber, Janet value, JanetSignal sig, int soon) {
if (fiber->gc.flags & JANET_FIBER_EV_FLAG_CANCELED) return;
if (!(fiber->gc.flags & JANET_FIBER_FLAG_ROOT)) {
Janet task_element = janet_wrap_fiber(fiber);
@ -477,7 +493,19 @@ void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) {
JanetTask t = { fiber, value, sig, ++fiber->sched_id };
fiber->gc.flags |= JANET_FIBER_FLAG_ROOT;
if (sig == JANET_SIGNAL_ERROR) fiber->gc.flags |= JANET_FIBER_EV_FLAG_CANCELED;
janet_q_push(&janet_vm.spawn, &t, sizeof(t));
if (soon) {
janet_q_push_head(&janet_vm.spawn, &t, sizeof(t));
} else {
janet_q_push(&janet_vm.spawn, &t, sizeof(t));
}
}
void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) {
janet_schedule_general(fiber, value, sig, 0);
}
void janet_schedule_soon(JanetFiber *fiber, Janet value, JanetSignal sig) {
janet_schedule_general(fiber, value, sig, 1);
}
void janet_cancel(JanetFiber *fiber, Janet value) {
@ -1300,7 +1328,64 @@ int janet_loop_done(void) {
janet_vm.extra_listeners);
}
static void janet_loop1_poll(void) {
JanetFiber *janet_loop1(void) {
/* Schedule expired timers */
JanetTimeout to;
JanetTimestamp now = ts_now();
while (peek_timeout(&to) && to.when <= now) {
pop_timeout(0);
if (to.curr_fiber != NULL) {
if (janet_fiber_can_resume(to.curr_fiber)) {
janet_cancel(to.fiber, janet_cstringv("deadline expired"));
}
} else {
/* This is a timeout (for a function call, not a whole fiber) */
if (to.fiber->sched_id == to.sched_id) {
if (to.is_error) {
janet_cancel(to.fiber, janet_cstringv("timeout"));
} else {
janet_schedule(to.fiber, janet_wrap_nil());
}
}
}
}
/* Run scheduled fibers unless interrupts need to be handled. */
while (janet_vm.spawn.head != janet_vm.spawn.tail) {
/* Don't run until all interrupts have been marked as handled by calling janet_interpreter_interrupt_handled */
if (janet_vm.auto_suspend) break;
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK, 0};
janet_q_pop(&janet_vm.spawn, &task, sizeof(task));
if (task.fiber->gc.flags & JANET_FIBER_EV_FLAG_SUSPENDED) janet_ev_dec_refcount();
task.fiber->gc.flags &= ~(JANET_FIBER_EV_FLAG_CANCELED | JANET_FIBER_EV_FLAG_SUSPENDED);
if (task.expected_sched_id != task.fiber->sched_id) continue;
Janet res;
JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig);
if (!janet_fiber_can_resume(task.fiber)) {
janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(task.fiber));
}
void *sv = task.fiber->supervisor_channel;
int is_suspended = sig == JANET_SIGNAL_EVENT || sig == JANET_SIGNAL_YIELD || sig == JANET_SIGNAL_INTERRUPT;
if (is_suspended) {
task.fiber->gc.flags |= JANET_FIBER_EV_FLAG_SUSPENDED;
janet_ev_inc_refcount();
}
if (NULL == sv) {
if (!is_suspended) {
janet_stacktrace_ext(task.fiber, res, "");
}
} else if (sig == JANET_SIGNAL_OK || (task.fiber->flags & (1 << sig))) {
JanetChannel *chan = janet_channel_unwrap(sv);
janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig],
task.fiber, chan->is_threaded), 2);
} else if (!is_suspended) {
janet_stacktrace_ext(task.fiber, res, "");
}
if (sig == JANET_SIGNAL_INTERRUPT) {
return task.fiber;
}
}
/* Poll for events */
if (janet_vm.listener_count || janet_vm.tq_count || janet_vm.extra_listeners) {
JanetTimeout to;
@ -1325,66 +1410,6 @@ static void janet_loop1_poll(void) {
janet_loop1_impl(has_timeout, to.when);
}
}
}
JanetFiber *janet_loop1(void) {
/* Schedule expired timers */
JanetTimeout to;
JanetTimestamp now = ts_now();
while (peek_timeout(&to) && to.when <= now) {
pop_timeout(0);
if (to.curr_fiber != NULL) {
if (janet_fiber_can_resume(to.curr_fiber)) {
janet_cancel(to.fiber, janet_cstringv("deadline expired"));
}
} else {
/* This is a timeout (for a function call, not a whole fiber) */
if (to.fiber->sched_id == to.sched_id) {
if (to.is_error) {
janet_cancel(to.fiber, janet_cstringv("timeout"));
} else {
janet_schedule(to.fiber, janet_wrap_nil());
}
}
}
}
/* Run scheduled fibers */
while (janet_vm.spawn.head != janet_vm.spawn.tail) {
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK, 0};
janet_q_pop(&janet_vm.spawn, &task, sizeof(task));
if (task.fiber->gc.flags & JANET_FIBER_EV_FLAG_SUSPENDED) janet_ev_dec_refcount();
task.fiber->gc.flags &= ~(JANET_FIBER_EV_FLAG_CANCELED | JANET_FIBER_EV_FLAG_SUSPENDED);
if (task.expected_sched_id != task.fiber->sched_id) continue;
Janet res;
JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig);
if (!janet_fiber_can_resume(task.fiber)) {
janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(task.fiber));
}
void *sv = task.fiber->supervisor_channel;
int is_suspended = sig == JANET_SIGNAL_EVENT || sig == JANET_SIGNAL_YIELD || sig == JANET_SIGNAL_INTERRUPT;
if (is_suspended) {
task.fiber->gc.flags |= JANET_FIBER_EV_FLAG_SUSPENDED;
janet_ev_inc_refcount();
}
if (NULL == sv) {
if (!is_suspended) {
janet_stacktrace_ext(task.fiber, res, "");
}
} else if (sig == JANET_SIGNAL_OK || (task.fiber->flags & (1 << sig))) {
JanetChannel *chan = janet_channel_unwrap(sv);
janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig],
task.fiber, chan->is_threaded), 2);
} else if (!is_suspended) {
janet_stacktrace_ext(task.fiber, res, "");
}
if (sig == JANET_SIGNAL_INTERRUPT) {
/* On interrupts, return the interrupted fiber immediately */
return task.fiber;
}
}
janet_loop1_poll();
/* No fiber was interrupted */
return NULL;
@ -1405,12 +1430,6 @@ void janet_loop(void) {
while (!janet_loop_done()) {
JanetFiber *interrupted_fiber = janet_loop1();
if (NULL != interrupted_fiber) {
/* Allow an extra poll before rescheduling to allow posted events to be handled
* before entering a possibly infinite, blocking loop. */
Janet x = janet_wrap_fiber(interrupted_fiber);
janet_gcroot(x);
janet_loop1_poll();
janet_gcunroot(x);
janet_schedule(interrupted_fiber, janet_wrap_nil());
}
}

View File

@ -809,6 +809,7 @@ static void close_handle(JanetHandle handle) {
#ifndef JANET_WINDOWS
static void janet_signal_callback(JanetEVGenericMessage msg) {
int sig = msg.tag;
if (msg.argi) janet_interpreter_interrupt_handled(NULL);
Janet handlerv = janet_table_get(&janet_vm.signal_handlers, janet_wrap_integer(sig));
if (!janet_checktype(handlerv, JANET_FUNCTION)) {
/* Let another thread/process try to handle this */
@ -825,11 +826,8 @@ static void janet_signal_callback(JanetEVGenericMessage msg) {
}
JanetFunction *handler = janet_unwrap_function(handlerv);
JanetFiber *fiber = janet_fiber(handler, 64, 0, NULL);
janet_schedule(fiber, janet_wrap_nil());
if (msg.argi) {
janet_vm.auto_suspend = 0; /* Undo interrupt if it wasn't needed. */
janet_ev_dec_refcount();
}
janet_schedule_soon(fiber, janet_wrap_nil(), JANET_SIGNAL_OK);
janet_ev_dec_refcount();
}
static void janet_signal_trampoline_no_interrupt(int sig) {
@ -838,6 +836,7 @@ static void janet_signal_trampoline_no_interrupt(int sig) {
memset(&msg, 0, sizeof(msg));
msg.tag = sig;
janet_ev_post_event(&janet_vm, janet_signal_callback, msg);
janet_ev_inc_refcount();
}
static void janet_signal_trampoline(int sig) {
@ -846,9 +845,9 @@ static void janet_signal_trampoline(int sig) {
memset(&msg, 0, sizeof(msg));
msg.tag = sig;
msg.argi = 1;
janet_interpreter_interrupt(NULL);
janet_ev_post_event(&janet_vm, janet_signal_callback, msg);
janet_ev_inc_refcount();
janet_interpreter_interrupt(NULL);
}
#endif
@ -881,7 +880,11 @@ JANET_CORE_FN(os_sigaction,
sigfillset(&mask);
memset(&action, 0, sizeof(action));
if (can_interrupt) {
#ifdef JANET_NO_INTERPRETER_INTERRUPT
janet_panic("interpreter interrupt not enabled");
#else
action.sa_handler = janet_signal_trampoline;
#endif
} else {
action.sa_handler = janet_signal_trampoline_no_interrupt;
}

View File

@ -24,6 +24,7 @@
#include "features.h"
#include <janet.h>
#include "state.h"
#include "util.h"
#endif
JANET_THREAD_LOCAL JanetVM janet_vm;
@ -57,5 +58,18 @@ void janet_vm_load(JanetVM *from) {
* use NULL to interrupt the current VM when convenient */
void janet_interpreter_interrupt(JanetVM *vm) {
vm = vm ? vm : &janet_vm;
vm->auto_suspend = 1;
#ifdef JANET_WINDOWS
InterlockedIncrement(&vm->auto_suspend);
#else
__atomic_add_fetch(&vm->auto_suspend, 1, __ATOMIC_RELAXED);
#endif
}
void janet_interpreter_interrupt_handled(JanetVM *vm) {
vm = vm ? vm : &janet_vm;
#ifdef JANET_WINDOWS
InterlockedDecrement(&vm->auto_suspend);
#else
__atomic_add_fetch(&vm->auto_suspend, -1, __ATOMIC_RELAXED);
#endif
}

View File

@ -89,7 +89,7 @@ struct JanetVM {
/* If this flag is true, suspend on function calls and backwards jumps.
* When this occurs, this flag will be reset to 0. */
volatile int auto_suspend;
volatile int32_t auto_suspend;
/* The current running fiber on the current thread.
* Set and unset by functions in vm.c */

View File

@ -116,7 +116,6 @@
#else
#define vm_maybe_auto_suspend(COND) do { \
if ((COND) && janet_vm.auto_suspend) { \
janet_vm.auto_suspend = 0; \
fiber->flags |= (JANET_FIBER_RESUME_NO_USEVAL | JANET_FIBER_RESUME_NO_SKIP); \
vm_return(JANET_SIGNAL_INTERRUPT, janet_wrap_nil()); \
} \

View File

@ -1388,6 +1388,7 @@ JANET_API void janet_stream_flags(JanetStream *stream, uint32_t flags);
JANET_API void janet_schedule(JanetFiber *fiber, Janet value);
JANET_API void janet_cancel(JanetFiber *fiber, Janet value);
JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig);
JANET_API void janet_schedule_soon(JanetFiber *fiber, Janet value, JanetSignal sig);
/* Start a state machine listening for events from a stream */
JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user);
@ -1799,6 +1800,7 @@ JANET_API void janet_vm_free(JanetVM *vm);
JANET_API void janet_vm_save(JanetVM *into);
JANET_API void janet_vm_load(JanetVM *from);
JANET_API void janet_interpreter_interrupt(JanetVM *vm);
JANET_API void janet_interpreter_interrupt_handled(JanetVM *vm);
JANET_API JanetSignal janet_continue(JanetFiber *fiber, Janet in, Janet *out);
JANET_API JanetSignal janet_continue_signal(JanetFiber *fiber, Janet in, Janet *out, JanetSignal sig);
JANET_API JanetSignal janet_pcall(JanetFunction *fun, int32_t argn, const Janet *argv, Janet *out, JanetFiber **f);