diff --git a/src/core/ev.c b/src/core/ev.c index 5f733c9e..09139b5f 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -1948,13 +1948,17 @@ void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, in static volatile long PipeSerialNumber; #endif -int janet_make_pipe(JanetHandle handles[2], int keep_write_side) { +int janet_make_pipe(JanetHandle handles[2], int mode) { #ifdef JANET_WINDOWS /* * On windows, the built in CreatePipe function doesn't support overlapped IO * so we lift from the windows source code and modify for our own version. + * + * mode = 0: both sides non-blocking. + * mode = 1: only read side non-blocking: write side sent to subprocess + * mode = 2: only write side non-blocking: read side sent to subprocess */ - JanetHandle rhandle, whandle; + JanetHandle shandle, chandle; UCHAR PipeNameBuffer[MAX_PATH]; SECURITY_ATTRIBUTES saAttr; memset(&saAttr, 0, sizeof(saAttr)); @@ -1964,33 +1968,45 @@ int janet_make_pipe(JanetHandle handles[2], int keep_write_side) { "\\\\.\\Pipe\\JanetPipeFile.%08x.%08x", GetCurrentProcessId(), InterlockedIncrement(&PipeSerialNumber)); - rhandle = CreateNamedPipeA( + + /* server handle goes to subprocess */ + shandle = CreateNamedPipeA( PipeNameBuffer, - PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | (keep_write_side ? PIPE_NOWAIT : PIPE_WAIT), /* why does this work? */ - 1, /* Max number of pipes for duplication. */ + (mode == 2 ? PIPE_ACCESS_INBOUND : PIPE_ACCESS_OUTBOUND) | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_WAIT, + 255, /* Max number of pipes for duplication. */ 4096, /* Out buffer size */ 4096, /* In buffer size */ 120 * 1000, /* Timeout in ms */ &saAttr); - if (!rhandle) return -1; - whandle = CreateFileA( + if (shandle == INVALID_HANDLE_VALUE) { + return -1; + } + + /* we keep client handle */ + chandle = CreateFileA( PipeNameBuffer, - GENERIC_WRITE, + (mode == 2 ? GENERIC_WRITE : GENERIC_READ), 0, &saAttr, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL); - if (whandle == INVALID_HANDLE_VALUE) { - CloseHandle(rhandle); + + if (chandle == INVALID_HANDLE_VALUE) { + CloseHandle(shandle); return -1; } - handles[0] = rhandle; - handles[1] = whandle; + if (mode == 2) { + handles[0] = shandle; + handles[1] = chandle; + } else { + handles[0] = chandle; + handles[1] = shandle; + } return 0; #else - (void) keep_write_side; + (void) mode; if (pipe(handles)) return -1; if (fcntl(handles[0], F_SETFL, O_NONBLOCK)) goto error; if (fcntl(handles[1], F_SETFL, O_NONBLOCK)) goto error; diff --git a/src/core/io.c b/src/core/io.c index abaee148..c2725a60 100644 --- a/src/core/io.c +++ b/src/core/io.c @@ -264,20 +264,29 @@ static Janet cfun_io_fflush(int32_t argc, Janet *argv) { #define WEXITSTATUS(x) x #endif +/* For closing files from C API */ +int janet_file_close(JanetFile *file) { + int ret = 0; + if (!(file->flags & (JANET_FILE_NOT_CLOSEABLE | JANET_FILE_CLOSED))) { +#ifndef JANET_NO_PROCESSES + if (file->flags & JANET_FILE_PIPED) { + ret = pclose(file->file); + } else +#endif + { + ret = fclose(file->file); + } + file->flags |= JANET_FILE_CLOSED; + return ret; + } + return 0; +} + /* Cleanup a file */ static int cfun_io_gc(void *p, size_t len) { (void) len; JanetFile *iof = (JanetFile *)p; - if (!(iof->flags & (JANET_FILE_NOT_CLOSEABLE | JANET_FILE_CLOSED))) { - /* We can't panic inside a gc, so just ignore bad statuses here */ - if (iof->flags & JANET_FILE_PIPED) { -#ifndef JANET_NO_PROCESSES - pclose(iof->file); -#endif - } else { - fclose(iof->file); - } - } + janet_file_close(iof); return 0; } @@ -723,7 +732,7 @@ static const JanetReg io_cfuns[] = { { "file/temp", cfun_io_temp, JDOC("(file/temp)\n\n" - "Open an anonymous temporary file that is removed on close." + "Open an anonymous temporary file that is removed on close. " "Raises an error on failure.") }, { diff --git a/src/core/os.c b/src/core/os.c index c686516a..9690c221 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -322,6 +322,9 @@ static const JanetAbstractType ProcAT; #define JANET_PROC_WAITED 2 #define JANET_PROC_WAITING 4 #define JANET_PROC_ERROR_NONZERO 8 +#define JANET_PROC_OWNS_STDIN 16 +#define JANET_PROC_OWNS_STDOUT 32 +#define JANET_PROC_OWNS_STDERR 64 typedef struct { int flags; #ifdef JANET_WINDOWS @@ -509,6 +512,33 @@ static Janet os_proc_kill(int32_t argc, Janet *argv) { } } +static Janet os_proc_close(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetProc *proc = janet_getabstract(argv, 0, &ProcAT); +#ifdef JANET_EV + if (proc->flags & JANET_PROC_OWNS_STDIN) janet_stream_close(proc->in); + if (proc->flags & JANET_PROC_OWNS_STDOUT) janet_stream_close(proc->out); + if (proc->flags & JANET_PROC_OWNS_STDERR) janet_stream_close(proc->err); +#else + if (proc->flags & JANET_PROC_OWNS_STDIN) janet_file_close(proc->in); + if (proc->flags & JANET_PROC_OWNS_STDOUT) janet_file_close(proc->out); + if (proc->flags & JANET_PROC_OWNS_STDERR) janet_file_close(proc->err); +#endif + proc->in = NULL; + proc->out = NULL; + proc->err = NULL; + proc->flags &= ~(JANET_PROC_OWNS_STDIN | JANET_PROC_OWNS_STDOUT | JANET_PROC_OWNS_STDERR); + if (proc->flags & (JANET_PROC_WAITED | JANET_PROC_WAITING)) { + return janet_wrap_nil(); + } +#ifdef JANET_EV + os_proc_wait_impl(proc); + return janet_wrap_nil(); +#else + return os_proc_wait_impl(proc); +#endif +} + static void swap_handles(JanetHandle *handles) { JanetHandle temp = handles[0]; handles[0] = handles[1]; @@ -533,7 +563,7 @@ static JanetHandle make_pipes(JanetHandle *handle, int reverse, int *errflag) { #ifdef JANET_EV /* non-blocking pipes */ - if (janet_make_pipe(handles, reverse)) goto error; + if (janet_make_pipe(handles, reverse ? 2 : 1)) goto error; if (reverse) swap_handles(handles); #ifdef JANET_WINDOWS if (!SetHandleInformation(handles[0], HANDLE_FLAG_INHERIT, 0)) goto error; @@ -571,6 +601,7 @@ error: static const JanetMethod proc_methods[] = { {"wait", os_proc_wait}, {"kill", os_proc_kill}, + {"close", os_proc_close}, /* dud methods for janet_proc_next */ {"in", NULL}, {"out", NULL}, @@ -720,6 +751,7 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) { JanetHandle new_in = JANET_HANDLE_NONE, new_out = JANET_HANDLE_NONE, new_err = JANET_HANDLE_NONE; JanetHandle pipe_in = JANET_HANDLE_NONE, pipe_out = JANET_HANDLE_NONE, pipe_err = JANET_HANDLE_NONE; int pipe_errflag = 0; /* Track errors setting up pipes */ + int pipe_owner_flags = 0; /* Get optional redirections */ if (argc > 2) { @@ -729,16 +761,19 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) { Janet maybe_stderr = janet_dictionary_get(tab.kvs, tab.cap, janet_ckeywordv("err")); if (janet_keyeq(maybe_stdin, "pipe")) { new_in = make_pipes(&pipe_in, 1, &pipe_errflag); + pipe_owner_flags |= JANET_PROC_OWNS_STDIN; } else if (!janet_checktype(maybe_stdin, JANET_NIL)) { new_in = janet_getjstream(&maybe_stdin, 0, &orig_in); } if (janet_keyeq(maybe_stdout, "pipe")) { new_out = make_pipes(&pipe_out, 0, &pipe_errflag); + pipe_owner_flags |= JANET_PROC_OWNS_STDOUT; } else if (!janet_checktype(maybe_stdout, JANET_NIL)) { new_out = janet_getjstream(&maybe_stdout, 0, &orig_out); } if (janet_keyeq(maybe_stderr, "pipe")) { new_err = make_pipes(&pipe_err, 0, &pipe_errflag); + pipe_owner_flags |= JANET_PROC_OWNS_STDERR; } else if (!janet_checktype(maybe_stderr, JANET_NIL)) { new_err = janet_getjstream(&maybe_stderr, 0, &orig_err); } @@ -770,6 +805,9 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) { JanetBuffer *buf = os_exec_escape(exargs); if (buf->count > 8191) { + if (pipe_in != JANET_HANDLE_NONE) CloseHandle(pipe_in); + if (pipe_out != JANET_HANDLE_NONE) CloseHandle(pipe_out); + if (pipe_err != JANET_HANDLE_NONE) CloseHandle(pipe_err); janet_panic("command line string too long (max 8191 characters)"); } const char *path = (const char *) janet_unwrap_string(exargs.items[0]); @@ -801,10 +839,6 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) { startupInfo.hStdError = (HANDLE) _get_osfhandle(2); } - /* Use _spawn family of functions. */ - /* Windows docs say do this before any spawns. */ - _flushall(); - int cp_failed = 0; if (!CreateProcess(janet_flag_at(flags, 1) ? NULL : path, (char *) buf->data, /* Single CLI argument */ @@ -906,7 +940,7 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) { proc->in = NULL; proc->out = NULL; proc->err = NULL; - proc->flags = 0; + proc->flags = pipe_owner_flags; if (janet_flag_at(flags, 2)) { proc->flags |= JANET_PROC_ERROR_NONZERO; } @@ -2055,6 +2089,12 @@ static const JanetReg os_cfuns[] = { "handle on windows. If wait is truthy, will wait for the process to finsih and " "returns the exit code. Otherwise, returns proc.") }, + { + "os/proc-close", os_proc_close, + JDOC("(os/proc-close proc)\n\n" + "Wait on a process if it has not been waited on, and close pipes created by `os/spawn` " + "if they have not been closed. Returns nil.") + }, #endif { "os/setenv", os_setenv, diff --git a/src/core/util.h b/src/core/util.h index c4af39a6..01d78ecb 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -146,7 +146,7 @@ extern const JanetAbstractType janet_address_type; #ifdef JANET_EV void janet_lib_ev(JanetTable *env); void janet_ev_mark(void); -int janet_make_pipe(JanetHandle handles[2], int keep_write_side); +int janet_make_pipe(JanetHandle handles[2], int mode); #endif #endif diff --git a/src/include/janet.h b/src/include/janet.h index 4f6781cf..feb276d3 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1778,6 +1778,7 @@ JANET_API FILE *janet_dynfile(const char *name, FILE *def); JANET_API JanetFile *janet_getjfile(const Janet *argv, int32_t n); JANET_API JanetAbstract janet_checkfile(Janet j); JANET_API FILE *janet_unwrapfile(Janet j, int32_t *flags); +JANET_API int janet_file_close(JanetFile *file); JANET_API int janet_cryptorand(uint8_t *out, size_t n); diff --git a/test/suite0009.janet b/test/suite0009.janet index 1a236f22..76455aed 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -1,4 +1,4 @@ -# Copyright (c) 2020 Calvin Rose & contributors +# Copyright (c) 2021 Calvin Rose & contributors # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to @@ -26,9 +26,10 @@ (def janet (dyn :executable)) (repeat 10 + (let [p (os/spawn [janet "-e" `(print "hello")`] :p {:out :pipe})] (os/proc-wait p) - (def x (:read (p :out) 1024)) + (def x (:read (p :out) :all)) (assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn pre close.")) (let [p (os/spawn [janet "-e" `(print "hello")`] :p {:out :pipe})] @@ -37,9 +38,15 @@ (assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn post close.")) (let [p (os/spawn [janet "-e" `(file/read stdin :line)`] :px {:in :pipe})] - (:write (p :in) "hello!") + (:write (p :in) "hello!\n") (assert-no-error "pipe stdin to process" (os/proc-wait p)))) +(let [p (os/spawn [janet "-e" `(print (file/read stdin :line))`] :px {:in :pipe :out :pipe})] + (:write (p :in) "hello!\n") + (def x (:read (p :out) 1024)) + (assert-no-error "pipe stdin to process 2" (os/proc-wait p)) + (assert (= "hello!" (string/trim x)) "round trip pipeline in process")) + # Parallel subprocesses (defn calc-1