Improve reading and writing from streams.

Handle errors earlier, and allow 0 length packets from UDP
but not from TCP. This should more closely follow the exact specs
of send and recv calls.
This commit is contained in:
Calvin Rose 2020-11-02 09:05:19 -06:00
parent 8942e348bd
commit 4d21b582c7
2 changed files with 30 additions and 19 deletions

View File

@ -6,7 +6,7 @@
(def b @"")
(print "Connection " id "!")
(while (:read stream 1024 b)
(repeat 10 (print "work for " id " ...") (ev/sleep 1))
(repeat 10 (print "work for " id " ...") (ev/sleep 0.1))
(:write stream b)
(buffer/clear b))
(printf "Done %v!" id)))

View File

@ -207,8 +207,18 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0);
}
} while (nread == -1 && JLASTERR == JEINTR);
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) {
break;
/* Check for errors - special case errors that can just be waited on to fix */
if (nread == -1) {
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR)));
return JANET_ASYNC_STATUS_DONE;
}
/* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
if (nread == 0 && !state->is_recv_from) {
janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE;
}
/* Increment buffer counts */
@ -229,14 +239,7 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
memcpy(abst, &saddr, socklen);
resume_val = janet_wrap_abstract(abst);
} else {
if (nread > 0) {
resume_val = janet_wrap_buffer(buffer);
} else {
sig = JANET_SIGNAL_ERROR;
resume_val = (nread == -1)
? janet_cstringv(strerror(JLASTERR))
: janet_cstringv("could not read");
}
resume_val = janet_wrap_buffer(buffer);
}
janet_schedule_signal(s->fiber, resume_val, sig);
return JANET_ASYNC_STATUS_DONE;
@ -339,8 +342,8 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
JReadInt nwrote = 0;
if (start < len) {
int32_t nbytes = len - start;
void *dest_abst = state->dest_abst;
do {
void *dest_abst = state->dest_abst;
if (dest_abst) {
nwrote = sendto(s->pollable->handle, bytes + start, nbytes, 0,
(struct sockaddr *) dest_abst, janet_abstract_size(dest_abst));
@ -348,6 +351,20 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL);
}
} while (nwrote == -1 && JLASTERR == JEINTR);
/* Handle write errors */
if (nwrote == -1) {
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR)));
return JANET_ASYNC_STATUS_DONE;
}
/* Unless using datagrams, empty message is a disocnnect */
if (nwrote == 0 && !dest_abst) {
janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE;
}
if (nwrote > 0) {
start += nwrote;
} else {
@ -356,13 +373,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
}
state->start = start;
if (start >= len) {
if (nwrote > 0) {
janet_schedule(s->fiber, janet_wrap_nil());
} else if (nwrote == 0) {
janet_cancel(s->fiber, janet_cstringv("could not write"));
} else {
janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR)));
}
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
}
break;