1
0
mirror of https://github.com/janet-lang/janet synced 2024-12-01 04:19:55 +00:00

Add backpressure capability to net.

This commit is contained in:
Calvin Rose 2020-05-31 15:46:01 -05:00
parent 71d8e6b4cd
commit 3f434f2a44
2 changed files with 79 additions and 18 deletions

View File

@ -1,14 +1,19 @@
(defn handler (defn handler
"Simple handler for connections." "Simple handler for connections."
[stream] [stream]
(defer (:close stream)
(def id (gensym)) (def id (gensym))
(def b @"") (def b @"")
(print "Connection " id "!") (print "Connection " id "!")
(while (:read stream 1024 b) (while (:read stream 1024 b)
(:write stream b) (:write stream b)
(buffer/clear b)) (buffer/clear b))
(printf "Done %v!" id))) (printf "Done %v!" id))
(print "Starting echo server on 127.0.0.1:8000") (print "Starting echo server on 127.0.0.1:8000")
(net/server "127.0.0.1" "8000" handler)
(def server (net/server "127.0.0.1" "8000"))
# Run server.
(while true
(with [conn (:accept server)]
(handler conn)))

View File

@ -52,6 +52,7 @@
#define JANET_STREAM_READABLE 0x200 #define JANET_STREAM_READABLE 0x200
#define JANET_STREAM_WRITABLE 0x400 #define JANET_STREAM_WRITABLE 0x400
#define JANET_STREAM_ACCEPTABLE 0x800
static int janet_stream_close(void *p, size_t s); static int janet_stream_close(void *p, size_t s);
static int janet_stream_mark(void *p, size_t s); static int janet_stream_mark(void *p, size_t s);
@ -335,6 +336,38 @@ JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEven
return JANET_ASYNC_STATUS_NOT_DONE; return JANET_ASYNC_STATUS_NOT_DONE;
} }
/* State machine for accepting connections. */
typedef struct {
JanetListenerState head;
} NetStateAccept;
JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) {
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_READ: {
JSock connfd = accept(s->pollable->handle, NULL, NULL);
if (JSOCKVALID(connfd)) {
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
Janet streamv = janet_wrap_abstract(stream);
janet_schedule(s->fiber, streamv);
return JANET_ASYNC_STATUS_DONE;
}
break;
}
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) {
janet_listen(stream, net_machine_accept, JANET_ASYNC_EVENT_READ, sizeof(NetStateAccept));
janet_await();
}
/* Adress info */ /* Adress info */
/* Needs argc >= offset + 2 */ /* Needs argc >= offset + 2 */
@ -392,10 +425,10 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
} }
static Janet cfun_net_server(int32_t argc, Janet *argv) { static Janet cfun_net_server(int32_t argc, Janet *argv) {
janet_fixarity(argc, 3); janet_arity(argc, 2, 3);
/* Get host, port, and handler*/ /* Get host, port, and handler*/
JanetFunction *fun = janet_getfunction(argv, 2); JanetFunction *fun = janet_optfunction(argv, argc, 2, NULL);
struct addrinfo *ai = janet_get_addrinfo(argv, 0); struct addrinfo *ai = janet_get_addrinfo(argv, 0);
@ -441,11 +474,26 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
} }
/* Put sfd on our loop */ /* Put sfd on our loop */
if (NULL == fun) {
JanetStream *stream = make_stream(sfd, JANET_STREAM_ACCEPTABLE);
return janet_wrap_abstract(stream);
} else {
/* Server with handler */
JanetStream *stream = make_stream(sfd, 0); JanetStream *stream = make_stream(sfd, 0);
NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server,
JANET_ASYNC_EVENT_READ, sizeof(NetStateSimpleServer)); JANET_ASYNC_EVENT_READ, sizeof(NetStateSimpleServer));
ss->function = fun; ss->function = fun;
return janet_wrap_abstract(stream); return janet_wrap_abstract(stream);
}
}
static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
if (!(stream->flags & JANET_STREAM_ACCEPTABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) {
janet_panic("got non acceptable stream");
}
janet_sched_accept(stream);
} }
static Janet cfun_stream_read(int32_t argc, Janet *argv) { static Janet cfun_stream_read(int32_t argc, Janet *argv) {
@ -511,6 +559,7 @@ static const JanetMethod stream_methods[] = {
{"read", cfun_stream_read}, {"read", cfun_stream_read},
{"write", cfun_stream_write}, {"write", cfun_stream_write},
{"flush", cfun_stream_flush}, {"flush", cfun_stream_flush},
{"accept", cfun_stream_accept},
{NULL, NULL} {NULL, NULL}
}; };
@ -523,10 +572,17 @@ static int janet_stream_getter(void *p, Janet key, Janet *out) {
static const JanetReg net_cfuns[] = { static const JanetReg net_cfuns[] = {
{ {
"net/server", cfun_net_server, "net/server", cfun_net_server,
JDOC("(net/server host port handler)\n\n" JDOC("(net/server host port &opt handler)\n\n"
"Start a TCP server. handler is a function that will be called with a stream " "Start a TCP server. handler is a function that will be called with a stream "
"on each connection to the server. Returns a new stream that is neither readable nor " "on each connection to the server. Returns a new stream that is neither readable nor "
"writeable.") "writeable. If handler is not provided, net/accept must be used to get the next connection "
"to the server.")
},
{
"net/accept", cfun_stream_accept,
JDOC("(net/accept stream)\n\n"
"Get the next connection on a server stream. This would usually be called in a loop in a dedicated fiber. "
"Returns a new duplex stream which represents a connection to the client.")
}, },
{ {
"net/read", cfun_stream_read, "net/read", cfun_stream_read,