From b3e88a8d80e8e46ce100b76a3d1480fee0704f33 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sat, 14 Nov 2020 11:48:23 -0600 Subject: [PATCH] Move read/write functions into ev.c from net.c This code can also be used for non-network streams. --- src/boot/boot.janet | 6 + src/core/ev.c | 467 ++++++++++++++++++++++++++++++++++++++++++-- src/core/net.c | 413 ++------------------------------------- src/core/util.h | 1 + src/include/janet.h | 25 ++- 5 files changed, 504 insertions(+), 408 deletions(-) diff --git a/src/boot/boot.janet b/src/boot/boot.janet index 03d0a0e5..b7aaf82e 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -2733,6 +2733,12 @@ (if (dyn sym) form)) +(guarddef ev/go + (defmacro ev/spawn + "Run some code in a new fiber. This is shorthand for (ev/call (fn [] ;body))." + [& body] + ~(,ev/call (fn [] ,;body)))) + (guarddef net/listen (defn net/server "Start a server asynchornously with net/listen and net/accept-loop. Returns the new server stream." diff --git a/src/core/ev.c b/src/core/ev.c index 603db388..69ffed3a 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -32,14 +32,11 @@ #ifdef JANET_EV /* Includes */ - #ifdef JANET_WINDOWS - +#include #include #include - #else - #include #include #include @@ -47,20 +44,16 @@ #include #include #include - -#ifdef JANET_BSD #include -#endif - +#include +#include +#include #ifdef JANET_EV_EPOLL #include #include #endif - #endif -/* General queue */ - /* Ring buffer for storing a list of fibers */ typedef struct { int32_t capacity; @@ -1050,6 +1043,458 @@ void janet_ev_deinit(void) { #endif +/* C API helpers for reading and writing from streams. + * There is some networking code in here as well as generic + * reading and writing primitives. */ + + +/* When there is an IO error, we need to be able to convert it to a Janet + * string to raise a Janet error. */ +#ifdef JANET_WINDOWS +#define JANET_EV_CHUNKSIZE 4096 +Janet janet_ev_lasterr(void) { + int code = GetLastError(); + char msgbuf[256]; + msgbuf[0] = '\0'; + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + code, + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + msgbuf, + sizeof(msgbuf), + NULL); + if (!*msgbuf) sprintf(msgbuf, "%d", code); + char *c = msgbuf; + while (*c) { + if (*c == '\n' || *c == '\r') { + *c = '\0'; + break; + } + c++; + } + return janet_cstringv(msgbuf); +} +#else +Janet janet_ev_lasterr(void) { + return janet_cstringv(strerror(errno)); +} +#endif + +/* State machine for read/recv/recvfrom */ + +typedef enum { + JANET_ASYNC_READMODE_READ, + JANET_ASYNC_READMODE_RECV, + JANET_ASYNC_READMODE_RECVFROM +} JanetReadMode; + +typedef struct { + JanetListenerState head; + int32_t bytes_left; + JanetBuffer *buf; + int is_chunk; + JanetReadMode mode; +#ifdef JANET_WINDOWS + OVERLAPPED overlapped; +#ifdef JANET_NET + WSABUF wbuf; + DWORD flags; + struct sockaddr from; + int fromlen; +#endif + uint8_t chunk_buf[JANET_EV_CHUNKSIZE]; +#else + int flags; +#endif +} StateRead; + +JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { + StateRead *state = (StateRead *) s; + switch (event) { + default: + break; + case JANET_ASYNC_EVENT_MARK: + janet_mark(janet_wrap_buffer(state->buf)); + break; + case JANET_ASYNC_EVENT_CLOSE: + janet_cancel(s->fiber, janet_cstringv("stream closed")); + return JANET_ASYNC_STATUS_DONE; +#ifdef JANET_WINDOWS + case JANET_ASYNC_EVENT_COMPLETE: { + /* Called when read finished */ + if (s->bytes == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { + janet_schedule(s->fiber, janet_wrap_nil()); + return JANET_ASYNC_STATUS_DONE; + } + + janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes); + state->bytes_left -= s->bytes; + + if (state->bytes_left <= 0 || !state->is_chunk) { + Janet resume_val; +#ifdef JANET_NET + if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { + void *abst = janet_abstract(&janet_address_type, state->fromlen); + memcpy(abst, &state->from, state->fromlen); + resume_val = janet_wrap_abstract(abst); + } else +#endif + { + resume_val = janet_wrap_buffer(state->buf); + } + janet_schedule(s->fiber, resume_val); + return JANET_ASYNC_STATUS_DONE; + } + } + + /* fallthrough */ + case JANET_ASYNC_EVENT_USER: { + int32_t chunk_size = state->bytes_left > JANET_EV_CHUNKSIZE ? JANET_EV_CHUNKSIZE : state->bytes_left; + s->tag = &state->overlapped; + memset(&(state->overlapped), 0, sizeof(OVERLAPPED)); + int status; +#ifdef JANET_NET + if (state->mode != JANET_ASYNC_READMODE_READ) { + state->wbuf.len = (ULONG) chunk_size; + state->wbuf.buf = state->chunk_buf; + if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { + status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); + } else + } + status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL); + } + if (status && (WSA_IO_PENDING != WSAGetLastError())) { + janet_cancel(s->fiber, janet_ev_lasterr()); + return JANET_ASYNC_STATUS_DONE; + } + } else +#endif + { + status = ReadFile(s->pollable->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped); + if (status && (ERROR_IO_PENDING != WSAGetLastError())) { + janet_cancel(s->fiber, janet_ev_lasterr()); + return JANET_ASYNC_STATUS_DONE; + } + } +} +break; +#else + case JANET_ASYNC_EVENT_READ: + /* Read in bytes */ + { + JanetBuffer *buffer = state->buf; + int32_t bytes_left = state->bytes_left; + janet_buffer_extra(buffer, bytes_left); + ssize_t nread; +#ifdef JANET_NET + char saddr[256]; + socklen_t socklen = sizeof(saddr); +#endif + do { +#ifdef JANET_NET + if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { + nread = recvfrom(s->pollable->handle, buffer->data + buffer->count, bytes_left, state->flags, + (struct sockaddr *)&saddr, &socklen); + } else if (state->mode == JANET_ASYNC_READMODE_RECV) { + nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, state->flags); + } else +#endif + { + nread = read(s->pollable->handle, buffer->data + buffer->count, bytes_left); + } + } while (nread == -1 && errno == EINTR); + + /* Check for errors - special case errors that can just be waited on to fix */ + if (nread == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) break; + janet_cancel(s->fiber, janet_ev_lasterr()); + return JANET_ASYNC_STATUS_DONE; + } + + /* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */ + if (nread == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { + janet_schedule(s->fiber, janet_wrap_nil()); + return JANET_ASYNC_STATUS_DONE; + } + + /* Increment buffer counts */ + if (nread > 0) { + buffer->count += nread; + bytes_left -= nread; + } else { + bytes_left = 0; + } + state->bytes_left = bytes_left; + + /* Resume if done */ + if (!state->is_chunk || bytes_left == 0) { + Janet resume_val; +#ifdef JANET_NET + if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { + void *abst = janet_abstract(&janet_address_type, socklen); + memcpy(abst, &saddr, socklen); + resume_val = janet_wrap_abstract(abst); + } else +#endif + { + resume_val = janet_wrap_buffer(buffer); + } + janet_schedule(s->fiber, resume_val); + return JANET_ASYNC_STATUS_DONE; + } + } + break; +#endif +} +return JANET_ASYNC_STATUS_NOT_DONE; +} + +static void janet_ev_read_generic(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) { + StateRead *state = (StateRead *) janet_listen(stream, ev_machine_read, + JANET_ASYNC_LISTEN_READ, sizeof(StateRead), NULL); + state->is_chunk = is_chunked; + state->buf = buf; + state->bytes_left = nbytes; + state->mode = mode; +#ifdef JANET_WINDOWS + ev_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); + state->flags = (DWORD) flags; +#else + state->flags = flags; +#endif +} + +void janet_ev_read(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes) { + janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_READ, 0); +} +void janet_ev_readchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes) { + janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_READ, 0); +} +#ifdef JANET_NET +void janet_ev_recv(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) { + janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECV, flags); +} +void janet_ev_recvchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) { + janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_RECV, flags); +} +void janet_ev_recvfrom(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) { + janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECVFROM, flags); +} +#endif + +/* + * State machine for write/send/send-to + */ + +typedef enum { + JANET_ASYNC_WRITEMODE_WRITE, + JANET_ASYNC_WRITEMODE_SEND, + JANET_ASYNC_WRITEMODE_SENDTO +} JanetWriteMode; + +typedef struct { + JanetListenerState head; + union { + JanetBuffer *buf; + const uint8_t *str; + } src; + int is_buffer; + JanetWriteMode mode; + void *dest_abst; +#ifdef JANET_WINDOWS + OVERLAPPED overlapped; +#ifdef JANET_NET + WSABUF wbuf; + DWORD flags; +#endif +#else + int flags; + int32_t start; +#endif +} StateWrite; + +JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) { + StateWrite *state = (StateWrite *) s; + switch (event) { + default: + break; + case JANET_ASYNC_EVENT_MARK: + janet_mark(state->is_buffer + ? janet_wrap_buffer(state->src.buf) + : janet_wrap_string(state->src.str)); + if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { + janet_mark(janet_wrap_abstract(state->dest_abst)); + } + break; + case JANET_ASYNC_EVENT_CLOSE: + janet_cancel(s->fiber, janet_cstringv("stream closed")); + return JANET_ASYNC_STATUS_DONE; +#ifdef JANET_WINDOWS + case JANET_ASYNC_EVENT_COMPLETE: { + /* Called when write finished */ + if (s->bytes == 0 && (state->mode != JANET_ASYNC_WRITEMODE_SENDTO)) { + janet_cancel(s->fiber, janet_cstringv("disconnect")); + return JANET_ASYNC_STATUS_DONE; + } + + janet_schedule(s->fiber, janet_wrap_nil()); + return JANET_ASYNC_STATUS_DONE; + } + break; + case JANET_ASYNC_EVENT_USER: { + /* Begin write */ + int32_t start, len; + const uint8_t *bytes; + if (state->is_buffer) { + /* If buffer, convert to string. */ + /* TODO - be more efficient about this */ + JanetBuffer *buffer = state->src.buf; + JanetString str = janet_string(buffer->data, buffer->count); + bytes = str; + len = buffer->count; + state->is_buffer = 0; + state->src.str = str; + } else { + bytes = state->src.str; + len = janet_string_length(bytes); + } + s->tag = &state->overlapped; + memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED)); + + int status; +#ifdef JANET_NET + if (state->mode != JANET_ASYNC_WRITEMODE_WRITE) { + SOCKET sock = (SOCKET) s->pollable->handle; + state->wbuf.buf = (char *) bytes; + state->wbuf.len = len; + if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { + const struct sockaddr *to = state->dest_abst; + int tolen = (int) janet_abstract_size((void *) to); + status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL); + } else { + status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL); + } + if (status && (WSA_IO_PENDING != WSAGetLastError())) { + janet_cancel(s->fiber, janet_cstringv("failed to write to stream")); + return JANET_ASYNC_STATUS_DONE; + } + } else +#endif + { + status = WriteFile(s->pollable->handle, bytes, len, NULL, &state->overlapped); + if (status && (ERROR_IO_PENDING != WSAGetLastError())) { + janet_cancel(s->fiber, janet_cstringv("failed to write to stream")); + return JANET_ASYNC_STATUS_DONE; + } + } + } + break; +#else + case JANET_ASYNC_EVENT_WRITE: { + int32_t start, len; + const uint8_t *bytes; + start = state->start; + if (state->is_buffer) { + JanetBuffer *buffer = state->src.buf; + bytes = buffer->data; + len = buffer->count; + } else { + bytes = state->src.str; + len = janet_string_length(bytes); + } + ssize_t nwrote = 0; + if (start < len) { + int32_t nbytes = len - start; + void *dest_abst = state->dest_abst; + do { +#ifdef JANET_NET + if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { + nwrote = sendto(s->pollable->handle, bytes + start, nbytes, state->flags, + (struct sockaddr *) dest_abst, janet_abstract_size(dest_abst)); + } else if (state->mode == JANET_ASYNC_WRITEMODE_SEND) { + nwrote = send(s->pollable->handle, bytes + start, nbytes, state->flags); + } else +#endif + { + nwrote = write(s->pollable->handle, bytes + start, nbytes); + } + } while (nwrote == -1 && errno == EINTR); + + /* Handle write errors */ + if (nwrote == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) break; + janet_cancel(s->fiber, janet_ev_lasterr()); + return JANET_ASYNC_STATUS_DONE; + } + + /* Unless using datagrams, empty message is a disconnect */ + if (nwrote == 0 && !dest_abst) { + janet_cancel(s->fiber, janet_cstringv("disconnect")); + return JANET_ASYNC_STATUS_DONE; + } + + if (nwrote > 0) { + start += nwrote; + } else { + start = len; + } + } + state->start = start; + if (start >= len) { + janet_schedule(s->fiber, janet_wrap_nil()); + return JANET_ASYNC_STATUS_DONE; + } + break; + } + break; +#endif + } + return JANET_ASYNC_STATUS_NOT_DONE; +} + +static void janet_ev_write_generic(JanetPollable *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) { + StateWrite *state = (StateWrite *) janet_listen(stream, ev_machine_write, + JANET_ASYNC_LISTEN_WRITE, sizeof(StateWrite), NULL); + state->is_buffer = is_buffer; + state->src.buf = buf; + state->dest_abst = dest_abst; + state->mode = mode; +#ifdef JANET_WINDOWS + state->flags = (DWORD) flags; + net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); +#else + state->start = 0; + state->flags = flags; +#endif +} + + +void janet_ev_write_buffer(JanetPollable *stream, JanetBuffer *buf) { + janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_WRITE, 1, 0); +} + +void janet_ev_write_string(JanetPollable *stream, JanetString str) { + janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_WRITE, 0, 0); +} + +#ifdef JANET_NET +void janet_ev_send_buffer(JanetPollable *stream, JanetBuffer *buf, int flags) { + janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_SEND, 1, flags); +} + +void janet_ev_send_string(JanetPollable *stream, JanetString str, int flags) { + janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_SEND, 0, flags); +} + +void janet_ev_sendto_buffer(JanetPollable *stream, JanetBuffer *buf, void *dest, int flags) { + janet_ev_write_generic(stream, buf, dest, JANET_ASYNC_WRITEMODE_SENDTO, 1, flags); +} + +void janet_ev_sendto_string(JanetPollable *stream, JanetString str, void *dest, int flags) { + janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags); +} +#endif + /* C functions */ static Janet cfun_ev_go(int32_t argc, Janet *argv) { diff --git a/src/core/net.c b/src/core/net.c index d62fa7ea..7213c158 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -72,23 +72,15 @@ static const JanetAbstractType StreamAT = { typedef JanetPollable JanetStream; -static const JanetAbstractType AddressAT = { +const JanetAbstractType janet_address_type = { "core/socket-address", JANET_ATEND_NAME }; #ifdef JANET_WINDOWS -#define JANET_NET_CHUNKSIZE 4096 #define JSOCKCLOSE(x) closesocket((SOCKET) x) -#define JSOCKDEFAULT INVALID_SOCKET -#define JLASTERR WSAGetLastError() #define JSOCKVALID(x) ((x) != INVALID_SOCKET) -#define JEINTR WSAEINTR -#define JEWOULDBLOCK WSAEWOULDBLOCK -#define JEAGAIN WSAEWOULDBLOCK -#define JPOLL WSAPoll #define JSock SOCKET -#define JReadInt long #define JSOCKFLAGS 0 static JanetStream *make_stream(SOCKET fd, uint32_t flags) { u_long iMode = 0; @@ -98,39 +90,11 @@ static JanetStream *make_stream(SOCKET fd, uint32_t flags) { stream->flags = flags; return stream; } -static Janet net_lasterr(void) { - int code = WSAGetLastError(); - char msgbuf[256]; - msgbuf[0] = '\0'; - FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, - code, - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - msgbuf, - sizeof(msgbuf), - NULL); - if (!*msgbuf) sprintf(msgbuf, "%d", code); - char *c = msgbuf; - while (*c) { - if (*c == '\n' || *c == '\r') { - *c = '\0'; - break; - } - c++; - } - return janet_cstringv(msgbuf); -} #else #define JSOCKCLOSE(x) close(x) #define JSOCKDEFAULT 0 -#define JLASTERR errno #define JSOCKVALID(x) ((x) >= 0) -#define JEINTR EINTR -#define JEWOULDBLOCK EWOULDBLOCK -#define JEAGAIN EAGAIN -#define JPOLL poll #define JSock int -#define JReadInt ssize_t #ifdef SOCK_CLOEXEC #define JSOCKFLAGS SOCK_CLOEXEC #else @@ -148,9 +112,6 @@ static JanetStream *make_stream(int fd, uint32_t flags) { stream->flags = flags; return stream; } -static Janet net_lasterr(void) { - return janet_cstringv(strerror(errno)); -} #endif /* We pass this flag to all send calls to prevent sigpipe */ @@ -186,351 +147,6 @@ static int janet_stream_mark(void *p, size_t s) { return 0; } -/* - * State machine for read - */ - -typedef struct { - JanetListenerState head; - int32_t bytes_left; - JanetBuffer *buf; - int is_chunk; - int is_recv_from; -#ifdef JANET_WINDOWS - WSAOVERLAPPED overlapped; - WSABUF wbuf; - DWORD flags; - int32_t chunk_size; - struct sockaddr from; - int fromlen; - uint8_t chunk_buf[JANET_NET_CHUNKSIZE]; -#endif -} NetStateRead; - -JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) { - NetStateRead *state = (NetStateRead *) s; - switch (event) { - default: - break; - case JANET_ASYNC_EVENT_MARK: - janet_mark(janet_wrap_buffer(state->buf)); - break; - case JANET_ASYNC_EVENT_CLOSE: - janet_cancel(s->fiber, janet_cstringv("stream closed")); - return JANET_ASYNC_STATUS_DONE; -#ifdef JANET_WINDOWS - case JANET_ASYNC_EVENT_COMPLETE: { - /* Called when read finished */ - if (s->bytes == 0 && !state->is_recv_from) { - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; - } - - janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes); - state->bytes_left -= s->bytes; - - if (state->bytes_left <= 0 || !state->is_chunk) { - Janet resume_val; - if (state->is_recv_from) { - void *abst = janet_abstract(&AddressAT, state->fromlen); - memcpy(abst, &state->from, state->fromlen); - resume_val = janet_wrap_abstract(abst); - } else { - resume_val = janet_wrap_buffer(state->buf); - } - janet_schedule(s->fiber, resume_val); - return JANET_ASYNC_STATUS_DONE; - } - } - - /* fallthrough */ - case JANET_ASYNC_EVENT_USER: { - state->flags = 0; - int32_t chunk_size = state->bytes_left > JANET_NET_CHUNKSIZE ? JANET_NET_CHUNKSIZE : state->bytes_left; - state->wbuf.len = (ULONG) chunk_size; - state->wbuf.buf = state->chunk_buf; - state->chunk_size = chunk_size; - s->tag = &state->overlapped; - memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED)); - int status; - if (state->is_recv_from) { - status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); - } else { - status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL); - } - if (status && WSA_IO_PENDING != WSAGetLastError()) { - janet_cancel(s->fiber, net_lasterr()); - return JANET_ASYNC_STATUS_DONE; - } - } - break; -#else - case JANET_ASYNC_EVENT_READ: - /* Read in bytes */ - { - JanetBuffer *buffer = state->buf; - int32_t bytes_left = state->bytes_left; - janet_buffer_extra(buffer, bytes_left); - JReadInt nread; - char saddr[256]; - socklen_t socklen = sizeof(saddr); - do { - if (state->is_recv_from) { - nread = recvfrom(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0, - (struct sockaddr *)&saddr, &socklen); - } else { - nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0); - } - } while (nread == -1 && JLASTERR == JEINTR); - - /* Check for errors - special case errors that can just be waited on to fix */ - if (nread == -1) { - if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break; - janet_cancel(s->fiber, net_lasterr()); - return JANET_ASYNC_STATUS_DONE; - } - - /* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */ - if (nread == 0 && !state->is_recv_from) { - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; - } - - /* Increment buffer counts */ - if (nread > 0) { - buffer->count += nread; - bytes_left -= nread; - } else { - bytes_left = 0; - } - state->bytes_left = bytes_left; - - /* Resume if done */ - if (!state->is_chunk || bytes_left == 0) { - Janet resume_val; - if (state->is_recv_from) { - void *abst = janet_abstract(&AddressAT, socklen); - memcpy(abst, &saddr, socklen); - resume_val = janet_wrap_abstract(abst); - } else { - resume_val = janet_wrap_buffer(buffer); - } - janet_schedule(s->fiber, resume_val); - return JANET_ASYNC_STATUS_DONE; - } - } - break; -#endif - } - return JANET_ASYNC_STATUS_NOT_DONE; -} - -JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { - NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL); - state->is_chunk = 0; - state->buf = buf; - state->bytes_left = nbytes; - state->is_recv_from = 0; - net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); - janet_await(); -} - -JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { - NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL); - state->is_chunk = 1; - state->buf = buf; - state->bytes_left = nbytes; - state->is_recv_from = 0; - net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); - janet_await(); -} - -JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { - NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL); - state->is_chunk = 0; - state->buf = buf; - state->bytes_left = nbytes; - state->is_recv_from = 1; - net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); - janet_await(); -} - -/* - * State machine for write/send-to - */ - -typedef struct { - JanetListenerState head; - union { - JanetBuffer *buf; - const uint8_t *str; - } src; - int32_t start; - int is_buffer; - void *dest_abst; -#ifdef JANET_WINDOWS - WSAOVERLAPPED overlapped; - WSABUF wbuf; - DWORD flags; -#endif -} NetStateWrite; - -JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) { - NetStateWrite *state = (NetStateWrite *) s; - switch (event) { - default: - break; - case JANET_ASYNC_EVENT_MARK: - janet_mark(state->is_buffer - ? janet_wrap_buffer(state->src.buf) - : janet_wrap_string(state->src.str)); - if (state->dest_abst != NULL) { - janet_mark(janet_wrap_abstract(state->dest_abst)); - } - break; - case JANET_ASYNC_EVENT_CLOSE: - janet_cancel(s->fiber, janet_cstringv("stream closed")); - return JANET_ASYNC_STATUS_DONE; -#ifdef JANET_WINDOWS - case JANET_ASYNC_EVENT_COMPLETE: { - /* Called when write finished */ - if (s->bytes == 0 && !state->dest_abst) { - janet_cancel(s->fiber, janet_cstringv("disconnect")); - return JANET_ASYNC_STATUS_DONE; - } - - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; - } - break; - case JANET_ASYNC_EVENT_USER: { - /* Begin write */ - int32_t start, len; - const uint8_t *bytes; - start = state->start; - if (state->is_buffer) { - /* If buffer, convert to string. */ - /* TODO - be more efficient about this */ - JanetBuffer *buffer = state->src.buf; - JanetString str = janet_string(buffer->data, buffer->count); - bytes = str; - len = buffer->count; - state->is_buffer = 0; - state->src.str = str; - } else { - bytes = state->src.str; - len = janet_string_length(bytes); - } - state->wbuf.buf = (char *) bytes; - state->wbuf.len = len; - state->flags = 0; - s->tag = &state->overlapped; - memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED)); - - int status; - SOCKET sock = (SOCKET) s->pollable->handle; - if (state->dest_abst) { - const struct sockaddr *to = state->dest_abst; - int tolen = (int) janet_abstract_size((void *) to); - status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL); - } else { - status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL); - } - - if (status && WSA_IO_PENDING != WSAGetLastError()) { - janet_cancel(s->fiber, janet_cstringv("failed to write to stream")); - return JANET_ASYNC_STATUS_DONE; - } - } - break; -#else - case JANET_ASYNC_EVENT_WRITE: { - int32_t start, len; - const uint8_t *bytes; - start = state->start; - if (state->is_buffer) { - JanetBuffer *buffer = state->src.buf; - bytes = buffer->data; - len = buffer->count; - } else { - bytes = state->src.str; - len = janet_string_length(bytes); - } - JReadInt nwrote = 0; - if (start < len) { - int32_t nbytes = len - start; - void *dest_abst = state->dest_abst; - do { - if (dest_abst) { - nwrote = sendto(s->pollable->handle, bytes + start, nbytes, 0, - (struct sockaddr *) dest_abst, janet_abstract_size(dest_abst)); - } else { - nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL); - } - } while (nwrote == -1 && JLASTERR == JEINTR); - - /* Handle write errors */ - if (nwrote == -1) { - if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break; - janet_cancel(s->fiber, net_lasterr()); - return JANET_ASYNC_STATUS_DONE; - } - - /* Unless using datagrams, empty message is a disconnect */ - if (nwrote == 0 && !dest_abst) { - janet_cancel(s->fiber, janet_cstringv("disconnect")); - return JANET_ASYNC_STATUS_DONE; - } - - if (nwrote > 0) { - start += nwrote; - } else { - start = len; - } - } - state->start = start; - if (start >= len) { - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; - } - break; - } - break; -#endif - } - return JANET_ASYNC_STATUS_NOT_DONE; -} - -JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf, void *dest_abst) { - NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, - JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL); - state->is_buffer = 1; - state->start = 0; - state->src.buf = buf; - state->dest_abst = dest_abst; - net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); - janet_await(); -} - - -JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str, void *dest_abst) { - NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, - JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL); - state->is_buffer = 0; - state->start = 0; - state->src.str = str; - state->dest_abst = dest_abst; - net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); - janet_await(); -} - -/* - * State machine for simple server - */ - /* State machine for accepting connections. */ #ifdef JANET_WINDOWS @@ -750,7 +366,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) { #ifndef JANET_WINDOWS /* no unix domain socket support on windows yet */ if (is_unix) { - void *abst = janet_abstract(&AddressAT, sizeof(struct sockaddr_un)); + void *abst = janet_abstract(&janet_address_type, sizeof(struct sockaddr_un)); memcpy(abst, ai, sizeof(struct sockaddr_un)); Janet ret = janet_wrap_abstract(abst); return make_arr ? janet_wrap_array(janet_array_n(&ret, 1)) : ret; @@ -761,7 +377,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) { JanetArray *arr = janet_array(10); struct addrinfo *iter = ai; while (NULL != iter) { - void *abst = janet_abstract(&AddressAT, iter->ai_addrlen); + void *abst = janet_abstract(&janet_address_type, iter->ai_addrlen); memcpy(abst, iter->ai_addr, iter->ai_addrlen); janet_array_push(arr, janet_wrap_abstract(abst)); iter = iter->ai_next; @@ -773,7 +389,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) { if (NULL == ai) { janet_panic("no data for given address"); } - void *abst = janet_abstract(&AddressAT, ai->ai_addrlen); + void *abst = janet_abstract(&janet_address_type, ai->ai_addrlen); memcpy(abst, ai->ai_addr, ai->ai_addrlen); freeaddrinfo(ai); return janet_wrap_abstract(abst); @@ -963,7 +579,8 @@ static Janet cfun_stream_read(int32_t argc, Janet *argv) { JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); double to = janet_optnumber(argv, argc, 3, INFINITY); if (to != INFINITY) janet_addtimeout(to); - janet_sched_read(stream, buffer, n); + janet_ev_recv(stream, buffer, n, MSG_NOSIGNAL); + janet_await(); } static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { @@ -974,7 +591,8 @@ static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); double to = janet_optnumber(argv, argc, 3, INFINITY); if (to != INFINITY) janet_addtimeout(to); - janet_sched_chunk(stream, buffer, n); + janet_ev_recvchunk(stream, buffer, n, MSG_NOSIGNAL); + janet_await(); } static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) { @@ -985,7 +603,8 @@ static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) { JanetBuffer *buffer = janet_getbuffer(argv, 2); double to = janet_optnumber(argv, argc, 3, INFINITY); if (to != INFINITY) janet_addtimeout(to); - janet_sched_recv_from(stream, buffer, n); + janet_ev_recvfrom(stream, buffer, n, MSG_NOSIGNAL); + janet_await(); } static Janet cfun_stream_close(int32_t argc, Janet *argv) { @@ -1008,28 +627,30 @@ static Janet cfun_stream_write(int32_t argc, Janet *argv) { double to = janet_optnumber(argv, argc, 2, INFINITY); if (janet_checktype(argv[1], JANET_BUFFER)) { if (to != INFINITY) janet_addtimeout(to); - janet_sched_write_buffer(stream, janet_getbuffer(argv, 1), NULL); + janet_ev_send_buffer(stream, janet_getbuffer(argv, 1), MSG_NOSIGNAL); } else { JanetByteView bytes = janet_getbytes(argv, 1); if (to != INFINITY) janet_addtimeout(to); - janet_sched_write_stringlike(stream, bytes.bytes, NULL); + janet_ev_send_string(stream, bytes.bytes, MSG_NOSIGNAL); } + janet_await(); } static Janet cfun_stream_send_to(int32_t argc, Janet *argv) { janet_arity(argc, 3, 4); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); check_stream_flag(stream, JANET_STREAM_UDPSERVER); - void *dest = janet_getabstract(argv, 1, &AddressAT); + void *dest = janet_getabstract(argv, 1, &janet_address_type); double to = janet_optnumber(argv, argc, 3, INFINITY); if (janet_checktype(argv[2], JANET_BUFFER)) { if (to != INFINITY) janet_addtimeout(to); - janet_sched_write_buffer(stream, janet_getbuffer(argv, 2), dest); + janet_ev_sendto_buffer(stream, janet_getbuffer(argv, 2), dest, MSG_NOSIGNAL); } else { JanetByteView bytes = janet_getbytes(argv, 2); if (to != INFINITY) janet_addtimeout(to); - janet_sched_write_stringlike(stream, bytes.bytes, dest); + janet_ev_sendto_string(stream, bytes.bytes, dest, MSG_NOSIGNAL); } + janet_await(); } static Janet cfun_stream_flush(int32_t argc, Janet *argv) { diff --git a/src/core/util.h b/src/core/util.h index 48ad89c8..fc4b0aa7 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -140,6 +140,7 @@ void janet_lib_thread(JanetTable *env); #endif #ifdef JANET_NET void janet_lib_net(JanetTable *env); +extern const JanetAbstractType janet_address_type; #endif #ifdef JANET_EV void janet_lib_ev(JanetTable *env); diff --git a/src/include/janet.h b/src/include/janet.h index 0339f0c0..54ec8a27 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -499,6 +499,7 @@ typedef void *JanetAbstract; /* Event Loop Types */ #ifdef JANET_EV + #define JANET_POLL_FLAG_CLOSED 0x1 #define JANET_POLL_FLAG_SOCKET 0x2 #define JANET_POLL_FLAG_IOCP 0x4 @@ -1269,7 +1270,29 @@ JANET_NO_RETURN JANET_API void janet_await(void); /* For use inside listeners - adds a timeout to the current fiber, such that * it will be resumed after sec seconds if no other event schedules the current fiber. */ -void janet_addtimeout(double sec); +JANET_API void janet_addtimeout(double sec); + +/* Get last error from a an IO operation */ +JANET_API Janet janet_ev_lasterr(void); + +/* Read async from a pollable */ +JANET_API void janet_ev_read(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes); +JANET_API void janet_ev_readchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes); +#ifdef JANET_NET +JANET_API void janet_ev_recv(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags); +JANET_API void janet_ev_recvchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags); +JANET_API void janet_ev_recvfrom(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags); +#endif + +/* Write async to a pollable */ +JANET_API void janet_ev_write_buffer(JanetPollable *stream, JanetBuffer *buf); +JANET_API void janet_ev_write_string(JanetPollable *stream, JanetString str); +#ifdef JANET_NET +JANET_API void janet_ev_send_buffer(JanetPollable *stream, JanetBuffer *buf, int flags); +JANET_API void janet_ev_send_string(JanetPollable *stream, JanetString str, int flags); +JANET_API void janet_ev_sendto_buffer(JanetPollable *stream, JanetBuffer *buf, void *dest, int flags); +JANET_API void janet_ev_sendto_string(JanetPollable *stream, JanetString str, void *dest, int flags); +#endif #endif