Working implementation of process waiting with threads.

Does not require all sorts of signal handling code
that is not thread-safe and can "steal processes".

However, there is a much simpler way to add this functionality
by creating a new stream and thread for each subprocess when it is
waited on. This is perhaps _slightly_ less efficient but oh so much
simpler, since we can reuse all of our concepts from streams and there
is no need to implement a whole system around the selfpipe.
This commit is contained in:
Calvin Rose 2020-12-31 11:22:18 -06:00
parent 9e42ee153c
commit c831ecf5d2
3 changed files with 97 additions and 31 deletions

View File

@ -880,7 +880,7 @@ void janet_loop(void) {
}
/*
* Signal handling code.
* Self-pipe handling code.
*/
#ifdef JANET_WINDOWS
@ -889,43 +889,34 @@ void janet_loop(void) {
JANET_THREAD_LOCAL int janet_vm_selfpipe[2];
static void janet_ev_handle_signals(void) {
int sig = 0;
while (read(janet_vm_selfpipe[0], &sig, sizeof(sig)) > 0) {
switch (sig) {
/* Handle events from the self pipe inside the event loop */
static void janet_ev_handle_selfpipe(void) {
JanetSelfPipeEvent ev;
while (read(janet_vm_selfpipe[0], &ev, sizeof(ev)) > 0) {
switch (ev.tag) {
default:
break;
case SIGCHLD:
{
int status = 0;
pid_t pid = waitpid(-1, &status, WNOHANG | WUNTRACED);
/* invalid pid on failure will do no harm */
janet_schedule_pid(pid, status);
}
case JANET_SELFPIPE_PROC:
janet_schedule_pid(ev.as.proc.pid, ev.as.proc.status);
break;
}
}
}
static void janet_sig_handler(int sig) {
int result = write(janet_vm_selfpipe[1], &sig, sizeof(sig));
if (result) {
/* Failed to handle signal. */
;
}
signal(sig, janet_sig_handler);
}
static void janet_ev_setup_signals(void) {
static void janet_ev_setup_selfpipe(void) {
if (-1 == pipe(janet_vm_selfpipe)) goto error;
if (fcntl(janet_vm_selfpipe[0], F_SETFL, O_NONBLOCK)) goto error;
if (fcntl(janet_vm_selfpipe[1], F_SETFL, O_NONBLOCK)) goto error;
signal(SIGCHLD, janet_sig_handler);
return;
error:
JANET_EXIT("failed to initialize self pipe in event loop");
}
static void janet_ev_cleanup_selfpipe(void) {
close(janet_vm_selfpipe[0]);
close(janet_vm_selfpipe[1]);
}
#endif
#ifdef JANET_WINDOWS
@ -1099,8 +1090,8 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (&janet_vm_timerfd == p) {
/* Timer expired, ignore */;
} else if (janet_vm_selfpipe == p) {
/* Signal */
janet_ev_handle_signals();
/* Self-pipe handling */
janet_ev_handle_selfpipe();
} else {
JanetStream *stream = p;
int mask = events[i].events;
@ -1133,7 +1124,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
void janet_ev_init(void) {
janet_ev_init_common();
janet_ev_setup_signals();
janet_ev_setup_selfpipe();
janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC);
janet_vm_timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
janet_vm_timer_enabled = 0;
@ -1154,6 +1145,7 @@ void janet_ev_deinit(void) {
janet_ev_deinit_common();
close(janet_vm_epoll);
close(janet_vm_timerfd);
janet_ev_cleanup_selfpipe();
janet_vm_epoll = 0;
}
@ -1226,7 +1218,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
/* Check selfpipe */
if (janet_vm_fds[0].revents & POLLIN) {
janet_vm_fds[0].revents = 0;
janet_ev_handle_signals();
janet_ev_handle_selfpipe();
}
/* Step state machines */
@ -1260,7 +1252,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
void janet_ev_init(void) {
janet_ev_init_common();
janet_vm_fds = NULL;
janet_ev_setup_signals();
janet_ev_setup_selfpipe();
janet_vm_fds = malloc(sizeof(struct pollfd));
if (NULL == janet_vm_fds) {
JANET_OUT_OF_MEMORY;
@ -1273,9 +1265,8 @@ void janet_ev_init(void) {
void janet_ev_deinit(void) {
janet_ev_deinit_common();
janet_ev_cleanup_selfpipe();
free(janet_vm_fds);
close(janet_vm_selfpipe[0]);
close(janet_vm_selfpipe[1]);
janet_vm_fds = NULL;
}

View File

@ -328,7 +328,7 @@ typedef struct {
HANDLE pHandle;
HANDLE tHandle;
#else
int pid;
pid_t pid;
#endif
int return_code;
#ifdef JANET_EV
@ -349,6 +349,41 @@ JANET_THREAD_LOCAL JanetProc **janet_vm_waiting_procs = NULL;
JANET_THREAD_LOCAL size_t janet_vm_proc_count = 0;
JANET_THREAD_LOCAL size_t janet_vm_proc_cap = 0;
/* Structure used to initialize the thread used to call wait
* on child processes */
typedef struct {
pid_t pid;
int write_pipe;
} JanetReaperInit;
static void *janet_thread_waiter(void *ptr) {
JanetReaperInit *init = (JanetReaperInit *)ptr;
pid_t pid = init->pid;
int fd = init->write_pipe;
free(init);
for (;;) {
int status = 0;
pid_t which = 0;
do {
which = waitpid(pid, &status, 0);
} while (which == -1 && errno == EINTR);
if (which < 0) {
/* Error, could not wait or no children to wait on. */
break;
} else {
JanetSelfPipeEvent ev;
ev.tag = JANET_SELFPIPE_PROC;
ev.as.proc.status = status;
ev.as.proc.pid = which;
if (write(fd, &ev, sizeof(ev)) < 0) {
/* TODO failed to handle signal. */
fprintf(stderr, "failed to write event\n");
}
}
}
return NULL;
}
/* Map pids to JanetProc to allow for lookup after a call to
* waitpid. */
static void janet_add_waiting_proc(JanetProc *proc) {
@ -362,6 +397,26 @@ static void janet_add_waiting_proc(JanetProc *proc) {
janet_vm_waiting_procs = newprocs;
janet_vm_proc_cap = newcap;
}
/* Set proccess group for tracking purposes */
pid_t pid = proc->pid;
JanetReaperInit *init = malloc(sizeof(JanetReaperInit));
if (NULL == init) {
JANET_OUT_OF_MEMORY;
}
init->pid = pid;
init->write_pipe = janet_vm_selfpipe[1];
pthread_attr_t attr;
pthread_t waiter_thread;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
int err = pthread_create(&waiter_thread, NULL, janet_thread_waiter, init);
pthread_attr_destroy(&attr);
if (err) {
janet_panicf("%s", strerror(err));
}
pthread_detach(waiter_thread);
janet_vm_waiting_procs[janet_vm_proc_count++] = proc;
janet_gcroot(janet_wrap_abstract(proc));
janet_ev_inc_refcount();

View File

@ -147,6 +147,26 @@ void janet_lib_ev(JanetTable *env);
void janet_ev_mark(void);
int janet_make_pipe(JanetHandle handles[2]);
void janet_schedule_pid(pid_t pid, int status);
/* Single message that is written to self pipe. This is used
* to communicate messages inside Janet to work with the event loop.
* Signals and threads will be the main users of this. */
typedef struct {
enum {
JANET_SELFPIPE_PROC
} tag;
union {
struct {
int status;
pid_t pid;
} proc;
} as;
} JanetSelfPipeEvent;
#ifndef JANET_WINDOWS
extern JANET_THREAD_LOCAL int janet_vm_selfpipe[2];
#endif
#endif
#endif