1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-25 01:37:19 +00:00

Add os/sigaction for signal handling.

Also improve interrupts to work better with busy loops
and signals.
This commit is contained in:
Calvin Rose 2023-08-19 13:13:22 -05:00
parent 2ac36a0572
commit f45571033c
5 changed files with 150 additions and 42 deletions

10
examples/sigaction.janet Normal file
View File

@ -0,0 +1,10 @@
(defn action []
(print "Handled SIGHUP!")
(flush))
(defn main [_]
# Set the interrupt-interpreter argument to `true` to allow
# interrupting the busy loop `(forever)`. By default, will not
# interrupt the interpreter.
(os/sigaction :hup action true)
(forever))

View File

@ -562,6 +562,7 @@ void janet_ev_init_common(void) {
janet_vm.tq_capacity = 0; janet_vm.tq_capacity = 0;
janet_table_init_raw(&janet_vm.threaded_abstracts, 0); janet_table_init_raw(&janet_vm.threaded_abstracts, 0);
janet_table_init_raw(&janet_vm.active_tasks, 0); janet_table_init_raw(&janet_vm.active_tasks, 0);
janet_table_init_raw(&janet_vm.signal_handlers, 0);
janet_rng_seed(&janet_vm.ev_rng, 0); janet_rng_seed(&janet_vm.ev_rng, 0);
#ifndef JANET_WINDOWS #ifndef JANET_WINDOWS
pthread_attr_init(&janet_vm.new_thread_attr); pthread_attr_init(&janet_vm.new_thread_attr);
@ -577,6 +578,7 @@ void janet_ev_deinit_common(void) {
janet_vm.listeners = NULL; janet_vm.listeners = NULL;
janet_table_deinit(&janet_vm.threaded_abstracts); janet_table_deinit(&janet_vm.threaded_abstracts);
janet_table_deinit(&janet_vm.active_tasks); janet_table_deinit(&janet_vm.active_tasks);
janet_table_deinit(&janet_vm.signal_handlers);
#ifndef JANET_WINDOWS #ifndef JANET_WINDOWS
pthread_attr_destroy(&janet_vm.new_thread_attr); pthread_attr_destroy(&janet_vm.new_thread_attr);
#endif #endif
@ -1290,6 +1292,33 @@ int janet_loop_done(void) {
janet_vm.extra_listeners); janet_vm.extra_listeners);
} }
static void janet_loop1_poll(void) {
/* Poll for events */
if (janet_vm.listener_count || janet_vm.tq_count || janet_vm.extra_listeners) {
JanetTimeout to;
memset(&to, 0, sizeof(to));
int has_timeout;
/* Drop timeouts that are no longer needed */
while ((has_timeout = peek_timeout(&to))) {
if (to.curr_fiber != NULL) {
if (!janet_fiber_can_resume(to.curr_fiber)) {
janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(to.curr_fiber));
pop_timeout(0);
continue;
}
} else if (to.fiber->sched_id != to.sched_id) {
pop_timeout(0);
continue;
}
break;
}
/* Run polling implementation only if pending timeouts or pending events */
if (janet_vm.tq_count || janet_vm.listener_count || janet_vm.extra_listeners) {
janet_loop1_impl(has_timeout, to.when);
}
}
}
JanetFiber *janet_loop1(void) { JanetFiber *janet_loop1(void) {
/* Schedule expired timers */ /* Schedule expired timers */
JanetTimeout to; JanetTimeout to;
@ -1347,30 +1376,7 @@ JanetFiber *janet_loop1(void) {
} }
} }
/* Poll for events */ janet_loop1_poll();
if (janet_vm.listener_count || janet_vm.tq_count || janet_vm.extra_listeners) {
JanetTimeout to;
memset(&to, 0, sizeof(to));
int has_timeout;
/* Drop timeouts that are no longer needed */
while ((has_timeout = peek_timeout(&to))) {
if (to.curr_fiber != NULL) {
if (!janet_fiber_can_resume(to.curr_fiber)) {
janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(to.curr_fiber));
pop_timeout(0);
continue;
}
} else if (to.fiber->sched_id != to.sched_id) {
pop_timeout(0);
continue;
}
break;
}
/* Run polling implementation only if pending timeouts or pending events */
if (janet_vm.tq_count || janet_vm.listener_count || janet_vm.extra_listeners) {
janet_loop1_impl(has_timeout, to.when);
}
}
/* No fiber was interrupted */ /* No fiber was interrupted */
return NULL; return NULL;
@ -1391,6 +1397,12 @@ void janet_loop(void) {
while (!janet_loop_done()) { while (!janet_loop_done()) {
JanetFiber *interrupted_fiber = janet_loop1(); JanetFiber *interrupted_fiber = janet_loop1();
if (NULL != interrupted_fiber) { if (NULL != interrupted_fiber) {
/* Allow an extra poll before rescheduling to allow posted events to be handled
* before entering a possibly infinite, blocking loop. */
Janet x = janet_wrap_fiber(interrupted_fiber);
janet_gcroot(x);
janet_loop1_poll();
janet_gcunroot(x);
janet_schedule(interrupted_fiber, janet_wrap_nil()); janet_schedule(interrupted_fiber, janet_wrap_nil());
} }
} }

View File

@ -706,6 +706,18 @@ static const struct keyword_signal signal_keywords[] = {
#endif #endif
{NULL, 0}, {NULL, 0},
}; };
static int get_signal_kw(const Janet *argv, int32_t n) {
JanetKeyword signal_kw = janet_getkeyword(argv, n);
const struct keyword_signal *ptr = signal_keywords;
while (ptr->keyword) {
if (!janet_cstrcmp(signal_kw, ptr->keyword)) {
return ptr->signal;
}
ptr++;
}
janet_panicf("undefined signal %v", argv[n]);
}
#endif #endif
JANET_CORE_FN(os_proc_kill, JANET_CORE_FN(os_proc_kill,
@ -731,18 +743,7 @@ JANET_CORE_FN(os_proc_kill,
#else #else
int signal = -1; int signal = -1;
if (argc == 3) { if (argc == 3) {
JanetKeyword signal_kw = janet_getkeyword(argv, 2); signal = get_signal_kw(argv, 2);
const struct keyword_signal *ptr = signal_keywords;
while (ptr->keyword) {
if (!janet_cstrcmp(signal_kw, ptr->keyword)) {
signal = ptr->signal;
break;
}
ptr++;
}
if (signal == -1) {
janet_panic("undefined signal");
}
} }
int status = kill(proc->pid, signal == -1 ? SIGKILL : signal); int status = kill(proc->pid, signal == -1 ? SIGKILL : signal);
if (status) { if (status) {
@ -803,6 +804,89 @@ static void close_handle(JanetHandle handle) {
#endif #endif
} }
#ifdef JANET_EV
#ifndef JANET_WINDOWS
static void janet_signal_callback(JanetEVGenericMessage msg) {
int sig = msg.tag;
Janet handlerv = janet_table_get(&janet_vm.signal_handlers, janet_wrap_integer(sig));
if (!janet_checktype(handlerv, JANET_FUNCTION)) {
/* Let another thread/process try to handle this */
sigset_t set;
sigemptyset(&set);
sigaddset(&set, sig);
sigprocmask(SIG_BLOCK, &set, NULL);
raise(sig);
return;
}
JanetFunction *handler = janet_unwrap_function(handlerv);
JanetFiber *fiber = janet_fiber(handler, 64, 0, NULL);
janet_schedule(fiber, janet_wrap_nil());
if (msg.argi) {
janet_ev_dec_refcount();
}
}
static void janet_signal_trampoline_no_interrupt(int sig) {
/* Do not interact with global janet state here except for janet_ev_post_event, unsafe! */
JanetEVGenericMessage msg;
memset(&msg, 0, sizeof(msg));
msg.tag = sig;
janet_ev_post_event(&janet_vm, janet_signal_callback, msg);
}
static void janet_signal_trampoline(int sig) {
/* Do not interact with global janet state here except for janet_ev_post_event, unsafe! */
JanetEVGenericMessage msg;
memset(&msg, 0, sizeof(msg));
msg.tag = sig;
msg.argi = 1;
janet_ev_post_event(&janet_vm, janet_signal_callback, msg);
janet_ev_inc_refcount();
janet_interpreter_interrupt(NULL);
}
#endif
JANET_CORE_FN(os_sigaction,
"(os/sigaction which &opt handler interrupt-interpreter)",
"Add a signal handler for a given action. Use nil for the `handler` argument to remove a signal handler.") {
janet_arity(argc, 1, 3);
#ifdef JANET_WINDOWS
janet_panic("unsupported on this platform");
#else
/* TODO - per thread signal masks */
int rc;
int sig = get_signal_kw(argv, 0);
JanetFunction *handler = janet_optfunction(argv, argc, 1, NULL);
int can_interrupt = janet_optboolean(argv, argc, 2, 0);
Janet oldhandler = janet_table_get(&janet_vm.signal_handlers, janet_wrap_integer(sig));
if (!janet_checktype(oldhandler, JANET_NIL)) {
janet_gcunroot(oldhandler);
}
if (NULL != handler) {
Janet handlerv = janet_wrap_function(handler);
janet_gcroot(handlerv);
janet_table_put(&janet_vm.signal_handlers, janet_wrap_integer(sig), handlerv);
} else {
janet_table_put(&janet_vm.signal_handlers, janet_wrap_integer(sig), janet_wrap_nil());
}
struct sigaction action;
sigset_t mask;
sigfillset(&mask);
memset(&action, 0, sizeof(action));
if (can_interrupt) {
action.sa_handler = janet_signal_trampoline;
} else {
action.sa_handler = janet_signal_trampoline_no_interrupt;
}
action.sa_mask = mask;
RETRY_EINTR(rc, sigaction(sig, &action, NULL));
return janet_wrap_nil();
#endif
}
#endif
/* Create piped file for os/execute and os/spawn. Need to be careful that we mark /* Create piped file for os/execute and os/spawn. Need to be careful that we mark
the error flag if we can't create pipe and don't leak handles. *handle will be cleaned the error flag if we can't create pipe and don't leak handles. *handle will be cleaned
up by the calling function. If everything goes well, *handle is owned by the calling function, up by the calling function. If everything goes well, *handle is owned by the calling function,
@ -2536,6 +2620,7 @@ void janet_lib_os(JanetTable *env) {
#ifdef JANET_EV #ifdef JANET_EV
JANET_CORE_REG("os/open", os_open), /* fs read and write */ JANET_CORE_REG("os/open", os_open), /* fs read and write */
JANET_CORE_REG("os/pipe", os_pipe), JANET_CORE_REG("os/pipe", os_pipe),
JANET_CORE_REG("os/sigaction", os_sigaction),
#endif #endif
#endif #endif
JANET_REG_END JANET_REG_END

View File

@ -89,7 +89,7 @@ struct JanetVM {
/* If this flag is true, suspend on function calls and backwards jumps. /* If this flag is true, suspend on function calls and backwards jumps.
* When this occurs, this flag will be reset to 0. */ * When this occurs, this flag will be reset to 0. */
int auto_suspend; volatile int auto_suspend;
/* The current running fiber on the current thread. /* The current running fiber on the current thread.
* Set and unset by functions in vm.c */ * Set and unset by functions in vm.c */
@ -160,6 +160,7 @@ struct JanetVM {
size_t extra_listeners; size_t extra_listeners;
JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */ JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */
JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */ JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */
JanetTable signal_handlers;
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
void **iocp; void **iocp;
#elif defined(JANET_EV_EPOLL) #elif defined(JANET_EV_EPOLL)

View File

@ -800,13 +800,13 @@ static JanetSignal run_vm(JanetFiber *fiber, Janet in) {
VM_OP(JOP_JUMP) VM_OP(JOP_JUMP)
pc += DS; pc += DS;
vm_maybe_auto_suspend(DS < 0); vm_maybe_auto_suspend(DS <= 0);
vm_next(); vm_next();
VM_OP(JOP_JUMP_IF) VM_OP(JOP_JUMP_IF)
if (janet_truthy(stack[A])) { if (janet_truthy(stack[A])) {
pc += ES; pc += ES;
vm_maybe_auto_suspend(ES < 0); vm_maybe_auto_suspend(ES <= 0);
} else { } else {
pc++; pc++;
} }
@ -817,14 +817,14 @@ static JanetSignal run_vm(JanetFiber *fiber, Janet in) {
pc++; pc++;
} else { } else {
pc += ES; pc += ES;
vm_maybe_auto_suspend(ES < 0); vm_maybe_auto_suspend(ES <= 0);
} }
vm_next(); vm_next();
VM_OP(JOP_JUMP_IF_NIL) VM_OP(JOP_JUMP_IF_NIL)
if (janet_checktype(stack[A], JANET_NIL)) { if (janet_checktype(stack[A], JANET_NIL)) {
pc += ES; pc += ES;
vm_maybe_auto_suspend(ES < 0); vm_maybe_auto_suspend(ES <= 0);
} else { } else {
pc++; pc++;
} }
@ -835,7 +835,7 @@ static JanetSignal run_vm(JanetFiber *fiber, Janet in) {
pc++; pc++;
} else { } else {
pc += ES; pc += ES;
vm_maybe_auto_suspend(ES < 0); vm_maybe_auto_suspend(ES <= 0);
} }
vm_next(); vm_next();