diff --git a/examples/echoclient.janet b/examples/echoclient.janet new file mode 100644 index 00000000..c3103abe --- /dev/null +++ b/examples/echoclient.janet @@ -0,0 +1,5 @@ +(with [conn (net/connect "127.0.0.1" 8000)] + (print "writing abcdefg...") + (:write conn "abcdefg") + (print "reading...") + (printf "got: %v" (:read conn 1024))) diff --git a/examples/echoserve.janet b/examples/echoserve.janet index f05fe602..2d35366c 100644 --- a/examples/echoserve.janet +++ b/examples/echoserve.janet @@ -9,6 +9,7 @@ (printf " %v -> %v" id b) (:write stream b) (buffer/clear b)) - (printf "Done %v!" id))) + (printf "Done %v!" id) + (ev/sleep 0.5))) (net/server "127.0.0.1" "8000" handler) diff --git a/src/core/ev.c b/src/core/ev.c index ecc633ca..ac3df101 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -765,7 +765,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { if (!result) { if (!has_timeout) { - JANET_EXIT("failed to get iocp GetQueuedCompletionStatus"); + /* queue emptied */ } } else { /* Normal event */ @@ -774,6 +774,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { while (state != NULL) { if (state->tag == overlapped) { state->event = overlapped; + state->bytes = num_bytes_transfered; JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE); if (status == JANET_ASYNC_STATUS_DONE) { janet_unlisten(state); diff --git a/src/core/net.c b/src/core/net.c index 79e34d62..5d05b235 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -32,6 +32,7 @@ #include #include #include +#include #pragma comment (lib, "Ws2_32.lib") #pragma comment (lib, "Mswsock.lib") #pragma comment (lib, "Advapi32.lib") @@ -96,6 +97,28 @@ static JanetStream *make_stream(SOCKET fd, uint32_t flags) { stream->flags = flags; return stream; } +static Janet net_lasterr(void) { + int code = WSAGetLastError(); + char msgbuf[256]; + msgbuf[0] = '\0'; + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + code, + MAKELANGID (LANG_NEUTRAL, SUBLANG_DEFAULT), + msgbuf, + sizeof (msgbuf), + NULL); + if (!*msgbuf) sprintf(msgbuf, "%d", code); + char *c = msgbuf; + while (*c) { + if (*c == '\n' || *c == '\r') { + *c = '\0'; + break; + } + c++; + } + return janet_cstringv(msgbuf); +} #else #define JSOCKCLOSE(x) close(x) #define JSOCKDEFAULT 0 @@ -124,6 +147,9 @@ static JanetStream *make_stream(int fd, uint32_t flags) { stream->flags = flags; return stream; } +static Janet net_lasterr(void) { + return janet_cstringv(strerror(errno)); +} #endif /* We pass this flag to all send calls to prevent sigpipe */ @@ -173,7 +199,6 @@ typedef struct { WSAOVERLAPPED overlapped; WSABUF wbuf; DWORD flags; - DWORD received; int32_t chunk_size; struct sockaddr from; int fromlen; @@ -195,13 +220,13 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) #ifdef JANET_WINDOWS case JANET_ASYNC_EVENT_COMPLETE: { /* Called when read finished */ - if (state->received == 0 && !state->is_recv_from) { - janet_cancel(s->fiber, janet_cstringv("disconnect")); + if (s->bytes == 0 && !state->is_recv_from) { + janet_schedule(s->fiber, janet_wrap_nil()); return JANET_ASYNC_STATUS_DONE; } - janet_buffer_push_bytes(state->buf, state->chunk_buf, state->received); - state->bytes_left -= state->received; + janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes); + state->bytes_left -= s->bytes; if (state->bytes_left <= 0 || !state->is_chunk) { Janet resume_val; @@ -221,22 +246,20 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) case JANET_ASYNC_EVENT_USER: { state->flags = 0; - /* If there are bytes left, do another read. Currently only read 2k at a time. */ int32_t chunk_size = state->bytes_left > JANET_NET_CHUNKSIZE ? JANET_NET_CHUNKSIZE : state->bytes_left; state->wbuf.len = (ULONG) chunk_size; state->wbuf.buf = state->chunk_buf; state->chunk_size = chunk_size; - state->received = 0; s->tag = &state->overlapped; memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED)); int status; if (state->is_recv_from) { - status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, &state->received, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); + status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); } else { - status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, &state->received, &state->flags, &state->overlapped, NULL); + status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL); } - if (status) { - janet_cancel(s->fiber, janet_cstringv("failed to read from stream")); + if (status && WSA_IO_PENDING != WSAGetLastError()) { + janet_cancel(s->fiber, net_lasterr()); return JANET_ASYNC_STATUS_DONE; } } @@ -263,13 +286,13 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) /* Check for errors - special case errors that can just be waited on to fix */ if (nread == -1) { if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break; - janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR))); + janet_cancel(s->fiber, net_lasterr()); return JANET_ASYNC_STATUS_DONE; } /* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */ if (nread == 0 && !state->is_recv_from) { - janet_cancel(s->fiber, janet_cstringv("disconnect")); + janet_schedule(s->fiber, janet_wrap_nil()); return JANET_ASYNC_STATUS_DONE; } @@ -351,7 +374,6 @@ typedef struct { #ifdef JANET_WINDOWS WSAOVERLAPPED overlapped; WSABUF wbuf; - DWORD sent; DWORD flags; #endif } NetStateWrite; @@ -375,7 +397,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) #ifdef JANET_WINDOWS case JANET_ASYNC_EVENT_COMPLETE: { /* Called when write finished */ - if (state->sent == 0 && !state->dest_abst) { + if (s->bytes == 0 && !state->dest_abst) { janet_cancel(s->fiber, janet_cstringv("disconnect")); return JANET_ASYNC_STATUS_DONE; } @@ -404,7 +426,6 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) } state->wbuf.buf = (char *) bytes; state->wbuf.len = len; - state->sent = 0; state->flags = 0; s->tag = &state->overlapped; memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED)); @@ -414,12 +435,12 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) if (state->dest_abst) { const struct sockaddr *to = state->dest_abst; int tolen = (int) janet_abstract_size((void *) to); - status = WSASendTo(sock, &state->wbuf, 1, &state->sent, state->flags, to, tolen, &state->overlapped, NULL); + status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL); } else { - status = WSASend(sock, &state->wbuf, 1, &state->sent, state->flags, &state->overlapped, NULL); + status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL); } - if (status) { + if (status && WSA_IO_PENDING != WSAGetLastError()) { janet_cancel(s->fiber, janet_cstringv("failed to write to stream")); return JANET_ASYNC_STATUS_DONE; } @@ -454,7 +475,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) /* Handle write errors */ if (nwrote == -1) { if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break; - janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR))); + janet_cancel(s->fiber, net_lasterr()); return JANET_ASYNC_STATUS_DONE; } @@ -510,96 +531,150 @@ JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, co * State machine for simple server */ +/* State machine for accepting connections. */ + +#ifdef JANET_WINDOWS + +typedef struct { + JanetListenerState head; + WSAOVERLAPPED overlapped; + JanetFunction *function; + JanetStream *lstream; + JanetStream *astream; + char buf[1024]; +} NetStateAccept; + +static int net_sched_accept_impl(NetStateAccept *state, Janet *err); + +JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) { + NetStateAccept *state = (NetStateAccept *)s; + switch (event) { + default: + break; + case JANET_ASYNC_EVENT_MARK: { + if (state->lstream) janet_mark(janet_wrap_abstract(state->lstream)); + if (state->astream) janet_mark(janet_wrap_abstract(state->astream)); + if (state->function) janet_mark(janet_wrap_abstract(state->function)); + break; + } + case JANET_ASYNC_EVENT_CLOSE: + janet_schedule(s->fiber, janet_wrap_nil()); + return JANET_ASYNC_STATUS_DONE; + case JANET_ASYNC_EVENT_COMPLETE: { + int seconds; + int bytes = sizeof(seconds); + if (NO_ERROR != getsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_CONNECT_TIME, + (char *)&seconds, &bytes)) { + janet_cancel(s->fiber, janet_cstringv("failed to accept connection")); + return JANET_ASYNC_STATUS_DONE; + } + if (NO_ERROR != setsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, + (char *)&(state->lstream->handle), sizeof(SOCKET))) { + janet_cancel(s->fiber, janet_cstringv("failed to accept connection")); + return JANET_ASYNC_STATUS_DONE; + } + + Janet streamv = janet_wrap_abstract(state->astream); + if (state->function) { + /* Schedule worker */ + JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv); + janet_schedule(fiber, janet_wrap_nil()); + /* Now listen again for next connection */ + Janet err; + if (net_sched_accept_impl(state, &err)) { + janet_cancel(s->fiber, err); + return JANET_ASYNC_STATUS_DONE; + } + } else { + janet_schedule(s->fiber, streamv); + return JANET_ASYNC_STATUS_DONE; + } + } + } + return JANET_ASYNC_STATUS_NOT_DONE; +} + +JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) { + Janet err; + SOCKET lsock = (SOCKET) stream->handle; + JanetListenerState *s = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL); + NetStateAccept *state = (NetStateAccept *)s; + memset(&state->overlapped, 0, sizeof(WSAOVERLAPPED)); + memset(&state->buf, 0, 1024); + state->function = fun; + state->lstream = stream; + s->tag = &state->overlapped; + if (net_sched_accept_impl(state, &err)) janet_panicv(err); + janet_await(); +} + +static int net_sched_accept_impl(NetStateAccept *state, Janet *err) { + SOCKET lsock = (SOCKET) state->lstream->handle; + SOCKET asock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); + if (asock == INVALID_SOCKET) { + *err = net_lasterr(); + return 1; + } + JanetStream *astream = make_stream(asock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); + state->astream = astream; + int socksize = sizeof(SOCKADDR_STORAGE) + 16; + if (FALSE == AcceptEx(lsock, asock, state->buf, 0, socksize, socksize, NULL, &state->overlapped)) { + int code = WSAGetLastError(); + if (code == WSA_IO_PENDING) return 0; /* indicates io is happening async */ + *err = net_lasterr(); + return 1; + } + return 0; +} + +#else + typedef struct { JanetListenerState head; JanetFunction *function; -} NetStateSimpleServer; - -JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEvent event) { - NetStateSimpleServer *state = (NetStateSimpleServer *) s; - switch (event) { - default: - break; - case JANET_ASYNC_EVENT_INIT: - /* We know the pollable will be a stream */ - janet_gcroot(janet_wrap_abstract(s->pollable)); -#ifdef JANET_WINDOWS - /* requires some more setup code */ -#endif - break; - case JANET_ASYNC_EVENT_MARK: - janet_mark(janet_wrap_function(state->function)); - break; - case JANET_ASYNC_EVENT_CLOSE: - janet_gcunroot(janet_wrap_abstract(s->pollable)); - return JANET_ASYNC_STATUS_DONE; -#ifdef JANET_WINDOWS - case JANET_ASYNC_EVENT_COMPLETE: { - /* Called when ever we get an IOCP event */ - } - break; -#else - case JANET_ASYNC_EVENT_READ: { - JSock connfd = accept(s->pollable->handle, NULL, NULL); - if (JSOCKVALID(connfd)) { - /* Made a new connection socket */ - nosigpipe(connfd); - JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); - Janet streamv = janet_wrap_abstract(stream); - JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv); - janet_schedule(fiber, janet_wrap_nil()); - } - break; - } -#endif - } - return JANET_ASYNC_STATUS_NOT_DONE; -} - -/* State machine for accepting connections. */ - -typedef struct { - JanetListenerState head; } NetStateAccept; JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) { + NetStateAccept *state = (NetStateAccept *)s; switch (event) { default: break; + case JANET_ASYNC_EVENT_MARK: { + if (state->function) janet_mark(janet_wrap_function(state->function)); + break; + } case JANET_ASYNC_EVENT_CLOSE: - janet_cancel(s->fiber, janet_cstringv("stream closed")); + janet_schedule(s->fiber, janet_wrap_nil()); return JANET_ASYNC_STATUS_DONE; -#ifdef JANET_WINDOWS - case JANET_ASYNC_EVENT_INIT: { - - } - break; - case JANET_ASYNC_EVENT_COMPLETE: { - - } - break; -#else case JANET_ASYNC_EVENT_READ: { JSock connfd = accept(s->pollable->handle, NULL, NULL); if (JSOCKVALID(connfd)) { nosigpipe(connfd); JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); Janet streamv = janet_wrap_abstract(stream); - janet_schedule(s->fiber, streamv); - return JANET_ASYNC_STATUS_DONE; + if (state->function) { + JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv); + janet_schedule(fiber, janet_wrap_nil()); + } else { + janet_schedule(s->fiber, streamv); + return JANET_ASYNC_STATUS_DONE; + } } break; } -#endif } return JANET_ASYNC_STATUS_NOT_DONE; } -JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) { - janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL); +JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) { + JanetListenerState *state = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL); + state->function = fun; janet_await(); } + +#endif + /* Adress info */ static int janet_get_sockettype(Janet *argv, int32_t argc, int32_t n) { @@ -729,7 +804,11 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { { struct addrinfo *rp = NULL; for (rp = ai; rp != NULL; rp = rp->ai_next) { +#ifdef JANET_WINDOWS + sock = WSASocketW(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol, NULL, 0, WSA_FLAG_OVERLAPPED); +#else sock = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol); +#endif if (JSOCKVALID(sock)) { addr = rp->ai_addr; addrlen = (socklen_t) rp->ai_addrlen; @@ -807,7 +886,11 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { /* Check all addrinfos in a loop for the first that we can bind to. */ struct addrinfo *rp = NULL; for (rp = ai; rp != NULL; rp = rp->ai_next) { +#ifdef JANET_WINDOWS + sfd = WSASocketW(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol, NULL, 0, WSA_FLAG_OVERLAPPED); +#else sfd = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol); +#endif if (!JSOCKVALID(sfd)) continue; const char *err = serverify_socket(sfd); if (NULL != err) { @@ -853,11 +936,8 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { return janet_wrap_abstract(stream); } else { /* Server with handler */ - JanetStream *stream = make_stream(sfd, 0); - NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, - JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer), NULL); - ss->function = fun; - return janet_wrap_abstract(stream); + JanetStream *stream = make_stream(sfd, JANET_STREAM_ACCEPTABLE); + janet_sched_accept(stream, fun); } } } @@ -879,7 +959,7 @@ static Janet cfun_stream_accept(int32_t argc, Janet *argv) { check_stream_flag(stream, JANET_STREAM_ACCEPTABLE); double to = janet_optnumber(argv, argc, 1, INFINITY); if (to != INFINITY) janet_addtimeout(to); - janet_sched_accept(stream); + janet_sched_accept(stream, NULL); } static Janet cfun_stream_read(int32_t argc, Janet *argv) { @@ -922,6 +1002,12 @@ static Janet cfun_stream_close(int32_t argc, Janet *argv) { return janet_wrap_nil(); } +static Janet cfun_stream_closed(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + return janet_wrap_boolean(stream->flags & JANET_POLL_FLAG_CLOSED); +} + static Janet cfun_stream_write(int32_t argc, Janet *argv) { janet_arity(argc, 2, 3); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); @@ -968,6 +1054,7 @@ static Janet cfun_stream_flush(int32_t argc, Janet *argv) { static const JanetMethod stream_methods[] = { {"chunk", cfun_stream_chunk}, {"close", cfun_stream_close}, + {"closed?", cfun_stream_closed}, {"read", cfun_stream_read}, {"write", cfun_stream_write}, {"flush", cfun_stream_flush}, @@ -1055,6 +1142,11 @@ static const JanetReg net_cfuns[] = { JDOC("(net/close stream)\n\n" "Close a stream so that no further communication can occur.") }, + { + "net/closed?", cfun_stream_closed, + JDOC("(net/closed? stream)\n\n" + "Check if a stream is closed.") + }, { "net/connect", cfun_net_connect, JDOC("(net/connect host porti &opt type)\n\n" diff --git a/src/include/janet.h b/src/include/janet.h index 6b11c3c2..0339f0c0 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -550,6 +550,7 @@ struct JanetListenerState { implementation of the event loop and the particular event. */ #ifdef JANET_WINDOWS void *tag; /* Used to associate listeners with an overlapped structure */ + int bytes; /* Used to track how many bytes were transfered. */ #endif /* internal */ int _index; /* not used in all implementations */ diff --git a/test/suite0009.janet b/test/suite0009.janet index c87f952c..7510a32a 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -33,9 +33,16 @@ (:write stream b) (buffer/clear b))) -(def s (net/server "127.0.0.1" "8000" handler)) +(def s (net/server "127.0.0.1" "8000")) (assert s "made server 1") +(ev/go + (coro + (while (not (net/closed? s)) + (def conn (net/accept s)) + (unless conn (break)) + (ev/call handler conn)))) + (defn test-echo [msg] (with [conn (net/connect "127.0.0.1" "8000")] (:write conn msg)