From c0d2140d1441936633a97f312f9d8ae9f683249f Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sat, 1 Feb 2020 20:39:54 -0600 Subject: [PATCH 01/17] Begin net/ module in core. Humble beginnings. --- Makefile | 1 + meson.build | 2 ++ meson_options.txt | 1 + src/boot/boot.janet | 1 + src/conf/janetconf.h | 1 + src/core/corelib.c | 3 +++ src/core/net.c | 50 ++++++++++++++++++++++++++++++++++++++++++++ src/core/util.h | 3 +++ src/include/janet.h | 5 +++++ 9 files changed, 67 insertions(+) create mode 100644 src/core/net.c diff --git a/Makefile b/Makefile index dff7c925..6cb818d7 100644 --- a/Makefile +++ b/Makefile @@ -95,6 +95,7 @@ JANET_CORE_SOURCES=src/core/abstract.c \ src/core/io.c \ src/core/marsh.c \ src/core/math.c \ + src/core/net.c \ src/core/os.c \ src/core/parse.c \ src/core/peg.c \ diff --git a/meson.build b/meson.build index fe9a02f8..7de2916e 100644 --- a/meson.build +++ b/meson.build @@ -59,6 +59,7 @@ conf.set('JANET_NO_DOCSTRINGS', not get_option('docstrings')) conf.set('JANET_NO_SOURCEMAPS', not get_option('sourcemaps')) conf.set('JANET_NO_ASSEMBLER', not get_option('assembler')) conf.set('JANET_NO_PEG', not get_option('peg')) +conf.set('JANET_NO_NET', not get_option('net')) conf.set('JANET_REDUCED_OS', get_option('reduced_os')) conf.set('JANET_NO_TYPED_ARRAY', not get_option('typed_array')) conf.set('JANET_NO_INT_TYPES', not get_option('int_types')) @@ -112,6 +113,7 @@ core_src = [ 'src/core/io.c', 'src/core/marsh.c', 'src/core/math.c', + 'src/core/net.c', 'src/core/os.c', 'src/core/parse.c', 'src/core/peg.c', diff --git a/meson_options.txt b/meson_options.txt index 83bd25a6..1d0bd77c 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -11,6 +11,7 @@ option('peg', type : 'boolean', value : true) option('typed_array', type : 'boolean', value : true) option('int_types', type : 'boolean', value : true) option('prf', type : 'boolean', value : true) +option('net', type : 'boolean', value : true) option('recursion_guard', type : 'integer', min : 10, max : 8000, value : 1024) option('max_proto_depth', type : 'integer', min : 10, max : 8000, value : 200) diff --git a/src/boot/boot.janet b/src/boot/boot.janet index f1360943..eda66669 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -2423,6 +2423,7 @@ "src/core/io.c" "src/core/marsh.c" "src/core/math.c" + "src/core/net.c" "src/core/os.c" "src/core/parse.c" "src/core/peg.c" diff --git a/src/conf/janetconf.h b/src/conf/janetconf.h index bb49a8ca..c792f0c8 100644 --- a/src/conf/janetconf.h +++ b/src/conf/janetconf.h @@ -49,6 +49,7 @@ /* Other settings */ /* #define JANET_NO_ASSEMBLER */ /* #define JANET_NO_PEG */ +/* #define JANET_NO_NET */ /* #define JANET_NO_TYPED_ARRAY */ /* #define JANET_NO_INT_TYPES */ /* #define JANET_NO_PRF */ diff --git a/src/core/corelib.c b/src/core/corelib.c index 5042ddbf..bebf49da 100644 --- a/src/core/corelib.c +++ b/src/core/corelib.c @@ -975,6 +975,9 @@ static void janet_load_libs(JanetTable *env) { #ifdef JANET_THREADS janet_lib_thread(env); #endif +#ifdef JANET_NET + janet_lib_net(env); +#endif } #ifdef JANET_BOOTSTRAP diff --git a/src/core/net.c b/src/core/net.c new file mode 100644 index 00000000..529790a9 --- /dev/null +++ b/src/core/net.c @@ -0,0 +1,50 @@ +/* +* Copyright (c) 2020 Calvin Rose +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to +* deal in the Software without restriction, including without limitation the +* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +* sell copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in +* all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +* IN THE SOFTWARE. +*/ + +#ifndef JANET_AMALG +#include "features.h" +#include +#include "util.h" +#endif + +/* + * C Funs + */ + +static Janet cfun_net_hello(int32_t argc, Janet *argv) { + (void) argv; + janet_fixarity(argc, 0); + janet_printf("Hello!\n"); + return janet_wrap_nil(); +} + +static const JanetReg net_cfuns[] = { + {"net/hello", cfun_net_hello, + JDOC("(net/hello)\n\n" + "Prints \"Hello!\".")}, + {NULL, NULL, NULL} +}; + +void janet_lib_net(JanetTable *env) { + janet_core_cfuns(env, NULL, net_cfuns); +} + diff --git a/src/core/util.h b/src/core/util.h index 5a3f5568..c2417014 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -126,5 +126,8 @@ void janet_lib_inttypes(JanetTable *env); #ifdef JANET_THREADS void janet_lib_thread(JanetTable *env); #endif +#ifdef JANET_NET +void janet_lib_net(JanetTable *env); +#endif #endif diff --git a/src/include/janet.h b/src/include/janet.h index cc8d7069..005602fc 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -152,6 +152,11 @@ extern "C" { #define JANET_TYPED_ARRAY #endif +/* Enable or disable networking */ +#ifndef JANET_NO_NET +#define JANET_NET +#endif + /* Enable or disable large int types (for now 64 bit, maybe 128 / 256 bit integer types) */ #ifndef JANET_NO_INT_TYPES #define JANET_INT_TYPES From eda61455d3c446ce0922725231db3ea1fe84926c Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Mon, 3 Feb 2020 09:29:51 -0600 Subject: [PATCH 02/17] Work on tcp server code. --- src/core/gc.c | 3 + src/core/net.c | 280 ++++++++++++++++++++++++++++++++++++++++++++++-- src/core/util.h | 1 + 3 files changed, 278 insertions(+), 6 deletions(-) diff --git a/src/core/gc.c b/src/core/gc.c index ba3e3400..727fe9f4 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -384,6 +384,9 @@ void janet_collect(void) { if (janet_vm_gc_suspend) return; depth = JANET_RECURSION_GUARD; orig_rootcount = janet_vm_root_count; +#ifdef JANET_NET + janet_net_markloop(); +#endif for (i = 0; i < orig_rootcount; i++) janet_mark(janet_vm_roots[i]); while (orig_rootcount < janet_vm_root_count) { diff --git a/src/core/net.c b/src/core/net.c index 529790a9..375ab3b0 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -26,25 +26,293 @@ #include "util.h" #endif +#include +#include +#include +#include +#include +#include +#include +#include + +/* + * Event loops + */ + +/* This large struct describes a waiting file descriptor, as well + * as what to do when we get an event for it. */ +typedef struct { + + /* File descriptor to listen for events on. */ + int fd; + + /* Fiber to resume when event finishes. Can be NULL. */ + JanetFiber *fiber; + + /* We need to tell which fd_set to put in for select. */ + enum { + JLFD_READ, + JLFD_WRITE + } select_mode; + + /* What kind of event we are listening for. + * As more IO functionality get's added, we can + * expand this. */ + enum { + JLE_READ_INTO_BUFFER, + JLE_READ_ACCEPT, + JLE_WRITE_FROM_BUFFER, + JLE_WRITE_FROM_STRINGLIKE + } event_type; + + union { + + /* JLE_READ_INTO_BUFFER */ + struct { + int32_t n; + JanetBuffer *buf; + } read_into_buffer; + + /* JLE_READ_ACCEPT */ + struct { + JanetFunction *handler; + } read_accept; + + /* JLE_WRITE_FROM_BUFFER */ + struct { + JanetBuffer *buf; + } write_from_buffer; + + /* JLE_WRITE_FROM_STRINGLIKE */ + struct { + const uint8_t *str; + } write_from_stringlike; + } data; + +} JanetLoopFD; + +#define JANET_LOOPFD_MAX 1024 + +/* Global loop data */ +JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX]; +JANET_THREAD_LOCAL int janet_vm_loop_count; + +/* We could also add/remove gc roots. This is easier for now. */ +void janet_net_markloop(void) { + for (int i = 0; i < janet_vm_loop_count; i++) { + JanetLoopFD lfd = janet_vm_loopfds[i]; + switch (lfd.event_type) { + default: + break; + case JLE_READ_INTO_BUFFER: + janet_mark(janet_wrap_buffer(lfd.data.read_into_buffer.buf)); + break; + case JLE_READ_ACCEPT: + janet_mark(janet_wrap_function(lfd.data.read_accept.handler)); + break; + case JLE_WRITE_FROM_BUFFER: + janet_mark(janet_wrap_buffer(lfd.data.write_from_buffer.buf)); + break; + case JLE_WRITE_FROM_STRINGLIKE: + janet_mark(janet_wrap_buffer(lfd.data.write_from_buffer.buf)); + } + } +} + +/* Add a loop fd to the global event loop */ +static int janet_loop_schedule(JanetLoopFD lfd) { + if (janet_vm_loop_count == JANET_LOOPFD_MAX) { + return -1; + } + int index = janet_vm_loop_count; + janet_vm_loopfds[janet_vm_loop_count++] = lfd; + if (NULL != lfd.fiber) { + janet_gcroot(janet_wrap_fiber(lfd.fiber)); + } + return index; +} + +/* Remove an event listener by the handle it returned when scheduled. */ +static void janet_loop_unschedule(int index) { + janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count]; +} + +/* Return delta in number of loop fds. Abstracted out so + * we can separate out the polling logic */ +static size_t janet_loop_event(size_t index) { + JanetLoopFD *jlfd = janet_vm_loopfds + index; + int ret = 1; + int should_resume = 0; + Janet resumeval = janet_wrap_nil(); + switch (jlfd->event_type) { + case JLE_READ_INTO_BUFFER: + { + JanetBuffer *buffer = jlfd->data.read_into_buffer.buf; + int32_t how_much = jlfd->data.read_into_buffer.n; + janet_buffer_extra(buffer, how_much); + int status = read(jlfd->fd, buffer->data + buffer->count, how_much); + if (status > 0) { + buffer->count += how_much; + } + should_resume = 1; + resumeval = janet_wrap_buffer(buffer); + /* Bag pop */ + janet_loop_unschedule(index); + ret = 0; + break; + } + case JLE_READ_ACCEPT: + { + char addr[256]; /* Just make sure it is large enough for largest address type */ + socklen_t len; + int connfd = accept(jlfd->fd, (void *) &addr, &len); + if (connfd >= 0) { + /* Made a new connection socket */ + int flags = fcntl(connfd, F_GETFL, 0); + fcntl(connfd, F_SETFL, flags | O_NONBLOCK); + FILE *f = fdopen(connfd, "r+"); + Janet filev = janet_makefile(f, JANET_FILE_WRITE | JANET_FILE_READ); + JanetFunction *handler = jlfd->data.read_accept.handler; + Janet out; + /* Launch connection fiber */ + janet_pcall(handler, 1, &filev, &out, NULL); + } + ret = 1; + break; + } + case JLE_WRITE_FROM_BUFFER: + case JLE_WRITE_FROM_STRINGLIKE: + ret = 1; + break; + } + if (NULL != jlfd->fiber && should_resume) { + /* Resume the fiber */ + Janet out; + janet_continue(jlfd->fiber, resumeval, &out); + } + return ret; +} + +void janet_loop1(void) { + /* Set up fd_sets */ + fd_set readfds; + fd_set writefds; + FD_ZERO(&readfds); + FD_ZERO(&writefds); + int fd_max = 0; + for (int i = 0; i < janet_vm_loop_count; i++) { + JanetLoopFD *jlfd = janet_vm_loopfds + i; + if (jlfd->fd > fd_max) fd_max = jlfd->fd; + fd_set *set = (jlfd->select_mode == JLFD_READ) ? &readfds : &writefds; + FD_SET(jlfd->fd, set); + } + + /* Blocking call - we should add timeout functionality */ + printf("selecting %d!\n", janet_vm_loop_count); + int status = select(fd_max, &readfds, &writefds, NULL, NULL); + (void) status; + printf("selected!\n"); + + /* Now handle all events */ + for (int i = 0; i < janet_vm_loop_count;) { + JanetLoopFD *jlfd = janet_vm_loopfds + i; + fd_set *set = (jlfd->select_mode == JLFD_READ) ? &readfds : &writefds; + if (FD_ISSET(jlfd->fd, set)) { + size_t delta = janet_loop_event(i); + i += delta; + } else { + i++; + } + } +} + +void janet_loop(void) { + while (janet_vm_loop_count) janet_loop1(); +} + /* * C Funs */ -static Janet cfun_net_hello(int32_t argc, Janet *argv) { - (void) argv; +static Janet cfun_net_server(int32_t argc, Janet *argv) { + janet_fixarity(argc, 3); + + /* Get host, port, and handler*/ + const char *host = janet_getcstring(argv, 0); + const char *port = janet_getcstring(argv, 1); + JanetFunction *fun = janet_getfunction(argv, 2); + + /* getaddrinfo */ + struct addrinfo *ai; + struct addrinfo hints = {0}; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_PASSIVE; + int status = getaddrinfo(host, port, &hints, &ai); + if (status) { + janet_panicf("could not get address info: %s", gai_strerror(status)); + } + + /* bind */ + /* Check all addrinfos in a loop for the first that we can bind to. */ + int sfd = 0; + struct addrinfo *rp = NULL; + for (rp = ai; rp != NULL; rp = rp->ai_next) { + sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) continue; + if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) break; + close(sfd); + } + if (NULL == rp) { + freeaddrinfo(ai); + janet_panic("could not bind to any sockets"); + } + + /* listen */ + status = listen(sfd, 1024); + if (status) { + freeaddrinfo(ai); + close(sfd); + janet_panic("could not listen on file descriptor"); + } + + /* We need to ignore sigpipe when reading and writing to our connection socket. + * Since a connection could be disconnected at any time, any read or write may fail. + * We don't want to blow up the whole application. */ + signal(SIGPIPE, SIG_IGN); + + /* cleanup */ + freeaddrinfo(ai); + + /* Put sfd on our loop */ + JanetLoopFD lfd = {0}; + lfd.fd = sfd; + lfd.select_mode = JLFD_READ; + lfd.event_type = JLE_READ_ACCEPT; + lfd.data.read_accept.handler = fun; + janet_loop_schedule(lfd); + + return janet_wrap_nil(); +} + +static Janet cfun_net_loop(int32_t argc, Janet *argv) { janet_fixarity(argc, 0); - janet_printf("Hello!\n"); + (void) argv; + printf("starting loop...\n"); + janet_loop(); return janet_wrap_nil(); } static const JanetReg net_cfuns[] = { - {"net/hello", cfun_net_hello, - JDOC("(net/hello)\n\n" - "Prints \"Hello!\".")}, + {"net/server", cfun_net_server, + JDOC("(net/server host port)\n\nStart a simple TCP echo server.")}, + {"net/loop", cfun_net_loop, NULL}, {NULL, NULL, NULL} }; void janet_lib_net(JanetTable *env) { + janet_vm_loop_count = 0; janet_core_cfuns(env, NULL, net_cfuns); } diff --git a/src/core/util.h b/src/core/util.h index c2417014..eeafcc4b 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -128,6 +128,7 @@ void janet_lib_thread(JanetTable *env); #endif #ifdef JANET_NET void janet_lib_net(JanetTable *env); +void janet_net_markloop(void); #endif #endif From f4d7fd97f6df3c320b28479559cf13008700d99e Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 9 Feb 2020 19:04:34 -0600 Subject: [PATCH 03/17] Working TCP echo server and client. Required a few changes to APIs, namely janet_root_fiber() to get topmost fiber that is active in the current scheduler. This is distinct from janet_current_fiber(), which gets the bottom most fiber in the fiber stack - it might have a parent, and so cannot be reliably resumed. This is the kind of situation that makes symmetric coroutines more attractive. --- examples/tcpclient.janet | 7 + examples/tcpserver.janet | 13 ++ src/boot/boot.janet | 4 +- src/core/fiber.c | 4 + src/core/net.c | 372 ++++++++++++++++++++++++++++++--------- src/core/run.c | 2 +- src/core/state.h | 1 + src/core/thread.c | 6 +- src/core/vm.c | 12 +- src/include/janet.h | 6 + src/mainclient/shell.c | 6 +- 11 files changed, 341 insertions(+), 92 deletions(-) create mode 100644 examples/tcpclient.janet create mode 100644 examples/tcpserver.janet diff --git a/examples/tcpclient.janet b/examples/tcpclient.janet new file mode 100644 index 00000000..28f28af8 --- /dev/null +++ b/examples/tcpclient.janet @@ -0,0 +1,7 @@ +(def conn (net/connect "127.0.0.1" "8000")) +(printf "Connected to %q!" conn) +(net/write conn "Echo...") +(print "Wrote to connection...") +(def res (net/read conn 1024)) +(pp res) +(net/close conn) diff --git a/examples/tcpserver.janet b/examples/tcpserver.janet new file mode 100644 index 00000000..a7ecd2d2 --- /dev/null +++ b/examples/tcpserver.janet @@ -0,0 +1,13 @@ +(defn handler + "Simple handler for connections." + [stream] + (def id (gensym)) + (def b @"") + (print "Connection " id "!") + (while (net/read stream 1024 b) + (net/write stream b) + (buffer/clear b)) + (printf "Done %v!" id) + (net/close stream)) + +(net/server "127.0.0.1" "8000" handler) diff --git a/src/boot/boot.janet b/src/boot/boot.janet index 66845757..5276fd36 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -1825,6 +1825,7 @@ (default on-parse-error bad-parse) (default evaluator (fn evaluate [x &] (x))) (default where "") + (default guard :yed) # Are we done yet? (var going true) @@ -1851,7 +1852,7 @@ (string err " on line " line ", column " column) err)) (on-compile-error msg errf where)))) - (or guard :a))) + guard)) (fiber/setenv f env) (while (let [fs (fiber/status f)] (and (not= :dead fs) (not= :error fs))) @@ -2290,6 +2291,7 @@ (def h (in handlers n)) (if h (h i) (do (print "unknown flag -" n) ((in handlers "h"))))) + # Use special evaulator for fly checking (-k option) (def- safe-forms {'defn true 'defn- true 'defmacro true 'defmacro- true}) (def- importers {'import true 'import* true 'use true 'dofile true 'require true}) (defn- evaluator diff --git a/src/core/fiber.c b/src/core/fiber.c index 124b9f39..546ac93b 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -337,6 +337,10 @@ JanetFiber *janet_current_fiber(void) { return janet_vm_fiber; } +JanetFiber *janet_root_fiber(void) { + return janet_vm_root_fiber; +} + /* CFuns */ static Janet cfun_fiber_getenv(int32_t argc, Janet *argv) { diff --git a/src/core/net.c b/src/core/net.c index 375ab3b0..ad485203 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -39,39 +39,72 @@ * Event loops */ +#define JANET_STREAM_CLOSED 1 + +typedef struct { + int fd; + int flags; +} JanetStream; + +static int janet_stream_close(void *p, size_t s); + +static const JanetAbstractType StreamAT = { + "core/stream", + janet_stream_close, + JANET_ATEND_GC +}; + +static int janet_stream_close(void *p, size_t s) { + (void) s; + JanetStream *stream = p; + if (!(stream->flags & JANET_STREAM_CLOSED)) { + stream->flags |= JANET_STREAM_CLOSED; + close(stream->fd); + } + return 0; +} + +static JanetStream *make_stream(int fd) { + JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); + int flags = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, flags | O_NONBLOCK); + stream->fd = fd; + stream->flags = 0; + return stream; +} + /* This large struct describes a waiting file descriptor, as well - * as what to do when we get an event for it. */ + * as what to do when we get an event for it. It is a variant type, where + * each variant implements a simple state machine. */ typedef struct { /* File descriptor to listen for events on. */ int fd; - /* Fiber to resume when event finishes. Can be NULL. */ + /* Fiber to resume when event finishes. Can be NULL, in which case, + * no fiber is resumed when event completes. */ JanetFiber *fiber; - /* We need to tell which fd_set to put in for select. */ - enum { - JLFD_READ, - JLFD_WRITE - } select_mode; - /* What kind of event we are listening for. * As more IO functionality get's added, we can * expand this. */ enum { - JLE_READ_INTO_BUFFER, + JLE_READ_CHUNK, + JLE_READ_SOME, JLE_READ_ACCEPT, + JLE_CONNECT, JLE_WRITE_FROM_BUFFER, JLE_WRITE_FROM_STRINGLIKE } event_type; + /* Each variant can have a different payload. */ union { - /* JLE_READ_INTO_BUFFER */ + /* JLE_READ_CHUNK/JLE_READ_SOME */ struct { - int32_t n; + int32_t bytes_left; JanetBuffer *buf; - } read_into_buffer; + } read_chunk; /* JLE_READ_ACCEPT */ struct { @@ -81,12 +114,15 @@ typedef struct { /* JLE_WRITE_FROM_BUFFER */ struct { JanetBuffer *buf; + int32_t start; } write_from_buffer; /* JLE_WRITE_FROM_STRINGLIKE */ struct { const uint8_t *str; + int32_t start; } write_from_stringlike; + } data; } JanetLoopFD; @@ -104,17 +140,20 @@ void janet_net_markloop(void) { switch (lfd.event_type) { default: break; - case JLE_READ_INTO_BUFFER: - janet_mark(janet_wrap_buffer(lfd.data.read_into_buffer.buf)); + case JLE_READ_CHUNK: + case JLE_READ_SOME: + janet_mark(janet_wrap_buffer(lfd.data.read_chunk.buf)); break; case JLE_READ_ACCEPT: janet_mark(janet_wrap_function(lfd.data.read_accept.handler)); break; + case JLE_CONNECT: + break; case JLE_WRITE_FROM_BUFFER: janet_mark(janet_wrap_buffer(lfd.data.write_from_buffer.buf)); break; case JLE_WRITE_FROM_STRINGLIKE: - janet_mark(janet_wrap_buffer(lfd.data.write_from_buffer.buf)); + janet_mark(janet_wrap_string(lfd.data.write_from_stringlike.str)); } } } @@ -132,11 +171,6 @@ static int janet_loop_schedule(JanetLoopFD lfd) { return index; } -/* Remove an event listener by the handle it returned when scheduled. */ -static void janet_loop_unschedule(int index) { - janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count]; -} - /* Return delta in number of loop fds. Abstracted out so * we can separate out the polling logic */ static size_t janet_loop_event(size_t index) { @@ -145,55 +179,122 @@ static size_t janet_loop_event(size_t index) { int should_resume = 0; Janet resumeval = janet_wrap_nil(); switch (jlfd->event_type) { - case JLE_READ_INTO_BUFFER: - { - JanetBuffer *buffer = jlfd->data.read_into_buffer.buf; - int32_t how_much = jlfd->data.read_into_buffer.n; - janet_buffer_extra(buffer, how_much); - int status = read(jlfd->fd, buffer->data + buffer->count, how_much); - if (status > 0) { - buffer->count += how_much; - } - should_resume = 1; - resumeval = janet_wrap_buffer(buffer); - /* Bag pop */ - janet_loop_unschedule(index); - ret = 0; - break; - } - case JLE_READ_ACCEPT: - { - char addr[256]; /* Just make sure it is large enough for largest address type */ - socklen_t len; - int connfd = accept(jlfd->fd, (void *) &addr, &len); - if (connfd >= 0) { - /* Made a new connection socket */ - int flags = fcntl(connfd, F_GETFL, 0); - fcntl(connfd, F_SETFL, flags | O_NONBLOCK); - FILE *f = fdopen(connfd, "r+"); - Janet filev = janet_makefile(f, JANET_FILE_WRITE | JANET_FILE_READ); - JanetFunction *handler = jlfd->data.read_accept.handler; - Janet out; - /* Launch connection fiber */ - janet_pcall(handler, 1, &filev, &out, NULL); - } + case JLE_READ_CHUNK: + case JLE_READ_SOME: { + JanetBuffer *buffer = jlfd->data.read_chunk.buf; + int32_t bytes_left = jlfd->data.read_chunk.bytes_left; + janet_buffer_extra(buffer, bytes_left); + ssize_t nread; + errno = 0; + do { + nread = read(jlfd->fd, buffer->data + buffer->count, bytes_left); + } while (errno == EINTR); + if (errno == EAGAIN || errno == EWOULDBLOCK) { ret = 1; break; } - case JLE_WRITE_FROM_BUFFER: - case JLE_WRITE_FROM_STRINGLIKE: - ret = 1; + if (nread > 0) { + buffer->count += nread; + bytes_left -= nread; + } else { + bytes_left = 0; + } + if (jlfd->event_type == JLE_READ_SOME || bytes_left == 0) { + should_resume = 1; + if (nread > 0) { + resumeval = janet_wrap_buffer(buffer); + } + ret = 0; + } else { + jlfd->data.read_chunk.bytes_left = bytes_left; + ret = 1; + } break; + } + case JLE_READ_ACCEPT: { + char addr[256] = {0}; /* Just make sure it is large enough for largest address type */ + socklen_t len = 0; + int connfd = accept(jlfd->fd, (void *) &addr, &len); + if (connfd >= 0) { + /* Made a new connection socket */ + JanetStream *stream = make_stream(connfd); + Janet streamv = janet_wrap_abstract(stream); + JanetFunction *handler = jlfd->data.read_accept.handler; + Janet out; + JanetFiber *fiberp = NULL; + /* Launch connection fiber */ + JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp); + if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) { + janet_stacktrace(fiberp, out); + } + } + ret = JANET_LOOPFD_MAX; + break; + } + case JLE_WRITE_FROM_BUFFER: + case JLE_WRITE_FROM_STRINGLIKE: { + int32_t start, len; + const uint8_t *bytes; + if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { + JanetBuffer *buffer = jlfd->data.write_from_buffer.buf; + bytes = buffer->data; + len = buffer->count; + start = jlfd->data.write_from_buffer.start; + } else { + bytes = jlfd->data.write_from_stringlike.str; + len = janet_string_length(bytes); + start = jlfd->data.write_from_stringlike.start; + } + if (start < len) { + int32_t nbytes = len - start; + ssize_t nwrote; + do { + nwrote = write(jlfd->fd, bytes + start, nbytes); + } while (nwrote == EINTR); + if (nwrote > 0) { + start += nwrote; + } else { + start = len; + } + } + if (start >= len) { + should_resume = 1; + ret = 0; + } else { + if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { + jlfd->data.write_from_buffer.start = start; + } else { + jlfd->data.write_from_stringlike.start = start; + } + ret = 1; + } + break; + } + case JLE_CONNECT: { + + break; + } } + + /* Resume a fiber for some events */ if (NULL != jlfd->fiber && should_resume) { /* Resume the fiber */ Janet out; - janet_continue(jlfd->fiber, resumeval, &out); + JanetSignal sig = janet_continue(jlfd->fiber, resumeval, &out); + if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) { + janet_stacktrace(jlfd->fiber, out); + } } + + /* Remove this handler from the handler pool. */ + if (should_resume) { + janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count]; + } + return ret; } -void janet_loop1(void) { +static void janet_loop1(void) { /* Set up fd_sets */ fd_set readfds; fd_set writefds; @@ -203,20 +304,17 @@ void janet_loop1(void) { for (int i = 0; i < janet_vm_loop_count; i++) { JanetLoopFD *jlfd = janet_vm_loopfds + i; if (jlfd->fd > fd_max) fd_max = jlfd->fd; - fd_set *set = (jlfd->select_mode == JLFD_READ) ? &readfds : &writefds; + fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds; FD_SET(jlfd->fd, set); } /* Blocking call - we should add timeout functionality */ - printf("selecting %d!\n", janet_vm_loop_count); - int status = select(fd_max, &readfds, &writefds, NULL, NULL); - (void) status; - printf("selected!\n"); + select(fd_max + 1, &readfds, &writefds, NULL, NULL); /* Now handle all events */ for (int i = 0; i < janet_vm_loop_count;) { JanetLoopFD *jlfd = janet_vm_loopfds + i; - fd_set *set = (jlfd->select_mode == JLFD_READ) ? &readfds : &writefds; + fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds; if (FD_ISSET(jlfd->fd, set)) { size_t delta = janet_loop_event(i); i += delta; @@ -227,23 +325,57 @@ void janet_loop1(void) { } void janet_loop(void) { - while (janet_vm_loop_count) janet_loop1(); + while (janet_vm_loop_count) { + janet_loop1(); + } } /* - * C Funs + * Scheduling Helpers */ -static Janet cfun_net_server(int32_t argc, Janet *argv) { - janet_fixarity(argc, 3); +#define JANET_SCHED_FSOME 1 - /* Get host, port, and handler*/ - const char *host = janet_getcstring(argv, 0); - const char *port = janet_getcstring(argv, 1); - JanetFunction *fun = janet_getfunction(argv, 2); +JANET_NO_RETURN static void janet_sched_read(int fd, JanetBuffer *buf, int32_t nbytes, int flags) { + JanetLoopFD lfd = {0}; + lfd.fd = fd; + lfd.fiber = janet_root_fiber(); + lfd.event_type = (flags & JANET_SCHED_FSOME) ? JLE_READ_SOME : JLE_READ_CHUNK; + lfd.data.read_chunk.buf = buf; + lfd.data.read_chunk.bytes_left = nbytes; + janet_loop_schedule(lfd); + janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); +} +JANET_NO_RETURN static void janet_sched_write_buffer(int fd, JanetBuffer *buf) { + JanetLoopFD lfd = {0}; + lfd.fd = fd; + lfd.fiber = janet_root_fiber(); + lfd.event_type = JLE_WRITE_FROM_BUFFER; + lfd.data.write_from_buffer.buf = buf; + lfd.data.write_from_buffer.start = 0; + janet_loop_schedule(lfd); + janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); +} + +JANET_NO_RETURN static void janet_sched_write_stringlike(int fd, const uint8_t *str) { + JanetLoopFD lfd = {0}; + lfd.fd = fd; + lfd.fiber = janet_root_fiber(); + lfd.event_type = JLE_WRITE_FROM_STRINGLIKE; + lfd.data.write_from_stringlike.str = str; + lfd.data.write_from_stringlike.start = 0; + janet_loop_schedule(lfd); + janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); +} + +/* Needs argc >= offset + 2 */ +static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset) { + /* Get host and port */ + const char *host = janet_getcstring(argv, offset); + const char *port = janet_getcstring(argv, offset + 1); /* getaddrinfo */ - struct addrinfo *ai; + struct addrinfo *ai = NULL; struct addrinfo hints = {0}; hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; @@ -253,8 +385,46 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { if (status) { janet_panicf("could not get address info: %s", gai_strerror(status)); } + return ai; +} + +/* + * C Funs + */ + +static Janet cfun_net_connect(int32_t argc, Janet *argv) { + janet_fixarity(argc, 2); + + struct addrinfo *ai = janet_get_addrinfo(argv, 0); + + /* Create socket */ + int sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (sock < 0) { + freeaddrinfo(ai); + janet_panic("could not create socket"); + } + + /* Connect to socket */ + int status = connect(sock, ai->ai_addr, ai->ai_addrlen); + freeaddrinfo(ai); + if (status < 0) { + close(sock); + janet_panic("could not connect to socket"); + } + + /* Wrap socket in abstract type JanetStream */ + JanetStream *stream = make_stream(sock); + return janet_wrap_abstract(stream); +} + +static Janet cfun_net_server(int32_t argc, Janet *argv) { + janet_fixarity(argc, 3); + + /* Get host, port, and handler*/ + JanetFunction *fun = janet_getfunction(argv, 2); + + struct addrinfo *ai = janet_get_addrinfo(argv, 0); - /* bind */ /* Check all addrinfos in a loop for the first that we can bind to. */ int sfd = 0; struct addrinfo *rp = NULL; @@ -270,9 +440,9 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { } /* listen */ - status = listen(sfd, 1024); + int status = listen(sfd, 1024); + freeaddrinfo(ai); if (status) { - freeaddrinfo(ai); close(sfd); janet_panic("could not listen on file descriptor"); } @@ -282,13 +452,9 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { * We don't want to blow up the whole application. */ signal(SIGPIPE, SIG_IGN); - /* cleanup */ - freeaddrinfo(ai); - /* Put sfd on our loop */ JanetLoopFD lfd = {0}; lfd.fd = sfd; - lfd.select_mode = JLFD_READ; lfd.event_type = JLE_READ_ACCEPT; lfd.data.read_accept.handler = fun; janet_loop_schedule(lfd); @@ -296,18 +462,50 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { return janet_wrap_nil(); } -static Janet cfun_net_loop(int32_t argc, Janet *argv) { - janet_fixarity(argc, 0); - (void) argv; - printf("starting loop...\n"); - janet_loop(); +static Janet cfun_stream_read(int32_t argc, Janet *argv) { + janet_arity(argc, 2, 3); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + int32_t n = janet_getnat(argv, 1); + JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); + janet_sched_read(stream->fd, buffer, n, JANET_SCHED_FSOME); +} + +static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { + janet_arity(argc, 2, 3); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + int32_t n = janet_getnat(argv, 1); + JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); + janet_sched_read(stream->fd, buffer, n, 0); +} + +static Janet cfun_stream_close(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + janet_stream_close(stream, 0); return janet_wrap_nil(); } +static Janet cfun_stream_write(int32_t argc, Janet *argv) { + janet_fixarity(argc, 2); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + if (janet_checktype(argv[1], JANET_BUFFER)) { + janet_sched_write_buffer(stream->fd, janet_getbuffer(argv, 1)); + } else { + JanetByteView bytes = janet_getbytes(argv, 1); + janet_sched_write_stringlike(stream->fd, bytes.bytes); + } +} + static const JanetReg net_cfuns[] = { - {"net/server", cfun_net_server, - JDOC("(net/server host port)\n\nStart a simple TCP echo server.")}, - {"net/loop", cfun_net_loop, NULL}, + { + "net/server", cfun_net_server, + JDOC("(net/server host port)\n\nStart a TCP server.") + }, + {"net/read", cfun_stream_read, NULL}, + {"net/chunk", cfun_stream_chunk, NULL}, + {"net/write", cfun_stream_write, NULL}, + {"net/close", cfun_stream_close, NULL}, + {"net/connect", cfun_net_connect, NULL}, {NULL, NULL, NULL} }; diff --git a/src/core/run.c b/src/core/run.c index ada14555..6f6019c0 100644 --- a/src/core/run.c +++ b/src/core/run.c @@ -50,7 +50,7 @@ int janet_dobytes(JanetTable *env, const uint8_t *bytes, int32_t len, const char JanetFiber *fiber = janet_fiber(f, 64, 0, NULL); fiber->env = env; JanetSignal status = janet_continue(fiber, janet_wrap_nil(), &ret); - if (status != JANET_SIGNAL_OK) { + if (status != JANET_SIGNAL_OK && status < JANET_SIGNAL_USER0) { janet_stacktrace(fiber, ret); errflags |= 0x01; done = 1; diff --git a/src/core/state.h b/src/core/state.h index 8674b07a..3285d8c8 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -43,6 +43,7 @@ extern JANET_THREAD_LOCAL int janet_vm_stackn; /* The current running fiber on the current thread. * Set and unset by janet_run. */ extern JANET_THREAD_LOCAL JanetFiber *janet_vm_fiber; +extern JANET_THREAD_LOCAL JanetFiber *janet_vm_root_fiber; /* The current pointer to the inner most jmp_buf. The current * return point for panics. */ diff --git a/src/core/thread.c b/src/core/thread.c index 370e3359..daedef9b 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -448,11 +448,15 @@ static int thread_worker(JanetMailbox *mailbox) { Janet argv[1] = { parentv }; fiber = janet_fiber(func, 64, 1, argv); JanetSignal sig = janet_continue(fiber, janet_wrap_nil(), &out); - if (sig != JANET_SIGNAL_OK) { + if (sig != JANET_SIGNAL_OK && sig < JANET_SIGNAL_USER0) { janet_eprintf("in thread %v: ", janet_wrap_abstract(janet_make_thread(mailbox, encode))); janet_stacktrace(fiber, out); } +#ifdef JANET_NET + janet_loop(); +#endif + /* Normal exit */ janet_deinit(); return 0; diff --git a/src/core/vm.c b/src/core/vm.c index 6b918f2d..7912d95a 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -35,6 +35,7 @@ JANET_THREAD_LOCAL JanetTable *janet_vm_core_env; JANET_THREAD_LOCAL JanetTable *janet_vm_registry; JANET_THREAD_LOCAL int janet_vm_stackn = 0; JANET_THREAD_LOCAL JanetFiber *janet_vm_fiber = NULL; +JANET_THREAD_LOCAL JanetFiber *janet_vm_root_fiber = NULL; JANET_THREAD_LOCAL Janet *janet_vm_return_reg = NULL; JANET_THREAD_LOCAL jmp_buf *janet_vm_jmp_buf = NULL; @@ -1231,7 +1232,9 @@ Janet janet_call(JanetFunction *fun, int32_t argc, const Janet *argv) { janet_vm_stackn = oldn; janet_gcunlock(handle); - if (signal != JANET_SIGNAL_OK) janet_panicv(*janet_vm_return_reg); + if (signal != JANET_SIGNAL_OK) { + janet_panicv(*janet_vm_return_reg); + } return *janet_vm_return_reg; } @@ -1277,6 +1280,7 @@ JanetSignal janet_continue(JanetFiber *fiber, Janet in, Janet *out) { Janet *old_vm_return_reg = janet_vm_return_reg; /* Setup fiber */ + if (oldn == 0) janet_vm_root_fiber = fiber; janet_vm_fiber = fiber; janet_gcroot(janet_wrap_fiber(fiber)); janet_fiber_set_status(fiber, JANET_STATUS_ALIVE); @@ -1302,6 +1306,7 @@ JanetSignal janet_continue(JanetFiber *fiber, Janet in, Janet *out) { janet_gcunroot(janet_wrap_fiber(fiber)); /* Restore global state */ + if (oldn == 0) janet_vm_root_fiber = NULL; janet_vm_gc_suspend = handle; janet_vm_fiber = old_vm_fiber; janet_vm_stackn = oldn; @@ -1369,6 +1374,9 @@ int janet_init(void) { janet_vm_core_env = NULL; /* Seed RNG */ janet_rng_seed(janet_default_rng(), 0); + /* Fibers */ + janet_vm_fiber = NULL; + janet_vm_root_fiber = NULL; /* Threads */ #ifdef JANET_THREADS janet_threads_init(); @@ -1386,6 +1394,8 @@ void janet_deinit(void) { janet_vm_root_capacity = 0; janet_vm_registry = NULL; janet_vm_core_env = NULL; + janet_vm_fiber = NULL; + janet_vm_root_fiber = NULL; #ifdef JANET_THREADS janet_threads_deinit(); #endif diff --git a/src/include/janet.h b/src/include/janet.h index 005602fc..20e9570c 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1099,6 +1099,11 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT]; /***** START SECTION MAIN *****/ +/* Event Loop */ +#ifdef JANET_NET +JANET_API void janet_loop(void); +#endif + /* Parsing */ JANET_API void janet_parser_init(JanetParser *parser); JANET_API void janet_parser_deinit(JanetParser *parser); @@ -1273,6 +1278,7 @@ JANET_API JanetFiber *janet_fiber(JanetFunction *callee, int32_t capacity, int32 JANET_API JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t argc, const Janet *argv); JANET_API JanetFiberStatus janet_fiber_status(JanetFiber *fiber); JANET_API JanetFiber *janet_current_fiber(void); +JANET_API JanetFiber *janet_root_fiber(void); /* Treat similar types through uniform interfaces for iteration */ JANET_API int janet_indexed_view(Janet seq, const Janet **data, int32_t *len); diff --git a/src/mainclient/shell.c b/src/mainclient/shell.c index 1ef3fd5c..4b7a3796 100644 --- a/src/mainclient/shell.c +++ b/src/mainclient/shell.c @@ -869,10 +869,14 @@ int main(int argc, char **argv) { JanetFiber *fiber = janet_fiber(janet_unwrap_function(mainfun), 64, 1, mainargs); fiber->env = env; status = janet_continue(fiber, janet_wrap_nil(), &out); - if (status != JANET_SIGNAL_OK) { + if (status != JANET_SIGNAL_OK && status < JANET_SIGNAL_USER0) { janet_stacktrace(fiber, out); } +#ifdef JANET_NET + janet_loop(); +#endif + /* Deinitialize vm */ janet_deinit(); janet_line_deinit(); From 135aff9e1763484471dfcfa4b97c73ec944a5318 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 9 Feb 2020 20:02:35 -0600 Subject: [PATCH 04/17] Add janet_loop() call to static binaries. --- auxbin/jpm | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/auxbin/jpm b/auxbin/jpm index a3efee68..7bb16596 100755 --- a/auxbin/jpm +++ b/auxbin/jpm @@ -512,11 +512,14 @@ int main(int argc, const char **argv) { fiber->env = temptab; Janet out; JanetSignal result = janet_continue(fiber, janet_wrap_nil(), &out); - if (result) { + if (result != JANET_SIGNAL_OK && result < JANET_SIGNAL_USER0) { janet_stacktrace(fiber, out); janet_deinit(); return result; } + #ifdef JANET_NET + janet_loop(); + #endif janet_deinit(); return 0; } From 79bb9e54d5ba9677f90ccd110f827ce04d229a47 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Tue, 11 Feb 2020 08:57:44 -0600 Subject: [PATCH 05/17] Remove direct references to file descriptors. If a descriptor is freed by the Janet code, other uses of that descriptor, say in the event loop, need to know that it has been closed. --- src/core/cfuns.c | 8 +- src/core/net.c | 241 ++++++++++++++++++++++++++--------------------- 2 files changed, 138 insertions(+), 111 deletions(-) diff --git a/src/core/cfuns.c b/src/core/cfuns.c index 96b040be..b2e9b0b5 100644 --- a/src/core/cfuns.c +++ b/src/core/cfuns.c @@ -70,10 +70,10 @@ static JanetSlot genericSSI(JanetFopts opts, int op, JanetSlot s, int32_t imm) { /* Emit an insruction that implements a form by itself. */ static JanetSlot opfunction( - JanetFopts opts, - JanetSlot *args, - int op, - Janet defaultArg2) { + JanetFopts opts, + JanetSlot *args, + int op, + Janet defaultArg2) { JanetCompiler *c = opts.compiler; int32_t len; len = janet_v_count(args); diff --git a/src/core/net.c b/src/core/net.c index ad485203..0060dba2 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -36,10 +36,12 @@ #include /* - * Event loops + * Streams */ #define JANET_STREAM_CLOSED 1 +#define JANET_STREAM_READABLE 2 +#define JANET_STREAM_WRITABLE 4 typedef struct { int fd; @@ -48,6 +50,8 @@ typedef struct { static int janet_stream_close(void *p, size_t s); +static int janet_stream_getter(void *p, size_t, Janet key); + static const JanetAbstractType StreamAT = { "core/stream", janet_stream_close, @@ -64,22 +68,25 @@ static int janet_stream_close(void *p, size_t s) { return 0; } -static JanetStream *make_stream(int fd) { +static JanetStream *make_stream(int fd, int flags) { JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); - int flags = fcntl(fd, F_GETFL, 0); - fcntl(fd, F_SETFL, flags | O_NONBLOCK); + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); stream->fd = fd; - stream->flags = 0; + stream->flags = flags; return stream; } +/* + * Event loop + */ + /* This large struct describes a waiting file descriptor, as well * as what to do when we get an event for it. It is a variant type, where * each variant implements a simple state machine. */ typedef struct { /* File descriptor to listen for events on. */ - int fd; + JanetStream *stream; /* Fiber to resume when event finishes. Can be NULL, in which case, * no fiber is resumed when event completes. */ @@ -137,6 +144,7 @@ JANET_THREAD_LOCAL int janet_vm_loop_count; void janet_net_markloop(void) { for (int i = 0; i < janet_vm_loop_count; i++) { JanetLoopFD lfd = janet_vm_loopfds[i]; + janet_mark(janet_wrap_abstract(lfd.stream)); switch (lfd.event_type) { default: break; @@ -175,104 +183,121 @@ static int janet_loop_schedule(JanetLoopFD lfd) { * we can separate out the polling logic */ static size_t janet_loop_event(size_t index) { JanetLoopFD *jlfd = janet_vm_loopfds + index; + JanetStream *stream = jlfd->stream; + int fd = stream->fd; int ret = 1; int should_resume = 0; Janet resumeval = janet_wrap_nil(); - switch (jlfd->event_type) { - case JLE_READ_CHUNK: - case JLE_READ_SOME: { - JanetBuffer *buffer = jlfd->data.read_chunk.buf; - int32_t bytes_left = jlfd->data.read_chunk.bytes_left; - janet_buffer_extra(buffer, bytes_left); - ssize_t nread; - errno = 0; - do { - nread = read(jlfd->fd, buffer->data + buffer->count, bytes_left); - } while (errno == EINTR); - if (errno == EAGAIN || errno == EWOULDBLOCK) { - ret = 1; + if (stream->flags & JANET_STREAM_CLOSED) { + should_resume = 1; + ret = 0; + } else { + switch (jlfd->event_type) { + case JLE_READ_CHUNK: + case JLE_READ_SOME: { + JanetBuffer *buffer = jlfd->data.read_chunk.buf; + int32_t bytes_left = jlfd->data.read_chunk.bytes_left; + janet_buffer_extra(buffer, bytes_left); + ssize_t nread; + errno = 0; + if (!(stream->flags & JANET_STREAM_READABLE)) { + should_resume = 1; + ret = 0; + break; + } + do { + nread = read(fd, buffer->data + buffer->count, bytes_left); + } while (errno == EINTR); + if (errno == EAGAIN || errno == EWOULDBLOCK) { + ret = 1; + break; + } + if (nread > 0) { + buffer->count += nread; + bytes_left -= nread; + } else { + bytes_left = 0; + } + if (jlfd->event_type == JLE_READ_SOME || bytes_left == 0) { + should_resume = 1; + if (nread > 0) { + resumeval = janet_wrap_buffer(buffer); + } + ret = 0; + } else { + jlfd->data.read_chunk.bytes_left = bytes_left; + ret = 1; + } break; } - if (nread > 0) { - buffer->count += nread; - bytes_left -= nread; - } else { - bytes_left = 0; - } - if (jlfd->event_type == JLE_READ_SOME || bytes_left == 0) { - should_resume = 1; - if (nread > 0) { - resumeval = janet_wrap_buffer(buffer); + case JLE_READ_ACCEPT: { + char addr[256] = {0}; /* Just make sure it is large enough for largest address type */ + socklen_t len = 0; + int connfd = accept(fd, (void *) &addr, &len); + if (connfd >= 0) { + /* Made a new connection socket */ + JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); + Janet streamv = janet_wrap_abstract(stream); + JanetFunction *handler = jlfd->data.read_accept.handler; + Janet out; + JanetFiber *fiberp = NULL; + /* Launch connection fiber */ + JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp); + if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) { + janet_stacktrace(fiberp, out); + } } - ret = 0; - } else { - jlfd->data.read_chunk.bytes_left = bytes_left; - ret = 1; + ret = JANET_LOOPFD_MAX; + break; } - break; - } - case JLE_READ_ACCEPT: { - char addr[256] = {0}; /* Just make sure it is large enough for largest address type */ - socklen_t len = 0; - int connfd = accept(jlfd->fd, (void *) &addr, &len); - if (connfd >= 0) { - /* Made a new connection socket */ - JanetStream *stream = make_stream(connfd); - Janet streamv = janet_wrap_abstract(stream); - JanetFunction *handler = jlfd->data.read_accept.handler; - Janet out; - JanetFiber *fiberp = NULL; - /* Launch connection fiber */ - JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp); - if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) { - janet_stacktrace(fiberp, out); + case JLE_WRITE_FROM_BUFFER: + case JLE_WRITE_FROM_STRINGLIKE: { + int32_t start, len; + const uint8_t *bytes; + if (!(stream->flags & JANET_STREAM_WRITABLE)) { + should_resume = 1; + ret = 0; + break; } - } - ret = JANET_LOOPFD_MAX; - break; - } - case JLE_WRITE_FROM_BUFFER: - case JLE_WRITE_FROM_STRINGLIKE: { - int32_t start, len; - const uint8_t *bytes; - if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { - JanetBuffer *buffer = jlfd->data.write_from_buffer.buf; - bytes = buffer->data; - len = buffer->count; - start = jlfd->data.write_from_buffer.start; - } else { - bytes = jlfd->data.write_from_stringlike.str; - len = janet_string_length(bytes); - start = jlfd->data.write_from_stringlike.start; - } - if (start < len) { - int32_t nbytes = len - start; - ssize_t nwrote; - do { - nwrote = write(jlfd->fd, bytes + start, nbytes); - } while (nwrote == EINTR); - if (nwrote > 0) { - start += nwrote; - } else { - start = len; - } - } - if (start >= len) { - should_resume = 1; - ret = 0; - } else { if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { - jlfd->data.write_from_buffer.start = start; + JanetBuffer *buffer = jlfd->data.write_from_buffer.buf; + bytes = buffer->data; + len = buffer->count; + start = jlfd->data.write_from_buffer.start; } else { - jlfd->data.write_from_stringlike.start = start; + bytes = jlfd->data.write_from_stringlike.str; + len = janet_string_length(bytes); + start = jlfd->data.write_from_stringlike.start; } - ret = 1; + if (start < len) { + int32_t nbytes = len - start; + ssize_t nwrote; + do { + nwrote = write(fd, bytes + start, nbytes); + } while (nwrote == EINTR); + if (nwrote > 0) { + start += nwrote; + } else { + start = len; + } + } + if (start >= len) { + should_resume = 1; + ret = 0; + } else { + if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { + jlfd->data.write_from_buffer.start = start; + } else { + jlfd->data.write_from_stringlike.start = start; + } + ret = 1; + } + break; } - break; - } - case JLE_CONNECT: { + case JLE_CONNECT: { - break; + break; + } } } @@ -303,9 +328,10 @@ static void janet_loop1(void) { int fd_max = 0; for (int i = 0; i < janet_vm_loop_count; i++) { JanetLoopFD *jlfd = janet_vm_loopfds + i; - if (jlfd->fd > fd_max) fd_max = jlfd->fd; + int fd = jlfd->stream->fd; + if (fd > fd_max) fd_max = fd; fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds; - FD_SET(jlfd->fd, set); + FD_SET(fd, set); } /* Blocking call - we should add timeout functionality */ @@ -314,8 +340,9 @@ static void janet_loop1(void) { /* Now handle all events */ for (int i = 0; i < janet_vm_loop_count;) { JanetLoopFD *jlfd = janet_vm_loopfds + i; + int fd = jlfd->stream->fd; fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds; - if (FD_ISSET(jlfd->fd, set)) { + if (FD_ISSET(fd, set)) { size_t delta = janet_loop_event(i); i += delta; } else { @@ -336,9 +363,9 @@ void janet_loop(void) { #define JANET_SCHED_FSOME 1 -JANET_NO_RETURN static void janet_sched_read(int fd, JanetBuffer *buf, int32_t nbytes, int flags) { +JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { JanetLoopFD lfd = {0}; - lfd.fd = fd; + lfd.stream = stream; lfd.fiber = janet_root_fiber(); lfd.event_type = (flags & JANET_SCHED_FSOME) ? JLE_READ_SOME : JLE_READ_CHUNK; lfd.data.read_chunk.buf = buf; @@ -347,9 +374,9 @@ JANET_NO_RETURN static void janet_sched_read(int fd, JanetBuffer *buf, int32_t n janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); } -JANET_NO_RETURN static void janet_sched_write_buffer(int fd, JanetBuffer *buf) { +JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf) { JanetLoopFD lfd = {0}; - lfd.fd = fd; + lfd.stream = stream; lfd.fiber = janet_root_fiber(); lfd.event_type = JLE_WRITE_FROM_BUFFER; lfd.data.write_from_buffer.buf = buf; @@ -358,9 +385,9 @@ JANET_NO_RETURN static void janet_sched_write_buffer(int fd, JanetBuffer *buf) { janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); } -JANET_NO_RETURN static void janet_sched_write_stringlike(int fd, const uint8_t *str) { +JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str) { JanetLoopFD lfd = {0}; - lfd.fd = fd; + lfd.stream = stream; lfd.fiber = janet_root_fiber(); lfd.event_type = JLE_WRITE_FROM_STRINGLIKE; lfd.data.write_from_stringlike.str = str; @@ -413,7 +440,7 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { } /* Wrap socket in abstract type JanetStream */ - JanetStream *stream = make_stream(sock); + JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); return janet_wrap_abstract(stream); } @@ -454,12 +481,12 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { /* Put sfd on our loop */ JanetLoopFD lfd = {0}; - lfd.fd = sfd; + lfd.stream = make_stream(sfd, 0); lfd.event_type = JLE_READ_ACCEPT; lfd.data.read_accept.handler = fun; janet_loop_schedule(lfd); - return janet_wrap_nil(); + return janet_wrap_abstract(lfd.stream); } static Janet cfun_stream_read(int32_t argc, Janet *argv) { @@ -467,7 +494,7 @@ static Janet cfun_stream_read(int32_t argc, Janet *argv) { JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); - janet_sched_read(stream->fd, buffer, n, JANET_SCHED_FSOME); + janet_sched_read(stream, buffer, n, JANET_SCHED_FSOME); } static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { @@ -475,7 +502,7 @@ static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); - janet_sched_read(stream->fd, buffer, n, 0); + janet_sched_read(stream, buffer, n, 0); } static Janet cfun_stream_close(int32_t argc, Janet *argv) { @@ -489,10 +516,10 @@ static Janet cfun_stream_write(int32_t argc, Janet *argv) { janet_fixarity(argc, 2); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); if (janet_checktype(argv[1], JANET_BUFFER)) { - janet_sched_write_buffer(stream->fd, janet_getbuffer(argv, 1)); + janet_sched_write_buffer(stream, janet_getbuffer(argv, 1)); } else { JanetByteView bytes = janet_getbytes(argv, 1); - janet_sched_write_stringlike(stream->fd, bytes.bytes); + janet_sched_write_stringlike(stream, bytes.bytes); } } From f4a46ba6ea39d23226e6d2e512fdce8a5f24fa4a Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Wed, 12 Feb 2020 09:32:41 -0600 Subject: [PATCH 06/17] Add methods to streams. This makes streams polymorphic with files in many cases. printf family functions still need porting. --- examples/tcpclient.janet | 13 ++++++------- examples/tcpserver.janet | 16 ++++++++-------- src/core/net.c | 20 ++++++++++++++++++-- 3 files changed, 32 insertions(+), 17 deletions(-) diff --git a/examples/tcpclient.janet b/examples/tcpclient.janet index 28f28af8..b042da4a 100644 --- a/examples/tcpclient.janet +++ b/examples/tcpclient.janet @@ -1,7 +1,6 @@ -(def conn (net/connect "127.0.0.1" "8000")) -(printf "Connected to %q!" conn) -(net/write conn "Echo...") -(print "Wrote to connection...") -(def res (net/read conn 1024)) -(pp res) -(net/close conn) +(with [conn (net/connect "127.0.0.1" "8000")] + (printf "Connected to %q!" conn) + (:write conn "Echo...") + (print "Wrote to connection...") + (def res (:read conn 1024)) + (pp res)) diff --git a/examples/tcpserver.janet b/examples/tcpserver.janet index a7ecd2d2..71501b8b 100644 --- a/examples/tcpserver.janet +++ b/examples/tcpserver.janet @@ -1,13 +1,13 @@ (defn handler "Simple handler for connections." [stream] - (def id (gensym)) - (def b @"") - (print "Connection " id "!") - (while (net/read stream 1024 b) - (net/write stream b) - (buffer/clear b)) - (printf "Done %v!" id) - (net/close stream)) + (defer (:close stream) + (def id (gensym)) + (def b @"") + (print "Connection " id "!") + (while (:read stream 1024 b) + (:write stream b) + (buffer/clear b)) + (printf "Done %v!" id))) (net/server "127.0.0.1" "8000" handler) diff --git a/src/core/net.c b/src/core/net.c index 0060dba2..9802daaf 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -50,12 +50,14 @@ typedef struct { static int janet_stream_close(void *p, size_t s); -static int janet_stream_getter(void *p, size_t, Janet key); +static int janet_stream_getter(void *p, Janet key, Janet *out); static const JanetAbstractType StreamAT = { "core/stream", janet_stream_close, - JANET_ATEND_GC + NULL, + janet_stream_getter, + JANET_ATEND_GET }; static int janet_stream_close(void *p, size_t s) { @@ -523,6 +525,20 @@ static Janet cfun_stream_write(int32_t argc, Janet *argv) { } } +static const JanetMethod stream_methods[] = { + {"chunk", cfun_stream_chunk}, + {"close", cfun_stream_close}, + {"read", cfun_stream_read}, + {"write", cfun_stream_write}, + {NULL, NULL} +}; + +static int janet_stream_getter(void *p, Janet key, Janet *out) { + (void) p; + if (!janet_checktype(key, JANET_KEYWORD)) return 0; + return janet_getmethod(janet_unwrap_keyword(key), stream_methods, out); +} + static const JanetReg net_cfuns[] = { { "net/server", cfun_net_server, From 0df220780a95722f95d3e35f76ece004b3acbb29 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 20 Feb 2020 19:54:31 -0600 Subject: [PATCH 07/17] Fix issues with #282 Bad handling of write errors, as well as janet_root_fiber(). --- src/core/net.c | 23 ++++++++++++++++++----- src/core/vm.c | 5 +++-- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/core/net.c b/src/core/net.c index 9802daaf..dfe14ea5 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -26,6 +26,8 @@ #include "util.h" #endif +#include +#include #include #include #include @@ -146,6 +148,9 @@ JANET_THREAD_LOCAL int janet_vm_loop_count; void janet_net_markloop(void) { for (int i = 0; i < janet_vm_loop_count; i++) { JanetLoopFD lfd = janet_vm_loopfds[i]; + if (lfd.fiber != NULL) { + janet_mark(janet_wrap_fiber(lfd.fiber)); + } janet_mark(janet_wrap_abstract(lfd.stream)); switch (lfd.event_type) { default: @@ -175,9 +180,6 @@ static int janet_loop_schedule(JanetLoopFD lfd) { } int index = janet_vm_loop_count; janet_vm_loopfds[janet_vm_loop_count++] = lfd; - if (NULL != lfd.fiber) { - janet_gcroot(janet_wrap_fiber(lfd.fiber)); - } return index; } @@ -274,9 +276,10 @@ static size_t janet_loop_event(size_t index) { if (start < len) { int32_t nbytes = len - start; ssize_t nwrote; + errno = 0; do { nwrote = write(fd, bytes + start, nbytes); - } while (nwrote == EINTR); + } while (errno == EINTR); if (nwrote > 0) { start += nwrote; } else { @@ -422,7 +425,7 @@ static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset) { */ static Janet cfun_net_connect(int32_t argc, Janet *argv) { - janet_fixarity(argc, 2); + janet_arity(argc, 2, -1); struct addrinfo *ai = janet_get_addrinfo(argv, 0); @@ -433,6 +436,16 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { janet_panic("could not create socket"); } + /* Set socket opts */ + /*for (int32_t argi = 1; argi < argc; argi++) { + const uint8_t *kw = janet_getkeyword(argv, argi); + if (janet_cstrcmp(kw, "no-delay")) { + int one = 1; + setsockopt(sock, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); + } + }*/ + + /* Connect to socket */ int status = connect(sock, ai->ai_addr, ai->ai_addrlen); freeaddrinfo(ai); diff --git a/src/core/vm.c b/src/core/vm.c index 7912d95a..10fc7790 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -1280,7 +1280,7 @@ JanetSignal janet_continue(JanetFiber *fiber, Janet in, Janet *out) { Janet *old_vm_return_reg = janet_vm_return_reg; /* Setup fiber */ - if (oldn == 0) janet_vm_root_fiber = fiber; + if (janet_vm_root_fiber == NULL) janet_vm_root_fiber = fiber; janet_vm_fiber = fiber; janet_gcroot(janet_wrap_fiber(fiber)); janet_fiber_set_status(fiber, JANET_STATUS_ALIVE); @@ -1306,7 +1306,7 @@ JanetSignal janet_continue(JanetFiber *fiber, Janet in, Janet *out) { janet_gcunroot(janet_wrap_fiber(fiber)); /* Restore global state */ - if (oldn == 0) janet_vm_root_fiber = NULL; + if (janet_vm_root_fiber == fiber) janet_vm_root_fiber = NULL; janet_vm_gc_suspend = handle; janet_vm_fiber = old_vm_fiber; janet_vm_stackn = oldn; @@ -1377,6 +1377,7 @@ int janet_init(void) { /* Fibers */ janet_vm_fiber = NULL; janet_vm_root_fiber = NULL; + janet_vm_stackn = 0; /* Threads */ #ifdef JANET_THREADS janet_threads_init(); From 01a79dc9654f3c2b97b96b71b9b4c7d1af10bf8c Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 20 Feb 2020 20:10:03 -0600 Subject: [PATCH 08/17] Remove extra functionality. --- src/core/net.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/core/net.c b/src/core/net.c index dfe14ea5..76b435cd 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -26,8 +26,6 @@ #include "util.h" #endif -#include -#include #include #include #include @@ -425,7 +423,7 @@ static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset) { */ static Janet cfun_net_connect(int32_t argc, Janet *argv) { - janet_arity(argc, 2, -1); + janet_fixarity(argc, 2); struct addrinfo *ai = janet_get_addrinfo(argv, 0); @@ -436,16 +434,6 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { janet_panic("could not create socket"); } - /* Set socket opts */ - /*for (int32_t argi = 1; argi < argc; argi++) { - const uint8_t *kw = janet_getkeyword(argv, argi); - if (janet_cstrcmp(kw, "no-delay")) { - int one = 1; - setsockopt(sock, SOL_TCP, TCP_NODELAY, &one, sizeof(one)); - } - }*/ - - /* Connect to socket */ int status = connect(sock, ai->ai_addr, ai->ai_addrlen); freeaddrinfo(ai); From 16202216b22468fcfc0161ef2f2026343ef89b74 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 5 Mar 2020 19:18:45 -0600 Subject: [PATCH 09/17] Address #291 When resuming a fiber with a child, the root fiber was set incorrectly. --- src/core/vm.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/vm.c b/src/core/vm.c index 770c07d1..99a79a04 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -1262,10 +1262,12 @@ JanetSignal janet_continue(JanetFiber *fiber, Janet in, Janet *out) { /* Continue child fiber if it exists */ if (fiber->child) { + if (janet_vm_root_fiber == NULL) janet_vm_root_fiber = fiber; JanetFiber *child = fiber->child; janet_vm_stackn++; JanetSignal sig = janet_continue(child, in, &in); janet_vm_stackn--; + if (janet_vm_root_fiber == fiber) janet_vm_root_fiber = NULL; if (sig != JANET_SIGNAL_OK && !(child->flags & (1 << sig))) { *out = in; return sig; From 4ac382e553be24b683c20179fd12cd8e0b9d5cfa Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Fri, 17 Apr 2020 16:27:02 -0500 Subject: [PATCH 10/17] Add alias JANET_SIGNAL_EVENT. --- auxbin/jpm | 2 +- src/boot/boot.janet | 2 +- src/core/net.c | 10 +++++----- src/core/run.c | 2 +- src/include/janet.h | 2 ++ 5 files changed, 10 insertions(+), 8 deletions(-) diff --git a/auxbin/jpm b/auxbin/jpm index 8ed0a099..ba7e404e 100755 --- a/auxbin/jpm +++ b/auxbin/jpm @@ -540,7 +540,7 @@ int main(int argc, const char **argv) { fiber->env = temptab; Janet out; JanetSignal result = janet_continue(fiber, janet_wrap_nil(), &out); - if (result != JANET_SIGNAL_OK && result < JANET_SIGNAL_USER0) { + if (result != JANET_SIGNAL_OK && result != JANET_SIGNAL_EVENT) { janet_stacktrace(fiber, out); janet_deinit(); return result; diff --git a/src/boot/boot.janet b/src/boot/boot.janet index c60b1794..9401f0e8 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -1948,7 +1948,7 @@ (default on-parse-error bad-parse) (default evaluator (fn evaluate [x &] (x))) (default where "") - (default guard :yed) + (default guard :ydt) # Are we done yet? (var going true) diff --git a/src/core/net.c b/src/core/net.c index 76b435cd..34395734 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -245,7 +245,7 @@ static size_t janet_loop_event(size_t index) { JanetFiber *fiberp = NULL; /* Launch connection fiber */ JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp); - if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) { + if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) { janet_stacktrace(fiberp, out); } } @@ -309,7 +309,7 @@ static size_t janet_loop_event(size_t index) { /* Resume the fiber */ Janet out; JanetSignal sig = janet_continue(jlfd->fiber, resumeval, &out); - if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) { + if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) { janet_stacktrace(jlfd->fiber, out); } } @@ -374,7 +374,7 @@ JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *b lfd.data.read_chunk.buf = buf; lfd.data.read_chunk.bytes_left = nbytes; janet_loop_schedule(lfd); - janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); + janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); } JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf) { @@ -385,7 +385,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB lfd.data.write_from_buffer.buf = buf; lfd.data.write_from_buffer.start = 0; janet_loop_schedule(lfd); - janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); + janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); } JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str) { @@ -396,7 +396,7 @@ JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, co lfd.data.write_from_stringlike.str = str; lfd.data.write_from_stringlike.start = 0; janet_loop_schedule(lfd); - janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); + janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); } /* Needs argc >= offset + 2 */ diff --git a/src/core/run.c b/src/core/run.c index 6f6019c0..039d1144 100644 --- a/src/core/run.c +++ b/src/core/run.c @@ -50,7 +50,7 @@ int janet_dobytes(JanetTable *env, const uint8_t *bytes, int32_t len, const char JanetFiber *fiber = janet_fiber(f, 64, 0, NULL); fiber->env = env; JanetSignal status = janet_continue(fiber, janet_wrap_nil(), &ret); - if (status != JANET_SIGNAL_OK && status < JANET_SIGNAL_USER0) { + if (status != JANET_SIGNAL_OK && status != JANET_SIGNAL_EVENT) { janet_stacktrace(fiber, ret); errflags |= 0x01; done = 1; diff --git a/src/include/janet.h b/src/include/janet.h index 8b6fd860..fc5552d9 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -294,6 +294,8 @@ typedef enum { JANET_SIGNAL_USER9 } JanetSignal; +#define JANET_SIGNAL_EVENT JANET_SIGNAL_USER9 + /* Fiber statuses - mostly corresponds to signals. */ typedef enum { JANET_STATUS_DEAD, From 2904c19ed923f98792bd6b0bd09d235e992cd9a6 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sat, 18 Apr 2020 12:12:27 -0500 Subject: [PATCH 11/17] Switch to poll from select. Simpler and more flexible interface, and also lets us use epoll more easily on linux, which is the most important plantform to optimize for network performance. --- meson.build | 1 + src/core/net.c | 83 ++++++++++++++++++++++++++++------------------- test/suite9.janet | 51 +++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 33 deletions(-) create mode 100644 test/suite9.janet diff --git a/meson.build b/meson.build index 14a5486c..a89c9fd2 100644 --- a/meson.build +++ b/meson.build @@ -222,6 +222,7 @@ test_files = [ 'test/suite6.janet', 'test/suite7.janet', 'test/suite8.janet' + 'test/suite9.janet' ] foreach t : test_files test(t, janet_nativeclient, args : files([t]), workdir : meson.current_source_dir()) diff --git a/src/core/net.c b/src/core/net.c index 34395734..b9ef9ec5 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -32,7 +32,7 @@ #include #include #include -#include +#include #include /* @@ -139,6 +139,7 @@ typedef struct { #define JANET_LOOPFD_MAX 1024 /* Global loop data */ +JANET_THREAD_LOCAL struct pollfd janet_vm_pollfds[JANET_LOOPFD_MAX]; JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX]; JANET_THREAD_LOCAL int janet_vm_loop_count; @@ -172,15 +173,25 @@ void janet_net_markloop(void) { } /* Add a loop fd to the global event loop */ -static int janet_loop_schedule(JanetLoopFD lfd) { +static int janet_loop_schedule(JanetLoopFD lfd, short events) { if (janet_vm_loop_count == JANET_LOOPFD_MAX) { return -1; } - int index = janet_vm_loop_count; - janet_vm_loopfds[janet_vm_loop_count++] = lfd; + int index = janet_vm_loop_count++; + janet_vm_loopfds[index] = lfd; + janet_vm_pollfds[index].fd = lfd.stream->fd; + janet_vm_pollfds[index].events = events; + janet_vm_pollfds[index].revents = 0; return index; } +/* Remove event from list */ +static void janet_loop_rmindex(int index) { + janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count]; + janet_vm_pollfds[index] = janet_vm_pollfds[janet_vm_loop_count]; +} + + /* Return delta in number of loop fds. Abstracted out so * we can separate out the polling logic */ static size_t janet_loop_event(size_t index) { @@ -315,37 +326,30 @@ static size_t janet_loop_event(size_t index) { } /* Remove this handler from the handler pool. */ - if (should_resume) { - janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count]; - } + if (should_resume) janet_loop_rmindex(index); return ret; } static void janet_loop1(void) { - /* Set up fd_sets */ - fd_set readfds; - fd_set writefds; - FD_ZERO(&readfds); - FD_ZERO(&writefds); - int fd_max = 0; - for (int i = 0; i < janet_vm_loop_count; i++) { - JanetLoopFD *jlfd = janet_vm_loopfds + i; - int fd = jlfd->stream->fd; - if (fd > fd_max) fd_max = fd; - fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds; - FD_SET(fd, set); - } - - /* Blocking call - we should add timeout functionality */ - select(fd_max + 1, &readfds, &writefds, NULL, NULL); - - /* Now handle all events */ + /* Remove closed file descriptors */ for (int i = 0; i < janet_vm_loop_count;) { - JanetLoopFD *jlfd = janet_vm_loopfds + i; - int fd = jlfd->stream->fd; - fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds; - if (FD_ISSET(fd, set)) { + if (janet_vm_loopfds[i].stream->flags & JANET_STREAM_CLOSED) { + janet_loop_rmindex(i); + } else { + i++; + } + } + /* Poll */ + if (janet_vm_loop_count == 0) return; + int ready; + do { + ready = poll(janet_vm_pollfds, janet_vm_loop_count, -1); + } while (ready == -1 && errno == EAGAIN); + if (ready == -1) return; + /* Handle events */ + for (int i = 0; i < janet_vm_loop_count;) { + if (janet_vm_pollfds[i].events & janet_vm_pollfds[i].revents) { size_t delta = janet_loop_event(i); i += delta; } else { @@ -373,7 +377,7 @@ JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *b lfd.event_type = (flags & JANET_SCHED_FSOME) ? JLE_READ_SOME : JLE_READ_CHUNK; lfd.data.read_chunk.buf = buf; lfd.data.read_chunk.bytes_left = nbytes; - janet_loop_schedule(lfd); + janet_loop_schedule(lfd, POLLIN); janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); } @@ -384,7 +388,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB lfd.event_type = JLE_WRITE_FROM_BUFFER; lfd.data.write_from_buffer.buf = buf; lfd.data.write_from_buffer.start = 0; - janet_loop_schedule(lfd); + janet_loop_schedule(lfd, POLLOUT); janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); } @@ -395,7 +399,7 @@ JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, co lfd.event_type = JLE_WRITE_FROM_STRINGLIKE; lfd.data.write_from_stringlike.str = str; lfd.data.write_from_stringlike.start = 0; - janet_loop_schedule(lfd); + janet_loop_schedule(lfd, POLLOUT); janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); } @@ -461,6 +465,19 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { for (rp = ai; rp != NULL; rp = rp->ai_next) { sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (sfd == -1) continue; + /* Set various socket options */ + int enable = 1; + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) { + close(sfd); + janet_panic("setsockopt(SO_REUSEADDR) failed"); + } +#ifdef SO_REUSEPORT + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(int)) < 0) { + close(sfd); + janet_panic("setsockopt(SO_REUSEPORT) failed"); + } +#endif + /* Bind */ if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) break; close(sfd); } @@ -487,7 +504,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { lfd.stream = make_stream(sfd, 0); lfd.event_type = JLE_READ_ACCEPT; lfd.data.read_accept.handler = fun; - janet_loop_schedule(lfd); + janet_loop_schedule(lfd, POLLIN); return janet_wrap_abstract(lfd.stream); } diff --git a/test/suite9.janet b/test/suite9.janet new file mode 100644 index 00000000..c87f952c --- /dev/null +++ b/test/suite9.janet @@ -0,0 +1,51 @@ +# Copyright (c) 2020 Calvin Rose & contributors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +(import ./helper :prefix "" :exit true) +(start-suite 9) + +# Net testing + +(defn handler + "Simple handler for connections." + [stream] + (defer (:close stream) + (def id (gensym)) + (def b @"") + (:read stream 1024 b) + (:write stream b) + (buffer/clear b))) + +(def s (net/server "127.0.0.1" "8000" handler)) +(assert s "made server 1") + +(defn test-echo [msg] + (with [conn (net/connect "127.0.0.1" "8000")] + (:write conn msg) + (def res (:read conn 1024)) + (assert (= (string res) msg) (string "echo " msg)))) + +(test-echo "hello") +(test-echo "world") +(test-echo (string/repeat "abcd" 200)) + +(:close s) + +(end-suite) From 0745c15d7b953312bdc2f5c3d82a1ad98d342563 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sat, 18 Apr 2020 15:31:46 -0500 Subject: [PATCH 12/17] Fix return value from shell.c --- src/mainclient/shell.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mainclient/shell.c b/src/mainclient/shell.c index 6b443c02..ecb7534e 100644 --- a/src/mainclient/shell.c +++ b/src/mainclient/shell.c @@ -1011,11 +1011,12 @@ int main(int argc, char **argv) { JanetFiber *fiber = janet_fiber(janet_unwrap_function(mainfun), 64, 1, mainargs); fiber->env = env; status = janet_continue(fiber, janet_wrap_nil(), &out); - if (status != JANET_SIGNAL_OK && status < JANET_SIGNAL_USER0) { + if (status != JANET_SIGNAL_OK && status != JANET_SIGNAL_EVENT) { janet_stacktrace(fiber, out); } #ifdef JANET_NET + status = JANET_SIGNAL_OK; janet_loop(); #endif From 4a693222b4618eec9c6fec74343bdf38473f3236 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sat, 18 Apr 2020 19:14:38 -0400 Subject: [PATCH 13/17] Port net code to windows. Use winsock2 and WSAPoll. Not the most high performance solution but should work well. --- src/core/features.h | 4 ++ src/core/net.c | 144 +++++++++++++++++++++++++++++++++++++++++--- src/core/util.h | 5 +- src/core/vm.c | 3 + 4 files changed, 145 insertions(+), 11 deletions(-) diff --git a/src/core/features.h b/src/core/features.h index 4604d195..896846b4 100644 --- a/src/core/features.h +++ b/src/core/features.h @@ -29,6 +29,10 @@ #define _POSIX_C_SOURCE 200112L #endif +#if defined(WIN32) || defined(_WIN32) +#define WIN32_LEAN_AND_MEAN +#endif + /* Needed for realpath on linux */ #if !defined(_XOPEN_SOURCE) && (defined(__linux__) || defined(__EMSCRIPTEN__)) #define _XOPEN_SOURCE 500 diff --git a/src/core/net.c b/src/core/net.c index b9ef9ec5..8e950c49 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -26,6 +26,14 @@ #include "util.h" #endif +#ifdef JANET_WINDOWS +#include +#include +#include +#pragma comment (lib, "Ws2_32.lib") +#pragma comment (lib, "Mswsock.lib") +#pragma comment (lib, "Advapi32.lib") +#else #include #include #include @@ -34,6 +42,7 @@ #include #include #include +#endif /* * Streams @@ -44,7 +53,11 @@ #define JANET_STREAM_WRITABLE 4 typedef struct { +#ifdef JANET_WINDOWS + SOCKET socket; +#else int fd; +#endif int flags; } JanetStream; @@ -65,11 +78,25 @@ static int janet_stream_close(void *p, size_t s) { JanetStream *stream = p; if (!(stream->flags & JANET_STREAM_CLOSED)) { stream->flags |= JANET_STREAM_CLOSED; +#ifdef JANET_WINDOWS + closesocket(stream->socket); +#else close(stream->fd); +#endif } return 0; } +#ifdef JANET_WINDOWS +static JanetStream *make_stream(SOCKET socket, int flags) { + u_long iMode = 0; + JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); + ioctlsocket(socket, FIONBIO, &iMode); + stream->socket = socket; + stream->flags = flags; + return stream; +} +#else static JanetStream *make_stream(int fd, int flags) { JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); @@ -77,6 +104,7 @@ static JanetStream *make_stream(int fd, int flags) { stream->flags = flags; return stream; } +#endif /* * Event loop @@ -139,7 +167,11 @@ typedef struct { #define JANET_LOOPFD_MAX 1024 /* Global loop data */ +#ifdef JANET_WINDOWS +JANET_THREAD_LOCAL WSAPOLLFD janet_vm_pollfds[JANET_LOOPFD_MAX]; +#else JANET_THREAD_LOCAL struct pollfd janet_vm_pollfds[JANET_LOOPFD_MAX]; +#endif JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX]; JANET_THREAD_LOCAL int janet_vm_loop_count; @@ -179,7 +211,11 @@ static int janet_loop_schedule(JanetLoopFD lfd, short events) { } int index = janet_vm_loop_count++; janet_vm_loopfds[index] = lfd; +#ifdef JANET_WINDOWS + janet_vm_pollfds[index].fd = lfd.stream->socket; +#else janet_vm_pollfds[index].fd = lfd.stream->fd; +#endif janet_vm_pollfds[index].events = events; janet_vm_pollfds[index].revents = 0; return index; @@ -197,7 +233,11 @@ static void janet_loop_rmindex(int index) { static size_t janet_loop_event(size_t index) { JanetLoopFD *jlfd = janet_vm_loopfds + index; JanetStream *stream = jlfd->stream; +#ifdef JANET_WINDOWS + SOCKET socket = stream->socket; +#else int fd = stream->fd; +#endif int ret = 1; int should_resume = 0; Janet resumeval = janet_wrap_nil(); @@ -211,20 +251,30 @@ static size_t janet_loop_event(size_t index) { JanetBuffer *buffer = jlfd->data.read_chunk.buf; int32_t bytes_left = jlfd->data.read_chunk.bytes_left; janet_buffer_extra(buffer, bytes_left); - ssize_t nread; - errno = 0; if (!(stream->flags & JANET_STREAM_READABLE)) { should_resume = 1; ret = 0; break; } +#ifdef JANET_WINDOWS + long nread; + do { + nread = recv(socket, buffer->data + buffer->count, bytes_left, 0); + } while (nread == -1 && WSAGetLastError() == WSAEINTR); + if (WSAGetLastError() == WSAEWOULDBLOCK) { + ret = 1; + break; + } +#else + ssize_t nread; do { nread = read(fd, buffer->data + buffer->count, bytes_left); - } while (errno == EINTR); + } while (nread == -1 && errno == EINTR); if (errno == EAGAIN || errno == EWOULDBLOCK) { ret = 1; break; } +#endif if (nread > 0) { buffer->count += nread; bytes_left -= nread; @@ -244,10 +294,15 @@ static size_t janet_loop_event(size_t index) { break; } case JLE_READ_ACCEPT: { +#ifdef JANET_WINDOWS + SOCKET connfd = accept(socket, NULL, NULL); + if (connfd != INVALID_SOCKET) { +#else char addr[256] = {0}; /* Just make sure it is large enough for largest address type */ socklen_t len = 0; int connfd = accept(fd, (void *) &addr, &len); if (connfd >= 0) { +#endif /* Made a new connection socket */ JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); Janet streamv = janet_wrap_abstract(stream); @@ -284,11 +339,17 @@ static size_t janet_loop_event(size_t index) { } if (start < len) { int32_t nbytes = len - start; +#ifdef JANET_WINDOWS + long nwrote; + do { + nwrote = send(socket, bytes + start, nbytes, 0); + } while (nwrote == -1 && WSAGetLastError() == WSAEINTR); +#else ssize_t nwrote; - errno = 0; do { nwrote = write(fd, bytes + start, nbytes); - } while (errno == EINTR); + } while (nwrote == -1 && errno == EINTR); +#endif if (nwrote > 0) { start += nwrote; } else { @@ -309,7 +370,6 @@ static size_t janet_loop_event(size_t index) { break; } case JLE_CONNECT: { - break; } } @@ -326,7 +386,7 @@ static size_t janet_loop_event(size_t index) { } /* Remove this handler from the handler pool. */ - if (should_resume) janet_loop_rmindex(index); + if (should_resume) janet_loop_rmindex((int) index); return ret; } @@ -343,15 +403,24 @@ static void janet_loop1(void) { /* Poll */ if (janet_vm_loop_count == 0) return; int ready; +#ifdef JANET_WINDOWS + do { + ready = WSAPoll(janet_vm_pollfds, janet_vm_loop_count, -1); + } while (ready == -1 && WSAGetLastError() == WSAEINTR); + if (ready == -1) return; +#else do { ready = poll(janet_vm_pollfds, janet_vm_loop_count, -1); } while (ready == -1 && errno == EAGAIN); if (ready == -1) return; +#endif /* Handle events */ for (int i = 0; i < janet_vm_loop_count;) { - if (janet_vm_pollfds[i].events & janet_vm_pollfds[i].revents) { + int revents = janet_vm_pollfds[i].revents; + janet_vm_pollfds[i].revents = 0; + if ((janet_vm_pollfds[i].events | POLLHUP | POLLER) & revents) { size_t delta = janet_loop_event(i); - i += delta; + i += (int) delta; } else { i++; } @@ -431,6 +500,22 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { struct addrinfo *ai = janet_get_addrinfo(argv, 0); +#ifdef JANET_WINDOWS + /* Create socket */ + SOCKET sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (sock == INVALID_SOCKET) { + freeaddrinfo(ai); + janet_panic("could not create socket"); + } + + /* Connect to socket */ + int status = connect(sock, ai->ai_addr, (int) ai->ai_addrlen); + freeaddrinfo(ai); + if (status == -1) { + closesocket(sock); + janet_panic("could not connect to socket"); + } +#else /* Create socket */ int sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); if (sock < 0) { @@ -445,6 +530,7 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { close(sock); janet_panic("could not connect to socket"); } +#endif /* Wrap socket in abstract type JanetStream */ JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); @@ -459,6 +545,36 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { struct addrinfo *ai = janet_get_addrinfo(argv, 0); +#ifdef JANET_WINDOWS + /* Check all addrinfos in a loop for the first that we can bind to. */ + SOCKET sfd = INVALID_SOCKET; + struct addrinfo *rp = NULL; + for (rp = ai; rp != NULL; rp = rp->ai_next) { + sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == INVALID_SOCKET) continue; + /* Set various socket options */ + int enable = 1; + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (char *) &enable, sizeof(int)) < 0) { + closesocket(sfd); + janet_panic("setsockopt(SO_REUSEADDR) failed"); + } + /* Bind */ + if (bind(sfd, rp->ai_addr, (int) rp->ai_addrlen) == 0) break; + closesocket(sfd); + } + if (NULL == rp) { + freeaddrinfo(ai); + janet_panic("could not bind to any sockets"); + } + + /* listen */ + int status = listen(sfd, 1024); + freeaddrinfo(ai); + if (status) { + closesocket(sfd); + janet_panic("could not listen on file descriptor"); + } +#else /* Check all addrinfos in a loop for the first that we can bind to. */ int sfd = 0; struct addrinfo *rp = NULL; @@ -498,6 +614,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { * Since a connection could be disconnected at any time, any read or write may fail. * We don't want to blow up the whole application. */ signal(SIGPIPE, SIG_IGN); +#endif /* Put sfd on our loop */ JanetLoopFD lfd = {0}; @@ -572,6 +689,15 @@ static const JanetReg net_cfuns[] = { void janet_lib_net(JanetTable *env) { janet_vm_loop_count = 0; +#ifdef JANET_WINDOWS + WSADATA wsaData; + janet_assert(!WSAStartup(MAKEWORD(2, 2), &wsaData), "could not start winsock"); +#endif janet_core_cfuns(env, NULL, net_cfuns); } +void janet_net_deinit(void) { +#ifdef JANET_WINDOWS + WSACleanup(); +#endif +} diff --git a/src/core/util.h b/src/core/util.h index eeafcc4b..98b71427 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -35,7 +35,7 @@ #ifndef janet_exit #include #define janet_exit(m) do { \ - printf("C runtime error at line %d in file %s: %s\n",\ + fprintf(stderr, "C runtime error at line %d in file %s: %s\n",\ __LINE__,\ __FILE__,\ (m));\ @@ -50,7 +50,7 @@ /* What to do when out of memory */ #ifndef JANET_OUT_OF_MEMORY #include -#define JANET_OUT_OF_MEMORY do { printf("janet out of memory\n"); exit(1); } while (0) +#define JANET_OUT_OF_MEMORY do { fprintf(stderr, "janet out of memory\n"); exit(1); } while (0) #endif /* Omit docstrings in some builds */ @@ -128,6 +128,7 @@ void janet_lib_thread(JanetTable *env); #endif #ifdef JANET_NET void janet_lib_net(JanetTable *env); +void janet_net_deinit(void); void janet_net_markloop(void); #endif diff --git a/src/core/vm.c b/src/core/vm.c index 6c9b228e..599f724a 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -1444,4 +1444,7 @@ void janet_deinit(void) { #ifdef JANET_THREADS janet_threads_deinit(); #endif +#ifdef JANET_NET + janet_net_deinit(); +#endif } From 0d3c6abee8a0fa4dd8748b20550f178637aec9d2 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sat, 18 Apr 2020 19:15:59 -0400 Subject: [PATCH 14/17] POLLER -> POLLERR --- src/core/net.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/net.c b/src/core/net.c index 8e950c49..7ab2cc48 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -418,7 +418,7 @@ static void janet_loop1(void) { for (int i = 0; i < janet_vm_loop_count;) { int revents = janet_vm_pollfds[i].revents; janet_vm_pollfds[i].revents = 0; - if ((janet_vm_pollfds[i].events | POLLHUP | POLLER) & revents) { + if ((janet_vm_pollfds[i].events | POLLHUP | POLLERR) & revents) { size_t delta = janet_loop_event(i); i += (int) delta; } else { From 60f8dd0bfcd3440612ac6c92f5ceb3e11309f963 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 19 Apr 2020 08:54:24 -0500 Subject: [PATCH 15/17] Renable :source argument to dofile. Allows for some more interesting usage of loaders. --- src/boot/boot.janet | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/boot/boot.janet b/src/boot/boot.janet index 9401f0e8..4020799c 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -2185,6 +2185,7 @@ [path &keys {:exit exit :env env + :source src :expander expander :evaluator evaluator}] (def f (if (= (type path) :core/file) @@ -2193,8 +2194,8 @@ (def path-is-file (= f path)) (default env (make-env)) (def spath (string path)) - (put env :current-file (if-not path-is-file spath)) - (put env :source (if-not path-is-file spath path)) + (put env :current-file (or src (if-not path-is-file spath))) + (put env :source (or src (if-not path-is-file spath path))) (defn chunks [buf _] (file/read f 2048 buf)) (defn bp [&opt x y] (def ret (bad-parse x y)) @@ -2217,7 +2218,7 @@ (if exit (os/exit 1) (eflush)))) :evaluator evaluator :expander expander - :source (if path-is-file "" spath)})) + :source (or src (if path-is-file "" spath))})) (if-not path-is-file (file/close f)) nenv) From 3e60e82529d9751dfc7c728de4ff55c440384beb Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 19 Apr 2020 09:35:14 -0500 Subject: [PATCH 16/17] Add circular dependency detection. This detection will not stop compilation, as errors in general do not stop compilation unless exit on error is passed inside an import, but should notify the user something is going on. --- src/boot/boot.janet | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/boot/boot.janet b/src/boot/boot.janet index 4020799c..81a389a1 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -2102,7 +2102,7 @@ [ext loader] (defn- find-prefix [pre] - (or (find-index |(string/has-prefix? pre ($ 0)) module/paths) 0)) + (or (find-index |(and (string? ($ 0)) (string/has-prefix? pre ($ 0))) module/paths) 0)) (array/insert module/paths 0 [(string ":cur:/:all:" ext) loader check-.]) (def all-index (find-prefix ":all:")) (array/insert module/paths all-index [(string ":all:" ext) loader not-check-.]) @@ -2243,12 +2243,14 @@ (unless fullpath (error mod-kind)) (if-let [check (in module/cache fullpath)] check - (do - (def loader (if (keyword? mod-kind) (module/loaders mod-kind) mod-kind)) - (unless loader (error (string "module type " mod-kind " unknown"))) - (def env (loader fullpath args)) - (put module/cache fullpath env) - env))) + (if-let [check2 (module/loading fullpath)] + (error (string "circular dependency " fullpath " detected")) + (do + (def loader (if (keyword? mod-kind) (module/loaders mod-kind) mod-kind)) + (unless loader (error (string "module type " mod-kind " unknown"))) + (def env (loader fullpath args)) + (put module/cache fullpath env) + env)))) (defn import* "Function form of import. Same parameters, but the path From e579d1d89f1251672aa27a3cbde375979aa9d42c Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Mon, 20 Apr 2020 18:31:14 -0500 Subject: [PATCH 17/17] Add jpm rule-tree. Useful for debugging jpm. This funtionality also maybe reused for for showing a dependency tree as well. --- auxbin/jpm | 33 +++++++++++++++++++++++++++++++++ jpm.1 | 6 ++++++ src/conf/janetconf.h | 4 ++++ 3 files changed, 43 insertions(+) diff --git a/auxbin/jpm b/auxbin/jpm index ba7e404e..6d6cde31 100755 --- a/auxbin/jpm +++ b/auxbin/jpm @@ -862,6 +862,25 @@ int main(int argc, const char **argv) { (add-body "install" (spit newname bat)))) +(def- tree-l " └─") +(def- tree-t " ├─") +(def- tree-i " │ ") +(def- tree-s " ") +(defn- print-rule-tree + "Show dependencies for a given rule recursively in a nice tree." + [root depth prefix prefix-part] + (printf "%s%s" prefix root) + (def rules (getrules)) + (when-let [[root-deps] (rules root)] + (def l (-> root-deps length dec)) + (when (pos? depth) + (eachp [i d] (sorted root-deps) + (def is-last (= i l)) + (print-rule-tree + d (dec depth) + (string prefix-part (if is-last tree-l tree-t)) + (string prefix-part (if is-last tree-s tree-i))))))) + (defn declare-archive "Build a janet archive. This is a file that bundles together many janet scripts into a janet image. This file can the be moved to any machine with @@ -976,6 +995,10 @@ Subcommands are: run rule : run a rule. Can also run custom rules added via (phony "task" [deps...] ...) or (rule "ouput.file" [deps...] ...). rules : list rules available with run. + rule-tree (root rule) (depth) : Print a nice tree to see what rules depend on other rules. + Optinally provide a root rule to start printing from, and a + max depth to print. Without these options, all rules will print + their full dependency tree. update-pkgs : Update the current package listing from the remote git repository selected. quickbin entry executable : Create an executable from a janet script with a main function. make-lockfile (lockfile) : Create a lockfile based on repositories in the cache. The @@ -1044,6 +1067,15 @@ Flags are: [] (local-rule "install-deps" true)) +(defn show-rule-tree + [&opt root depth] + (import-rules "./project.janet" true) + (def max-depth (if depth (scan-number depth) math/inf)) + (if root + (print-rule-tree root max-depth "" "") + (let [ks (sort (seq [k :keys (dyn :rules)] k))] + (each k ks (print-rule-tree k max-depth "" ""))))) + (defn list-rules [&opt ctx] (import-rules "./project.janet" true) @@ -1089,6 +1121,7 @@ Flags are: "help" help "deps" deps "repl" jpm-repl + "rule-tree" show-rule-tree "show-paths" show-paths "clear-cache" clear-cache "run" local-rule diff --git a/jpm.1 b/jpm.1 index 0a0c72e3..6836e684 100644 --- a/jpm.1 +++ b/jpm.1 @@ -143,6 +143,12 @@ like make. run will run a single rule or build a single file. .BR rules List all rules that can be run via run. This is useful for exploring rules in the project. +.TP +.BR rule-tree\ [\fBroot\fR] [\fdepth\fR] +Show rule dependency tree in a pretty format. Optionally provide a rule to use as the tree +root, as well as a max depth to print. By default, prints the full tree for all rules. This +can be quite long, so it is recommended to give a root rule. + .TP .BR show-paths Show all of the paths used when installing and building artifacts. diff --git a/src/conf/janetconf.h b/src/conf/janetconf.h index 119ca662..8d615064 100644 --- a/src/conf/janetconf.h +++ b/src/conf/janetconf.h @@ -45,6 +45,10 @@ /* #define JANET_NO_DOCSTRINGS */ /* #define JANET_NO_SOURCEMAPS */ /* #define JANET_REDUCED_OS */ +/* #define JANET_OS_NO_EXECUTE */ +/* #define JANET_OS_NO_TIME */ +/* #define JANET_OS_NO_FS */ +/* #define JANET_OS_NO_ENV */ /* Other settings */ /* #define JANET_NO_ASSEMBLER */