Add os/proc-close to close all pipes associated with a subprocess.

This will not leak handles until the GC runs in most use cases.
This commit is contained in:
Calvin Rose 2021-01-16 15:11:07 -06:00
parent c79480342b
commit 462e74ef87
6 changed files with 108 additions and 35 deletions

View File

@ -1948,13 +1948,17 @@ void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, in
static volatile long PipeSerialNumber;
#endif
int janet_make_pipe(JanetHandle handles[2], int keep_write_side) {
int janet_make_pipe(JanetHandle handles[2], int mode) {
#ifdef JANET_WINDOWS
/*
* On windows, the built in CreatePipe function doesn't support overlapped IO
* so we lift from the windows source code and modify for our own version.
*
* mode = 0: both sides non-blocking.
* mode = 1: only read side non-blocking: write side sent to subprocess
* mode = 2: only write side non-blocking: read side sent to subprocess
*/
JanetHandle rhandle, whandle;
JanetHandle shandle, chandle;
UCHAR PipeNameBuffer[MAX_PATH];
SECURITY_ATTRIBUTES saAttr;
memset(&saAttr, 0, sizeof(saAttr));
@ -1964,33 +1968,45 @@ int janet_make_pipe(JanetHandle handles[2], int keep_write_side) {
"\\\\.\\Pipe\\JanetPipeFile.%08x.%08x",
GetCurrentProcessId(),
InterlockedIncrement(&PipeSerialNumber));
rhandle = CreateNamedPipeA(
/* server handle goes to subprocess */
shandle = CreateNamedPipeA(
PipeNameBuffer,
PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | (keep_write_side ? PIPE_NOWAIT : PIPE_WAIT), /* why does this work? */
1, /* Max number of pipes for duplication. */
(mode == 2 ? PIPE_ACCESS_INBOUND : PIPE_ACCESS_OUTBOUND) | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_WAIT,
255, /* Max number of pipes for duplication. */
4096, /* Out buffer size */
4096, /* In buffer size */
120 * 1000, /* Timeout in ms */
&saAttr);
if (!rhandle) return -1;
whandle = CreateFileA(
if (shandle == INVALID_HANDLE_VALUE) {
return -1;
}
/* we keep client handle */
chandle = CreateFileA(
PipeNameBuffer,
GENERIC_WRITE,
(mode == 2 ? GENERIC_WRITE : GENERIC_READ),
0,
&saAttr,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
NULL);
if (whandle == INVALID_HANDLE_VALUE) {
CloseHandle(rhandle);
if (chandle == INVALID_HANDLE_VALUE) {
CloseHandle(shandle);
return -1;
}
handles[0] = rhandle;
handles[1] = whandle;
if (mode == 2) {
handles[0] = shandle;
handles[1] = chandle;
} else {
handles[0] = chandle;
handles[1] = shandle;
}
return 0;
#else
(void) keep_write_side;
(void) mode;
if (pipe(handles)) return -1;
if (fcntl(handles[0], F_SETFL, O_NONBLOCK)) goto error;
if (fcntl(handles[1], F_SETFL, O_NONBLOCK)) goto error;

View File

@ -264,20 +264,29 @@ static Janet cfun_io_fflush(int32_t argc, Janet *argv) {
#define WEXITSTATUS(x) x
#endif
/* For closing files from C API */
int janet_file_close(JanetFile *file) {
int ret = 0;
if (!(file->flags & (JANET_FILE_NOT_CLOSEABLE | JANET_FILE_CLOSED))) {
#ifndef JANET_NO_PROCESSES
if (file->flags & JANET_FILE_PIPED) {
ret = pclose(file->file);
} else
#endif
{
ret = fclose(file->file);
}
file->flags |= JANET_FILE_CLOSED;
return ret;
}
return 0;
}
/* Cleanup a file */
static int cfun_io_gc(void *p, size_t len) {
(void) len;
JanetFile *iof = (JanetFile *)p;
if (!(iof->flags & (JANET_FILE_NOT_CLOSEABLE | JANET_FILE_CLOSED))) {
/* We can't panic inside a gc, so just ignore bad statuses here */
if (iof->flags & JANET_FILE_PIPED) {
#ifndef JANET_NO_PROCESSES
pclose(iof->file);
#endif
} else {
fclose(iof->file);
}
}
janet_file_close(iof);
return 0;
}
@ -723,7 +732,7 @@ static const JanetReg io_cfuns[] = {
{
"file/temp", cfun_io_temp,
JDOC("(file/temp)\n\n"
"Open an anonymous temporary file that is removed on close."
"Open an anonymous temporary file that is removed on close. "
"Raises an error on failure.")
},
{

View File

@ -322,6 +322,9 @@ static const JanetAbstractType ProcAT;
#define JANET_PROC_WAITED 2
#define JANET_PROC_WAITING 4
#define JANET_PROC_ERROR_NONZERO 8
#define JANET_PROC_OWNS_STDIN 16
#define JANET_PROC_OWNS_STDOUT 32
#define JANET_PROC_OWNS_STDERR 64
typedef struct {
int flags;
#ifdef JANET_WINDOWS
@ -509,6 +512,33 @@ static Janet os_proc_kill(int32_t argc, Janet *argv) {
}
}
static Janet os_proc_close(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetProc *proc = janet_getabstract(argv, 0, &ProcAT);
#ifdef JANET_EV
if (proc->flags & JANET_PROC_OWNS_STDIN) janet_stream_close(proc->in);
if (proc->flags & JANET_PROC_OWNS_STDOUT) janet_stream_close(proc->out);
if (proc->flags & JANET_PROC_OWNS_STDERR) janet_stream_close(proc->err);
#else
if (proc->flags & JANET_PROC_OWNS_STDIN) janet_file_close(proc->in);
if (proc->flags & JANET_PROC_OWNS_STDOUT) janet_file_close(proc->out);
if (proc->flags & JANET_PROC_OWNS_STDERR) janet_file_close(proc->err);
#endif
proc->in = NULL;
proc->out = NULL;
proc->err = NULL;
proc->flags &= ~(JANET_PROC_OWNS_STDIN | JANET_PROC_OWNS_STDOUT | JANET_PROC_OWNS_STDERR);
if (proc->flags & (JANET_PROC_WAITED | JANET_PROC_WAITING)) {
return janet_wrap_nil();
}
#ifdef JANET_EV
os_proc_wait_impl(proc);
return janet_wrap_nil();
#else
return os_proc_wait_impl(proc);
#endif
}
static void swap_handles(JanetHandle *handles) {
JanetHandle temp = handles[0];
handles[0] = handles[1];
@ -533,7 +563,7 @@ static JanetHandle make_pipes(JanetHandle *handle, int reverse, int *errflag) {
#ifdef JANET_EV
/* non-blocking pipes */
if (janet_make_pipe(handles, reverse)) goto error;
if (janet_make_pipe(handles, reverse ? 2 : 1)) goto error;
if (reverse) swap_handles(handles);
#ifdef JANET_WINDOWS
if (!SetHandleInformation(handles[0], HANDLE_FLAG_INHERIT, 0)) goto error;
@ -571,6 +601,7 @@ error:
static const JanetMethod proc_methods[] = {
{"wait", os_proc_wait},
{"kill", os_proc_kill},
{"close", os_proc_close},
/* dud methods for janet_proc_next */
{"in", NULL},
{"out", NULL},
@ -720,6 +751,7 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) {
JanetHandle new_in = JANET_HANDLE_NONE, new_out = JANET_HANDLE_NONE, new_err = JANET_HANDLE_NONE;
JanetHandle pipe_in = JANET_HANDLE_NONE, pipe_out = JANET_HANDLE_NONE, pipe_err = JANET_HANDLE_NONE;
int pipe_errflag = 0; /* Track errors setting up pipes */
int pipe_owner_flags = 0;
/* Get optional redirections */
if (argc > 2) {
@ -729,16 +761,19 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) {
Janet maybe_stderr = janet_dictionary_get(tab.kvs, tab.cap, janet_ckeywordv("err"));
if (janet_keyeq(maybe_stdin, "pipe")) {
new_in = make_pipes(&pipe_in, 1, &pipe_errflag);
pipe_owner_flags |= JANET_PROC_OWNS_STDIN;
} else if (!janet_checktype(maybe_stdin, JANET_NIL)) {
new_in = janet_getjstream(&maybe_stdin, 0, &orig_in);
}
if (janet_keyeq(maybe_stdout, "pipe")) {
new_out = make_pipes(&pipe_out, 0, &pipe_errflag);
pipe_owner_flags |= JANET_PROC_OWNS_STDOUT;
} else if (!janet_checktype(maybe_stdout, JANET_NIL)) {
new_out = janet_getjstream(&maybe_stdout, 0, &orig_out);
}
if (janet_keyeq(maybe_stderr, "pipe")) {
new_err = make_pipes(&pipe_err, 0, &pipe_errflag);
pipe_owner_flags |= JANET_PROC_OWNS_STDERR;
} else if (!janet_checktype(maybe_stderr, JANET_NIL)) {
new_err = janet_getjstream(&maybe_stderr, 0, &orig_err);
}
@ -770,6 +805,9 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) {
JanetBuffer *buf = os_exec_escape(exargs);
if (buf->count > 8191) {
if (pipe_in != JANET_HANDLE_NONE) CloseHandle(pipe_in);
if (pipe_out != JANET_HANDLE_NONE) CloseHandle(pipe_out);
if (pipe_err != JANET_HANDLE_NONE) CloseHandle(pipe_err);
janet_panic("command line string too long (max 8191 characters)");
}
const char *path = (const char *) janet_unwrap_string(exargs.items[0]);
@ -801,10 +839,6 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) {
startupInfo.hStdError = (HANDLE) _get_osfhandle(2);
}
/* Use _spawn family of functions. */
/* Windows docs say do this before any spawns. */
_flushall();
int cp_failed = 0;
if (!CreateProcess(janet_flag_at(flags, 1) ? NULL : path,
(char *) buf->data, /* Single CLI argument */
@ -906,7 +940,7 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) {
proc->in = NULL;
proc->out = NULL;
proc->err = NULL;
proc->flags = 0;
proc->flags = pipe_owner_flags;
if (janet_flag_at(flags, 2)) {
proc->flags |= JANET_PROC_ERROR_NONZERO;
}
@ -2055,6 +2089,12 @@ static const JanetReg os_cfuns[] = {
"handle on windows. If wait is truthy, will wait for the process to finsih and "
"returns the exit code. Otherwise, returns proc.")
},
{
"os/proc-close", os_proc_close,
JDOC("(os/proc-close proc)\n\n"
"Wait on a process if it has not been waited on, and close pipes created by `os/spawn` "
"if they have not been closed. Returns nil.")
},
#endif
{
"os/setenv", os_setenv,

View File

@ -146,7 +146,7 @@ extern const JanetAbstractType janet_address_type;
#ifdef JANET_EV
void janet_lib_ev(JanetTable *env);
void janet_ev_mark(void);
int janet_make_pipe(JanetHandle handles[2], int keep_write_side);
int janet_make_pipe(JanetHandle handles[2], int mode);
#endif
#endif

View File

@ -1778,6 +1778,7 @@ JANET_API FILE *janet_dynfile(const char *name, FILE *def);
JANET_API JanetFile *janet_getjfile(const Janet *argv, int32_t n);
JANET_API JanetAbstract janet_checkfile(Janet j);
JANET_API FILE *janet_unwrapfile(Janet j, int32_t *flags);
JANET_API int janet_file_close(JanetFile *file);
JANET_API int janet_cryptorand(uint8_t *out, size_t n);

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020 Calvin Rose & contributors
# Copyright (c) 2021 Calvin Rose & contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
@ -26,9 +26,10 @@
(def janet (dyn :executable))
(repeat 10
(let [p (os/spawn [janet "-e" `(print "hello")`] :p {:out :pipe})]
(os/proc-wait p)
(def x (:read (p :out) 1024))
(def x (:read (p :out) :all))
(assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn pre close."))
(let [p (os/spawn [janet "-e" `(print "hello")`] :p {:out :pipe})]
@ -37,9 +38,15 @@
(assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn post close."))
(let [p (os/spawn [janet "-e" `(file/read stdin :line)`] :px {:in :pipe})]
(:write (p :in) "hello!")
(:write (p :in) "hello!\n")
(assert-no-error "pipe stdin to process" (os/proc-wait p))))
(let [p (os/spawn [janet "-e" `(print (file/read stdin :line))`] :px {:in :pipe :out :pipe})]
(:write (p :in) "hello!\n")
(def x (:read (p :out) 1024))
(assert-no-error "pipe stdin to process 2" (os/proc-wait p))
(assert (= "hello!" (string/trim x)) "round trip pipeline in process"))
# Parallel subprocesses
(defn calc-1