More work on windows networking code.

Remove use of WSARecv and WSASend since for whatever reason
they seem suspect. We may want to revisit this later.
This commit is contained in:
Calvin Rose 2021-01-11 18:00:31 -06:00
parent 80c5ba32b5
commit bf01bf631d
5 changed files with 61 additions and 57 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Calvin Rose
* Copyright (c) 2021 Calvin Rose and contributors.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
@ -910,7 +910,7 @@ typedef struct {
static JANET_THREAD_LOCAL JanetHandle janet_vm_selfpipe[2];
static void janet_ev_setup_selfpipe(void) {
if (janet_make_pipe(janet_vm_selfpipe)) {
if (janet_make_pipe(janet_vm_selfpipe, 0)) {
JANET_EXIT("failed to initialize self pipe in event loop");
}
}
@ -1545,16 +1545,11 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
memset(&(state->overlapped), 0, sizeof(OVERLAPPED));
int status;
#ifdef JANET_NET
if (state->mode != JANET_ASYNC_READMODE_READ) {
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
state->wbuf.len = (ULONG) chunk_size;
state->wbuf.buf = state->chunk_buf;
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
status = WSARecvFrom((SOCKET) s->stream->handle, &state->wbuf, 1,
NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
} else {
status = WSARecv((SOCKET) s->stream->handle, &state->wbuf, 1,
NULL, &state->flags, &state->overlapped, NULL);
}
status = WSARecvFrom((SOCKET) s->stream->handle, &state->wbuf, 1,
NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
if (status && (WSA_IO_PENDING != WSAGetLastError())) {
janet_cancel(s->fiber, janet_ev_lasterr());
return JANET_ASYNC_STATUS_DONE;
@ -1772,17 +1767,13 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
int status;
#ifdef JANET_NET
if (state->mode != JANET_ASYNC_WRITEMODE_WRITE) {
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
SOCKET sock = (SOCKET) s->stream->handle;
state->wbuf.buf = (char *) bytes;
state->wbuf.len = len;
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
const struct sockaddr *to = state->dest_abst;
int tolen = (int) janet_abstract_size((void *) to);
status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
} else {
status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL);
}
const struct sockaddr *to = state->dest_abst;
int tolen = (int) janet_abstract_size((void *) to);
status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
if (status && (WSA_IO_PENDING != WSAGetLastError())) {
janet_cancel(s->fiber, janet_ev_lasterr());
return JANET_ASYNC_STATUS_DONE;
@ -1915,7 +1906,7 @@ void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, in
static volatile long PipeSerialNumber;
#endif
int janet_make_pipe(JanetHandle handles[2]) {
int janet_make_pipe(JanetHandle handles[2], int keep_write_side) {
#ifdef JANET_WINDOWS
/*
* On windows, the built in CreatePipe function doesn't support overlapped IO
@ -1934,8 +1925,8 @@ int janet_make_pipe(JanetHandle handles[2]) {
rhandle = CreateNamedPipeA(
PipeNameBuffer,
PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_NOWAIT,
255, /* Max number of pipes for duplication. */
PIPE_TYPE_BYTE | (keep_write_side ? PIPE_NOWAIT : PIPE_WAIT), /* why does this work? */
1, /* Max number of pipes for duplication. */
4096, /* Out buffer size */
4096, /* In buffer size */
120 * 1000, /* Timeout in ms */
@ -1957,6 +1948,7 @@ int janet_make_pipe(JanetHandle handles[2]) {
handles[1] = whandle;
return 0;
#else
(void) keep_write_side;
if (pipe(handles)) return -1;
if (fcntl(handles[0], F_SETFL, O_NONBLOCK)) goto error;
if (fcntl(handles[1], F_SETFL, O_NONBLOCK)) goto error;

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Calvin Rose
* Copyright (c) 2021 Calvin Rose and contributors.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
@ -390,12 +390,17 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
}
/* Connect to socket */
#ifdef JANET_WINDOWS
int status = WSAConnect(sock, addr, addrlen, NULL, NULL, NULL, NULL);
freeaddrinfo(ai);
#else
int status = connect(sock, addr, addrlen);
if (is_unix) {
free(ai);
} else {
freeaddrinfo(ai);
}
#endif
if (status == -1) {
JSOCKCLOSE(sock);

View File

@ -1,6 +1,5 @@
/*
*
* Copyright (c) 2021 Calvin Rose
* Copyright (c) 2021 Calvin Rose and contributors.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
@ -534,7 +533,7 @@ static JanetHandle make_pipes(JanetHandle *handle, int reverse, int *errflag) {
#ifdef JANET_EV
/* non-blocking pipes */
if (janet_make_pipe(handles)) goto error;
if (janet_make_pipe(handles, reverse)) goto error;
if (reverse) swap_handles(handles);
#ifdef JANET_WINDOWS
if (!SetHandleInformation(handles[0], HANDLE_FLAG_INHERIT, 0)) goto error;
@ -1850,7 +1849,7 @@ static Janet os_pipe(int32_t argc, Janet *argv) {
(void) argv;
janet_fixarity(argc, 0);
JanetHandle fds[2];
if (janet_make_pipe(fds)) janet_panicv(janet_ev_lasterr());
if (janet_make_pipe(fds, 0)) janet_panicv(janet_ev_lasterr());
JanetStream *reader = janet_stream(fds[0], JANET_STREAM_READABLE, NULL);
JanetStream *writer = janet_stream(fds[1], JANET_STREAM_WRITABLE, NULL);
Janet tup[2] = {janet_wrap_abstract(reader), janet_wrap_abstract(writer)};

View File

@ -146,7 +146,7 @@ extern const JanetAbstractType janet_address_type;
#ifdef JANET_EV
void janet_lib_ev(JanetTable *env);
void janet_ev_mark(void);
int janet_make_pipe(JanetHandle handles[2]);
int janet_make_pipe(JanetHandle handles[2], int keep_write_side);
#endif
#endif

View File

@ -21,32 +21,50 @@
(import ./helper :prefix "" :exit true)
(start-suite 9)
(repeat 10
# Subprocess
(let [p (os/spawn [(dyn :executable) "-e" `(print "hello")`] :p {:out :pipe})]
(os/proc-wait p)
(def x (:read (p :out) 1024))
(assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn pre close."))
(let [p (os/spawn [(dyn :executable) "-e" `(print "hello")`] :p {:out :pipe})]
(def x (:read (p :out) 1024))
(os/proc-wait p)
(assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn post close."))
(let [p (os/spawn [(dyn :executable) "-e" `(file/read stdin :line)`] :px {:in :pipe})]
(:write (p :in) "hello!")
(assert-no-error "pipe stdin to process" (os/proc-wait p))))
# Net testing
(defn handler
"Simple handler for connections."
[stream]
(defer (:close stream)
(def id (gensym))
(def b @"")
(:read stream 1024 b)
(:write stream b)
(buffer/clear b)))
(repeat 30
(def s (net/server "127.0.0.1" "8000" handler))
(assert s "made server 1")
(defn handler
"Simple handler for connections."
[stream]
(defer (:close stream)
(def id (gensym))
(def b @"")
(net/read stream 1024 b)
(net/write stream b)
(buffer/clear b)))
(defn test-echo [msg]
(with [conn (net/connect "127.0.0.1" "8000")]
(:write conn msg)
(def res (:read conn 1024))
(assert (= (string res) msg) (string "echo " msg))))
(def s (net/server "127.0.0.1" "8000" handler))
(assert s "made server 1")
(test-echo "hello")
(test-echo "world")
(test-echo (string/repeat "abcd" 200))
(defn test-echo [msg]
(with [conn (net/connect "127.0.0.1" "8000")]
(net/write conn msg)
(def res (net/read conn 1024))
(assert (= (string res) msg) (string "echo " msg))))
(:close s)
(test-echo "hello")
(test-echo "world")
(test-echo (string/repeat "abcd" 200))
(:close s))
# Create pipe
@ -78,14 +96,4 @@
(assert (os/execute [(dyn :executable) "-e" `(+ 1 2 3)`] :xp) "os/execute self")
# Subprocess
(let [p (os/spawn [(dyn :executable) "-e" `(file/read stdin :line)`] :px {:in :pipe})]
(:write (p :in) "hello!")
(assert-no-error "pipe stdin to process" (os/proc-wait p)))
(let [p (os/spawn [(dyn :executable) "-e" `(print "hello")`] :p {:out :pipe})]
(os/proc-wait p)
(def x (:read (p :out) 1024))
(assert (deep= "hello" (string/trim x)) "capture stdout from os/spawn"))
(end-suite)