From c831ecf5d26fbe2f7fa222cb1c284683b91cda8e Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 31 Dec 2020 11:22:18 -0600 Subject: [PATCH] Working implementation of process waiting with threads. Does not require all sorts of signal handling code that is not thread-safe and can "steal processes". However, there is a much simpler way to add this functionality by creating a new stream and thread for each subprocess when it is waited on. This is perhaps _slightly_ less efficient but oh so much simpler, since we can reuse all of our concepts from streams and there is no need to implement a whole system around the selfpipe. --- src/core/ev.c | 51 ++++++++++++++++++------------------------- src/core/os.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++- src/core/util.h | 20 +++++++++++++++++ 3 files changed, 97 insertions(+), 31 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index d6fe845a..3fc8c89e 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -880,7 +880,7 @@ void janet_loop(void) { } /* - * Signal handling code. + * Self-pipe handling code. */ #ifdef JANET_WINDOWS @@ -889,43 +889,34 @@ void janet_loop(void) { JANET_THREAD_LOCAL int janet_vm_selfpipe[2]; -static void janet_ev_handle_signals(void) { - int sig = 0; - while (read(janet_vm_selfpipe[0], &sig, sizeof(sig)) > 0) { - switch (sig) { +/* Handle events from the self pipe inside the event loop */ +static void janet_ev_handle_selfpipe(void) { + JanetSelfPipeEvent ev; + while (read(janet_vm_selfpipe[0], &ev, sizeof(ev)) > 0) { + switch (ev.tag) { default: break; - case SIGCHLD: - { - int status = 0; - pid_t pid = waitpid(-1, &status, WNOHANG | WUNTRACED); - /* invalid pid on failure will do no harm */ - janet_schedule_pid(pid, status); - } + case JANET_SELFPIPE_PROC: + janet_schedule_pid(ev.as.proc.pid, ev.as.proc.status); break; } } } -static void janet_sig_handler(int sig) { - int result = write(janet_vm_selfpipe[1], &sig, sizeof(sig)); - if (result) { - /* Failed to handle signal. */ - ; - } - signal(sig, janet_sig_handler); -} - -static void janet_ev_setup_signals(void) { +static void janet_ev_setup_selfpipe(void) { if (-1 == pipe(janet_vm_selfpipe)) goto error; if (fcntl(janet_vm_selfpipe[0], F_SETFL, O_NONBLOCK)) goto error; if (fcntl(janet_vm_selfpipe[1], F_SETFL, O_NONBLOCK)) goto error; - signal(SIGCHLD, janet_sig_handler); return; error: JANET_EXIT("failed to initialize self pipe in event loop"); } +static void janet_ev_cleanup_selfpipe(void) { + close(janet_vm_selfpipe[0]); + close(janet_vm_selfpipe[1]); +} + #endif #ifdef JANET_WINDOWS @@ -1099,8 +1090,8 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { if (&janet_vm_timerfd == p) { /* Timer expired, ignore */; } else if (janet_vm_selfpipe == p) { - /* Signal */ - janet_ev_handle_signals(); + /* Self-pipe handling */ + janet_ev_handle_selfpipe(); } else { JanetStream *stream = p; int mask = events[i].events; @@ -1133,7 +1124,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_ev_init(void) { janet_ev_init_common(); - janet_ev_setup_signals(); + janet_ev_setup_selfpipe(); janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC); janet_vm_timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); janet_vm_timer_enabled = 0; @@ -1154,6 +1145,7 @@ void janet_ev_deinit(void) { janet_ev_deinit_common(); close(janet_vm_epoll); close(janet_vm_timerfd); + janet_ev_cleanup_selfpipe(); janet_vm_epoll = 0; } @@ -1226,7 +1218,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { /* Check selfpipe */ if (janet_vm_fds[0].revents & POLLIN) { janet_vm_fds[0].revents = 0; - janet_ev_handle_signals(); + janet_ev_handle_selfpipe(); } /* Step state machines */ @@ -1260,7 +1252,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_ev_init(void) { janet_ev_init_common(); janet_vm_fds = NULL; - janet_ev_setup_signals(); + janet_ev_setup_selfpipe(); janet_vm_fds = malloc(sizeof(struct pollfd)); if (NULL == janet_vm_fds) { JANET_OUT_OF_MEMORY; @@ -1273,9 +1265,8 @@ void janet_ev_init(void) { void janet_ev_deinit(void) { janet_ev_deinit_common(); + janet_ev_cleanup_selfpipe(); free(janet_vm_fds); - close(janet_vm_selfpipe[0]); - close(janet_vm_selfpipe[1]); janet_vm_fds = NULL; } diff --git a/src/core/os.c b/src/core/os.c index 1f996732..4e0d22fb 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -328,7 +328,7 @@ typedef struct { HANDLE pHandle; HANDLE tHandle; #else - int pid; + pid_t pid; #endif int return_code; #ifdef JANET_EV @@ -349,6 +349,41 @@ JANET_THREAD_LOCAL JanetProc **janet_vm_waiting_procs = NULL; JANET_THREAD_LOCAL size_t janet_vm_proc_count = 0; JANET_THREAD_LOCAL size_t janet_vm_proc_cap = 0; +/* Structure used to initialize the thread used to call wait + * on child processes */ +typedef struct { + pid_t pid; + int write_pipe; +} JanetReaperInit; + +static void *janet_thread_waiter(void *ptr) { + JanetReaperInit *init = (JanetReaperInit *)ptr; + pid_t pid = init->pid; + int fd = init->write_pipe; + free(init); + for (;;) { + int status = 0; + pid_t which = 0; + do { + which = waitpid(pid, &status, 0); + } while (which == -1 && errno == EINTR); + if (which < 0) { + /* Error, could not wait or no children to wait on. */ + break; + } else { + JanetSelfPipeEvent ev; + ev.tag = JANET_SELFPIPE_PROC; + ev.as.proc.status = status; + ev.as.proc.pid = which; + if (write(fd, &ev, sizeof(ev)) < 0) { + /* TODO failed to handle signal. */ + fprintf(stderr, "failed to write event\n"); + } + } + } + return NULL; +} + /* Map pids to JanetProc to allow for lookup after a call to * waitpid. */ static void janet_add_waiting_proc(JanetProc *proc) { @@ -362,6 +397,26 @@ static void janet_add_waiting_proc(JanetProc *proc) { janet_vm_waiting_procs = newprocs; janet_vm_proc_cap = newcap; } + + /* Set proccess group for tracking purposes */ + pid_t pid = proc->pid; + JanetReaperInit *init = malloc(sizeof(JanetReaperInit)); + if (NULL == init) { + JANET_OUT_OF_MEMORY; + } + init->pid = pid; + init->write_pipe = janet_vm_selfpipe[1]; + pthread_attr_t attr; + pthread_t waiter_thread; + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); + int err = pthread_create(&waiter_thread, NULL, janet_thread_waiter, init); + pthread_attr_destroy(&attr); + if (err) { + janet_panicf("%s", strerror(err)); + } + pthread_detach(waiter_thread); + janet_vm_waiting_procs[janet_vm_proc_count++] = proc; janet_gcroot(janet_wrap_abstract(proc)); janet_ev_inc_refcount(); diff --git a/src/core/util.h b/src/core/util.h index b24fc8ce..24be73b4 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -147,6 +147,26 @@ void janet_lib_ev(JanetTable *env); void janet_ev_mark(void); int janet_make_pipe(JanetHandle handles[2]); void janet_schedule_pid(pid_t pid, int status); + +/* Single message that is written to self pipe. This is used + * to communicate messages inside Janet to work with the event loop. + * Signals and threads will be the main users of this. */ +typedef struct { + enum { + JANET_SELFPIPE_PROC + } tag; + union { + struct { + int status; + pid_t pid; + } proc; + } as; +} JanetSelfPipeEvent; + +#ifndef JANET_WINDOWS +extern JANET_THREAD_LOCAL int janet_vm_selfpipe[2]; +#endif + #endif #endif