From cbdea8f331092afff18b1ecb44eeed393ad3779a Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 29 Nov 2020 15:36:21 -0600 Subject: [PATCH 01/10] Make os/execute cooperate with ev module. os/execute, os/proc-wait do not block (currently posix only). This uses the self-pipe trick to turn signals into a pollable entity. --- src/core/ev.c | 121 +++++++++++++++++++++++++++---- src/core/os.c | 167 +++++++++++++++++++++++++++++++++++-------- src/core/state.h | 2 + src/core/util.h | 1 + src/core/vm.c | 2 +- src/include/janet.h | 3 + test/suite0009.janet | 3 +- 7 files changed, 252 insertions(+), 47 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index bad6a932..94736642 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -48,6 +48,7 @@ #include #include #include +#include #ifdef JANET_EV_EPOLL #include #include @@ -147,6 +148,7 @@ JANET_THREAD_LOCAL JanetRNG janet_vm_ev_rng; JANET_THREAD_LOCAL JanetListenerState **janet_vm_listeners = NULL; JANET_THREAD_LOCAL size_t janet_vm_listener_count = 0; JANET_THREAD_LOCAL size_t janet_vm_listener_cap = 0; +JANET_THREAD_LOCAL size_t janet_vm_extra_listeners = 0; /* Get current timestamp (millisecond precision) */ static JanetTimestamp ts_now(void); @@ -491,6 +493,14 @@ void janet_addtimeout(double sec) { add_timeout(to); } +void janet_ev_inc_refcount(void) { + janet_vm_extra_listeners++; +} + +void janet_ev_dec_refcount(void) { + janet_vm_extra_listeners--; +} + /* Channels */ typedef struct { @@ -774,14 +784,16 @@ void janet_loop1(void) { } } } + /* Run scheduled fibers */ while (janet_vm_spawn.head != janet_vm_spawn.tail) { JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK}; janet_q_pop(&janet_vm_spawn, &task, sizeof(task)); run_one(task.fiber, task.value, task.sig); } + /* Poll for events */ - if (janet_vm_listener_count || janet_vm_tq_count) { + if (janet_vm_listener_count || janet_vm_tq_count || janet_vm_extra_listeners) { JanetTimeout to; memset(&to, 0, sizeof(to)); int has_timeout; @@ -790,18 +802,67 @@ void janet_loop1(void) { pop_timeout(0); } /* Run polling implementation only if pending timeouts or pending events */ - if (janet_vm_tq_count || janet_vm_listener_count) { + if (janet_vm_tq_count || janet_vm_listener_count || janet_vm_extra_listeners) { janet_loop1_impl(has_timeout, to.when); } } } void janet_loop(void) { - while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) { + while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count || janet_vm_extra_listeners) { janet_loop1(); } } +/* + * Signal handling code. + */ + +#ifdef JANET_WINDOWS + +#else + +JANET_THREAD_LOCAL int janet_vm_selfpipe[2]; + +static void janet_ev_handle_signals(void) { + int sig = 0; + while (read(janet_vm_selfpipe[0], &sig, sizeof(sig)) > 0) { + switch (sig) { + default: + break; + case SIGCHLD: + { + int status = 0; + pid_t pid = waitpid(-1, &status, WNOHANG | WUNTRACED); + /* invalid pid on failure will do no harm */ + janet_schedule_pid(pid, status); + } + break; + } + } +} + +static void janet_sig_handler(int sig) { + int result = write(janet_vm_selfpipe[1], &sig, sizeof(sig)); + if (result) { + /* Failed to handle signal. */ + ; + } + signal(sig, janet_sig_handler); +} + +static void janet_ev_setup_signals(void) { + if (-1 == pipe(janet_vm_selfpipe)) goto error; + if (fcntl(janet_vm_selfpipe[0], F_SETFL, O_NONBLOCK)) goto error; + if (fcntl(janet_vm_selfpipe[1], F_SETFL, O_NONBLOCK)) goto error; + signal(SIGCHLD, janet_sig_handler); + return; +error: + JANET_EXIT("failed to initialize self pipe in event loop"); +} + +#endif + #ifdef JANET_WINDOWS JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL; @@ -969,8 +1030,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { /* Step state machines */ for (int i = 0; i < ready; i++) { - JanetStream *stream = events[i].data.ptr; - if (NULL != stream) { /* If NULL, is a timeout */ + void *p = events[i].data.ptr; + if (&janet_vm_timerfd == p) { + /* Timer expired, ignore */; + } else if (janet_vm_selfpipe == p) { + /* Signal */ + janet_ev_handle_signals(); + } else { + JanetStream *stream = p; int mask = events[i].events; JanetListenerState *state = stream->state; state->event = events + i; @@ -1001,14 +1068,18 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_ev_init(void) { janet_ev_init_common(); + janet_ev_setup_signals(); janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC); janet_vm_timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); janet_vm_timer_enabled = 0; if (janet_vm_epoll == -1 || janet_vm_timerfd == -1) goto error; struct epoll_event ev; ev.events = EPOLLIN | EPOLLET; - ev.data.ptr = NULL; + ev.data.ptr = &janet_vm_timerfd; if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_timerfd, &ev)) goto error; + ev.events = EPOLLIN | EPOLLET; + ev.data.ptr = janet_vm_selfpipe; + if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_selfpipe[0], &ev)) goto error; return; error: JANET_EXIT("failed to initialize event loop"); @@ -1054,7 +1125,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); size_t newsize = janet_vm_listener_cap; if (newsize > oldsize) { - janet_vm_fds = realloc(janet_vm_fds, newsize * sizeof(struct pollfd)); + janet_vm_fds = realloc(janet_vm_fds, (newsize + 1) * sizeof(struct pollfd)); if (NULL == janet_vm_fds) { JANET_OUT_OF_MEMORY; } @@ -1063,12 +1134,12 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in ev.fd = stream->handle; ev.events = make_poll_events(state->stream->_mask); ev.revents = 0; - janet_vm_fds[state->_index] = ev; + janet_vm_fds[state->_index + 1] = ev; return state; } static void janet_unlisten(JanetListenerState *state) { - janet_vm_fds[state->_index] = janet_vm_fds[janet_vm_listener_count - 1]; + janet_vm_fds[state->_index + 1] = janet_vm_fds[janet_vm_listener_count]; janet_unlisten_impl(state); } @@ -1081,19 +1152,25 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { JanetTimestamp now = ts_now(); to = now > timeout ? 0 : (int)(timeout - now); } - ready = poll(janet_vm_fds, janet_vm_listener_count, to); + ready = poll(janet_vm_fds, janet_vm_listener_count + 1, to); } while (ready == -1 && errno == EINTR); if (ready == -1) { JANET_EXIT("failed to poll events"); } + /* Check selfpipe */ + if (janet_vm_fds[0].revents & POLLIN) { + janet_vm_fds[0].revents = 0; + janet_ev_handle_signals(); + } + /* Step state machines */ for (size_t i = 0; i < janet_vm_listener_count; i++) { - struct pollfd *pfd = janet_vm_fds + i; + struct pollfd *pfd = janet_vm_fds + i + 1; /* Skip fds where nothing interesting happened */ JanetListenerState *state = janet_vm_listeners[i]; /* Normal event */ - int mask = janet_vm_fds[i].revents; + int mask = pfd->revents; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE; @@ -1118,12 +1195,22 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_ev_init(void) { janet_ev_init_common(); janet_vm_fds = NULL; + janet_ev_setup_signals(); + janet_vm_fds = malloc(sizeof(struct pollfd)); + if (NULL == janet_vm_fds) { + JANET_OUT_OF_MEMORY; + } + janet_vm_fds[0].fd = janet_vm_selfpipe[0]; + janet_vm_fds[0].events = POLLIN; + janet_vm_fds[0].revents = 0; return; } void janet_ev_deinit(void) { janet_ev_deinit_common(); free(janet_vm_fds); + close(janet_vm_selfpipe[0]); + close(janet_vm_selfpipe[1]); janet_vm_fds = NULL; } @@ -1691,9 +1778,7 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) { return janet_wrap_fiber(fiber); } -static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { - janet_fixarity(argc, 1); - double sec = janet_getnumber(argv, 0); +JANET_NO_RETURN void janet_sleep_await(double sec) { JanetTimeout to; to.when = ts_delta(ts_now(), sec); to.fiber = janet_vm_root_fiber; @@ -1703,6 +1788,12 @@ static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { janet_await(); } +static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + double sec = janet_getnumber(argv, 0); + janet_sleep_await(sec); +} + static Janet cfun_ev_cancel(int32_t argc, Janet *argv) { janet_fixarity(argc, 2); JanetFiber *fiber = janet_getfiber(argv, 0); diff --git a/src/core/os.c b/src/core/os.c index 07697fc9..64539727 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -185,6 +185,7 @@ static Janet os_exit(int32_t argc, Janet *argv) { #ifndef JANET_REDUCED_OS #ifndef JANET_NO_PROCESSES + /* Get env for os_execute */ static char **os_execute_env(int32_t argc, const Janet *argv) { char **envp = NULL; @@ -319,6 +320,8 @@ static JanetBuffer *os_exec_escape(JanetView args) { static const JanetAbstractType ProcAT; #define JANET_PROC_CLOSED 1 #define JANET_PROC_WAITED 2 +#define JANET_PROC_WAITING 4 +#define JANET_PROC_ERROR_NONZERO 8 typedef struct { int flags; #ifdef JANET_WINDOWS @@ -332,6 +335,7 @@ typedef struct { JanetStream *in; JanetStream *out; JanetStream *err; + JanetFiber *fiber; #else JanetFile *in; JanetFile *out; @@ -339,9 +343,82 @@ typedef struct { #endif } JanetProc; +#ifdef JANET_EV + +JANET_THREAD_LOCAL JanetProc **janet_vm_waiting_procs = NULL; +JANET_THREAD_LOCAL size_t janet_vm_proc_count = 0; +JANET_THREAD_LOCAL size_t janet_vm_proc_cap = 0; + +/* Map pids to JanetProc to allow for lookup after a call to + * waitpid. */ +static void janet_add_waiting_proc(JanetProc *proc) { + if (janet_vm_proc_count == janet_vm_proc_cap) { + size_t newcap = (janet_vm_proc_count + 1) * 2; + if (newcap < 16) newcap = 16; + JanetProc **newprocs = realloc(janet_vm_waiting_procs, sizeof(JanetProc *) * newcap); + if (NULL == newprocs) { + JANET_OUT_OF_MEMORY; + } + janet_vm_waiting_procs = newprocs; + janet_vm_proc_cap = newcap; + } + janet_vm_waiting_procs[janet_vm_proc_count++] = proc; + janet_gcroot(janet_wrap_abstract(proc)); + janet_ev_inc_refcount(); +} + +static void janet_remove_waiting_proc(JanetProc *proc) { + for (size_t i = 0; i < janet_vm_proc_count; i++) { + if (janet_vm_waiting_procs[i] == proc) { + janet_vm_waiting_procs[i] = janet_vm_waiting_procs[--janet_vm_proc_count]; + janet_gcunroot(janet_wrap_abstract(proc)); + janet_ev_dec_refcount(); + return; + } + } +} + +static JanetProc *janet_lookup_proc(pid_t pid) { + for (size_t i = 0; i < janet_vm_proc_count; i++) { + if (janet_vm_waiting_procs[i]->pid == pid) { + return janet_vm_waiting_procs[i]; + } + } + return NULL; +} + +void janet_schedule_pid(pid_t pid, int status) { + /* Use POSIX shell semantics for interpreting signals */ + if (WIFEXITED(status)) { + status = WEXITSTATUS(status); + } else if (WIFSTOPPED(status)) { + status = WSTOPSIG(status) + 128; + } else { + status = WTERMSIG(status) + 128; + } + JanetProc *proc = janet_lookup_proc(pid); + if (NULL == proc) return; + proc->return_code = (int32_t) status; + proc->flags |= JANET_PROC_WAITED; + proc->flags &= ~JANET_PROC_WAITING; + if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) { + JanetString s = janet_formatc("command failed with non-zero exit code %d", status); + janet_cancel(proc->fiber, janet_wrap_string(s)); + } else { + janet_schedule(proc->fiber, janet_wrap_integer(status)); + } + janet_remove_waiting_proc(proc); +} +#endif + static int janet_proc_gc(void *p, size_t s) { (void) s; JanetProc *proc = (JanetProc *) p; +#ifdef JANET_EV + if (proc->flags & JANET_PROC_WAITING) { + janet_remove_waiting_proc(proc); + } +#endif #ifdef JANET_WINDOWS if (!(proc->flags & JANET_PROC_CLOSED)) { CloseHandle(proc->pHandle); @@ -364,13 +441,27 @@ static int janet_proc_mark(void *p, size_t s) { if (NULL != proc->in) janet_mark(janet_wrap_abstract(proc->in)); if (NULL != proc->out) janet_mark(janet_wrap_abstract(proc->out)); if (NULL != proc->err) janet_mark(janet_wrap_abstract(proc->err)); +#ifdef JANET_EV + if (NULL != proc->fiber) janet_mark(janet_wrap_fiber(proc->fiber)); +#endif return 0; } +#ifdef JANET_EV +JANET_NO_RETURN +#endif static Janet os_proc_wait_impl(JanetProc *proc) { - if (proc->flags & JANET_PROC_WAITED) { - janet_panicf("cannot wait on process that has already finished"); + if (proc->flags & (JANET_PROC_WAITED | JANET_PROC_WAITING)) { + janet_panicf("cannot wait twice on a process"); } +#ifdef JANET_EV + /* Event loop implementation */ + proc->fiber = janet_root_fiber(); + proc->flags |= JANET_PROC_WAITING; + janet_add_waiting_proc(proc); + janet_await(); +#else + /* Non evented implementation */ proc->flags |= JANET_PROC_WAITED; int status = 0; #ifdef JANET_WINDOWS @@ -386,6 +477,7 @@ static Janet os_proc_wait_impl(JanetProc *proc) { #endif proc->return_code = (int32_t) status; return janet_wrap_integer(proc->return_code); +#endif } static Janet os_proc_wait(int32_t argc, Janet *argv) { @@ -575,7 +667,7 @@ static JanetFile *get_stdio_for_handle(JanetHandle handle, void *orig, int iswri } #endif -static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) { +static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) { janet_arity(argc, 1, 3); /* Get flags */ @@ -713,7 +805,7 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) { tHandle = processInfo.hThread; /* Wait and cleanup immedaitely */ - if (!is_async) { + if (!is_spawn) { DWORD code; WaitForSingleObject(pHandle, INFINITE); GetExitCodeProcess(pHandle, &code); @@ -781,45 +873,42 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) { if (status) { os_execute_cleanup(envp, child_argv); janet_panicf("%p: %s", argv[0], strerror(errno)); - } else if (is_async) { + } else if (is_spawn) { /* Get process handle */ os_execute_cleanup(envp, child_argv); } else { /* Wait to complete */ - waitpid(pid, &status, 0); os_execute_cleanup(envp, child_argv); - /* Use POSIX shell semantics for interpreting signals */ - if (WIFEXITED(status)) { - status = WEXITSTATUS(status); - } else if (WIFSTOPPED(status)) { - status = WSTOPSIG(status) + 128; - } else { - status = WTERMSIG(status) + 128; - } } #endif - if (is_async) { - JanetProc *proc = janet_abstract(&ProcAT, sizeof(JanetProc)); - proc->return_code = -1; + JanetProc *proc = janet_abstract(&ProcAT, sizeof(JanetProc)); + proc->return_code = -1; #ifdef JANET_WINDOWS - proc->pHandle = pHandle; - proc->tHandle = tHandle; + proc->pHandle = pHandle; + proc->tHandle = tHandle; #else - proc->pid = pid; + proc->pid = pid; #endif - proc->in = get_stdio_for_handle(new_in, orig_in, 0); - proc->out = get_stdio_for_handle(new_out, orig_out, 1); - proc->err = get_stdio_for_handle(new_err, orig_err, 1); - proc->flags = 0; - if (proc->in == NULL || proc->out == NULL || proc->err == NULL) { - janet_panic("failed to construct proc"); - } + proc->in = get_stdio_for_handle(new_in, orig_in, 0); + proc->out = get_stdio_for_handle(new_out, orig_out, 1); + proc->err = get_stdio_for_handle(new_err, orig_err, 1); + proc->flags = 0; + if (proc->in == NULL || proc->out == NULL || proc->err == NULL) { + janet_panic("failed to construct proc"); + } + if (janet_flag_at(flags, 2)) { + proc->flags |= JANET_PROC_ERROR_NONZERO; + } + + if (is_spawn) { return janet_wrap_abstract(proc); - } else if (janet_flag_at(flags, 2) && status) { - janet_panicf("command failed with non-zero exit code %d", status); } else { - return janet_wrap_integer(status); +#ifdef JANET_EV + os_proc_wait_impl(proc); +#else + return os_proc_wait_impl(proc); +#endif } } @@ -2069,6 +2158,17 @@ static const JanetReg os_cfuns[] = { {NULL, NULL, NULL} }; +void janet_os_deinit(void) { +#ifndef JANET_NO_PROCESSES +#ifdef JANET_EV + free(janet_vm_waiting_procs); + janet_vm_waiting_procs = NULL; + janet_vm_proc_count = 0; + janet_vm_proc_cap = 0; +#endif +#endif +} + /* Module entry point */ void janet_lib_os(JanetTable *env) { #if !defined(JANET_REDUCED_OS) && defined(JANET_WINDOWS) && defined(JANET_THREADS) @@ -2078,6 +2178,13 @@ void janet_lib_os(JanetTable *env) { InitializeCriticalSection(&env_lock); env_lock_initialized = 1; } +#endif +#ifndef JANET_NO_PROCESSES +#ifdef JANET_EV + janet_vm_waiting_procs = NULL; + janet_vm_proc_count = 0; + janet_vm_proc_cap = 0; +#endif #endif janet_core_cfuns(env, NULL, os_cfuns); } diff --git a/src/core/state.h b/src/core/state.h index a36f5e38..0d1ce450 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -111,4 +111,6 @@ void janet_ev_init(void); void janet_ev_deinit(void); #endif +void janet_os_deinit(void); + #endif /* JANET_STATE_H_defined */ diff --git a/src/core/util.h b/src/core/util.h index c3f2fbfd..b24fc8ce 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -146,6 +146,7 @@ extern const JanetAbstractType janet_address_type; void janet_lib_ev(JanetTable *env); void janet_ev_mark(void); int janet_make_pipe(JanetHandle handles[2]); +void janet_schedule_pid(pid_t pid, int status); #endif #endif diff --git a/src/core/vm.c b/src/core/vm.c index 5f61a18b..a99a6625 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -1478,7 +1478,6 @@ int janet_init(void) { janet_vm_fiber = NULL; janet_vm_root_fiber = NULL; janet_vm_stackn = 0; - /* Threads */ #ifdef JANET_THREADS janet_threads_init(); #endif @@ -1506,6 +1505,7 @@ void janet_deinit(void) { free(janet_vm_traversal_base); janet_vm_fiber = NULL; janet_vm_root_fiber = NULL; + janet_os_deinit(); #ifdef JANET_THREADS janet_threads_deinit(); #endif diff --git a/src/include/janet.h b/src/include/janet.h index 3d4c8f21..02ca54c7 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1279,10 +1279,13 @@ JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener be /* Shorthand for yielding to event loop in C */ JANET_NO_RETURN JANET_API void janet_await(void); +JANET_NO_RETURN JANET_API void janet_sleep_await(double sec); /* For use inside listeners - adds a timeout to the current fiber, such that * it will be resumed after sec seconds if no other event schedules the current fiber. */ JANET_API void janet_addtimeout(double sec); +JANET_API void janet_ev_inc_refcount(void); +JANET_API void janet_ev_dec_refcount(void); /* Get last error from a an IO operation */ JANET_API Janet janet_ev_lasterr(void); diff --git a/test/suite0009.janet b/test/suite0009.janet index d9afc1e6..4262a914 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -40,7 +40,7 @@ # or else the first read can fail. Might be a strange windows # "bug", but needs further investigating. Otherwise, `build_win test` # can sometimes fail on windows, leading to flaky testing. -(ev/sleep 0.2) +(ev/sleep 0.3) (defn test-echo [msg] (with [conn (net/connect "127.0.0.1" "8000")] @@ -59,6 +59,7 @@ (var pipe-counter 0) (def chan (ev/chan 10)) (let [[reader writer] (os/pipe)] + (ev/sleep 0.3) (ev/spawn (while (ev/read reader 3) (++ pipe-counter)) From d457aa5951d739ab16c2d871dcdffd18c003249a Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Wed, 30 Dec 2020 10:22:45 -0600 Subject: [PATCH 02/10] Deprecate file/popen. os/spawn is the prefered way of creating a subprocess and communicating with it. --- CHANGELOG.md | 1 + src/core/io.c | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d42e893..9c33e830 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ All notable changes to this project will be documented in this file. ## Unreleased - ??? +- Deprecate `file/popen` in favor of `os/spawn`. - Add `:all` keyword to `ev/read` and `net/read` to make them more like `file/read`. However, we do not provide any `:line` option as that requires buffering. - Change repl behavior to make Ctrl-C raise SIGINT on posix. The old behavior for Ctrl-C, diff --git a/src/core/io.c b/src/core/io.c index 47376290..8bdb3202 100644 --- a/src/core/io.c +++ b/src/core/io.c @@ -777,7 +777,7 @@ static const JanetReg io_cfuns[] = { #ifndef JANET_NO_PROCESSES { "file/popen", cfun_io_popen, - JDOC("(file/popen command &opt mode)\n\n" + JDOC("(file/popen command &opt mode) (DEPRECATED for os/spawn)\n\n" "Open a file that is backed by a process. The file must be opened in either " "the :r (read) or the :w (write) mode. In :r mode, the stdout of the " "process can be read from the file. In :w mode, the stdin of the process " From c831ecf5d26fbe2f7fa222cb1c284683b91cda8e Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 31 Dec 2020 11:22:18 -0600 Subject: [PATCH 03/10] Working implementation of process waiting with threads. Does not require all sorts of signal handling code that is not thread-safe and can "steal processes". However, there is a much simpler way to add this functionality by creating a new stream and thread for each subprocess when it is waited on. This is perhaps _slightly_ less efficient but oh so much simpler, since we can reuse all of our concepts from streams and there is no need to implement a whole system around the selfpipe. --- src/core/ev.c | 51 ++++++++++++++++++------------------------- src/core/os.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++- src/core/util.h | 20 +++++++++++++++++ 3 files changed, 97 insertions(+), 31 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index d6fe845a..3fc8c89e 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -880,7 +880,7 @@ void janet_loop(void) { } /* - * Signal handling code. + * Self-pipe handling code. */ #ifdef JANET_WINDOWS @@ -889,43 +889,34 @@ void janet_loop(void) { JANET_THREAD_LOCAL int janet_vm_selfpipe[2]; -static void janet_ev_handle_signals(void) { - int sig = 0; - while (read(janet_vm_selfpipe[0], &sig, sizeof(sig)) > 0) { - switch (sig) { +/* Handle events from the self pipe inside the event loop */ +static void janet_ev_handle_selfpipe(void) { + JanetSelfPipeEvent ev; + while (read(janet_vm_selfpipe[0], &ev, sizeof(ev)) > 0) { + switch (ev.tag) { default: break; - case SIGCHLD: - { - int status = 0; - pid_t pid = waitpid(-1, &status, WNOHANG | WUNTRACED); - /* invalid pid on failure will do no harm */ - janet_schedule_pid(pid, status); - } + case JANET_SELFPIPE_PROC: + janet_schedule_pid(ev.as.proc.pid, ev.as.proc.status); break; } } } -static void janet_sig_handler(int sig) { - int result = write(janet_vm_selfpipe[1], &sig, sizeof(sig)); - if (result) { - /* Failed to handle signal. */ - ; - } - signal(sig, janet_sig_handler); -} - -static void janet_ev_setup_signals(void) { +static void janet_ev_setup_selfpipe(void) { if (-1 == pipe(janet_vm_selfpipe)) goto error; if (fcntl(janet_vm_selfpipe[0], F_SETFL, O_NONBLOCK)) goto error; if (fcntl(janet_vm_selfpipe[1], F_SETFL, O_NONBLOCK)) goto error; - signal(SIGCHLD, janet_sig_handler); return; error: JANET_EXIT("failed to initialize self pipe in event loop"); } +static void janet_ev_cleanup_selfpipe(void) { + close(janet_vm_selfpipe[0]); + close(janet_vm_selfpipe[1]); +} + #endif #ifdef JANET_WINDOWS @@ -1099,8 +1090,8 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { if (&janet_vm_timerfd == p) { /* Timer expired, ignore */; } else if (janet_vm_selfpipe == p) { - /* Signal */ - janet_ev_handle_signals(); + /* Self-pipe handling */ + janet_ev_handle_selfpipe(); } else { JanetStream *stream = p; int mask = events[i].events; @@ -1133,7 +1124,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_ev_init(void) { janet_ev_init_common(); - janet_ev_setup_signals(); + janet_ev_setup_selfpipe(); janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC); janet_vm_timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); janet_vm_timer_enabled = 0; @@ -1154,6 +1145,7 @@ void janet_ev_deinit(void) { janet_ev_deinit_common(); close(janet_vm_epoll); close(janet_vm_timerfd); + janet_ev_cleanup_selfpipe(); janet_vm_epoll = 0; } @@ -1226,7 +1218,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { /* Check selfpipe */ if (janet_vm_fds[0].revents & POLLIN) { janet_vm_fds[0].revents = 0; - janet_ev_handle_signals(); + janet_ev_handle_selfpipe(); } /* Step state machines */ @@ -1260,7 +1252,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_ev_init(void) { janet_ev_init_common(); janet_vm_fds = NULL; - janet_ev_setup_signals(); + janet_ev_setup_selfpipe(); janet_vm_fds = malloc(sizeof(struct pollfd)); if (NULL == janet_vm_fds) { JANET_OUT_OF_MEMORY; @@ -1273,9 +1265,8 @@ void janet_ev_init(void) { void janet_ev_deinit(void) { janet_ev_deinit_common(); + janet_ev_cleanup_selfpipe(); free(janet_vm_fds); - close(janet_vm_selfpipe[0]); - close(janet_vm_selfpipe[1]); janet_vm_fds = NULL; } diff --git a/src/core/os.c b/src/core/os.c index 1f996732..4e0d22fb 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -328,7 +328,7 @@ typedef struct { HANDLE pHandle; HANDLE tHandle; #else - int pid; + pid_t pid; #endif int return_code; #ifdef JANET_EV @@ -349,6 +349,41 @@ JANET_THREAD_LOCAL JanetProc **janet_vm_waiting_procs = NULL; JANET_THREAD_LOCAL size_t janet_vm_proc_count = 0; JANET_THREAD_LOCAL size_t janet_vm_proc_cap = 0; +/* Structure used to initialize the thread used to call wait + * on child processes */ +typedef struct { + pid_t pid; + int write_pipe; +} JanetReaperInit; + +static void *janet_thread_waiter(void *ptr) { + JanetReaperInit *init = (JanetReaperInit *)ptr; + pid_t pid = init->pid; + int fd = init->write_pipe; + free(init); + for (;;) { + int status = 0; + pid_t which = 0; + do { + which = waitpid(pid, &status, 0); + } while (which == -1 && errno == EINTR); + if (which < 0) { + /* Error, could not wait or no children to wait on. */ + break; + } else { + JanetSelfPipeEvent ev; + ev.tag = JANET_SELFPIPE_PROC; + ev.as.proc.status = status; + ev.as.proc.pid = which; + if (write(fd, &ev, sizeof(ev)) < 0) { + /* TODO failed to handle signal. */ + fprintf(stderr, "failed to write event\n"); + } + } + } + return NULL; +} + /* Map pids to JanetProc to allow for lookup after a call to * waitpid. */ static void janet_add_waiting_proc(JanetProc *proc) { @@ -362,6 +397,26 @@ static void janet_add_waiting_proc(JanetProc *proc) { janet_vm_waiting_procs = newprocs; janet_vm_proc_cap = newcap; } + + /* Set proccess group for tracking purposes */ + pid_t pid = proc->pid; + JanetReaperInit *init = malloc(sizeof(JanetReaperInit)); + if (NULL == init) { + JANET_OUT_OF_MEMORY; + } + init->pid = pid; + init->write_pipe = janet_vm_selfpipe[1]; + pthread_attr_t attr; + pthread_t waiter_thread; + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); + int err = pthread_create(&waiter_thread, NULL, janet_thread_waiter, init); + pthread_attr_destroy(&attr); + if (err) { + janet_panicf("%s", strerror(err)); + } + pthread_detach(waiter_thread); + janet_vm_waiting_procs[janet_vm_proc_count++] = proc; janet_gcroot(janet_wrap_abstract(proc)); janet_ev_inc_refcount(); diff --git a/src/core/util.h b/src/core/util.h index b24fc8ce..24be73b4 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -147,6 +147,26 @@ void janet_lib_ev(JanetTable *env); void janet_ev_mark(void); int janet_make_pipe(JanetHandle handles[2]); void janet_schedule_pid(pid_t pid, int status); + +/* Single message that is written to self pipe. This is used + * to communicate messages inside Janet to work with the event loop. + * Signals and threads will be the main users of this. */ +typedef struct { + enum { + JANET_SELFPIPE_PROC + } tag; + union { + struct { + int status; + pid_t pid; + } proc; + } as; +} JanetSelfPipeEvent; + +#ifndef JANET_WINDOWS +extern JANET_THREAD_LOCAL int janet_vm_selfpipe[2]; +#endif + #endif #endif From 788f91a36ff32db533a8415376a54c15e618287a Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 31 Dec 2020 11:52:12 -0600 Subject: [PATCH 04/10] Remove unneeded book keeping for sub processes. Since we are not using signals we no longer need some bookkeeping. --- examples/async-execute.janet | 22 ++++++++++ src/core/ev.c | 2 +- src/core/os.c | 78 ++++++------------------------------ src/core/util.h | 4 +- 4 files changed, 37 insertions(+), 69 deletions(-) create mode 100644 examples/async-execute.janet diff --git a/examples/async-execute.janet b/examples/async-execute.janet new file mode 100644 index 00000000..662ba0df --- /dev/null +++ b/examples/async-execute.janet @@ -0,0 +1,22 @@ +(defn dowork [name n] + (print name " starting work...") + (os/execute ["sleep" (string n)] :p) + (print name " finished work!")) + +# Will be done in parallel +(print "starting group A") +(ev/call dowork "A 2" 2) +(ev/call dowork "A 1" 1) +(ev/call dowork "A 3" 3) + +(ev/sleep 4) + +# Will also be done in parallel +(print "starting group B") +(ev/call dowork "B 2" 2) +(ev/call dowork "B 1" 1) +(ev/call dowork "B 3" 3) + +(ev/sleep 4) + +(print "all work done") diff --git a/src/core/ev.c b/src/core/ev.c index 3fc8c89e..4809273a 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -897,7 +897,7 @@ static void janet_ev_handle_selfpipe(void) { default: break; case JANET_SELFPIPE_PROC: - janet_schedule_pid(ev.as.proc.pid, ev.as.proc.status); + janet_schedule_proc(ev.as.proc.proc, ev.as.proc.status); break; } } diff --git a/src/core/os.c b/src/core/os.c index 4e0d22fb..41cbf4e9 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -345,21 +345,18 @@ typedef struct { #ifdef JANET_EV -JANET_THREAD_LOCAL JanetProc **janet_vm_waiting_procs = NULL; -JANET_THREAD_LOCAL size_t janet_vm_proc_count = 0; -JANET_THREAD_LOCAL size_t janet_vm_proc_cap = 0; - /* Structure used to initialize the thread used to call wait * on child processes */ typedef struct { - pid_t pid; int write_pipe; + JanetProc *proc; } JanetReaperInit; static void *janet_thread_waiter(void *ptr) { JanetReaperInit *init = (JanetReaperInit *)ptr; - pid_t pid = init->pid; + JanetProc *proc = (JanetProc *) init->proc; int fd = init->write_pipe; + pid_t pid = proc->pid; free(init); for (;;) { int status = 0; @@ -374,7 +371,7 @@ static void *janet_thread_waiter(void *ptr) { JanetSelfPipeEvent ev; ev.tag = JANET_SELFPIPE_PROC; ev.as.proc.status = status; - ev.as.proc.pid = which; + ev.as.proc.proc = proc; if (write(fd, &ev, sizeof(ev)) < 0) { /* TODO failed to handle signal. */ fprintf(stderr, "failed to write event\n"); @@ -387,62 +384,30 @@ static void *janet_thread_waiter(void *ptr) { /* Map pids to JanetProc to allow for lookup after a call to * waitpid. */ static void janet_add_waiting_proc(JanetProc *proc) { - if (janet_vm_proc_count == janet_vm_proc_cap) { - size_t newcap = (janet_vm_proc_count + 1) * 2; - if (newcap < 16) newcap = 16; - JanetProc **newprocs = realloc(janet_vm_waiting_procs, sizeof(JanetProc *) * newcap); - if (NULL == newprocs) { - JANET_OUT_OF_MEMORY; - } - janet_vm_waiting_procs = newprocs; - janet_vm_proc_cap = newcap; - } - - /* Set proccess group for tracking purposes */ - pid_t pid = proc->pid; JanetReaperInit *init = malloc(sizeof(JanetReaperInit)); if (NULL == init) { JANET_OUT_OF_MEMORY; } - init->pid = pid; init->write_pipe = janet_vm_selfpipe[1]; + init->proc = proc; pthread_attr_t attr; pthread_t waiter_thread; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); int err = pthread_create(&waiter_thread, NULL, janet_thread_waiter, init); pthread_attr_destroy(&attr); - if (err) { - janet_panicf("%s", strerror(err)); - } + if (err) janet_panicf("%s", strerror(err)); pthread_detach(waiter_thread); - - janet_vm_waiting_procs[janet_vm_proc_count++] = proc; janet_gcroot(janet_wrap_abstract(proc)); janet_ev_inc_refcount(); } static void janet_remove_waiting_proc(JanetProc *proc) { - for (size_t i = 0; i < janet_vm_proc_count; i++) { - if (janet_vm_waiting_procs[i] == proc) { - janet_vm_waiting_procs[i] = janet_vm_waiting_procs[--janet_vm_proc_count]; - janet_gcunroot(janet_wrap_abstract(proc)); - janet_ev_dec_refcount(); - return; - } - } + janet_gcunroot(janet_wrap_abstract(proc)); + janet_ev_dec_refcount(); } -static JanetProc *janet_lookup_proc(pid_t pid) { - for (size_t i = 0; i < janet_vm_proc_count; i++) { - if (janet_vm_waiting_procs[i]->pid == pid) { - return janet_vm_waiting_procs[i]; - } - } - return NULL; -} - -void janet_schedule_pid(pid_t pid, int status) { +void janet_schedule_proc(void *ptr, int status) { /* Use POSIX shell semantics for interpreting signals */ if (WIFEXITED(status)) { status = WEXITSTATUS(status); @@ -451,29 +416,24 @@ void janet_schedule_pid(pid_t pid, int status) { } else { status = WTERMSIG(status) + 128; } - JanetProc *proc = janet_lookup_proc(pid); + JanetProc *proc = (JanetProc *)ptr; if (NULL == proc) return; proc->return_code = (int32_t) status; proc->flags |= JANET_PROC_WAITED; proc->flags &= ~JANET_PROC_WAITING; + janet_remove_waiting_proc(proc); if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) { JanetString s = janet_formatc("command failed with non-zero exit code %d", status); janet_cancel(proc->fiber, janet_wrap_string(s)); } else { janet_schedule(proc->fiber, janet_wrap_integer(status)); } - janet_remove_waiting_proc(proc); } #endif static int janet_proc_gc(void *p, size_t s) { (void) s; JanetProc *proc = (JanetProc *) p; -#ifdef JANET_EV - if (proc->flags & JANET_PROC_WAITING) { - janet_remove_waiting_proc(proc); - } -#endif #ifdef JANET_WINDOWS if (!(proc->flags & JANET_PROC_CLOSED)) { CloseHandle(proc->pHandle); @@ -2213,16 +2173,7 @@ static const JanetReg os_cfuns[] = { {NULL, NULL, NULL} }; -void janet_os_deinit(void) { -#ifndef JANET_NO_PROCESSES -#ifdef JANET_EV - free(janet_vm_waiting_procs); - janet_vm_waiting_procs = NULL; - janet_vm_proc_count = 0; - janet_vm_proc_cap = 0; -#endif -#endif -} +void janet_os_deinit(void) {} /* Module entry point */ void janet_lib_os(JanetTable *env) { @@ -2235,11 +2186,6 @@ void janet_lib_os(JanetTable *env) { } #endif #ifndef JANET_NO_PROCESSES -#ifdef JANET_EV - janet_vm_waiting_procs = NULL; - janet_vm_proc_count = 0; - janet_vm_proc_cap = 0; -#endif #endif janet_core_cfuns(env, NULL, os_cfuns); } diff --git a/src/core/util.h b/src/core/util.h index 24be73b4..ec94de17 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -146,7 +146,7 @@ extern const JanetAbstractType janet_address_type; void janet_lib_ev(JanetTable *env); void janet_ev_mark(void); int janet_make_pipe(JanetHandle handles[2]); -void janet_schedule_pid(pid_t pid, int status); +void janet_schedule_proc(void *proc, int status); /* Single message that is written to self pipe. This is used * to communicate messages inside Janet to work with the event loop. @@ -158,7 +158,7 @@ typedef struct { union { struct { int status; - pid_t pid; + void *proc; } proc; } as; } JanetSelfPipeEvent; From 0a1c93b86926b35f309a4c6e34db1d62fbf75218 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 31 Dec 2020 16:12:42 -0600 Subject: [PATCH 05/10] Add ev api for making threaded calls. Easy way to make arbitrary functions in C async. --- examples/async-execute.janet | 2 +- src/core/ev.c | 120 +++++++++++++++++++++++++++--- src/core/os.c | 137 +++++++++++++---------------------- src/core/state.h | 2 - src/core/util.h | 21 ------ src/core/vm.c | 1 - src/include/janet.h | 42 +++++++++++ 7 files changed, 204 insertions(+), 121 deletions(-) diff --git a/examples/async-execute.janet b/examples/async-execute.janet index 662ba0df..3cc22c3d 100644 --- a/examples/async-execute.janet +++ b/examples/async-execute.janet @@ -1,6 +1,6 @@ (defn dowork [name n] (print name " starting work...") - (os/execute ["sleep" (string n)] :p) + (os/execute [(dyn :executable) "-e" (string "(os/sleep " n ")")]) (print name " finished work!")) # Will be done in parallel diff --git a/src/core/ev.c b/src/core/ev.c index 4809273a..1bdaed45 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -33,6 +33,7 @@ /* Includes */ #include +#include #ifdef JANET_WINDOWS #include #include @@ -883,6 +884,21 @@ void janet_loop(void) { * Self-pipe handling code. */ +/* Structure used to initialize threads in the thread pool. */ +typedef struct { + int write_pipe; + JanetEVGenericMessage msg; + JanetThreadedSubroutine subr; + JanetThreadedCallback cb; +} JanetEVThreadInit; + +/* Wrap return value by pairing it with the callback used to handle it + * in the main thread */ +typedef struct { + JanetEVGenericMessage msg; + JanetThreadedCallback cb; +} JanetSelfPipeEvent; + #ifdef JANET_WINDOWS #else @@ -891,15 +907,10 @@ JANET_THREAD_LOCAL int janet_vm_selfpipe[2]; /* Handle events from the self pipe inside the event loop */ static void janet_ev_handle_selfpipe(void) { - JanetSelfPipeEvent ev; - while (read(janet_vm_selfpipe[0], &ev, sizeof(ev)) > 0) { - switch (ev.tag) { - default: - break; - case JANET_SELFPIPE_PROC: - janet_schedule_proc(ev.as.proc.proc, ev.as.proc.status); - break; - } + JanetSelfPipeEvent response; + while (read(janet_vm_selfpipe[0], &response, sizeof(response)) > 0) { + response.cb(response.msg); + janet_ev_dec_refcount(); } } @@ -919,6 +930,97 @@ static void janet_ev_cleanup_selfpipe(void) { #endif +static void *janet_thread_body(void *ptr) { + JanetEVThreadInit *init = (JanetEVThreadInit *)ptr; + int fd = init->write_pipe; + JanetEVGenericMessage msg = init->msg; + JanetThreadedSubroutine subr = init->subr; + JanetThreadedCallback cb = init->cb; + free(init); + + JanetSelfPipeEvent response; + response.msg = subr(msg); + response.cb = cb; + + /* TODO - implement for windows */ + if (write(fd, &response, sizeof(response)) < 0) { + /* TODO failed to handle signal. */ + fprintf(stderr, "failed to write response\n"); + } + + return NULL; +} + +void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb) { + JanetEVThreadInit *init = malloc(sizeof(JanetEVThreadInit)); + if (NULL == init) { + JANET_OUT_OF_MEMORY; + } + init->write_pipe = janet_vm_selfpipe[1]; + init->msg = arguments; + init->subr = fp; + init->cb = cb; + + /* Create thread - TODO thread pool? */ + pthread_attr_t attr; + pthread_t waiter_thread; + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); + int err = pthread_create(&waiter_thread, NULL, janet_thread_body, init); + pthread_attr_destroy(&attr); + if (err) { + free(init); + janet_panicf("%s", strerror(err)); + } + pthread_detach(waiter_thread); + + /* Increment ev refcount so we don't quit while waiting for a subprocess */ + janet_ev_inc_refcount(); +} + +/* Default callback for janet_ev_threaded_await. */ +void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) { + switch (return_value.tag) { + default: + case JANET_EV_TCTAG_NIL: + janet_schedule(return_value.fiber, janet_wrap_nil()); + break; + case JANET_EV_TCTAG_INTEGER: + janet_schedule(return_value.fiber, janet_wrap_integer(return_value.argi)); + break; + case JANET_EV_TCTAG_STRING: + case JANET_EV_TCTAG_STRINGF: + janet_schedule(return_value.fiber, janet_cstringv((const char *) return_value.argp)); + if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp); + break; + case JANET_EV_TCTAG_KEYWORD: + janet_schedule(return_value.fiber, janet_ckeywordv((const char *) return_value.argp)); + break; + case JANET_EV_TCTAG_ERR_STRING: + case JANET_EV_TCTAG_ERR_STRINGF: + janet_cancel(return_value.fiber, janet_cstringv((const char *) return_value.argp)); + if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp); + break; + case JANET_EV_TCTAG_ERR_KEYWORD: + janet_cancel(return_value.fiber, janet_ckeywordv((const char *) return_value.argp)); + break; + } + janet_gcunroot(janet_wrap_fiber(return_value.fiber)); +} + + +/* Convenience method for common case */ +void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) { + JanetEVGenericMessage arguments; + arguments.tag = tag; + arguments.argi = argi; + arguments.argp = argp; + arguments.fiber = janet_root_fiber(); + janet_gcroot(janet_wrap_fiber(arguments.fiber)); + janet_ev_threaded_call(fp, arguments, janet_ev_default_threaded_callback); + janet_await(); +} + #ifdef JANET_WINDOWS JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL; diff --git a/src/core/os.c b/src/core/os.c index 41cbf4e9..0dee4dc7 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -335,7 +335,6 @@ typedef struct { JanetStream *in; JanetStream *out; JanetStream *err; - JanetFiber *fiber; #else JanetFile *in; JanetFile *out; @@ -345,69 +344,22 @@ typedef struct { #ifdef JANET_EV -/* Structure used to initialize the thread used to call wait - * on child processes */ -typedef struct { - int write_pipe; - JanetProc *proc; -} JanetReaperInit; - -static void *janet_thread_waiter(void *ptr) { - JanetReaperInit *init = (JanetReaperInit *)ptr; - JanetProc *proc = (JanetProc *) init->proc; - int fd = init->write_pipe; - pid_t pid = proc->pid; - free(init); - for (;;) { - int status = 0; - pid_t which = 0; - do { - which = waitpid(pid, &status, 0); - } while (which == -1 && errno == EINTR); - if (which < 0) { - /* Error, could not wait or no children to wait on. */ - break; - } else { - JanetSelfPipeEvent ev; - ev.tag = JANET_SELFPIPE_PROC; - ev.as.proc.status = status; - ev.as.proc.proc = proc; - if (write(fd, &ev, sizeof(ev)) < 0) { - /* TODO failed to handle signal. */ - fprintf(stderr, "failed to write event\n"); - } - } - } - return NULL; +/* Function that is called in separate thread to wait on a pid */ +static JanetEVGenericMessage janet_proc_wait_subr(JanetEVGenericMessage args) { + JanetProc *proc = (JanetProc *) args.argp; + pid_t result; + int status = 0; + do { + result = waitpid(proc->pid, &status, 0); + } while (result == -1 && errno == EINTR); + args.argi = status; + return args; } -/* Map pids to JanetProc to allow for lookup after a call to - * waitpid. */ -static void janet_add_waiting_proc(JanetProc *proc) { - JanetReaperInit *init = malloc(sizeof(JanetReaperInit)); - if (NULL == init) { - JANET_OUT_OF_MEMORY; - } - init->write_pipe = janet_vm_selfpipe[1]; - init->proc = proc; - pthread_attr_t attr; - pthread_t waiter_thread; - pthread_attr_init(&attr); - pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); - int err = pthread_create(&waiter_thread, NULL, janet_thread_waiter, init); - pthread_attr_destroy(&attr); - if (err) janet_panicf("%s", strerror(err)); - pthread_detach(waiter_thread); - janet_gcroot(janet_wrap_abstract(proc)); - janet_ev_inc_refcount(); -} - -static void janet_remove_waiting_proc(JanetProc *proc) { - janet_gcunroot(janet_wrap_abstract(proc)); - janet_ev_dec_refcount(); -} - -void janet_schedule_proc(void *ptr, int status) { +/* Callback that is called in main thread when subroutine completes. */ +static void janet_proc_wait_cb(JanetEVGenericMessage args) { + int status = args.argi; + JanetProc *proc = (JanetProc *) args.argp; /* Use POSIX shell semantics for interpreting signals */ if (WIFEXITED(status)) { status = WEXITSTATUS(status); @@ -416,19 +368,21 @@ void janet_schedule_proc(void *ptr, int status) { } else { status = WTERMSIG(status) + 128; } - JanetProc *proc = (JanetProc *)ptr; - if (NULL == proc) return; - proc->return_code = (int32_t) status; - proc->flags |= JANET_PROC_WAITED; - proc->flags &= ~JANET_PROC_WAITING; - janet_remove_waiting_proc(proc); - if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) { - JanetString s = janet_formatc("command failed with non-zero exit code %d", status); - janet_cancel(proc->fiber, janet_wrap_string(s)); - } else { - janet_schedule(proc->fiber, janet_wrap_integer(status)); + if (NULL != proc) { + proc->return_code = (int32_t) status; + proc->flags |= JANET_PROC_WAITED; + proc->flags &= ~JANET_PROC_WAITING; + janet_gcunroot(janet_wrap_abstract(proc)); + janet_gcunroot(janet_wrap_fiber(args.fiber)); + if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) { + JanetString s = janet_formatc("command failed with non-zero exit code %d", status); + janet_cancel(args.fiber, janet_wrap_string(s)); + } else { + janet_schedule(args.fiber, janet_wrap_integer(status)); + } } } + #endif static int janet_proc_gc(void *p, size_t s) { @@ -456,9 +410,6 @@ static int janet_proc_mark(void *p, size_t s) { if (NULL != proc->in) janet_mark(janet_wrap_abstract(proc->in)); if (NULL != proc->out) janet_mark(janet_wrap_abstract(proc->out)); if (NULL != proc->err) janet_mark(janet_wrap_abstract(proc->err)); -#ifdef JANET_EV - if (NULL != proc->fiber) janet_mark(janet_wrap_fiber(proc->fiber)); -#endif return 0; } @@ -470,10 +421,15 @@ static Janet os_proc_wait_impl(JanetProc *proc) { janet_panicf("cannot wait twice on a process"); } #ifdef JANET_EV - /* Event loop implementation */ - proc->fiber = janet_root_fiber(); + /* Event loop implementation - threaded call */ proc->flags |= JANET_PROC_WAITING; - janet_add_waiting_proc(proc); + JanetEVGenericMessage targs; + memset(&targs, 0, sizeof(targs)); + targs.argp = proc; + targs.fiber = janet_root_fiber(); + janet_gcroot(janet_wrap_abstract(proc)); + janet_gcroot(janet_wrap_fiber(targs.fiber)); + janet_ev_threaded_call(janet_proc_wait_subr, targs, janet_proc_wait_cb); janet_await(); #else /* Non evented implementation */ @@ -905,13 +861,22 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) { #else proc->pid = pid; #endif - proc->in = get_stdio_for_handle(new_in, orig_in, 0); - proc->out = get_stdio_for_handle(new_out, orig_out, 1); - proc->err = get_stdio_for_handle(new_err, orig_err, 1); - proc->flags = 0; - if (proc->in == NULL || proc->out == NULL || proc->err == NULL) { - janet_panic("failed to construct proc"); + proc->in = NULL; + proc->out = NULL; + proc->err = NULL; + if (new_in != JANET_HANDLE_NONE) { + proc->in = get_stdio_for_handle(new_in, orig_in, 0); + if (NULL == proc->in) janet_panic("failed to construct proc"); } + if (new_out != JANET_HANDLE_NONE) { + proc->out = get_stdio_for_handle(new_out, orig_out, 1); + if (NULL == proc->out) janet_panic("failed to construct proc"); + } + if (new_err != JANET_HANDLE_NONE) { + proc->err = get_stdio_for_handle(new_err, orig_err, 1); + if (NULL == proc->err) janet_panic("failed to construct proc"); + } + proc->flags = 0; if (janet_flag_at(flags, 2)) { proc->flags |= JANET_PROC_ERROR_NONZERO; } @@ -2173,8 +2138,6 @@ static const JanetReg os_cfuns[] = { {NULL, NULL, NULL} }; -void janet_os_deinit(void) {} - /* Module entry point */ void janet_lib_os(JanetTable *env) { #if !defined(JANET_REDUCED_OS) && defined(JANET_WINDOWS) && defined(JANET_THREADS) diff --git a/src/core/state.h b/src/core/state.h index 0d1ce450..a36f5e38 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -111,6 +111,4 @@ void janet_ev_init(void); void janet_ev_deinit(void); #endif -void janet_os_deinit(void); - #endif /* JANET_STATE_H_defined */ diff --git a/src/core/util.h b/src/core/util.h index ec94de17..c3f2fbfd 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -146,27 +146,6 @@ extern const JanetAbstractType janet_address_type; void janet_lib_ev(JanetTable *env); void janet_ev_mark(void); int janet_make_pipe(JanetHandle handles[2]); -void janet_schedule_proc(void *proc, int status); - -/* Single message that is written to self pipe. This is used - * to communicate messages inside Janet to work with the event loop. - * Signals and threads will be the main users of this. */ -typedef struct { - enum { - JANET_SELFPIPE_PROC - } tag; - union { - struct { - int status; - void *proc; - } proc; - } as; -} JanetSelfPipeEvent; - -#ifndef JANET_WINDOWS -extern JANET_THREAD_LOCAL int janet_vm_selfpipe[2]; -#endif - #endif #endif diff --git a/src/core/vm.c b/src/core/vm.c index a99a6625..2056090d 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -1505,7 +1505,6 @@ void janet_deinit(void) { free(janet_vm_traversal_base); janet_vm_fiber = NULL; janet_vm_root_fiber = NULL; - janet_os_deinit(); #ifdef JANET_THREADS janet_threads_deinit(); #endif diff --git a/src/include/janet.h b/src/include/janet.h index 02ca54c7..01dd341f 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1290,6 +1290,48 @@ JANET_API void janet_ev_dec_refcount(void); /* Get last error from a an IO operation */ JANET_API Janet janet_ev_lasterr(void); +/* Async service for calling a function or syscall in a background thread. This is not + * as efficient in the slightest as using Streams but can be used for arbitrary blocking + * functions and syscalls. */ + +/* Used to pass data between the main thread and worker threads for simple tasks. + * We could just use a pointer but this prevents malloc/free in the common case + * of only a handful of arguments. */ +typedef struct { + int tag; + int argi; + void *argp; + JanetFiber *fiber; +} JanetEVGenericMessage; + +/* How to resume or cancel after a threaded call. Not exhaustive of the possible + * ways one might want to resume after returning from a threaded call, but should + * cover most of the common cases. For something more complicated, such as resuming + * with an abstract type or a struct, one should use janet_ev_threaded_call instead + * of janet_ev_threaded_await with a custom callback. */ + +#define JANET_EV_TCTAG_NIL 0 /* resume with nil */ +#define JANET_EV_TCTAG_INTEGER 1 /* resume with janet_wrap_integer(argi) */ +#define JANET_EV_TCTAG_STRING 2 /* resume with janet_cstringv((const char *) argp) */ +#define JANET_EV_TCTAG_STRINGF 3 /* resume with janet_cstringv((const char *) argp), then call free on argp. */ +#define JANET_EV_TCTAG_KEYWORD 4 /* resume with janet_ckeywordv((const char *) argp) */ +#define JANET_EV_TCTAG_ERR_STRING 5 /* cancel with janet_cstringv((const char *) argp) */ +#define JANET_EV_TCTAG_ERR_STRINGF 6 /* cancel with janet_cstringv((const char *) argp), then call free on argp. */ +#define JANET_EV_TCTAG_ERR_KEYWORD 7 /* cancel with janet_ckeywordv((const char *) argp) */ + +/* Function pointer that is run in the thread pool */ +typedef JanetEVGenericMessage(*JanetThreadedSubroutine)(JanetEVGenericMessage arguments); + +/* Handler that is run in the main thread with the result of the JanetAsyncSubroutine */ +typedef void (*JanetThreadedCallback)(JanetEVGenericMessage return_value); + +/* API calls for quickly offloading some work in C to a new thread or thread pool. */ +JANET_API void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb); +JANET_API void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp); + +/* Callback used by janet_ev_threaded_await */ +JANET_API void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value); + /* Read async from a stream */ JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes); JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes); From 05166b36739b1be03cb675bfe072375b34b69931 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 31 Dec 2020 16:19:41 -0600 Subject: [PATCH 06/10] Fix proc getter bug. --- src/core/ev.c | 4 ---- src/core/os.c | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index 1bdaed45..0939857a 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -962,12 +962,8 @@ void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage ar init->cb = cb; /* Create thread - TODO thread pool? */ - pthread_attr_t attr; pthread_t waiter_thread; - pthread_attr_init(&attr); - pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); int err = pthread_create(&waiter_thread, NULL, janet_thread_body, init); - pthread_attr_destroy(&attr); if (err) { free(init); janet_panicf("%s", strerror(err)); diff --git a/src/core/os.c b/src/core/os.c index 0dee4dc7..6d35b977 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -544,7 +544,7 @@ static int janet_proc_get(void *p, Janet key, Janet *out) { return 1; } if (janet_keyeq(key, "err")) { - *out = (NULL == proc->out) ? janet_wrap_nil() : janet_wrap_abstract(proc->err); + *out = (NULL == proc->err) ? janet_wrap_nil() : janet_wrap_abstract(proc->err); return 1; } if ((-1 != proc->return_code) && janet_keyeq(key, "return-code")) { From 61c65f3df1b53b9a949bb41070f509f2716a7d04 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 31 Dec 2020 16:30:54 -0600 Subject: [PATCH 07/10] Fix valgrind warning. --- src/core/ev.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/ev.c b/src/core/ev.c index 0939857a..c5570cf0 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -522,6 +522,7 @@ void janet_ev_init_common(void) { /* Common deinit code */ void janet_ev_deinit_common(void) { janet_q_deinit(&janet_vm_spawn); + free(janet_vm_tq); free(janet_vm_listeners); janet_vm_listeners = NULL; } From 1c7ed8ca4848295ac03af7af9ae2252735b15618 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 3 Jan 2021 11:04:21 -0600 Subject: [PATCH 08/10] Use PostQueuedCompletionStatus for threaded calls on windows. Ore efficient than using a self pipe. --- src/core/ev.c | 268 ++++++++++++++++++++++++++++++-------------------- src/core/os.c | 2 +- 2 files changed, 160 insertions(+), 110 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index c5570cf0..438d1b7e 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -31,13 +31,12 @@ #ifdef JANET_EV -/* Includes */ #include -#include #ifdef JANET_WINDOWS #include #include #else +#include #include #include #include @@ -885,14 +884,6 @@ void janet_loop(void) { * Self-pipe handling code. */ -/* Structure used to initialize threads in the thread pool. */ -typedef struct { - int write_pipe; - JanetEVGenericMessage msg; - JanetThreadedSubroutine subr; - JanetThreadedCallback cb; -} JanetEVThreadInit; - /* Wrap return value by pairing it with the callback used to handle it * in the main thread */ typedef struct { @@ -900,11 +891,29 @@ typedef struct { JanetThreadedCallback cb; } JanetSelfPipeEvent; +/* Structure used to initialize threads in the thread pool + * (same head structure as self pipe event)*/ +typedef struct { + JanetEVGenericMessage msg; + JanetThreadedCallback cb; + JanetThreadedSubroutine subr; + JanetHandle write_pipe; +} JanetEVThreadInit; + #ifdef JANET_WINDOWS +/* On windows, use PostQueuedCompletionStatus instead for + * custom events */ + #else -JANET_THREAD_LOCAL int janet_vm_selfpipe[2]; +static JANET_THREAD_LOCAL JanetHandle janet_vm_selfpipe[2]; + +static void janet_ev_setup_selfpipe(void) { + if (janet_make_pipe(janet_vm_selfpipe)) { + JANET_EXIT("failed to initialize self pipe in event loop"); + } +} /* Handle events from the self pipe inside the event loop */ static void janet_ev_handle_selfpipe(void) { @@ -915,15 +924,6 @@ static void janet_ev_handle_selfpipe(void) { } } -static void janet_ev_setup_selfpipe(void) { - if (-1 == pipe(janet_vm_selfpipe)) goto error; - if (fcntl(janet_vm_selfpipe[0], F_SETFL, O_NONBLOCK)) goto error; - if (fcntl(janet_vm_selfpipe[1], F_SETFL, O_NONBLOCK)) goto error; - return; -error: - JANET_EXIT("failed to initialize self pipe in event loop"); -} - static void janet_ev_cleanup_selfpipe(void) { close(janet_vm_selfpipe[0]); close(janet_vm_selfpipe[1]); @@ -931,93 +931,6 @@ static void janet_ev_cleanup_selfpipe(void) { #endif -static void *janet_thread_body(void *ptr) { - JanetEVThreadInit *init = (JanetEVThreadInit *)ptr; - int fd = init->write_pipe; - JanetEVGenericMessage msg = init->msg; - JanetThreadedSubroutine subr = init->subr; - JanetThreadedCallback cb = init->cb; - free(init); - - JanetSelfPipeEvent response; - response.msg = subr(msg); - response.cb = cb; - - /* TODO - implement for windows */ - if (write(fd, &response, sizeof(response)) < 0) { - /* TODO failed to handle signal. */ - fprintf(stderr, "failed to write response\n"); - } - - return NULL; -} - -void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb) { - JanetEVThreadInit *init = malloc(sizeof(JanetEVThreadInit)); - if (NULL == init) { - JANET_OUT_OF_MEMORY; - } - init->write_pipe = janet_vm_selfpipe[1]; - init->msg = arguments; - init->subr = fp; - init->cb = cb; - - /* Create thread - TODO thread pool? */ - pthread_t waiter_thread; - int err = pthread_create(&waiter_thread, NULL, janet_thread_body, init); - if (err) { - free(init); - janet_panicf("%s", strerror(err)); - } - pthread_detach(waiter_thread); - - /* Increment ev refcount so we don't quit while waiting for a subprocess */ - janet_ev_inc_refcount(); -} - -/* Default callback for janet_ev_threaded_await. */ -void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) { - switch (return_value.tag) { - default: - case JANET_EV_TCTAG_NIL: - janet_schedule(return_value.fiber, janet_wrap_nil()); - break; - case JANET_EV_TCTAG_INTEGER: - janet_schedule(return_value.fiber, janet_wrap_integer(return_value.argi)); - break; - case JANET_EV_TCTAG_STRING: - case JANET_EV_TCTAG_STRINGF: - janet_schedule(return_value.fiber, janet_cstringv((const char *) return_value.argp)); - if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp); - break; - case JANET_EV_TCTAG_KEYWORD: - janet_schedule(return_value.fiber, janet_ckeywordv((const char *) return_value.argp)); - break; - case JANET_EV_TCTAG_ERR_STRING: - case JANET_EV_TCTAG_ERR_STRINGF: - janet_cancel(return_value.fiber, janet_cstringv((const char *) return_value.argp)); - if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp); - break; - case JANET_EV_TCTAG_ERR_KEYWORD: - janet_cancel(return_value.fiber, janet_ckeywordv((const char *) return_value.argp)); - break; - } - janet_gcunroot(janet_wrap_fiber(return_value.fiber)); -} - - -/* Convenience method for common case */ -void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) { - JanetEVGenericMessage arguments; - arguments.tag = tag; - arguments.argi = argi; - arguments.argp = argp; - arguments.fiber = janet_root_fiber(); - janet_gcroot(janet_wrap_fiber(arguments.fiber)); - janet_ev_threaded_call(fp, arguments, janet_ev_default_threaded_callback); - janet_await(); -} - #ifdef JANET_WINDOWS JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL; @@ -1077,6 +990,12 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { if (!has_timeout) { /* queue emptied */ } + } else if (NULL == completionKey) { + /* Custom event */ + JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped); + response->cb(response->msg); + free(response); + janet_ev_dec_refcount(); } else { /* Normal event */ JanetStream *stream = (JanetStream *) completionKey; @@ -1371,9 +1290,134 @@ void janet_ev_deinit(void) { #endif -/* C API helpers for reading and writing from streams. +/* + * End poll implementation + */ + +/* + * Threaded calls + */ + +#ifdef JANET_WINDOWS +static DWORD WINAPI janet_thread_body(LPVOID ptr) { + JanetEVThreadInit *init = (JanetEVThreadInit *)ptr; + JanetEVGenericMessage msg = init->msg; + JanetThreadedSubroutine subr = init->subr; + JanetThreadedCallback cb = init->cb; + JanetHandle iocp = init->write_pipe; + /* Reuse memory from thread init for returning data */ + init->msg = subr(msg); + init->cb = cb; + janet_assert(PostQueuedCompletionStatus(iocp, sizeof(JanetSelfPipeEvent), NULL, init), + "failed to post completion event"); + return 0; +} +#else +static void *janet_thread_body(void *ptr) { + JanetEVThreadInit *init = (JanetEVThreadInit *)ptr; + JanetEVGenericMessage msg = init->msg; + JanetThreadedSubroutine subr = init->subr; + JanetThreadedCallback cb = init->cb; + int fd = init->write_pipe; + free(init); + JanetSelfPipeEvent response; + response.msg = subr(msg); + response.cb = cb; + /* handle a bit of back pressure before giving up. */ + int tries = 4; + while (tries > 0) { + int status; + do { + status = write(fd, &response, sizeof(response)); + } while (status == -1 && errno == EINTR); + if (status > 0) break; + sleep(1); + tries--; + } + return NULL; +} +#endif + +void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb) { + JanetEVThreadInit *init = malloc(sizeof(JanetEVThreadInit)); + if (NULL == init) { + JANET_OUT_OF_MEMORY; + } + init->msg = arguments; + init->subr = fp; + init->cb = cb; + +#ifdef JANET_WINDOWS + init->write_pipe = janet_vm_iocp; + HANDLE thread_handle = CreateThread(NULL, 0, janet_thread_body, init, 0, NULL); + if (NULL == thread_handle) { + free(init); + janet_panic("failed to create thread"); + } + CloseHandle(thread_handle); /* detach from thread */ +#else + init->write_pipe = janet_vm_selfpipe[1]; + pthread_t waiter_thread; + int err = pthread_create(&waiter_thread, NULL, janet_thread_body, init); + if (err) { + free(init); + janet_panicf("%s", strerror(err)); + } + pthread_detach(waiter_thread); +#endif + + /* Increment ev refcount so we don't quit while waiting for a subprocess */ + janet_ev_inc_refcount(); +} + +/* Default callback for janet_ev_threaded_await. */ +void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) { + switch (return_value.tag) { + default: + case JANET_EV_TCTAG_NIL: + janet_schedule(return_value.fiber, janet_wrap_nil()); + break; + case JANET_EV_TCTAG_INTEGER: + janet_schedule(return_value.fiber, janet_wrap_integer(return_value.argi)); + break; + case JANET_EV_TCTAG_STRING: + case JANET_EV_TCTAG_STRINGF: + janet_schedule(return_value.fiber, janet_cstringv((const char *) return_value.argp)); + if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp); + break; + case JANET_EV_TCTAG_KEYWORD: + janet_schedule(return_value.fiber, janet_ckeywordv((const char *) return_value.argp)); + break; + case JANET_EV_TCTAG_ERR_STRING: + case JANET_EV_TCTAG_ERR_STRINGF: + janet_cancel(return_value.fiber, janet_cstringv((const char *) return_value.argp)); + if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp); + break; + case JANET_EV_TCTAG_ERR_KEYWORD: + janet_cancel(return_value.fiber, janet_ckeywordv((const char *) return_value.argp)); + break; + } + janet_gcunroot(janet_wrap_fiber(return_value.fiber)); +} + + +/* Convenience method for common case */ +void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) { + JanetEVGenericMessage arguments; + arguments.tag = tag; + arguments.argi = argi; + arguments.argp = argp; + arguments.fiber = janet_root_fiber(); + janet_gcroot(janet_wrap_fiber(arguments.fiber)); + janet_ev_threaded_call(fp, arguments, janet_ev_default_threaded_callback); + janet_await(); +} + +/* + * C API helpers for reading and writing from streams. * There is some networking code in here as well as generic - * reading and writing primitives. */ + * reading and writing primitives. + */ void janet_stream_flags(JanetStream *stream, uint32_t flags) { if (stream->flags & JANET_STREAM_CLOSED) { @@ -1907,7 +1951,13 @@ int janet_make_pipe(JanetHandle handles[2]) { return 0; #else if (pipe(handles)) return -1; + if (fcntl(handles[0], F_SETFL, O_NONBLOCK)) goto error; + if (fcntl(handles[1], F_SETFL, O_NONBLOCK)) goto error; return 0; +error: + close(handles[0]); + close(handles[1]); + return -1; #endif } diff --git a/src/core/os.c b/src/core/os.c index 6d35b977..bb1b1837 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -1,5 +1,5 @@ /* -* Copyright (c) 2020 Calvin Rose +* Copyright (c) 2021 Calvin Rose * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to From 47bb7fd21b44b6bd5d1386f873133208eec0e420 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 3 Jan 2021 11:21:44 -0600 Subject: [PATCH 09/10] Begin implementing async subproccesses for windows. --- src/core/ev.c | 7 +++++-- src/core/os.c | 31 ++++++++++++++++++++++--------- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index 438d1b7e..71114791 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -990,7 +990,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { if (!has_timeout) { /* queue emptied */ } - } else if (NULL == completionKey) { + } else if (0 == completionKey) { /* Custom event */ JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped); response->cb(response->msg); @@ -1308,7 +1308,10 @@ static DWORD WINAPI janet_thread_body(LPVOID ptr) { /* Reuse memory from thread init for returning data */ init->msg = subr(msg); init->cb = cb; - janet_assert(PostQueuedCompletionStatus(iocp, sizeof(JanetSelfPipeEvent), NULL, init), + janet_assert(PostQueuedCompletionStatus(iocp, + sizeof(JanetSelfPipeEvent), + 0, + (LPOVERLAPPED) init), "failed to post completion event"); return 0; } diff --git a/src/core/os.c b/src/core/os.c index bb1b1837..58adc668 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -344,6 +344,17 @@ typedef struct { #ifdef JANET_EV +#ifdef JANET_WINDOWS + +static JanetEVGenericMessage janet_proc_wait_subr(JanetEVGenericMessage args) { + JanetProc *proc = (JanetProc *) args.argp; + WaitForSingleObject(proc->pHandle, INFINITE); + GetExitCodeProcess(proc->pHandle, &args.argi); + return args; +} + +#else /* windows check */ + /* Function that is called in separate thread to wait on a pid */ static JanetEVGenericMessage janet_proc_wait_subr(JanetEVGenericMessage args) { JanetProc *proc = (JanetProc *) args.argp; @@ -352,14 +363,6 @@ static JanetEVGenericMessage janet_proc_wait_subr(JanetEVGenericMessage args) { do { result = waitpid(proc->pid, &status, 0); } while (result == -1 && errno == EINTR); - args.argi = status; - return args; -} - -/* Callback that is called in main thread when subroutine completes. */ -static void janet_proc_wait_cb(JanetEVGenericMessage args) { - int status = args.argi; - JanetProc *proc = (JanetProc *) args.argp; /* Use POSIX shell semantics for interpreting signals */ if (WIFEXITED(status)) { status = WEXITSTATUS(status); @@ -368,6 +371,16 @@ static void janet_proc_wait_cb(JanetEVGenericMessage args) { } else { status = WTERMSIG(status) + 128; } + args.argi = status; + return args; +} + +#endif /* End windows check */ + +/* Callback that is called in main thread when subroutine completes. */ +static void janet_proc_wait_cb(JanetEVGenericMessage args) { + int status = args.argi; + JanetProc *proc = (JanetProc *) args.argp; if (NULL != proc) { proc->return_code = (int32_t) status; proc->flags |= JANET_PROC_WAITED; @@ -383,7 +396,7 @@ static void janet_proc_wait_cb(JanetEVGenericMessage args) { } } -#endif +#endif /* End ev check */ static int janet_proc_gc(void *p, size_t s) { (void) s; From 9760cf1f4e5d64e2775e5c010742176a336383a6 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 3 Jan 2021 11:47:29 -0600 Subject: [PATCH 10/10] Fix MSVC warning. --- src/core/os.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/core/os.c b/src/core/os.c index 58adc668..d6c55a1c 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -427,9 +427,11 @@ static int janet_proc_mark(void *p, size_t s) { } #ifdef JANET_EV -JANET_NO_RETURN +static JANET_NO_RETURN void +#else +static Janet #endif -static Janet os_proc_wait_impl(JanetProc *proc) { +os_proc_wait_impl(JanetProc *proc) { if (proc->flags & (JANET_PROC_WAITED | JANET_PROC_WAITING)) { janet_panicf("cannot wait twice on a process"); }