From 3f434f2a443c6ad747cdbb044364ddceb65ef519 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 31 May 2020 15:46:01 -0500 Subject: [PATCH] Add backpressure capability to net. --- examples/tcpserver.janet | 23 ++++++++----- src/core/net.c | 74 +++++++++++++++++++++++++++++++++++----- 2 files changed, 79 insertions(+), 18 deletions(-) diff --git a/examples/tcpserver.janet b/examples/tcpserver.janet index f1d21e54..eacd9526 100644 --- a/examples/tcpserver.janet +++ b/examples/tcpserver.janet @@ -1,14 +1,19 @@ (defn handler "Simple handler for connections." [stream] - (defer (:close stream) - (def id (gensym)) - (def b @"") - (print "Connection " id "!") - (while (:read stream 1024 b) - (:write stream b) - (buffer/clear b)) - (printf "Done %v!" id))) + (def id (gensym)) + (def b @"") + (print "Connection " id "!") + (while (:read stream 1024 b) + (:write stream b) + (buffer/clear b)) + (printf "Done %v!" id)) (print "Starting echo server on 127.0.0.1:8000") -(net/server "127.0.0.1" "8000" handler) + +(def server (net/server "127.0.0.1" "8000")) + +# Run server. +(while true + (with [conn (:accept server)] + (handler conn))) diff --git a/src/core/net.c b/src/core/net.c index 3a924945..1019cdcd 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -52,6 +52,7 @@ #define JANET_STREAM_READABLE 0x200 #define JANET_STREAM_WRITABLE 0x400 +#define JANET_STREAM_ACCEPTABLE 0x800 static int janet_stream_close(void *p, size_t s); static int janet_stream_mark(void *p, size_t s); @@ -335,6 +336,38 @@ JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEven return JANET_ASYNC_STATUS_NOT_DONE; } +/* State machine for accepting connections. */ + +typedef struct { + JanetListenerState head; +} NetStateAccept; + +JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) { + switch (event) { + default: + break; + case JANET_ASYNC_EVENT_CLOSE: + janet_schedule(s->fiber, janet_wrap_nil()); + return JANET_ASYNC_STATUS_DONE; + case JANET_ASYNC_EVENT_READ: { + JSock connfd = accept(s->pollable->handle, NULL, NULL); + if (JSOCKVALID(connfd)) { + JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); + Janet streamv = janet_wrap_abstract(stream); + janet_schedule(s->fiber, streamv); + return JANET_ASYNC_STATUS_DONE; + } + break; + } + } + return JANET_ASYNC_STATUS_NOT_DONE; +} + +JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) { + janet_listen(stream, net_machine_accept, JANET_ASYNC_EVENT_READ, sizeof(NetStateAccept)); + janet_await(); +} + /* Adress info */ /* Needs argc >= offset + 2 */ @@ -392,10 +425,10 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { } static Janet cfun_net_server(int32_t argc, Janet *argv) { - janet_fixarity(argc, 3); + janet_arity(argc, 2, 3); /* Get host, port, and handler*/ - JanetFunction *fun = janet_getfunction(argv, 2); + JanetFunction *fun = janet_optfunction(argv, argc, 2, NULL); struct addrinfo *ai = janet_get_addrinfo(argv, 0); @@ -441,11 +474,26 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { } /* Put sfd on our loop */ - JanetStream *stream = make_stream(sfd, 0); - NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, - JANET_ASYNC_EVENT_READ, sizeof(NetStateSimpleServer)); - ss->function = fun; - return janet_wrap_abstract(stream); + if (NULL == fun) { + JanetStream *stream = make_stream(sfd, JANET_STREAM_ACCEPTABLE); + return janet_wrap_abstract(stream); + } else { + /* Server with handler */ + JanetStream *stream = make_stream(sfd, 0); + NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, + JANET_ASYNC_EVENT_READ, sizeof(NetStateSimpleServer)); + ss->function = fun; + return janet_wrap_abstract(stream); + } +} + +static Janet cfun_stream_accept(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + if (!(stream->flags & JANET_STREAM_ACCEPTABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { + janet_panic("got non acceptable stream"); + } + janet_sched_accept(stream); } static Janet cfun_stream_read(int32_t argc, Janet *argv) { @@ -511,6 +559,7 @@ static const JanetMethod stream_methods[] = { {"read", cfun_stream_read}, {"write", cfun_stream_write}, {"flush", cfun_stream_flush}, + {"accept", cfun_stream_accept}, {NULL, NULL} }; @@ -523,10 +572,17 @@ static int janet_stream_getter(void *p, Janet key, Janet *out) { static const JanetReg net_cfuns[] = { { "net/server", cfun_net_server, - JDOC("(net/server host port handler)\n\n" + JDOC("(net/server host port &opt handler)\n\n" "Start a TCP server. handler is a function that will be called with a stream " "on each connection to the server. Returns a new stream that is neither readable nor " - "writeable.") + "writeable. If handler is not provided, net/accept must be used to get the next connection " + "to the server.") + }, + { + "net/accept", cfun_stream_accept, + JDOC("(net/accept stream)\n\n" + "Get the next connection on a server stream. This would usually be called in a loop in a dedicated fiber. " + "Returns a new duplex stream which represents a connection to the client.") }, { "net/read", cfun_stream_read,