diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cda5516..c0bb7709 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ All notable changes to this project will be documented in this file. ## Unreleased - ??? - Add `dflt` argument to find-index. +- Deprecate `file/popen` in favor of `os/spawn`. - Add `:all` keyword to `ev/read` and `net/read` to make them more like `file/read`. However, we do not provide any `:line` option as that requires buffering. - Change repl behavior to make Ctrl-C raise SIGINT on posix. The old behavior for Ctrl-C, diff --git a/examples/async-execute.janet b/examples/async-execute.janet new file mode 100644 index 00000000..3cc22c3d --- /dev/null +++ b/examples/async-execute.janet @@ -0,0 +1,22 @@ +(defn dowork [name n] + (print name " starting work...") + (os/execute [(dyn :executable) "-e" (string "(os/sleep " n ")")]) + (print name " finished work!")) + +# Will be done in parallel +(print "starting group A") +(ev/call dowork "A 2" 2) +(ev/call dowork "A 1" 1) +(ev/call dowork "A 3" 3) + +(ev/sleep 4) + +# Will also be done in parallel +(print "starting group B") +(ev/call dowork "B 2" 2) +(ev/call dowork "B 1" 1) +(ev/call dowork "B 3" 3) + +(ev/sleep 4) + +(print "all work done") diff --git a/src/core/ev.c b/src/core/ev.c index 1ebbf912..71114791 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -31,12 +31,12 @@ #ifdef JANET_EV -/* Includes */ #include #ifdef JANET_WINDOWS #include #include #else +#include #include #include #include @@ -48,6 +48,7 @@ #include #include #include +#include #ifdef JANET_EV_EPOLL #include #include @@ -148,6 +149,7 @@ JANET_THREAD_LOCAL JanetRNG janet_vm_ev_rng; JANET_THREAD_LOCAL JanetListenerState **janet_vm_listeners = NULL; JANET_THREAD_LOCAL size_t janet_vm_listener_count = 0; JANET_THREAD_LOCAL size_t janet_vm_listener_cap = 0; +JANET_THREAD_LOCAL size_t janet_vm_extra_listeners = 0; /* Get current timestamp (millisecond precision) */ static JanetTimestamp ts_now(void); @@ -519,6 +521,7 @@ void janet_ev_init_common(void) { /* Common deinit code */ void janet_ev_deinit_common(void) { janet_q_deinit(&janet_vm_spawn); + free(janet_vm_tq); free(janet_vm_listeners); janet_vm_listeners = NULL; } @@ -540,6 +543,14 @@ void janet_addtimeout(double sec) { add_timeout(to); } +void janet_ev_inc_refcount(void) { + janet_vm_extra_listeners++; +} + +void janet_ev_dec_refcount(void) { + janet_vm_extra_listeners--; +} + /* Channels */ typedef struct { @@ -839,14 +850,16 @@ void janet_loop1(void) { } } } + /* Run scheduled fibers */ while (janet_vm_spawn.head != janet_vm_spawn.tail) { JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK}; janet_q_pop(&janet_vm_spawn, &task, sizeof(task)); run_one(task.fiber, task.value, task.sig); } + /* Poll for events */ - if (janet_vm_listener_count || janet_vm_tq_count) { + if (janet_vm_listener_count || janet_vm_tq_count || janet_vm_extra_listeners) { JanetTimeout to; memset(&to, 0, sizeof(to)); int has_timeout; @@ -855,18 +868,69 @@ void janet_loop1(void) { pop_timeout(0); } /* Run polling implementation only if pending timeouts or pending events */ - if (janet_vm_tq_count || janet_vm_listener_count) { + if (janet_vm_tq_count || janet_vm_listener_count || janet_vm_extra_listeners) { janet_loop1_impl(has_timeout, to.when); } } } void janet_loop(void) { - while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) { + while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count || janet_vm_extra_listeners) { janet_loop1(); } } +/* + * Self-pipe handling code. + */ + +/* Wrap return value by pairing it with the callback used to handle it + * in the main thread */ +typedef struct { + JanetEVGenericMessage msg; + JanetThreadedCallback cb; +} JanetSelfPipeEvent; + +/* Structure used to initialize threads in the thread pool + * (same head structure as self pipe event)*/ +typedef struct { + JanetEVGenericMessage msg; + JanetThreadedCallback cb; + JanetThreadedSubroutine subr; + JanetHandle write_pipe; +} JanetEVThreadInit; + +#ifdef JANET_WINDOWS + +/* On windows, use PostQueuedCompletionStatus instead for + * custom events */ + +#else + +static JANET_THREAD_LOCAL JanetHandle janet_vm_selfpipe[2]; + +static void janet_ev_setup_selfpipe(void) { + if (janet_make_pipe(janet_vm_selfpipe)) { + JANET_EXIT("failed to initialize self pipe in event loop"); + } +} + +/* Handle events from the self pipe inside the event loop */ +static void janet_ev_handle_selfpipe(void) { + JanetSelfPipeEvent response; + while (read(janet_vm_selfpipe[0], &response, sizeof(response)) > 0) { + response.cb(response.msg); + janet_ev_dec_refcount(); + } +} + +static void janet_ev_cleanup_selfpipe(void) { + close(janet_vm_selfpipe[0]); + close(janet_vm_selfpipe[1]); +} + +#endif + #ifdef JANET_WINDOWS JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL; @@ -926,6 +990,12 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { if (!has_timeout) { /* queue emptied */ } + } else if (0 == completionKey) { + /* Custom event */ + JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped); + response->cb(response->msg); + free(response); + janet_ev_dec_refcount(); } else { /* Normal event */ JanetStream *stream = (JanetStream *) completionKey; @@ -1034,8 +1104,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { /* Step state machines */ for (int i = 0; i < ready; i++) { - JanetStream *stream = events[i].data.ptr; - if (NULL != stream) { /* If NULL, is a timeout */ + void *p = events[i].data.ptr; + if (&janet_vm_timerfd == p) { + /* Timer expired, ignore */; + } else if (janet_vm_selfpipe == p) { + /* Self-pipe handling */ + janet_ev_handle_selfpipe(); + } else { + JanetStream *stream = p; int mask = events[i].events; JanetListenerState *state = stream->state; state->event = events + i; @@ -1066,14 +1142,18 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_ev_init(void) { janet_ev_init_common(); + janet_ev_setup_selfpipe(); janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC); janet_vm_timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); janet_vm_timer_enabled = 0; if (janet_vm_epoll == -1 || janet_vm_timerfd == -1) goto error; struct epoll_event ev; ev.events = EPOLLIN | EPOLLET; - ev.data.ptr = NULL; + ev.data.ptr = &janet_vm_timerfd; if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_timerfd, &ev)) goto error; + ev.events = EPOLLIN | EPOLLET; + ev.data.ptr = janet_vm_selfpipe; + if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_selfpipe[0], &ev)) goto error; return; error: JANET_EXIT("failed to initialize event loop"); @@ -1083,6 +1163,7 @@ void janet_ev_deinit(void) { janet_ev_deinit_common(); close(janet_vm_epoll); close(janet_vm_timerfd); + janet_ev_cleanup_selfpipe(); janet_vm_epoll = 0; } @@ -1119,7 +1200,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); size_t newsize = janet_vm_listener_cap; if (newsize > oldsize) { - janet_vm_fds = realloc(janet_vm_fds, newsize * sizeof(struct pollfd)); + janet_vm_fds = realloc(janet_vm_fds, (newsize + 1) * sizeof(struct pollfd)); if (NULL == janet_vm_fds) { JANET_OUT_OF_MEMORY; } @@ -1128,12 +1209,12 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in ev.fd = stream->handle; ev.events = make_poll_events(state->stream->_mask); ev.revents = 0; - janet_vm_fds[state->_index] = ev; + janet_vm_fds[state->_index + 1] = ev; return state; } static void janet_unlisten(JanetListenerState *state) { - janet_vm_fds[state->_index] = janet_vm_fds[janet_vm_listener_count - 1]; + janet_vm_fds[state->_index + 1] = janet_vm_fds[janet_vm_listener_count]; janet_unlisten_impl(state); } @@ -1146,19 +1227,25 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { JanetTimestamp now = ts_now(); to = now > timeout ? 0 : (int)(timeout - now); } - ready = poll(janet_vm_fds, janet_vm_listener_count, to); + ready = poll(janet_vm_fds, janet_vm_listener_count + 1, to); } while (ready == -1 && errno == EINTR); if (ready == -1) { JANET_EXIT("failed to poll events"); } + /* Check selfpipe */ + if (janet_vm_fds[0].revents & POLLIN) { + janet_vm_fds[0].revents = 0; + janet_ev_handle_selfpipe(); + } + /* Step state machines */ for (size_t i = 0; i < janet_vm_listener_count; i++) { - struct pollfd *pfd = janet_vm_fds + i; + struct pollfd *pfd = janet_vm_fds + i + 1; /* Skip fds where nothing interesting happened */ JanetListenerState *state = janet_vm_listeners[i]; /* Normal event */ - int mask = janet_vm_fds[i].revents; + int mask = pfd->revents; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE; @@ -1183,20 +1270,157 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_ev_init(void) { janet_ev_init_common(); janet_vm_fds = NULL; + janet_ev_setup_selfpipe(); + janet_vm_fds = malloc(sizeof(struct pollfd)); + if (NULL == janet_vm_fds) { + JANET_OUT_OF_MEMORY; + } + janet_vm_fds[0].fd = janet_vm_selfpipe[0]; + janet_vm_fds[0].events = POLLIN; + janet_vm_fds[0].revents = 0; return; } void janet_ev_deinit(void) { janet_ev_deinit_common(); + janet_ev_cleanup_selfpipe(); free(janet_vm_fds); janet_vm_fds = NULL; } #endif -/* C API helpers for reading and writing from streams. +/* + * End poll implementation + */ + +/* + * Threaded calls + */ + +#ifdef JANET_WINDOWS +static DWORD WINAPI janet_thread_body(LPVOID ptr) { + JanetEVThreadInit *init = (JanetEVThreadInit *)ptr; + JanetEVGenericMessage msg = init->msg; + JanetThreadedSubroutine subr = init->subr; + JanetThreadedCallback cb = init->cb; + JanetHandle iocp = init->write_pipe; + /* Reuse memory from thread init for returning data */ + init->msg = subr(msg); + init->cb = cb; + janet_assert(PostQueuedCompletionStatus(iocp, + sizeof(JanetSelfPipeEvent), + 0, + (LPOVERLAPPED) init), + "failed to post completion event"); + return 0; +} +#else +static void *janet_thread_body(void *ptr) { + JanetEVThreadInit *init = (JanetEVThreadInit *)ptr; + JanetEVGenericMessage msg = init->msg; + JanetThreadedSubroutine subr = init->subr; + JanetThreadedCallback cb = init->cb; + int fd = init->write_pipe; + free(init); + JanetSelfPipeEvent response; + response.msg = subr(msg); + response.cb = cb; + /* handle a bit of back pressure before giving up. */ + int tries = 4; + while (tries > 0) { + int status; + do { + status = write(fd, &response, sizeof(response)); + } while (status == -1 && errno == EINTR); + if (status > 0) break; + sleep(1); + tries--; + } + return NULL; +} +#endif + +void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb) { + JanetEVThreadInit *init = malloc(sizeof(JanetEVThreadInit)); + if (NULL == init) { + JANET_OUT_OF_MEMORY; + } + init->msg = arguments; + init->subr = fp; + init->cb = cb; + +#ifdef JANET_WINDOWS + init->write_pipe = janet_vm_iocp; + HANDLE thread_handle = CreateThread(NULL, 0, janet_thread_body, init, 0, NULL); + if (NULL == thread_handle) { + free(init); + janet_panic("failed to create thread"); + } + CloseHandle(thread_handle); /* detach from thread */ +#else + init->write_pipe = janet_vm_selfpipe[1]; + pthread_t waiter_thread; + int err = pthread_create(&waiter_thread, NULL, janet_thread_body, init); + if (err) { + free(init); + janet_panicf("%s", strerror(err)); + } + pthread_detach(waiter_thread); +#endif + + /* Increment ev refcount so we don't quit while waiting for a subprocess */ + janet_ev_inc_refcount(); +} + +/* Default callback for janet_ev_threaded_await. */ +void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) { + switch (return_value.tag) { + default: + case JANET_EV_TCTAG_NIL: + janet_schedule(return_value.fiber, janet_wrap_nil()); + break; + case JANET_EV_TCTAG_INTEGER: + janet_schedule(return_value.fiber, janet_wrap_integer(return_value.argi)); + break; + case JANET_EV_TCTAG_STRING: + case JANET_EV_TCTAG_STRINGF: + janet_schedule(return_value.fiber, janet_cstringv((const char *) return_value.argp)); + if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp); + break; + case JANET_EV_TCTAG_KEYWORD: + janet_schedule(return_value.fiber, janet_ckeywordv((const char *) return_value.argp)); + break; + case JANET_EV_TCTAG_ERR_STRING: + case JANET_EV_TCTAG_ERR_STRINGF: + janet_cancel(return_value.fiber, janet_cstringv((const char *) return_value.argp)); + if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp); + break; + case JANET_EV_TCTAG_ERR_KEYWORD: + janet_cancel(return_value.fiber, janet_ckeywordv((const char *) return_value.argp)); + break; + } + janet_gcunroot(janet_wrap_fiber(return_value.fiber)); +} + + +/* Convenience method for common case */ +void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) { + JanetEVGenericMessage arguments; + arguments.tag = tag; + arguments.argi = argi; + arguments.argp = argp; + arguments.fiber = janet_root_fiber(); + janet_gcroot(janet_wrap_fiber(arguments.fiber)); + janet_ev_threaded_call(fp, arguments, janet_ev_default_threaded_callback); + janet_await(); +} + +/* + * C API helpers for reading and writing from streams. * There is some networking code in here as well as generic - * reading and writing primitives. */ + * reading and writing primitives. + */ void janet_stream_flags(JanetStream *stream, uint32_t flags) { if (stream->flags & JANET_STREAM_CLOSED) { @@ -1730,7 +1954,13 @@ int janet_make_pipe(JanetHandle handles[2]) { return 0; #else if (pipe(handles)) return -1; + if (fcntl(handles[0], F_SETFL, O_NONBLOCK)) goto error; + if (fcntl(handles[1], F_SETFL, O_NONBLOCK)) goto error; return 0; +error: + close(handles[0]); + close(handles[1]); + return -1; #endif } @@ -1755,9 +1985,7 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) { return janet_wrap_fiber(fiber); } -static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { - janet_fixarity(argc, 1); - double sec = janet_getnumber(argv, 0); +JANET_NO_RETURN void janet_sleep_await(double sec) { JanetTimeout to; to.when = ts_delta(ts_now(), sec); to.fiber = janet_vm_root_fiber; @@ -1768,6 +1996,12 @@ static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { janet_await(); } +static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + double sec = janet_getnumber(argv, 0); + janet_sleep_await(sec); +} + static Janet cfun_ev_deadline(int32_t argc, Janet *argv) { janet_arity(argc, 1, 3); double sec = janet_getnumber(argv, 0); diff --git a/src/core/io.c b/src/core/io.c index 47376290..8bdb3202 100644 --- a/src/core/io.c +++ b/src/core/io.c @@ -777,7 +777,7 @@ static const JanetReg io_cfuns[] = { #ifndef JANET_NO_PROCESSES { "file/popen", cfun_io_popen, - JDOC("(file/popen command &opt mode)\n\n" + JDOC("(file/popen command &opt mode) (DEPRECATED for os/spawn)\n\n" "Open a file that is backed by a process. The file must be opened in either " "the :r (read) or the :w (write) mode. In :r mode, the stdout of the " "process can be read from the file. In :w mode, the stdin of the process " diff --git a/src/core/os.c b/src/core/os.c index 363505f6..d6c55a1c 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -1,5 +1,5 @@ /* -* Copyright (c) 2020 Calvin Rose +* Copyright (c) 2021 Calvin Rose * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to @@ -185,6 +185,7 @@ static Janet os_exit(int32_t argc, Janet *argv) { #ifndef JANET_REDUCED_OS #ifndef JANET_NO_PROCESSES + /* Get env for os_execute */ static char **os_execute_env(int32_t argc, const Janet *argv) { char **envp = NULL; @@ -319,13 +320,15 @@ static JanetBuffer *os_exec_escape(JanetView args) { static const JanetAbstractType ProcAT; #define JANET_PROC_CLOSED 1 #define JANET_PROC_WAITED 2 +#define JANET_PROC_WAITING 4 +#define JANET_PROC_ERROR_NONZERO 8 typedef struct { int flags; #ifdef JANET_WINDOWS HANDLE pHandle; HANDLE tHandle; #else - int pid; + pid_t pid; #endif int return_code; #ifdef JANET_EV @@ -339,6 +342,62 @@ typedef struct { #endif } JanetProc; +#ifdef JANET_EV + +#ifdef JANET_WINDOWS + +static JanetEVGenericMessage janet_proc_wait_subr(JanetEVGenericMessage args) { + JanetProc *proc = (JanetProc *) args.argp; + WaitForSingleObject(proc->pHandle, INFINITE); + GetExitCodeProcess(proc->pHandle, &args.argi); + return args; +} + +#else /* windows check */ + +/* Function that is called in separate thread to wait on a pid */ +static JanetEVGenericMessage janet_proc_wait_subr(JanetEVGenericMessage args) { + JanetProc *proc = (JanetProc *) args.argp; + pid_t result; + int status = 0; + do { + result = waitpid(proc->pid, &status, 0); + } while (result == -1 && errno == EINTR); + /* Use POSIX shell semantics for interpreting signals */ + if (WIFEXITED(status)) { + status = WEXITSTATUS(status); + } else if (WIFSTOPPED(status)) { + status = WSTOPSIG(status) + 128; + } else { + status = WTERMSIG(status) + 128; + } + args.argi = status; + return args; +} + +#endif /* End windows check */ + +/* Callback that is called in main thread when subroutine completes. */ +static void janet_proc_wait_cb(JanetEVGenericMessage args) { + int status = args.argi; + JanetProc *proc = (JanetProc *) args.argp; + if (NULL != proc) { + proc->return_code = (int32_t) status; + proc->flags |= JANET_PROC_WAITED; + proc->flags &= ~JANET_PROC_WAITING; + janet_gcunroot(janet_wrap_abstract(proc)); + janet_gcunroot(janet_wrap_fiber(args.fiber)); + if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) { + JanetString s = janet_formatc("command failed with non-zero exit code %d", status); + janet_cancel(args.fiber, janet_wrap_string(s)); + } else { + janet_schedule(args.fiber, janet_wrap_integer(status)); + } + } +} + +#endif /* End ev check */ + static int janet_proc_gc(void *p, size_t s) { (void) s; JanetProc *proc = (JanetProc *) p; @@ -367,10 +426,28 @@ static int janet_proc_mark(void *p, size_t s) { return 0; } -static Janet os_proc_wait_impl(JanetProc *proc) { - if (proc->flags & JANET_PROC_WAITED) { - janet_panicf("cannot wait on process that has already finished"); +#ifdef JANET_EV +static JANET_NO_RETURN void +#else +static Janet +#endif +os_proc_wait_impl(JanetProc *proc) { + if (proc->flags & (JANET_PROC_WAITED | JANET_PROC_WAITING)) { + janet_panicf("cannot wait twice on a process"); } +#ifdef JANET_EV + /* Event loop implementation - threaded call */ + proc->flags |= JANET_PROC_WAITING; + JanetEVGenericMessage targs; + memset(&targs, 0, sizeof(targs)); + targs.argp = proc; + targs.fiber = janet_root_fiber(); + janet_gcroot(janet_wrap_abstract(proc)); + janet_gcroot(janet_wrap_fiber(targs.fiber)); + janet_ev_threaded_call(janet_proc_wait_subr, targs, janet_proc_wait_cb); + janet_await(); +#else + /* Non evented implementation */ proc->flags |= JANET_PROC_WAITED; int status = 0; #ifdef JANET_WINDOWS @@ -386,6 +463,7 @@ static Janet os_proc_wait_impl(JanetProc *proc) { #endif proc->return_code = (int32_t) status; return janet_wrap_integer(proc->return_code); +#endif } static Janet os_proc_wait(int32_t argc, Janet *argv) { @@ -481,7 +559,7 @@ static int janet_proc_get(void *p, Janet key, Janet *out) { return 1; } if (janet_keyeq(key, "err")) { - *out = (NULL == proc->out) ? janet_wrap_nil() : janet_wrap_abstract(proc->err); + *out = (NULL == proc->err) ? janet_wrap_nil() : janet_wrap_abstract(proc->err); return 1; } if ((-1 != proc->return_code) && janet_keyeq(key, "return-code")) { @@ -575,7 +653,7 @@ static JanetFile *get_stdio_for_handle(JanetHandle handle, void *orig, int iswri } #endif -static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) { +static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) { janet_arity(argc, 1, 3); /* Get flags */ @@ -713,7 +791,7 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) { tHandle = processInfo.hThread; /* Wait and cleanup immedaitely */ - if (!is_async) { + if (!is_spawn) { DWORD code; WaitForSingleObject(pHandle, INFINITE); GetExitCodeProcess(pHandle, &code); @@ -781,45 +859,51 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) { if (status) { os_execute_cleanup(envp, child_argv); janet_panicf("%p: %s", argv[0], strerror(errno)); - } else if (is_async) { + } else if (is_spawn) { /* Get process handle */ os_execute_cleanup(envp, child_argv); } else { /* Wait to complete */ - waitpid(pid, &status, 0); os_execute_cleanup(envp, child_argv); - /* Use POSIX shell semantics for interpreting signals */ - if (WIFEXITED(status)) { - status = WEXITSTATUS(status); - } else if (WIFSTOPPED(status)) { - status = WSTOPSIG(status) + 128; - } else { - status = WTERMSIG(status) + 128; - } } #endif - if (is_async) { - JanetProc *proc = janet_abstract(&ProcAT, sizeof(JanetProc)); - proc->return_code = -1; + JanetProc *proc = janet_abstract(&ProcAT, sizeof(JanetProc)); + proc->return_code = -1; #ifdef JANET_WINDOWS - proc->pHandle = pHandle; - proc->tHandle = tHandle; + proc->pHandle = pHandle; + proc->tHandle = tHandle; #else - proc->pid = pid; + proc->pid = pid; #endif - proc->in = get_stdio_for_handle(new_in, orig_in, 1); - proc->out = get_stdio_for_handle(new_out, orig_out, 0); - proc->err = get_stdio_for_handle(new_err, orig_err, 0); - proc->flags = 0; - if (proc->in == NULL || proc->out == NULL || proc->err == NULL) { - janet_panic("failed to construct proc"); - } + proc->in = NULL; + proc->out = NULL; + proc->err = NULL; + if (new_in != JANET_HANDLE_NONE) { + proc->in = get_stdio_for_handle(new_in, orig_in, 0); + if (NULL == proc->in) janet_panic("failed to construct proc"); + } + if (new_out != JANET_HANDLE_NONE) { + proc->out = get_stdio_for_handle(new_out, orig_out, 1); + if (NULL == proc->out) janet_panic("failed to construct proc"); + } + if (new_err != JANET_HANDLE_NONE) { + proc->err = get_stdio_for_handle(new_err, orig_err, 1); + if (NULL == proc->err) janet_panic("failed to construct proc"); + } + proc->flags = 0; + if (janet_flag_at(flags, 2)) { + proc->flags |= JANET_PROC_ERROR_NONZERO; + } + + if (is_spawn) { return janet_wrap_abstract(proc); - } else if (janet_flag_at(flags, 2) && status) { - janet_panicf("command failed with non-zero exit code %d", status); } else { - return janet_wrap_integer(status); +#ifdef JANET_EV + os_proc_wait_impl(proc); +#else + return os_proc_wait_impl(proc); +#endif } } @@ -2078,6 +2162,8 @@ void janet_lib_os(JanetTable *env) { InitializeCriticalSection(&env_lock); env_lock_initialized = 1; } +#endif +#ifndef JANET_NO_PROCESSES #endif janet_core_cfuns(env, NULL, os_cfuns); } diff --git a/src/core/vm.c b/src/core/vm.c index 5f61a18b..2056090d 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -1478,7 +1478,6 @@ int janet_init(void) { janet_vm_fiber = NULL; janet_vm_root_fiber = NULL; janet_vm_stackn = 0; - /* Threads */ #ifdef JANET_THREADS janet_threads_init(); #endif diff --git a/src/include/janet.h b/src/include/janet.h index 3d4c8f21..01dd341f 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1279,14 +1279,59 @@ JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener be /* Shorthand for yielding to event loop in C */ JANET_NO_RETURN JANET_API void janet_await(void); +JANET_NO_RETURN JANET_API void janet_sleep_await(double sec); /* For use inside listeners - adds a timeout to the current fiber, such that * it will be resumed after sec seconds if no other event schedules the current fiber. */ JANET_API void janet_addtimeout(double sec); +JANET_API void janet_ev_inc_refcount(void); +JANET_API void janet_ev_dec_refcount(void); /* Get last error from a an IO operation */ JANET_API Janet janet_ev_lasterr(void); +/* Async service for calling a function or syscall in a background thread. This is not + * as efficient in the slightest as using Streams but can be used for arbitrary blocking + * functions and syscalls. */ + +/* Used to pass data between the main thread and worker threads for simple tasks. + * We could just use a pointer but this prevents malloc/free in the common case + * of only a handful of arguments. */ +typedef struct { + int tag; + int argi; + void *argp; + JanetFiber *fiber; +} JanetEVGenericMessage; + +/* How to resume or cancel after a threaded call. Not exhaustive of the possible + * ways one might want to resume after returning from a threaded call, but should + * cover most of the common cases. For something more complicated, such as resuming + * with an abstract type or a struct, one should use janet_ev_threaded_call instead + * of janet_ev_threaded_await with a custom callback. */ + +#define JANET_EV_TCTAG_NIL 0 /* resume with nil */ +#define JANET_EV_TCTAG_INTEGER 1 /* resume with janet_wrap_integer(argi) */ +#define JANET_EV_TCTAG_STRING 2 /* resume with janet_cstringv((const char *) argp) */ +#define JANET_EV_TCTAG_STRINGF 3 /* resume with janet_cstringv((const char *) argp), then call free on argp. */ +#define JANET_EV_TCTAG_KEYWORD 4 /* resume with janet_ckeywordv((const char *) argp) */ +#define JANET_EV_TCTAG_ERR_STRING 5 /* cancel with janet_cstringv((const char *) argp) */ +#define JANET_EV_TCTAG_ERR_STRINGF 6 /* cancel with janet_cstringv((const char *) argp), then call free on argp. */ +#define JANET_EV_TCTAG_ERR_KEYWORD 7 /* cancel with janet_ckeywordv((const char *) argp) */ + +/* Function pointer that is run in the thread pool */ +typedef JanetEVGenericMessage(*JanetThreadedSubroutine)(JanetEVGenericMessage arguments); + +/* Handler that is run in the main thread with the result of the JanetAsyncSubroutine */ +typedef void (*JanetThreadedCallback)(JanetEVGenericMessage return_value); + +/* API calls for quickly offloading some work in C to a new thread or thread pool. */ +JANET_API void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb); +JANET_API void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp); + +/* Callback used by janet_ev_threaded_await */ +JANET_API void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value); + /* Read async from a stream */ JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes); JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes); diff --git a/test/suite0009.janet b/test/suite0009.janet index d9afc1e6..4262a914 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -40,7 +40,7 @@ # or else the first read can fail. Might be a strange windows # "bug", but needs further investigating. Otherwise, `build_win test` # can sometimes fail on windows, leading to flaky testing. -(ev/sleep 0.2) +(ev/sleep 0.3) (defn test-echo [msg] (with [conn (net/connect "127.0.0.1" "8000")] @@ -59,6 +59,7 @@ (var pipe-counter 0) (def chan (ev/chan 10)) (let [[reader writer] (os/pipe)] + (ev/sleep 0.3) (ev/spawn (while (ev/read reader 3) (++ pipe-counter))