diff --git a/src/core/ev.c b/src/core/ev.c index bad6a932..94736642 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -48,6 +48,7 @@ #include #include #include +#include #ifdef JANET_EV_EPOLL #include #include @@ -147,6 +148,7 @@ JANET_THREAD_LOCAL JanetRNG janet_vm_ev_rng; JANET_THREAD_LOCAL JanetListenerState **janet_vm_listeners = NULL; JANET_THREAD_LOCAL size_t janet_vm_listener_count = 0; JANET_THREAD_LOCAL size_t janet_vm_listener_cap = 0; +JANET_THREAD_LOCAL size_t janet_vm_extra_listeners = 0; /* Get current timestamp (millisecond precision) */ static JanetTimestamp ts_now(void); @@ -491,6 +493,14 @@ void janet_addtimeout(double sec) { add_timeout(to); } +void janet_ev_inc_refcount(void) { + janet_vm_extra_listeners++; +} + +void janet_ev_dec_refcount(void) { + janet_vm_extra_listeners--; +} + /* Channels */ typedef struct { @@ -774,14 +784,16 @@ void janet_loop1(void) { } } } + /* Run scheduled fibers */ while (janet_vm_spawn.head != janet_vm_spawn.tail) { JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK}; janet_q_pop(&janet_vm_spawn, &task, sizeof(task)); run_one(task.fiber, task.value, task.sig); } + /* Poll for events */ - if (janet_vm_listener_count || janet_vm_tq_count) { + if (janet_vm_listener_count || janet_vm_tq_count || janet_vm_extra_listeners) { JanetTimeout to; memset(&to, 0, sizeof(to)); int has_timeout; @@ -790,18 +802,67 @@ void janet_loop1(void) { pop_timeout(0); } /* Run polling implementation only if pending timeouts or pending events */ - if (janet_vm_tq_count || janet_vm_listener_count) { + if (janet_vm_tq_count || janet_vm_listener_count || janet_vm_extra_listeners) { janet_loop1_impl(has_timeout, to.when); } } } void janet_loop(void) { - while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) { + while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count || janet_vm_extra_listeners) { janet_loop1(); } } +/* + * Signal handling code. + */ + +#ifdef JANET_WINDOWS + +#else + +JANET_THREAD_LOCAL int janet_vm_selfpipe[2]; + +static void janet_ev_handle_signals(void) { + int sig = 0; + while (read(janet_vm_selfpipe[0], &sig, sizeof(sig)) > 0) { + switch (sig) { + default: + break; + case SIGCHLD: + { + int status = 0; + pid_t pid = waitpid(-1, &status, WNOHANG | WUNTRACED); + /* invalid pid on failure will do no harm */ + janet_schedule_pid(pid, status); + } + break; + } + } +} + +static void janet_sig_handler(int sig) { + int result = write(janet_vm_selfpipe[1], &sig, sizeof(sig)); + if (result) { + /* Failed to handle signal. */ + ; + } + signal(sig, janet_sig_handler); +} + +static void janet_ev_setup_signals(void) { + if (-1 == pipe(janet_vm_selfpipe)) goto error; + if (fcntl(janet_vm_selfpipe[0], F_SETFL, O_NONBLOCK)) goto error; + if (fcntl(janet_vm_selfpipe[1], F_SETFL, O_NONBLOCK)) goto error; + signal(SIGCHLD, janet_sig_handler); + return; +error: + JANET_EXIT("failed to initialize self pipe in event loop"); +} + +#endif + #ifdef JANET_WINDOWS JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL; @@ -969,8 +1030,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { /* Step state machines */ for (int i = 0; i < ready; i++) { - JanetStream *stream = events[i].data.ptr; - if (NULL != stream) { /* If NULL, is a timeout */ + void *p = events[i].data.ptr; + if (&janet_vm_timerfd == p) { + /* Timer expired, ignore */; + } else if (janet_vm_selfpipe == p) { + /* Signal */ + janet_ev_handle_signals(); + } else { + JanetStream *stream = p; int mask = events[i].events; JanetListenerState *state = stream->state; state->event = events + i; @@ -1001,14 +1068,18 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_ev_init(void) { janet_ev_init_common(); + janet_ev_setup_signals(); janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC); janet_vm_timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); janet_vm_timer_enabled = 0; if (janet_vm_epoll == -1 || janet_vm_timerfd == -1) goto error; struct epoll_event ev; ev.events = EPOLLIN | EPOLLET; - ev.data.ptr = NULL; + ev.data.ptr = &janet_vm_timerfd; if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_timerfd, &ev)) goto error; + ev.events = EPOLLIN | EPOLLET; + ev.data.ptr = janet_vm_selfpipe; + if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_selfpipe[0], &ev)) goto error; return; error: JANET_EXIT("failed to initialize event loop"); @@ -1054,7 +1125,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); size_t newsize = janet_vm_listener_cap; if (newsize > oldsize) { - janet_vm_fds = realloc(janet_vm_fds, newsize * sizeof(struct pollfd)); + janet_vm_fds = realloc(janet_vm_fds, (newsize + 1) * sizeof(struct pollfd)); if (NULL == janet_vm_fds) { JANET_OUT_OF_MEMORY; } @@ -1063,12 +1134,12 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in ev.fd = stream->handle; ev.events = make_poll_events(state->stream->_mask); ev.revents = 0; - janet_vm_fds[state->_index] = ev; + janet_vm_fds[state->_index + 1] = ev; return state; } static void janet_unlisten(JanetListenerState *state) { - janet_vm_fds[state->_index] = janet_vm_fds[janet_vm_listener_count - 1]; + janet_vm_fds[state->_index + 1] = janet_vm_fds[janet_vm_listener_count]; janet_unlisten_impl(state); } @@ -1081,19 +1152,25 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { JanetTimestamp now = ts_now(); to = now > timeout ? 0 : (int)(timeout - now); } - ready = poll(janet_vm_fds, janet_vm_listener_count, to); + ready = poll(janet_vm_fds, janet_vm_listener_count + 1, to); } while (ready == -1 && errno == EINTR); if (ready == -1) { JANET_EXIT("failed to poll events"); } + /* Check selfpipe */ + if (janet_vm_fds[0].revents & POLLIN) { + janet_vm_fds[0].revents = 0; + janet_ev_handle_signals(); + } + /* Step state machines */ for (size_t i = 0; i < janet_vm_listener_count; i++) { - struct pollfd *pfd = janet_vm_fds + i; + struct pollfd *pfd = janet_vm_fds + i + 1; /* Skip fds where nothing interesting happened */ JanetListenerState *state = janet_vm_listeners[i]; /* Normal event */ - int mask = janet_vm_fds[i].revents; + int mask = pfd->revents; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE; @@ -1118,12 +1195,22 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_ev_init(void) { janet_ev_init_common(); janet_vm_fds = NULL; + janet_ev_setup_signals(); + janet_vm_fds = malloc(sizeof(struct pollfd)); + if (NULL == janet_vm_fds) { + JANET_OUT_OF_MEMORY; + } + janet_vm_fds[0].fd = janet_vm_selfpipe[0]; + janet_vm_fds[0].events = POLLIN; + janet_vm_fds[0].revents = 0; return; } void janet_ev_deinit(void) { janet_ev_deinit_common(); free(janet_vm_fds); + close(janet_vm_selfpipe[0]); + close(janet_vm_selfpipe[1]); janet_vm_fds = NULL; } @@ -1691,9 +1778,7 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) { return janet_wrap_fiber(fiber); } -static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { - janet_fixarity(argc, 1); - double sec = janet_getnumber(argv, 0); +JANET_NO_RETURN void janet_sleep_await(double sec) { JanetTimeout to; to.when = ts_delta(ts_now(), sec); to.fiber = janet_vm_root_fiber; @@ -1703,6 +1788,12 @@ static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { janet_await(); } +static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + double sec = janet_getnumber(argv, 0); + janet_sleep_await(sec); +} + static Janet cfun_ev_cancel(int32_t argc, Janet *argv) { janet_fixarity(argc, 2); JanetFiber *fiber = janet_getfiber(argv, 0); diff --git a/src/core/os.c b/src/core/os.c index 07697fc9..64539727 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -185,6 +185,7 @@ static Janet os_exit(int32_t argc, Janet *argv) { #ifndef JANET_REDUCED_OS #ifndef JANET_NO_PROCESSES + /* Get env for os_execute */ static char **os_execute_env(int32_t argc, const Janet *argv) { char **envp = NULL; @@ -319,6 +320,8 @@ static JanetBuffer *os_exec_escape(JanetView args) { static const JanetAbstractType ProcAT; #define JANET_PROC_CLOSED 1 #define JANET_PROC_WAITED 2 +#define JANET_PROC_WAITING 4 +#define JANET_PROC_ERROR_NONZERO 8 typedef struct { int flags; #ifdef JANET_WINDOWS @@ -332,6 +335,7 @@ typedef struct { JanetStream *in; JanetStream *out; JanetStream *err; + JanetFiber *fiber; #else JanetFile *in; JanetFile *out; @@ -339,9 +343,82 @@ typedef struct { #endif } JanetProc; +#ifdef JANET_EV + +JANET_THREAD_LOCAL JanetProc **janet_vm_waiting_procs = NULL; +JANET_THREAD_LOCAL size_t janet_vm_proc_count = 0; +JANET_THREAD_LOCAL size_t janet_vm_proc_cap = 0; + +/* Map pids to JanetProc to allow for lookup after a call to + * waitpid. */ +static void janet_add_waiting_proc(JanetProc *proc) { + if (janet_vm_proc_count == janet_vm_proc_cap) { + size_t newcap = (janet_vm_proc_count + 1) * 2; + if (newcap < 16) newcap = 16; + JanetProc **newprocs = realloc(janet_vm_waiting_procs, sizeof(JanetProc *) * newcap); + if (NULL == newprocs) { + JANET_OUT_OF_MEMORY; + } + janet_vm_waiting_procs = newprocs; + janet_vm_proc_cap = newcap; + } + janet_vm_waiting_procs[janet_vm_proc_count++] = proc; + janet_gcroot(janet_wrap_abstract(proc)); + janet_ev_inc_refcount(); +} + +static void janet_remove_waiting_proc(JanetProc *proc) { + for (size_t i = 0; i < janet_vm_proc_count; i++) { + if (janet_vm_waiting_procs[i] == proc) { + janet_vm_waiting_procs[i] = janet_vm_waiting_procs[--janet_vm_proc_count]; + janet_gcunroot(janet_wrap_abstract(proc)); + janet_ev_dec_refcount(); + return; + } + } +} + +static JanetProc *janet_lookup_proc(pid_t pid) { + for (size_t i = 0; i < janet_vm_proc_count; i++) { + if (janet_vm_waiting_procs[i]->pid == pid) { + return janet_vm_waiting_procs[i]; + } + } + return NULL; +} + +void janet_schedule_pid(pid_t pid, int status) { + /* Use POSIX shell semantics for interpreting signals */ + if (WIFEXITED(status)) { + status = WEXITSTATUS(status); + } else if (WIFSTOPPED(status)) { + status = WSTOPSIG(status) + 128; + } else { + status = WTERMSIG(status) + 128; + } + JanetProc *proc = janet_lookup_proc(pid); + if (NULL == proc) return; + proc->return_code = (int32_t) status; + proc->flags |= JANET_PROC_WAITED; + proc->flags &= ~JANET_PROC_WAITING; + if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) { + JanetString s = janet_formatc("command failed with non-zero exit code %d", status); + janet_cancel(proc->fiber, janet_wrap_string(s)); + } else { + janet_schedule(proc->fiber, janet_wrap_integer(status)); + } + janet_remove_waiting_proc(proc); +} +#endif + static int janet_proc_gc(void *p, size_t s) { (void) s; JanetProc *proc = (JanetProc *) p; +#ifdef JANET_EV + if (proc->flags & JANET_PROC_WAITING) { + janet_remove_waiting_proc(proc); + } +#endif #ifdef JANET_WINDOWS if (!(proc->flags & JANET_PROC_CLOSED)) { CloseHandle(proc->pHandle); @@ -364,13 +441,27 @@ static int janet_proc_mark(void *p, size_t s) { if (NULL != proc->in) janet_mark(janet_wrap_abstract(proc->in)); if (NULL != proc->out) janet_mark(janet_wrap_abstract(proc->out)); if (NULL != proc->err) janet_mark(janet_wrap_abstract(proc->err)); +#ifdef JANET_EV + if (NULL != proc->fiber) janet_mark(janet_wrap_fiber(proc->fiber)); +#endif return 0; } +#ifdef JANET_EV +JANET_NO_RETURN +#endif static Janet os_proc_wait_impl(JanetProc *proc) { - if (proc->flags & JANET_PROC_WAITED) { - janet_panicf("cannot wait on process that has already finished"); + if (proc->flags & (JANET_PROC_WAITED | JANET_PROC_WAITING)) { + janet_panicf("cannot wait twice on a process"); } +#ifdef JANET_EV + /* Event loop implementation */ + proc->fiber = janet_root_fiber(); + proc->flags |= JANET_PROC_WAITING; + janet_add_waiting_proc(proc); + janet_await(); +#else + /* Non evented implementation */ proc->flags |= JANET_PROC_WAITED; int status = 0; #ifdef JANET_WINDOWS @@ -386,6 +477,7 @@ static Janet os_proc_wait_impl(JanetProc *proc) { #endif proc->return_code = (int32_t) status; return janet_wrap_integer(proc->return_code); +#endif } static Janet os_proc_wait(int32_t argc, Janet *argv) { @@ -575,7 +667,7 @@ static JanetFile *get_stdio_for_handle(JanetHandle handle, void *orig, int iswri } #endif -static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) { +static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) { janet_arity(argc, 1, 3); /* Get flags */ @@ -713,7 +805,7 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) { tHandle = processInfo.hThread; /* Wait and cleanup immedaitely */ - if (!is_async) { + if (!is_spawn) { DWORD code; WaitForSingleObject(pHandle, INFINITE); GetExitCodeProcess(pHandle, &code); @@ -781,45 +873,42 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) { if (status) { os_execute_cleanup(envp, child_argv); janet_panicf("%p: %s", argv[0], strerror(errno)); - } else if (is_async) { + } else if (is_spawn) { /* Get process handle */ os_execute_cleanup(envp, child_argv); } else { /* Wait to complete */ - waitpid(pid, &status, 0); os_execute_cleanup(envp, child_argv); - /* Use POSIX shell semantics for interpreting signals */ - if (WIFEXITED(status)) { - status = WEXITSTATUS(status); - } else if (WIFSTOPPED(status)) { - status = WSTOPSIG(status) + 128; - } else { - status = WTERMSIG(status) + 128; - } } #endif - if (is_async) { - JanetProc *proc = janet_abstract(&ProcAT, sizeof(JanetProc)); - proc->return_code = -1; + JanetProc *proc = janet_abstract(&ProcAT, sizeof(JanetProc)); + proc->return_code = -1; #ifdef JANET_WINDOWS - proc->pHandle = pHandle; - proc->tHandle = tHandle; + proc->pHandle = pHandle; + proc->tHandle = tHandle; #else - proc->pid = pid; + proc->pid = pid; #endif - proc->in = get_stdio_for_handle(new_in, orig_in, 0); - proc->out = get_stdio_for_handle(new_out, orig_out, 1); - proc->err = get_stdio_for_handle(new_err, orig_err, 1); - proc->flags = 0; - if (proc->in == NULL || proc->out == NULL || proc->err == NULL) { - janet_panic("failed to construct proc"); - } + proc->in = get_stdio_for_handle(new_in, orig_in, 0); + proc->out = get_stdio_for_handle(new_out, orig_out, 1); + proc->err = get_stdio_for_handle(new_err, orig_err, 1); + proc->flags = 0; + if (proc->in == NULL || proc->out == NULL || proc->err == NULL) { + janet_panic("failed to construct proc"); + } + if (janet_flag_at(flags, 2)) { + proc->flags |= JANET_PROC_ERROR_NONZERO; + } + + if (is_spawn) { return janet_wrap_abstract(proc); - } else if (janet_flag_at(flags, 2) && status) { - janet_panicf("command failed with non-zero exit code %d", status); } else { - return janet_wrap_integer(status); +#ifdef JANET_EV + os_proc_wait_impl(proc); +#else + return os_proc_wait_impl(proc); +#endif } } @@ -2069,6 +2158,17 @@ static const JanetReg os_cfuns[] = { {NULL, NULL, NULL} }; +void janet_os_deinit(void) { +#ifndef JANET_NO_PROCESSES +#ifdef JANET_EV + free(janet_vm_waiting_procs); + janet_vm_waiting_procs = NULL; + janet_vm_proc_count = 0; + janet_vm_proc_cap = 0; +#endif +#endif +} + /* Module entry point */ void janet_lib_os(JanetTable *env) { #if !defined(JANET_REDUCED_OS) && defined(JANET_WINDOWS) && defined(JANET_THREADS) @@ -2078,6 +2178,13 @@ void janet_lib_os(JanetTable *env) { InitializeCriticalSection(&env_lock); env_lock_initialized = 1; } +#endif +#ifndef JANET_NO_PROCESSES +#ifdef JANET_EV + janet_vm_waiting_procs = NULL; + janet_vm_proc_count = 0; + janet_vm_proc_cap = 0; +#endif #endif janet_core_cfuns(env, NULL, os_cfuns); } diff --git a/src/core/state.h b/src/core/state.h index a36f5e38..0d1ce450 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -111,4 +111,6 @@ void janet_ev_init(void); void janet_ev_deinit(void); #endif +void janet_os_deinit(void); + #endif /* JANET_STATE_H_defined */ diff --git a/src/core/util.h b/src/core/util.h index c3f2fbfd..b24fc8ce 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -146,6 +146,7 @@ extern const JanetAbstractType janet_address_type; void janet_lib_ev(JanetTable *env); void janet_ev_mark(void); int janet_make_pipe(JanetHandle handles[2]); +void janet_schedule_pid(pid_t pid, int status); #endif #endif diff --git a/src/core/vm.c b/src/core/vm.c index 5f61a18b..a99a6625 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -1478,7 +1478,6 @@ int janet_init(void) { janet_vm_fiber = NULL; janet_vm_root_fiber = NULL; janet_vm_stackn = 0; - /* Threads */ #ifdef JANET_THREADS janet_threads_init(); #endif @@ -1506,6 +1505,7 @@ void janet_deinit(void) { free(janet_vm_traversal_base); janet_vm_fiber = NULL; janet_vm_root_fiber = NULL; + janet_os_deinit(); #ifdef JANET_THREADS janet_threads_deinit(); #endif diff --git a/src/include/janet.h b/src/include/janet.h index 3d4c8f21..02ca54c7 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1279,10 +1279,13 @@ JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener be /* Shorthand for yielding to event loop in C */ JANET_NO_RETURN JANET_API void janet_await(void); +JANET_NO_RETURN JANET_API void janet_sleep_await(double sec); /* For use inside listeners - adds a timeout to the current fiber, such that * it will be resumed after sec seconds if no other event schedules the current fiber. */ JANET_API void janet_addtimeout(double sec); +JANET_API void janet_ev_inc_refcount(void); +JANET_API void janet_ev_dec_refcount(void); /* Get last error from a an IO operation */ JANET_API Janet janet_ev_lasterr(void); diff --git a/test/suite0009.janet b/test/suite0009.janet index d9afc1e6..4262a914 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -40,7 +40,7 @@ # or else the first read can fail. Might be a strange windows # "bug", but needs further investigating. Otherwise, `build_win test` # can sometimes fail on windows, leading to flaky testing. -(ev/sleep 0.2) +(ev/sleep 0.3) (defn test-echo [msg] (with [conn (net/connect "127.0.0.1" "8000")] @@ -59,6 +59,7 @@ (var pipe-counter 0) (def chan (ev/chan 10)) (let [[reader writer] (os/pipe)] + (ev/sleep 0.3) (ev/spawn (while (ev/read reader 3) (++ pipe-counter))