From 4d21b582c781ecdcb0b7c16551f6ec0ac5f5e67c Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Mon, 2 Nov 2020 09:05:19 -0600 Subject: [PATCH] Improve reading and writing from streams. Handle errors earlier, and allow 0 length packets from UDP but not from TCP. This should more closely follow the exact specs of send and recv calls. --- examples/tcpserver.janet | 2 +- src/core/net.c | 47 +++++++++++++++++++++++++--------------- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/examples/tcpserver.janet b/examples/tcpserver.janet index 2a284bcb..5fbbea69 100644 --- a/examples/tcpserver.janet +++ b/examples/tcpserver.janet @@ -6,7 +6,7 @@ (def b @"") (print "Connection " id "!") (while (:read stream 1024 b) - (repeat 10 (print "work for " id " ...") (ev/sleep 1)) + (repeat 10 (print "work for " id " ...") (ev/sleep 0.1)) (:write stream b) (buffer/clear b)) (printf "Done %v!" id))) diff --git a/src/core/net.c b/src/core/net.c index 3d0f70e0..c64f6793 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -207,8 +207,18 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0); } } while (nread == -1 && JLASTERR == JEINTR); - if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) { - break; + + /* Check for errors - special case errors that can just be waited on to fix */ + if (nread == -1) { + if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break; + janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR))); + return JANET_ASYNC_STATUS_DONE; + } + + /* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */ + if (nread == 0 && !state->is_recv_from) { + janet_cancel(s->fiber, janet_cstringv("disconnect")); + return JANET_ASYNC_STATUS_DONE; } /* Increment buffer counts */ @@ -229,14 +239,7 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) memcpy(abst, &saddr, socklen); resume_val = janet_wrap_abstract(abst); } else { - if (nread > 0) { - resume_val = janet_wrap_buffer(buffer); - } else { - sig = JANET_SIGNAL_ERROR; - resume_val = (nread == -1) - ? janet_cstringv(strerror(JLASTERR)) - : janet_cstringv("could not read"); - } + resume_val = janet_wrap_buffer(buffer); } janet_schedule_signal(s->fiber, resume_val, sig); return JANET_ASYNC_STATUS_DONE; @@ -339,8 +342,8 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) JReadInt nwrote = 0; if (start < len) { int32_t nbytes = len - start; + void *dest_abst = state->dest_abst; do { - void *dest_abst = state->dest_abst; if (dest_abst) { nwrote = sendto(s->pollable->handle, bytes + start, nbytes, 0, (struct sockaddr *) dest_abst, janet_abstract_size(dest_abst)); @@ -348,6 +351,20 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL); } } while (nwrote == -1 && JLASTERR == JEINTR); + + /* Handle write errors */ + if (nwrote == -1) { + if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break; + janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR))); + return JANET_ASYNC_STATUS_DONE; + } + + /* Unless using datagrams, empty message is a disocnnect */ + if (nwrote == 0 && !dest_abst) { + janet_cancel(s->fiber, janet_cstringv("disconnect")); + return JANET_ASYNC_STATUS_DONE; + } + if (nwrote > 0) { start += nwrote; } else { @@ -356,13 +373,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) } state->start = start; if (start >= len) { - if (nwrote > 0) { - janet_schedule(s->fiber, janet_wrap_nil()); - } else if (nwrote == 0) { - janet_cancel(s->fiber, janet_cstringv("could not write")); - } else { - janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR))); - } + janet_schedule(s->fiber, janet_wrap_nil()); return JANET_ASYNC_STATUS_DONE; } break;