diff --git a/meson_options.txt b/meson_options.txt index e9f88c73..91293fa2 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -18,7 +18,7 @@ option('realpath', type : 'boolean', value : true) option('simple_getline', type : 'boolean', value : false) option('epoll', type : 'boolean', value : false) option('kqueue', type : 'boolean', value : false) -option('interpreter_interrupt', type : 'boolean', value : false) +option('interpreter_interrupt', type : 'boolean', value : true) option('ffi', type : 'boolean', value : true) option('ffi_jit', type : 'boolean', value : true) diff --git a/src/core/ev.c b/src/core/ev.c index 5f999764..38fc42e1 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -127,7 +127,7 @@ static int32_t janet_q_count(JanetQueue *q) { : (q->tail - q->head); } -static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) { +static int janet_q_maybe_resize(JanetQueue *q, size_t itemsize) { int32_t count = janet_q_count(q); /* Resize if needed */ if (count + 1 >= q->capacity) { @@ -151,11 +151,27 @@ static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) { } q->capacity = newcap; } + return 0; +} + +static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) { + if (janet_q_maybe_resize(q, itemsize)) return 1; memcpy((char *) q->data + itemsize * q->tail, item, itemsize); q->tail = q->tail + 1 < q->capacity ? q->tail + 1 : 0; return 0; } +static int janet_q_push_head(JanetQueue *q, void *item, size_t itemsize) { + if (janet_q_maybe_resize(q, itemsize)) return 1; + int32_t newhead = q->head - 1; + if (newhead < 0) { + newhead += q->capacity; + } + memcpy((char *) q->data + itemsize * newhead, item, itemsize); + q->head = newhead; + return 0; +} + static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) { if (q->head == q->tail) return 1; memcpy(out, (char *) q->data + itemsize * q->head, itemsize); @@ -468,7 +484,7 @@ const JanetAbstractType janet_stream_type = { }; /* Register a fiber to resume with value */ -void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) { +static void janet_schedule_general(JanetFiber *fiber, Janet value, JanetSignal sig, int soon) { if (fiber->gc.flags & JANET_FIBER_EV_FLAG_CANCELED) return; if (!(fiber->gc.flags & JANET_FIBER_FLAG_ROOT)) { Janet task_element = janet_wrap_fiber(fiber); @@ -477,7 +493,19 @@ void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) { JanetTask t = { fiber, value, sig, ++fiber->sched_id }; fiber->gc.flags |= JANET_FIBER_FLAG_ROOT; if (sig == JANET_SIGNAL_ERROR) fiber->gc.flags |= JANET_FIBER_EV_FLAG_CANCELED; - janet_q_push(&janet_vm.spawn, &t, sizeof(t)); + if (soon) { + janet_q_push_head(&janet_vm.spawn, &t, sizeof(t)); + } else { + janet_q_push(&janet_vm.spawn, &t, sizeof(t)); + } +} + +void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) { + janet_schedule_general(fiber, value, sig, 0); +} + +void janet_schedule_soon(JanetFiber *fiber, Janet value, JanetSignal sig) { + janet_schedule_general(fiber, value, sig, 1); } void janet_cancel(JanetFiber *fiber, Janet value) { @@ -1300,7 +1328,64 @@ int janet_loop_done(void) { janet_vm.extra_listeners); } -static void janet_loop1_poll(void) { +JanetFiber *janet_loop1(void) { + /* Schedule expired timers */ + JanetTimeout to; + JanetTimestamp now = ts_now(); + while (peek_timeout(&to) && to.when <= now) { + pop_timeout(0); + if (to.curr_fiber != NULL) { + if (janet_fiber_can_resume(to.curr_fiber)) { + janet_cancel(to.fiber, janet_cstringv("deadline expired")); + } + } else { + /* This is a timeout (for a function call, not a whole fiber) */ + if (to.fiber->sched_id == to.sched_id) { + if (to.is_error) { + janet_cancel(to.fiber, janet_cstringv("timeout")); + } else { + janet_schedule(to.fiber, janet_wrap_nil()); + } + } + } + } + + /* Run scheduled fibers unless interrupts need to be handled. */ + while (janet_vm.spawn.head != janet_vm.spawn.tail) { + /* Don't run until all interrupts have been marked as handled by calling janet_interpreter_interrupt_handled */ + if (janet_vm.auto_suspend) break; + JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK, 0}; + janet_q_pop(&janet_vm.spawn, &task, sizeof(task)); + if (task.fiber->gc.flags & JANET_FIBER_EV_FLAG_SUSPENDED) janet_ev_dec_refcount(); + task.fiber->gc.flags &= ~(JANET_FIBER_EV_FLAG_CANCELED | JANET_FIBER_EV_FLAG_SUSPENDED); + if (task.expected_sched_id != task.fiber->sched_id) continue; + Janet res; + JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig); + if (!janet_fiber_can_resume(task.fiber)) { + janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(task.fiber)); + } + void *sv = task.fiber->supervisor_channel; + int is_suspended = sig == JANET_SIGNAL_EVENT || sig == JANET_SIGNAL_YIELD || sig == JANET_SIGNAL_INTERRUPT; + if (is_suspended) { + task.fiber->gc.flags |= JANET_FIBER_EV_FLAG_SUSPENDED; + janet_ev_inc_refcount(); + } + if (NULL == sv) { + if (!is_suspended) { + janet_stacktrace_ext(task.fiber, res, ""); + } + } else if (sig == JANET_SIGNAL_OK || (task.fiber->flags & (1 << sig))) { + JanetChannel *chan = janet_channel_unwrap(sv); + janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], + task.fiber, chan->is_threaded), 2); + } else if (!is_suspended) { + janet_stacktrace_ext(task.fiber, res, ""); + } + if (sig == JANET_SIGNAL_INTERRUPT) { + return task.fiber; + } + } + /* Poll for events */ if (janet_vm.listener_count || janet_vm.tq_count || janet_vm.extra_listeners) { JanetTimeout to; @@ -1325,66 +1410,6 @@ static void janet_loop1_poll(void) { janet_loop1_impl(has_timeout, to.when); } } -} - -JanetFiber *janet_loop1(void) { - /* Schedule expired timers */ - JanetTimeout to; - JanetTimestamp now = ts_now(); - while (peek_timeout(&to) && to.when <= now) { - pop_timeout(0); - if (to.curr_fiber != NULL) { - if (janet_fiber_can_resume(to.curr_fiber)) { - janet_cancel(to.fiber, janet_cstringv("deadline expired")); - } - } else { - /* This is a timeout (for a function call, not a whole fiber) */ - if (to.fiber->sched_id == to.sched_id) { - if (to.is_error) { - janet_cancel(to.fiber, janet_cstringv("timeout")); - } else { - janet_schedule(to.fiber, janet_wrap_nil()); - } - } - } - } - - /* Run scheduled fibers */ - while (janet_vm.spawn.head != janet_vm.spawn.tail) { - JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK, 0}; - janet_q_pop(&janet_vm.spawn, &task, sizeof(task)); - if (task.fiber->gc.flags & JANET_FIBER_EV_FLAG_SUSPENDED) janet_ev_dec_refcount(); - task.fiber->gc.flags &= ~(JANET_FIBER_EV_FLAG_CANCELED | JANET_FIBER_EV_FLAG_SUSPENDED); - if (task.expected_sched_id != task.fiber->sched_id) continue; - Janet res; - JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig); - if (!janet_fiber_can_resume(task.fiber)) { - janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(task.fiber)); - } - void *sv = task.fiber->supervisor_channel; - int is_suspended = sig == JANET_SIGNAL_EVENT || sig == JANET_SIGNAL_YIELD || sig == JANET_SIGNAL_INTERRUPT; - if (is_suspended) { - task.fiber->gc.flags |= JANET_FIBER_EV_FLAG_SUSPENDED; - janet_ev_inc_refcount(); - } - if (NULL == sv) { - if (!is_suspended) { - janet_stacktrace_ext(task.fiber, res, ""); - } - } else if (sig == JANET_SIGNAL_OK || (task.fiber->flags & (1 << sig))) { - JanetChannel *chan = janet_channel_unwrap(sv); - janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], - task.fiber, chan->is_threaded), 2); - } else if (!is_suspended) { - janet_stacktrace_ext(task.fiber, res, ""); - } - if (sig == JANET_SIGNAL_INTERRUPT) { - /* On interrupts, return the interrupted fiber immediately */ - return task.fiber; - } - } - - janet_loop1_poll(); /* No fiber was interrupted */ return NULL; @@ -1405,12 +1430,6 @@ void janet_loop(void) { while (!janet_loop_done()) { JanetFiber *interrupted_fiber = janet_loop1(); if (NULL != interrupted_fiber) { - /* Allow an extra poll before rescheduling to allow posted events to be handled - * before entering a possibly infinite, blocking loop. */ - Janet x = janet_wrap_fiber(interrupted_fiber); - janet_gcroot(x); - janet_loop1_poll(); - janet_gcunroot(x); janet_schedule(interrupted_fiber, janet_wrap_nil()); } } diff --git a/src/core/os.c b/src/core/os.c index 2d0d7803..3038ab79 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -809,6 +809,7 @@ static void close_handle(JanetHandle handle) { #ifndef JANET_WINDOWS static void janet_signal_callback(JanetEVGenericMessage msg) { int sig = msg.tag; + if (msg.argi) janet_interpreter_interrupt_handled(NULL); Janet handlerv = janet_table_get(&janet_vm.signal_handlers, janet_wrap_integer(sig)); if (!janet_checktype(handlerv, JANET_FUNCTION)) { /* Let another thread/process try to handle this */ @@ -825,11 +826,8 @@ static void janet_signal_callback(JanetEVGenericMessage msg) { } JanetFunction *handler = janet_unwrap_function(handlerv); JanetFiber *fiber = janet_fiber(handler, 64, 0, NULL); - janet_schedule(fiber, janet_wrap_nil()); - if (msg.argi) { - janet_vm.auto_suspend = 0; /* Undo interrupt if it wasn't needed. */ - janet_ev_dec_refcount(); - } + janet_schedule_soon(fiber, janet_wrap_nil(), JANET_SIGNAL_OK); + janet_ev_dec_refcount(); } static void janet_signal_trampoline_no_interrupt(int sig) { @@ -838,6 +836,7 @@ static void janet_signal_trampoline_no_interrupt(int sig) { memset(&msg, 0, sizeof(msg)); msg.tag = sig; janet_ev_post_event(&janet_vm, janet_signal_callback, msg); + janet_ev_inc_refcount(); } static void janet_signal_trampoline(int sig) { @@ -846,9 +845,9 @@ static void janet_signal_trampoline(int sig) { memset(&msg, 0, sizeof(msg)); msg.tag = sig; msg.argi = 1; + janet_interpreter_interrupt(NULL); janet_ev_post_event(&janet_vm, janet_signal_callback, msg); janet_ev_inc_refcount(); - janet_interpreter_interrupt(NULL); } #endif @@ -881,7 +880,11 @@ JANET_CORE_FN(os_sigaction, sigfillset(&mask); memset(&action, 0, sizeof(action)); if (can_interrupt) { +#ifdef JANET_NO_INTERPRETER_INTERRUPT + janet_panic("interpreter interrupt not enabled"); +#else action.sa_handler = janet_signal_trampoline; +#endif } else { action.sa_handler = janet_signal_trampoline_no_interrupt; } diff --git a/src/core/state.c b/src/core/state.c index eab0b710..4c976dad 100644 --- a/src/core/state.c +++ b/src/core/state.c @@ -24,6 +24,7 @@ #include "features.h" #include #include "state.h" +#include "util.h" #endif JANET_THREAD_LOCAL JanetVM janet_vm; @@ -57,5 +58,18 @@ void janet_vm_load(JanetVM *from) { * use NULL to interrupt the current VM when convenient */ void janet_interpreter_interrupt(JanetVM *vm) { vm = vm ? vm : &janet_vm; - vm->auto_suspend = 1; +#ifdef JANET_WINDOWS + InterlockedIncrement(&vm->auto_suspend); +#else + __atomic_add_fetch(&vm->auto_suspend, 1, __ATOMIC_RELAXED); +#endif +} + +void janet_interpreter_interrupt_handled(JanetVM *vm) { + vm = vm ? vm : &janet_vm; +#ifdef JANET_WINDOWS + InterlockedDecrement(&vm->auto_suspend); +#else + __atomic_add_fetch(&vm->auto_suspend, -1, __ATOMIC_RELAXED); +#endif } diff --git a/src/core/state.h b/src/core/state.h index c7b3534b..a4312768 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -89,7 +89,7 @@ struct JanetVM { /* If this flag is true, suspend on function calls and backwards jumps. * When this occurs, this flag will be reset to 0. */ - volatile int auto_suspend; + volatile int32_t auto_suspend; /* The current running fiber on the current thread. * Set and unset by functions in vm.c */ diff --git a/src/core/vm.c b/src/core/vm.c index 7c2fc47d..6645cfb5 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -116,7 +116,6 @@ #else #define vm_maybe_auto_suspend(COND) do { \ if ((COND) && janet_vm.auto_suspend) { \ - janet_vm.auto_suspend = 0; \ fiber->flags |= (JANET_FIBER_RESUME_NO_USEVAL | JANET_FIBER_RESUME_NO_SKIP); \ vm_return(JANET_SIGNAL_INTERRUPT, janet_wrap_nil()); \ } \ diff --git a/src/include/janet.h b/src/include/janet.h index 1ca534ce..37717351 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1388,6 +1388,7 @@ JANET_API void janet_stream_flags(JanetStream *stream, uint32_t flags); JANET_API void janet_schedule(JanetFiber *fiber, Janet value); JANET_API void janet_cancel(JanetFiber *fiber, Janet value); JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig); +JANET_API void janet_schedule_soon(JanetFiber *fiber, Janet value, JanetSignal sig); /* Start a state machine listening for events from a stream */ JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user); @@ -1799,6 +1800,7 @@ JANET_API void janet_vm_free(JanetVM *vm); JANET_API void janet_vm_save(JanetVM *into); JANET_API void janet_vm_load(JanetVM *from); JANET_API void janet_interpreter_interrupt(JanetVM *vm); +JANET_API void janet_interpreter_interrupt_handled(JanetVM *vm); JANET_API JanetSignal janet_continue(JanetFiber *fiber, Janet in, Janet *out); JANET_API JanetSignal janet_continue_signal(JanetFiber *fiber, Janet in, Janet *out, JanetSignal sig); JANET_API JanetSignal janet_pcall(JanetFunction *fun, int32_t argn, const Janet *argv, Janet *out, JanetFiber **f);