diff --git a/src/core/ev.c b/src/core/ev.c index 0b2831c2..fbfb6d7a 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -1,5 +1,5 @@ /* -* Copyright (c) 2021 Calvin Rose +* Copyright (c) 2021 Calvin Rose and contributors. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to @@ -910,7 +910,7 @@ typedef struct { static JANET_THREAD_LOCAL JanetHandle janet_vm_selfpipe[2]; static void janet_ev_setup_selfpipe(void) { - if (janet_make_pipe(janet_vm_selfpipe)) { + if (janet_make_pipe(janet_vm_selfpipe, 0)) { JANET_EXIT("failed to initialize self pipe in event loop"); } } @@ -1545,16 +1545,11 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { memset(&(state->overlapped), 0, sizeof(OVERLAPPED)); int status; #ifdef JANET_NET - if (state->mode != JANET_ASYNC_READMODE_READ) { + if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { state->wbuf.len = (ULONG) chunk_size; state->wbuf.buf = state->chunk_buf; - if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { - status = WSARecvFrom((SOCKET) s->stream->handle, &state->wbuf, 1, - NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); - } else { - status = WSARecv((SOCKET) s->stream->handle, &state->wbuf, 1, - NULL, &state->flags, &state->overlapped, NULL); - } + status = WSARecvFrom((SOCKET) s->stream->handle, &state->wbuf, 1, + NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); if (status && (WSA_IO_PENDING != WSAGetLastError())) { janet_cancel(s->fiber, janet_ev_lasterr()); return JANET_ASYNC_STATUS_DONE; @@ -1772,17 +1767,13 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) int status; #ifdef JANET_NET - if (state->mode != JANET_ASYNC_WRITEMODE_WRITE) { + if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { SOCKET sock = (SOCKET) s->stream->handle; state->wbuf.buf = (char *) bytes; state->wbuf.len = len; - if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { - const struct sockaddr *to = state->dest_abst; - int tolen = (int) janet_abstract_size((void *) to); - status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL); - } else { - status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL); - } + const struct sockaddr *to = state->dest_abst; + int tolen = (int) janet_abstract_size((void *) to); + status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL); if (status && (WSA_IO_PENDING != WSAGetLastError())) { janet_cancel(s->fiber, janet_ev_lasterr()); return JANET_ASYNC_STATUS_DONE; @@ -1915,7 +1906,7 @@ void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, in static volatile long PipeSerialNumber; #endif -int janet_make_pipe(JanetHandle handles[2]) { +int janet_make_pipe(JanetHandle handles[2], int keep_write_side) { #ifdef JANET_WINDOWS /* * On windows, the built in CreatePipe function doesn't support overlapped IO @@ -1934,8 +1925,8 @@ int janet_make_pipe(JanetHandle handles[2]) { rhandle = CreateNamedPipeA( PipeNameBuffer, PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_NOWAIT, - 255, /* Max number of pipes for duplication. */ + PIPE_TYPE_BYTE | (keep_write_side ? PIPE_NOWAIT : PIPE_WAIT), /* why does this work? */ + 1, /* Max number of pipes for duplication. */ 4096, /* Out buffer size */ 4096, /* In buffer size */ 120 * 1000, /* Timeout in ms */ @@ -1957,6 +1948,7 @@ int janet_make_pipe(JanetHandle handles[2]) { handles[1] = whandle; return 0; #else + (void) keep_write_side; if (pipe(handles)) return -1; if (fcntl(handles[0], F_SETFL, O_NONBLOCK)) goto error; if (fcntl(handles[1], F_SETFL, O_NONBLOCK)) goto error; diff --git a/src/core/net.c b/src/core/net.c index b84e2c4e..8f72484a 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -1,5 +1,5 @@ /* -* Copyright (c) 2020 Calvin Rose +* Copyright (c) 2021 Calvin Rose and contributors. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to @@ -390,12 +390,17 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { } /* Connect to socket */ +#ifdef JANET_WINDOWS + int status = WSAConnect(sock, addr, addrlen, NULL, NULL, NULL, NULL); + freeaddrinfo(ai); +#else int status = connect(sock, addr, addrlen); if (is_unix) { free(ai); } else { freeaddrinfo(ai); } +#endif if (status == -1) { JSOCKCLOSE(sock); diff --git a/src/core/os.c b/src/core/os.c index b0d3fc82..ad345e66 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -1,6 +1,5 @@ /* - * -* Copyright (c) 2021 Calvin Rose +* Copyright (c) 2021 Calvin Rose and contributors. * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to @@ -534,7 +533,7 @@ static JanetHandle make_pipes(JanetHandle *handle, int reverse, int *errflag) { #ifdef JANET_EV /* non-blocking pipes */ - if (janet_make_pipe(handles)) goto error; + if (janet_make_pipe(handles, reverse)) goto error; if (reverse) swap_handles(handles); #ifdef JANET_WINDOWS if (!SetHandleInformation(handles[0], HANDLE_FLAG_INHERIT, 0)) goto error; @@ -1850,7 +1849,7 @@ static Janet os_pipe(int32_t argc, Janet *argv) { (void) argv; janet_fixarity(argc, 0); JanetHandle fds[2]; - if (janet_make_pipe(fds)) janet_panicv(janet_ev_lasterr()); + if (janet_make_pipe(fds, 0)) janet_panicv(janet_ev_lasterr()); JanetStream *reader = janet_stream(fds[0], JANET_STREAM_READABLE, NULL); JanetStream *writer = janet_stream(fds[1], JANET_STREAM_WRITABLE, NULL); Janet tup[2] = {janet_wrap_abstract(reader), janet_wrap_abstract(writer)}; diff --git a/src/core/util.h b/src/core/util.h index 22cc0e85..c4af39a6 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -146,7 +146,7 @@ extern const JanetAbstractType janet_address_type; #ifdef JANET_EV void janet_lib_ev(JanetTable *env); void janet_ev_mark(void); -int janet_make_pipe(JanetHandle handles[2]); +int janet_make_pipe(JanetHandle handles[2], int keep_write_side); #endif #endif diff --git a/test/suite0009.janet b/test/suite0009.janet index 999af128..3e5f43df 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -21,32 +21,50 @@ (import ./helper :prefix "" :exit true) (start-suite 9) +(repeat 10 + # Subprocess + (let [p (os/spawn [(dyn :executable) "-e" `(print "hello")`] :p {:out :pipe})] + (os/proc-wait p) + (def x (:read (p :out) 1024)) + (assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn pre close.")) + + (let [p (os/spawn [(dyn :executable) "-e" `(print "hello")`] :p {:out :pipe})] + (def x (:read (p :out) 1024)) + (os/proc-wait p) + (assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn post close.")) + + (let [p (os/spawn [(dyn :executable) "-e" `(file/read stdin :line)`] :px {:in :pipe})] + (:write (p :in) "hello!") + (assert-no-error "pipe stdin to process" (os/proc-wait p)))) + # Net testing -(defn handler - "Simple handler for connections." - [stream] - (defer (:close stream) - (def id (gensym)) - (def b @"") - (:read stream 1024 b) - (:write stream b) - (buffer/clear b))) +(repeat 30 -(def s (net/server "127.0.0.1" "8000" handler)) -(assert s "made server 1") + (defn handler + "Simple handler for connections." + [stream] + (defer (:close stream) + (def id (gensym)) + (def b @"") + (net/read stream 1024 b) + (net/write stream b) + (buffer/clear b))) -(defn test-echo [msg] - (with [conn (net/connect "127.0.0.1" "8000")] - (:write conn msg) - (def res (:read conn 1024)) - (assert (= (string res) msg) (string "echo " msg)))) + (def s (net/server "127.0.0.1" "8000" handler)) + (assert s "made server 1") -(test-echo "hello") -(test-echo "world") -(test-echo (string/repeat "abcd" 200)) + (defn test-echo [msg] + (with [conn (net/connect "127.0.0.1" "8000")] + (net/write conn msg) + (def res (net/read conn 1024)) + (assert (= (string res) msg) (string "echo " msg)))) -(:close s) + (test-echo "hello") + (test-echo "world") + (test-echo (string/repeat "abcd" 200)) + + (:close s)) # Create pipe @@ -78,14 +96,4 @@ (assert (os/execute [(dyn :executable) "-e" `(+ 1 2 3)`] :xp) "os/execute self") -# Subprocess -(let [p (os/spawn [(dyn :executable) "-e" `(file/read stdin :line)`] :px {:in :pipe})] - (:write (p :in) "hello!") - (assert-no-error "pipe stdin to process" (os/proc-wait p))) - -(let [p (os/spawn [(dyn :executable) "-e" `(print "hello")`] :p {:out :pipe})] - (os/proc-wait p) - (def x (:read (p :out) 1024)) - (assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn")) - (end-suite)