diff --git a/src/core/ev.c b/src/core/ev.c index 42b599f1..5f733c9e 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -2004,7 +2004,6 @@ error: static void janet_ev_go(JanetFiber *fiber, Janet value, JanetChannel *supervisor_channel) { fiber->supervisor_channel = supervisor_channel; - /* janet_channel_push(supervisor_channel, make_supervisor_event("new", fiber), 2); */ janet_schedule(fiber, value); } @@ -2031,6 +2030,17 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) { return janet_wrap_fiber(fiber); } +static Janet cfun_ev_give_supervisor(int32_t argc, Janet *argv) { + janet_arity(argc, 1, -1); + JanetChannel *chan = janet_vm_root_fiber->supervisor_channel; + if (NULL != chan) { + if (janet_channel_push(chan, janet_wrap_tuple(janet_tuple_n(argv, argc)), 0)) { + janet_await(); + } + } + return janet_wrap_nil(); +} + JANET_NO_RETURN void janet_sleep_await(double sec) { JanetTimeout to; to.when = ts_delta(ts_now(), sec); @@ -2139,6 +2149,13 @@ static const JanetReg ev_cfuns[] = { "events occur in the newly scheduled fiber, an event will be pushed to the supervisor. " "If not provided, the new fiber will inherit the current supervisor.") }, + { + "ev/give-supervisor", cfun_ev_give_supervisor, + JDOC("(ev/give-supervsior tag & payload)\n\n" + "Send a message to the current supervior channel if there is one. The message will be a " + "tuple of all of the arguments combined into a single message, where the first element is tag. " + "By convention, tag should be a keyword indicating the type of message. Returns nil.") + }, { "ev/sleep", cfun_ev_sleep, JDOC("(ev/sleep sec)\n\n" diff --git a/src/core/fiber.c b/src/core/fiber.c index 03576021..e124873f 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -602,7 +602,10 @@ static const JanetReg fiber_cfuns[] = { "Create a new fiber with function body func. Can optionally " "take a set of signals to block from the current parent fiber " "when called. The mask is specified as a keyword where each character " - "is used to indicate a signal to block. The default sigmask is :y. " + "is used to indicate a signal to block. If the ev module is enabled, and " + "this fiber is used as an argument to `ev/go`, these \"blocked\" signals " + "will result in messages being sent to the supervisor channel. " + "The default sigmask is :y. " "For example,\n\n" " (fiber/new myfun :e123)\n\n" "blocks error signals and user signals 1, 2 and 3. The signals are " diff --git a/src/core/os.c b/src/core/os.c index 510d9b24..e6e53ca5 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -322,6 +322,9 @@ static const JanetAbstractType ProcAT; #define JANET_PROC_WAITED 2 #define JANET_PROC_WAITING 4 #define JANET_PROC_ERROR_NONZERO 8 +#define JANET_PROC_OWNS_STDIN 16 +#define JANET_PROC_OWNS_STDOUT 32 +#define JANET_PROC_OWNS_STDERR 64 typedef struct { int flags; #ifdef JANET_WINDOWS @@ -889,12 +892,9 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) { } /* Wait for child */ + os_execute_cleanup(envp, child_argv); if (status) { - os_execute_cleanup(envp, child_argv); janet_panicf("%p: %s", argv[0], strerror(errno)); - } else { - /* Wait to complete */ - os_execute_cleanup(envp, child_argv); } #endif @@ -909,24 +909,24 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) { proc->in = NULL; proc->out = NULL; proc->err = NULL; - if (new_in != JANET_HANDLE_NONE) { - proc->in = get_stdio_for_handle(new_in, orig_in, 1); - if (NULL == proc->in) janet_panic("failed to construct proc"); - } - if (new_out != JANET_HANDLE_NONE) { - proc->out = get_stdio_for_handle(new_out, orig_out, 0); - if (NULL == proc->out) janet_panic("failed to construct proc"); - } - if (new_err != JANET_HANDLE_NONE) { - proc->err = get_stdio_for_handle(new_err, orig_err, 0); - if (NULL == proc->err) janet_panic("failed to construct proc"); - } proc->flags = 0; if (janet_flag_at(flags, 2)) { proc->flags |= JANET_PROC_ERROR_NONZERO; } - if (is_spawn) { + /* Only set up pointers to stdin, stdout, and stderr if os/spawn. */ + if (new_in != JANET_HANDLE_NONE) { + proc->in = get_stdio_for_handle(new_in, orig_in, 1); + if (NULL == proc->in) janet_panic("failed to construct proc"); + } + if (new_out != JANET_HANDLE_NONE) { + proc->out = get_stdio_for_handle(new_out, orig_out, 0); + if (NULL == proc->out) janet_panic("failed to construct proc"); + } + if (new_err != JANET_HANDLE_NONE) { + proc->err = get_stdio_for_handle(new_err, orig_err, 0); + if (NULL == proc->err) janet_panic("failed to construct proc"); + } return janet_wrap_abstract(proc); } else { #ifdef JANET_EV diff --git a/test/suite0009.janet b/test/suite0009.janet index df4fb2ac..1a236f22 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -74,6 +74,25 @@ (calc-2 "(+ 9 10 11 12)")) @[10 26 42]) "parallel subprocesses 2") +# File piping + +(assert-no-error "file writing 1" + (with [f (file/temp)] + (os/execute [janet "-e" `(repeat 20 (print :hello))`] :p {:out f}))) + +(assert-no-error "file writing 2" + (with [f (file/open "unique.txt" :w)] + (os/execute [janet "-e" `(repeat 20 (print :hello))`] :p {:out f}) + (file/flush f))) + +# Issue #593 +(assert-no-error "file writing 3" + (def outfile (file/open "unique.txt" :w)) + (os/execute [janet "-e" "(pp (seq [i :range (1 10)] i))"] :p {:out outfile}) + (file/flush outfile) + (file/close outfile) + (os/rm "unique.txt")) + # ev/gather (assert (deep= @[1 2 3] (ev/gather 1 2 3)) "ev/gather 1")