diff --git a/src/core/ev.c b/src/core/ev.c index fbfb6d7a..0691d219 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -1560,7 +1560,11 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { status = ReadFile(s->stream->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped); if (!status && (ERROR_IO_PENDING != WSAGetLastError())) { if (WSAGetLastError() == ERROR_BROKEN_PIPE) { - janet_schedule(s->fiber, janet_wrap_nil()); + if (state->bytes_read) { + janet_schedule(s->fiber, janet_wrap_buffer(state->buf)); + } else { + janet_schedule(s->fiber, janet_wrap_nil()); + } } else { janet_cancel(s->fiber, janet_ev_lasterr()); } diff --git a/src/core/net.c b/src/core/net.c index 8f72484a..f7e1c3ac 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -529,7 +529,7 @@ static Janet cfun_stream_read(int32_t argc, Janet *argv) { double to = janet_optnumber(argv, argc, 3, INFINITY); if (janet_keyeq(argv[1], "all")) { if (to != INFINITY) janet_addtimeout(to); - janet_ev_recvchunk(stream, buffer, -1, MSG_NOSIGNAL); + janet_ev_recvchunk(stream, buffer, INT32_MAX, MSG_NOSIGNAL); } else { int32_t n = janet_getnat(argv, 1); if (to != INFINITY) janet_addtimeout(to); diff --git a/test/suite0009.janet b/test/suite0009.janet index 3e5f43df..0f69c2b6 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -21,25 +21,63 @@ (import ./helper :prefix "" :exit true) (start-suite 9) +# Subprocess + +(def janet (dyn :executable)) + (repeat 10 - # Subprocess - (let [p (os/spawn [(dyn :executable) "-e" `(print "hello")`] :p {:out :pipe})] + (let [p (os/spawn [janet "-e" `(print "hello")`] :p {:out :pipe})] (os/proc-wait p) (def x (:read (p :out) 1024)) (assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn pre close.")) - (let [p (os/spawn [(dyn :executable) "-e" `(print "hello")`] :p {:out :pipe})] + (let [p (os/spawn [janet "-e" `(print "hello")`] :p {:out :pipe})] (def x (:read (p :out) 1024)) (os/proc-wait p) (assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn post close.")) - (let [p (os/spawn [(dyn :executable) "-e" `(file/read stdin :line)`] :px {:in :pipe})] + (let [p (os/spawn [janet "-e" `(file/read stdin :line)`] :px {:in :pipe})] (:write (p :in) "hello!") (assert-no-error "pipe stdin to process" (os/proc-wait p)))) +# Parallel subprocesses + +(defn calc-1 + "Run subprocess, read from stdout, then wait on subprocess." + [code] + (let [p (os/spawn [janet "-e" (string `(printf "%j" ` code `)`)] :px {:out :pipe})] + (os/proc-wait p) + (def output (:read (p :out) :all)) + (parse output))) + +(assert + (deep= + (ev/gather + (calc-1 "(+ 1 2 3 4)") + (calc-1 "(+ 5 6 7 8)") + (calc-1 "(+ 9 10 11 12)")) + @[10 26 42]) "parallel subprocesses 1") + +(defn calc-2 + "Run subprocess, wait on subprocess, then read from stdout. Read only up to 10 bytes instead of :all" + [code] + (let [p (os/spawn [janet "-e" (string `(printf "%j" ` code `)`)] :px {:out :pipe})] + (def output (:read (p :out) 10)) + (os/proc-wait p) + (parse output))) + +(assert + (deep= + (ev/gather + (calc-2 "(+ 1 2 3 4)") + (calc-2 "(+ 5 6 7 8)") + (calc-2 "(+ 9 10 11 12)")) + @[10 26 42]) "parallel subprocesses 2") + + # Net testing -(repeat 30 +(repeat 10 (defn handler "Simple handler for connections." @@ -86,7 +124,7 @@ (var result nil) (var fiber nil) (set fiber - (ev/spawn + (ev/spawn (set result (protect (ev/sleep 10))) (assert (= result '(false "boop")) "ev/cancel 1"))) (ev/sleep 0)