diff --git a/CHANGELOG.md b/CHANGELOG.md index 537be111..72efa55b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ All notable changes to this project will be documented in this file. ## Unreleased - ??? +- Add beta `net/` module to core for socket based networking. - Add the `parse` function to parse strings of source code more conveniently. - Add `jpm rule-tree` subcommand. - Add `--offline` flag to jpm to force use of the cache. diff --git a/Makefile b/Makefile index f53fc330..5fc16ae9 100644 --- a/Makefile +++ b/Makefile @@ -96,6 +96,7 @@ JANET_CORE_SOURCES=src/core/abstract.c \ src/core/io.c \ src/core/marsh.c \ src/core/math.c \ + src/core/net.c \ src/core/os.c \ src/core/parse.c \ src/core/peg.c \ diff --git a/examples/tcpclient.janet b/examples/tcpclient.janet new file mode 100644 index 00000000..b042da4a --- /dev/null +++ b/examples/tcpclient.janet @@ -0,0 +1,6 @@ +(with [conn (net/connect "127.0.0.1" "8000")] + (printf "Connected to %q!" conn) + (:write conn "Echo...") + (print "Wrote to connection...") + (def res (:read conn 1024)) + (pp res)) diff --git a/examples/tcpserver.janet b/examples/tcpserver.janet new file mode 100644 index 00000000..71501b8b --- /dev/null +++ b/examples/tcpserver.janet @@ -0,0 +1,13 @@ +(defn handler + "Simple handler for connections." + [stream] + (defer (:close stream) + (def id (gensym)) + (def b @"") + (print "Connection " id "!") + (while (:read stream 1024 b) + (:write stream b) + (buffer/clear b)) + (printf "Done %v!" id))) + +(net/server "127.0.0.1" "8000" handler) diff --git a/jpm b/jpm index a7a9a86c..492731f0 100755 --- a/jpm +++ b/jpm @@ -494,11 +494,14 @@ fiber->env = temptab; Janet out; JanetSignal result = janet_continue(fiber, janet_wrap_nil(), &out); - if (result) { - janet_stacktrace(fiber, out); - janet_deinit(); - return result; + if (result != JANET_SIGNAL_OK && result != JANET_SIGNAL_EVENT) { + janet_stacktrace(fiber, out); + janet_deinit(); + return result; } + #ifdef JANET_NET + janet_loop(); + #endif janet_deinit(); return 0; } diff --git a/meson.build b/meson.build index 3be88563..c1bde931 100644 --- a/meson.build +++ b/meson.build @@ -59,6 +59,7 @@ conf.set('JANET_NO_DOCSTRINGS', not get_option('docstrings')) conf.set('JANET_NO_SOURCEMAPS', not get_option('sourcemaps')) conf.set('JANET_NO_ASSEMBLER', not get_option('assembler')) conf.set('JANET_NO_PEG', not get_option('peg')) +conf.set('JANET_NO_NET', not get_option('net')) conf.set('JANET_REDUCED_OS', get_option('reduced_os')) conf.set('JANET_NO_TYPED_ARRAY', not get_option('typed_array')) conf.set('JANET_NO_INT_TYPES', not get_option('int_types')) @@ -112,6 +113,7 @@ core_src = [ 'src/core/io.c', 'src/core/marsh.c', 'src/core/math.c', + 'src/core/net.c', 'src/core/os.c', 'src/core/parse.c', 'src/core/peg.c', @@ -220,6 +222,7 @@ test_files = [ 'test/suite6.janet', 'test/suite7.janet', 'test/suite8.janet' + 'test/suite9.janet' ] foreach t : test_files test(t, janet_nativeclient, args : files([t]), workdir : meson.current_source_dir()) diff --git a/meson_options.txt b/meson_options.txt index 83bd25a6..1d0bd77c 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -11,6 +11,7 @@ option('peg', type : 'boolean', value : true) option('typed_array', type : 'boolean', value : true) option('int_types', type : 'boolean', value : true) option('prf', type : 'boolean', value : true) +option('net', type : 'boolean', value : true) option('recursion_guard', type : 'integer', min : 10, max : 8000, value : 1024) option('max_proto_depth', type : 'integer', min : 10, max : 8000, value : 200) diff --git a/src/boot/boot.janet b/src/boot/boot.janet index 0fb2e1b0..ae130370 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -1972,6 +1972,7 @@ (default on-parse-error bad-parse) (default evaluator (fn evaluate [x &] (x))) (default where "") + (default guard :ydt) # Are we done yet? (var going true) @@ -1998,7 +1999,7 @@ (string err " on line " line ", column " column) err)) (on-compile-error msg errf where)))) - (or guard :a))) + guard)) (fiber/setenv f env) (while (fiber/can-resume? f) (def res (resume f resumeval)) @@ -2792,6 +2793,7 @@ "src/core/io.c" "src/core/marsh.c" "src/core/math.c" + "src/core/net.c" "src/core/os.c" "src/core/parse.c" "src/core/peg.c" diff --git a/src/conf/janetconf.h b/src/conf/janetconf.h index 4f8a93fe..119ca662 100644 --- a/src/conf/janetconf.h +++ b/src/conf/janetconf.h @@ -49,6 +49,7 @@ /* Other settings */ /* #define JANET_NO_ASSEMBLER */ /* #define JANET_NO_PEG */ +/* #define JANET_NO_NET */ /* #define JANET_NO_TYPED_ARRAY */ /* #define JANET_NO_INT_TYPES */ /* #define JANET_NO_PRF */ diff --git a/src/core/corelib.c b/src/core/corelib.c index 3b09d345..0bbc7d2b 100644 --- a/src/core/corelib.c +++ b/src/core/corelib.c @@ -1004,6 +1004,9 @@ static void janet_load_libs(JanetTable *env) { #ifdef JANET_THREADS janet_lib_thread(env); #endif +#ifdef JANET_NET + janet_lib_net(env); +#endif } #ifdef JANET_BOOTSTRAP diff --git a/src/core/features.h b/src/core/features.h index 4604d195..896846b4 100644 --- a/src/core/features.h +++ b/src/core/features.h @@ -29,6 +29,10 @@ #define _POSIX_C_SOURCE 200112L #endif +#if defined(WIN32) || defined(_WIN32) +#define WIN32_LEAN_AND_MEAN +#endif + /* Needed for realpath on linux */ #if !defined(_XOPEN_SOURCE) && (defined(__linux__) || defined(__EMSCRIPTEN__)) #define _XOPEN_SOURCE 500 diff --git a/src/core/fiber.c b/src/core/fiber.c index cc1be7f2..f67ae0cf 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -405,6 +405,10 @@ JanetFiber *janet_current_fiber(void) { return janet_vm_fiber; } +JanetFiber *janet_root_fiber(void) { + return janet_vm_root_fiber; +} + /* CFuns */ static Janet cfun_fiber_getenv(int32_t argc, Janet *argv) { diff --git a/src/core/gc.c b/src/core/gc.c index e90ef04b..2c0bb73d 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -389,6 +389,9 @@ void janet_collect(void) { if (janet_vm_gc_suspend) return; depth = JANET_RECURSION_GUARD; orig_rootcount = janet_vm_root_count; +#ifdef JANET_NET + janet_net_markloop(); +#endif for (i = 0; i < orig_rootcount; i++) janet_mark(janet_vm_roots[i]); while (orig_rootcount < janet_vm_root_count) { diff --git a/src/core/net.c b/src/core/net.c new file mode 100644 index 00000000..19e76a82 --- /dev/null +++ b/src/core/net.c @@ -0,0 +1,703 @@ +/* +* Copyright (c) 2020 Calvin Rose +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to +* deal in the Software without restriction, including without limitation the +* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +* sell copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in +* all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +* IN THE SOFTWARE. +*/ + +#ifndef JANET_AMALG +#include "features.h" +#include +#include "util.h" +#endif + +#ifdef JANET_WINDOWS +#include +#include +#include +#pragma comment (lib, "Ws2_32.lib") +#pragma comment (lib, "Mswsock.lib") +#pragma comment (lib, "Advapi32.lib") +#else +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +/* + * Streams + */ + +#define JANET_STREAM_CLOSED 1 +#define JANET_STREAM_READABLE 2 +#define JANET_STREAM_WRITABLE 4 + +typedef struct { +#ifdef JANET_WINDOWS + SOCKET socket; +#else + int fd; +#endif + int flags; +} JanetStream; + +static int janet_stream_close(void *p, size_t s); + +static int janet_stream_getter(void *p, Janet key, Janet *out); + +static const JanetAbstractType StreamAT = { + "core/stream", + janet_stream_close, + NULL, + janet_stream_getter, + JANET_ATEND_GET +}; + +static int janet_stream_close(void *p, size_t s) { + (void) s; + JanetStream *stream = p; + if (!(stream->flags & JANET_STREAM_CLOSED)) { + stream->flags |= JANET_STREAM_CLOSED; +#ifdef JANET_WINDOWS + closesocket(stream->socket); +#else + close(stream->fd); +#endif + } + return 0; +} + +#ifdef JANET_WINDOWS +static JanetStream *make_stream(SOCKET socket, int flags) { + u_long iMode = 0; + JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); + ioctlsocket(socket, FIONBIO, &iMode); + stream->socket = socket; + stream->flags = flags; + return stream; +} +#else +static JanetStream *make_stream(int fd, int flags) { + JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); + stream->fd = fd; + stream->flags = flags; + return stream; +} +#endif + +/* + * Event loop + */ + +/* This large struct describes a waiting file descriptor, as well + * as what to do when we get an event for it. It is a variant type, where + * each variant implements a simple state machine. */ +typedef struct { + + /* File descriptor to listen for events on. */ + JanetStream *stream; + + /* Fiber to resume when event finishes. Can be NULL, in which case, + * no fiber is resumed when event completes. */ + JanetFiber *fiber; + + /* What kind of event we are listening for. + * As more IO functionality get's added, we can + * expand this. */ + enum { + JLE_READ_CHUNK, + JLE_READ_SOME, + JLE_READ_ACCEPT, + JLE_CONNECT, + JLE_WRITE_FROM_BUFFER, + JLE_WRITE_FROM_STRINGLIKE + } event_type; + + /* Each variant can have a different payload. */ + union { + + /* JLE_READ_CHUNK/JLE_READ_SOME */ + struct { + int32_t bytes_left; + JanetBuffer *buf; + } read_chunk; + + /* JLE_READ_ACCEPT */ + struct { + JanetFunction *handler; + } read_accept; + + /* JLE_WRITE_FROM_BUFFER */ + struct { + JanetBuffer *buf; + int32_t start; + } write_from_buffer; + + /* JLE_WRITE_FROM_STRINGLIKE */ + struct { + const uint8_t *str; + int32_t start; + } write_from_stringlike; + + } data; + +} JanetLoopFD; + +#define JANET_LOOPFD_MAX 1024 + +/* Global loop data */ +#ifdef JANET_WINDOWS +JANET_THREAD_LOCAL WSAPOLLFD janet_vm_pollfds[JANET_LOOPFD_MAX]; +#else +JANET_THREAD_LOCAL struct pollfd janet_vm_pollfds[JANET_LOOPFD_MAX]; +#endif +JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX]; +JANET_THREAD_LOCAL int janet_vm_loop_count; + +/* We could also add/remove gc roots. This is easier for now. */ +void janet_net_markloop(void) { + for (int i = 0; i < janet_vm_loop_count; i++) { + JanetLoopFD lfd = janet_vm_loopfds[i]; + if (lfd.fiber != NULL) { + janet_mark(janet_wrap_fiber(lfd.fiber)); + } + janet_mark(janet_wrap_abstract(lfd.stream)); + switch (lfd.event_type) { + default: + break; + case JLE_READ_CHUNK: + case JLE_READ_SOME: + janet_mark(janet_wrap_buffer(lfd.data.read_chunk.buf)); + break; + case JLE_READ_ACCEPT: + janet_mark(janet_wrap_function(lfd.data.read_accept.handler)); + break; + case JLE_CONNECT: + break; + case JLE_WRITE_FROM_BUFFER: + janet_mark(janet_wrap_buffer(lfd.data.write_from_buffer.buf)); + break; + case JLE_WRITE_FROM_STRINGLIKE: + janet_mark(janet_wrap_string(lfd.data.write_from_stringlike.str)); + } + } +} + +/* Add a loop fd to the global event loop */ +static int janet_loop_schedule(JanetLoopFD lfd, short events) { + if (janet_vm_loop_count == JANET_LOOPFD_MAX) { + return -1; + } + int index = janet_vm_loop_count++; + janet_vm_loopfds[index] = lfd; +#ifdef JANET_WINDOWS + janet_vm_pollfds[index].fd = lfd.stream->socket; +#else + janet_vm_pollfds[index].fd = lfd.stream->fd; +#endif + janet_vm_pollfds[index].events = events; + janet_vm_pollfds[index].revents = 0; + return index; +} + +/* Remove event from list */ +static void janet_loop_rmindex(int index) { + janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count]; + janet_vm_pollfds[index] = janet_vm_pollfds[janet_vm_loop_count]; +} + + +/* Return delta in number of loop fds. Abstracted out so + * we can separate out the polling logic */ +static size_t janet_loop_event(size_t index) { + JanetLoopFD *jlfd = janet_vm_loopfds + index; + JanetStream *stream = jlfd->stream; +#ifdef JANET_WINDOWS + SOCKET socket = stream->socket; +#else + int fd = stream->fd; +#endif + int ret = 1; + int should_resume = 0; + Janet resumeval = janet_wrap_nil(); + if (stream->flags & JANET_STREAM_CLOSED) { + should_resume = 1; + ret = 0; + } else { + switch (jlfd->event_type) { + case JLE_READ_CHUNK: + case JLE_READ_SOME: { + JanetBuffer *buffer = jlfd->data.read_chunk.buf; + int32_t bytes_left = jlfd->data.read_chunk.bytes_left; + janet_buffer_extra(buffer, bytes_left); + if (!(stream->flags & JANET_STREAM_READABLE)) { + should_resume = 1; + ret = 0; + break; + } +#ifdef JANET_WINDOWS + long nread; + do { + nread = recv(socket, buffer->data + buffer->count, bytes_left, 0); + } while (nread == -1 && WSAGetLastError() == WSAEINTR); + if (WSAGetLastError() == WSAEWOULDBLOCK) { + ret = 1; + break; + } +#else + ssize_t nread; + do { + nread = read(fd, buffer->data + buffer->count, bytes_left); + } while (nread == -1 && errno == EINTR); + if (errno == EAGAIN || errno == EWOULDBLOCK) { + ret = 1; + break; + } +#endif + if (nread > 0) { + buffer->count += nread; + bytes_left -= nread; + } else { + bytes_left = 0; + } + if (jlfd->event_type == JLE_READ_SOME || bytes_left == 0) { + should_resume = 1; + if (nread > 0) { + resumeval = janet_wrap_buffer(buffer); + } + ret = 0; + } else { + jlfd->data.read_chunk.bytes_left = bytes_left; + ret = 1; + } + break; + } + case JLE_READ_ACCEPT: { +#ifdef JANET_WINDOWS + SOCKET connfd = accept(socket, NULL, NULL); + if (connfd != INVALID_SOCKET) { +#else + char addr[256] = {0}; /* Just make sure it is large enough for largest address type */ + socklen_t len = 0; + int connfd = accept(fd, (void *) &addr, &len); + if (connfd >= 0) { +#endif + /* Made a new connection socket */ + JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); + Janet streamv = janet_wrap_abstract(stream); + JanetFunction *handler = jlfd->data.read_accept.handler; + Janet out; + JanetFiber *fiberp = NULL; + /* Launch connection fiber */ + JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp); + if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) { + janet_stacktrace(fiberp, out); + } + } + ret = JANET_LOOPFD_MAX; + break; + } + case JLE_WRITE_FROM_BUFFER: + case JLE_WRITE_FROM_STRINGLIKE: { + int32_t start, len; + const uint8_t *bytes; + if (!(stream->flags & JANET_STREAM_WRITABLE)) { + should_resume = 1; + ret = 0; + break; + } + if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { + JanetBuffer *buffer = jlfd->data.write_from_buffer.buf; + bytes = buffer->data; + len = buffer->count; + start = jlfd->data.write_from_buffer.start; + } else { + bytes = jlfd->data.write_from_stringlike.str; + len = janet_string_length(bytes); + start = jlfd->data.write_from_stringlike.start; + } + if (start < len) { + int32_t nbytes = len - start; +#ifdef JANET_WINDOWS + long nwrote; + do { + nwrote = send(socket, bytes + start, nbytes, 0); + } while (nwrote == -1 && WSAGetLastError() == WSAEINTR); +#else + ssize_t nwrote; + do { + nwrote = write(fd, bytes + start, nbytes); + } while (nwrote == -1 && errno == EINTR); +#endif + if (nwrote > 0) { + start += nwrote; + } else { + start = len; + } + } + if (start >= len) { + should_resume = 1; + ret = 0; + } else { + if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { + jlfd->data.write_from_buffer.start = start; + } else { + jlfd->data.write_from_stringlike.start = start; + } + ret = 1; + } + break; + } + case JLE_CONNECT: { + break; + } + } + } + + /* Resume a fiber for some events */ + if (NULL != jlfd->fiber && should_resume) { + /* Resume the fiber */ + Janet out; + JanetSignal sig = janet_continue(jlfd->fiber, resumeval, &out); + if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) { + janet_stacktrace(jlfd->fiber, out); + } + } + + /* Remove this handler from the handler pool. */ + if (should_resume) janet_loop_rmindex((int) index); + + return ret; +} + +static void janet_loop1(void) { + /* Remove closed file descriptors */ + for (int i = 0; i < janet_vm_loop_count;) { + if (janet_vm_loopfds[i].stream->flags & JANET_STREAM_CLOSED) { + janet_loop_rmindex(i); + } else { + i++; + } + } + /* Poll */ + if (janet_vm_loop_count == 0) return; + int ready; +#ifdef JANET_WINDOWS + do { + ready = WSAPoll(janet_vm_pollfds, janet_vm_loop_count, -1); + } while (ready == -1 && WSAGetLastError() == WSAEINTR); + if (ready == -1) return; +#else + do { + ready = poll(janet_vm_pollfds, janet_vm_loop_count, -1); + } while (ready == -1 && errno == EAGAIN); + if (ready == -1) return; +#endif + /* Handle events */ + for (int i = 0; i < janet_vm_loop_count;) { + int revents = janet_vm_pollfds[i].revents; + janet_vm_pollfds[i].revents = 0; + if ((janet_vm_pollfds[i].events | POLLHUP | POLLERR) & revents) { + size_t delta = janet_loop_event(i); + i += (int) delta; + } else { + i++; + } + } +} + +void janet_loop(void) { + while (janet_vm_loop_count) { + janet_loop1(); + } +} + +/* + * Scheduling Helpers + */ + +#define JANET_SCHED_FSOME 1 + +JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { + JanetLoopFD lfd = {0}; + lfd.stream = stream; + lfd.fiber = janet_root_fiber(); + lfd.event_type = (flags & JANET_SCHED_FSOME) ? JLE_READ_SOME : JLE_READ_CHUNK; + lfd.data.read_chunk.buf = buf; + lfd.data.read_chunk.bytes_left = nbytes; + janet_loop_schedule(lfd, POLLIN); + janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); +} + +JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf) { + JanetLoopFD lfd = {0}; + lfd.stream = stream; + lfd.fiber = janet_root_fiber(); + lfd.event_type = JLE_WRITE_FROM_BUFFER; + lfd.data.write_from_buffer.buf = buf; + lfd.data.write_from_buffer.start = 0; + janet_loop_schedule(lfd, POLLOUT); + janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); +} + +JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str) { + JanetLoopFD lfd = {0}; + lfd.stream = stream; + lfd.fiber = janet_root_fiber(); + lfd.event_type = JLE_WRITE_FROM_STRINGLIKE; + lfd.data.write_from_stringlike.str = str; + lfd.data.write_from_stringlike.start = 0; + janet_loop_schedule(lfd, POLLOUT); + janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); +} + +/* Needs argc >= offset + 2 */ +static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset) { + /* Get host and port */ + const char *host = janet_getcstring(argv, offset); + const char *port = janet_getcstring(argv, offset + 1); + /* getaddrinfo */ + struct addrinfo *ai = NULL; + struct addrinfo hints = {0}; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_PASSIVE; + int status = getaddrinfo(host, port, &hints, &ai); + if (status) { + janet_panicf("could not get address info: %s", gai_strerror(status)); + } + return ai; +} + +/* + * C Funs + */ + +static Janet cfun_net_connect(int32_t argc, Janet *argv) { + janet_fixarity(argc, 2); + + struct addrinfo *ai = janet_get_addrinfo(argv, 0); + +#ifdef JANET_WINDOWS + /* Create socket */ + SOCKET sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (sock == INVALID_SOCKET) { + freeaddrinfo(ai); + janet_panic("could not create socket"); + } + + /* Connect to socket */ + int status = connect(sock, ai->ai_addr, (int) ai->ai_addrlen); + freeaddrinfo(ai); + if (status == -1) { + closesocket(sock); + janet_panic("could not connect to socket"); + } +#else + /* Create socket */ + int sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (sock < 0) { + freeaddrinfo(ai); + janet_panic("could not create socket"); + } + + /* Connect to socket */ + int status = connect(sock, ai->ai_addr, ai->ai_addrlen); + freeaddrinfo(ai); + if (status < 0) { + close(sock); + janet_panic("could not connect to socket"); + } +#endif + + /* Wrap socket in abstract type JanetStream */ + JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); + return janet_wrap_abstract(stream); +} + +static Janet cfun_net_server(int32_t argc, Janet *argv) { + janet_fixarity(argc, 3); + + /* Get host, port, and handler*/ + JanetFunction *fun = janet_getfunction(argv, 2); + + struct addrinfo *ai = janet_get_addrinfo(argv, 0); + +#ifdef JANET_WINDOWS + /* Check all addrinfos in a loop for the first that we can bind to. */ + SOCKET sfd = INVALID_SOCKET; + struct addrinfo *rp = NULL; + for (rp = ai; rp != NULL; rp = rp->ai_next) { + sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == INVALID_SOCKET) continue; + /* Set various socket options */ + int enable = 1; + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (char *) &enable, sizeof(int)) < 0) { + closesocket(sfd); + janet_panic("setsockopt(SO_REUSEADDR) failed"); + } + /* Bind */ + if (bind(sfd, rp->ai_addr, (int) rp->ai_addrlen) == 0) break; + closesocket(sfd); + } + if (NULL == rp) { + freeaddrinfo(ai); + janet_panic("could not bind to any sockets"); + } + + /* listen */ + int status = listen(sfd, 1024); + freeaddrinfo(ai); + if (status) { + closesocket(sfd); + janet_panic("could not listen on file descriptor"); + } +#else + /* Check all addrinfos in a loop for the first that we can bind to. */ + int sfd = 0; + struct addrinfo *rp = NULL; + for (rp = ai; rp != NULL; rp = rp->ai_next) { + sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) continue; + /* Set various socket options */ + int enable = 1; + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) { + close(sfd); + janet_panic("setsockopt(SO_REUSEADDR) failed"); + } +#ifdef SO_REUSEPORT + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(int)) < 0) { + close(sfd); + janet_panic("setsockopt(SO_REUSEPORT) failed"); + } +#endif + /* Bind */ + if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) break; + close(sfd); + } + if (NULL == rp) { + freeaddrinfo(ai); + janet_panic("could not bind to any sockets"); + } + + /* listen */ + int status = listen(sfd, 1024); + freeaddrinfo(ai); + if (status) { + close(sfd); + janet_panic("could not listen on file descriptor"); + } + + /* We need to ignore sigpipe when reading and writing to our connection socket. + * Since a connection could be disconnected at any time, any read or write may fail. + * We don't want to blow up the whole application. */ + signal(SIGPIPE, SIG_IGN); +#endif + + /* Put sfd on our loop */ + JanetLoopFD lfd = {0}; + lfd.stream = make_stream(sfd, 0); + lfd.event_type = JLE_READ_ACCEPT; + lfd.data.read_accept.handler = fun; + janet_loop_schedule(lfd, POLLIN); + + return janet_wrap_abstract(lfd.stream); +} + +static Janet cfun_stream_read(int32_t argc, Janet *argv) { + janet_arity(argc, 2, 3); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + int32_t n = janet_getnat(argv, 1); + JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); + janet_sched_read(stream, buffer, n, JANET_SCHED_FSOME); +} + +static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { + janet_arity(argc, 2, 3); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + int32_t n = janet_getnat(argv, 1); + JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); + janet_sched_read(stream, buffer, n, 0); +} + +static Janet cfun_stream_close(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + janet_stream_close(stream, 0); + return janet_wrap_nil(); +} + +static Janet cfun_stream_write(int32_t argc, Janet *argv) { + janet_fixarity(argc, 2); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + if (janet_checktype(argv[1], JANET_BUFFER)) { + janet_sched_write_buffer(stream, janet_getbuffer(argv, 1)); + } else { + JanetByteView bytes = janet_getbytes(argv, 1); + janet_sched_write_stringlike(stream, bytes.bytes); + } +} + +static const JanetMethod stream_methods[] = { + {"chunk", cfun_stream_chunk}, + {"close", cfun_stream_close}, + {"read", cfun_stream_read}, + {"write", cfun_stream_write}, + {NULL, NULL} +}; + +static int janet_stream_getter(void *p, Janet key, Janet *out) { + (void) p; + if (!janet_checktype(key, JANET_KEYWORD)) return 0; + return janet_getmethod(janet_unwrap_keyword(key), stream_methods, out); +} + +static const JanetReg net_cfuns[] = { + { + "net/server", cfun_net_server, + JDOC("(net/server host port)\n\nStart a TCP server.") + }, + {"net/read", cfun_stream_read, NULL}, + {"net/chunk", cfun_stream_chunk, NULL}, + {"net/write", cfun_stream_write, NULL}, + {"net/close", cfun_stream_close, NULL}, + {"net/connect", cfun_net_connect, NULL}, + {NULL, NULL, NULL} +}; + +void janet_lib_net(JanetTable *env) { + janet_vm_loop_count = 0; +#ifdef JANET_WINDOWS + WSADATA wsaData; + janet_assert(!WSAStartup(MAKEWORD(2, 2), &wsaData), "could not start winsock"); +#endif + janet_core_cfuns(env, NULL, net_cfuns); +} + +void janet_net_deinit(void) { +#ifdef JANET_WINDOWS + WSACleanup(); +#endif +} diff --git a/src/core/run.c b/src/core/run.c index ada14555..039d1144 100644 --- a/src/core/run.c +++ b/src/core/run.c @@ -50,7 +50,7 @@ int janet_dobytes(JanetTable *env, const uint8_t *bytes, int32_t len, const char JanetFiber *fiber = janet_fiber(f, 64, 0, NULL); fiber->env = env; JanetSignal status = janet_continue(fiber, janet_wrap_nil(), &ret); - if (status != JANET_SIGNAL_OK) { + if (status != JANET_SIGNAL_OK && status != JANET_SIGNAL_EVENT) { janet_stacktrace(fiber, ret); errflags |= 0x01; done = 1; diff --git a/src/core/state.h b/src/core/state.h index 649b3785..43bf8200 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -43,6 +43,7 @@ extern JANET_THREAD_LOCAL int janet_vm_stackn; /* The current running fiber on the current thread. * Set and unset by janet_run. */ extern JANET_THREAD_LOCAL JanetFiber *janet_vm_fiber; +extern JANET_THREAD_LOCAL JanetFiber *janet_vm_root_fiber; /* The current pointer to the inner most jmp_buf. The current * return point for panics. */ diff --git a/src/core/thread.c b/src/core/thread.c index 68e556d5..a6c552bd 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -465,11 +465,15 @@ static int thread_worker(JanetMailboxPair *pair) { Janet argv[1] = { parentv }; fiber = janet_fiber(func, 64, 1, argv); JanetSignal sig = janet_continue(fiber, janet_wrap_nil(), &out); - if (sig != JANET_SIGNAL_OK) { + if (sig != JANET_SIGNAL_OK && sig < JANET_SIGNAL_USER0) { janet_eprintf("in thread %v: ", janet_wrap_abstract(janet_make_thread(pair->newbox, encode))); janet_stacktrace(fiber, out); } +#ifdef JANET_NET + janet_loop(); +#endif + /* Normal exit */ destroy_mailbox_pair(pair); janet_deinit(); diff --git a/src/core/util.h b/src/core/util.h index 5a3f5568..98b71427 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -35,7 +35,7 @@ #ifndef janet_exit #include #define janet_exit(m) do { \ - printf("C runtime error at line %d in file %s: %s\n",\ + fprintf(stderr, "C runtime error at line %d in file %s: %s\n",\ __LINE__,\ __FILE__,\ (m));\ @@ -50,7 +50,7 @@ /* What to do when out of memory */ #ifndef JANET_OUT_OF_MEMORY #include -#define JANET_OUT_OF_MEMORY do { printf("janet out of memory\n"); exit(1); } while (0) +#define JANET_OUT_OF_MEMORY do { fprintf(stderr, "janet out of memory\n"); exit(1); } while (0) #endif /* Omit docstrings in some builds */ @@ -126,5 +126,10 @@ void janet_lib_inttypes(JanetTable *env); #ifdef JANET_THREADS void janet_lib_thread(JanetTable *env); #endif +#ifdef JANET_NET +void janet_lib_net(JanetTable *env); +void janet_net_deinit(void); +void janet_net_markloop(void); +#endif #endif diff --git a/src/core/vm.c b/src/core/vm.c index b4e2c557..bb87b62a 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -38,6 +38,7 @@ JANET_THREAD_LOCAL JanetTable *janet_vm_registry; JANET_THREAD_LOCAL JanetTable *janet_vm_abstract_registry; JANET_THREAD_LOCAL int janet_vm_stackn = 0; JANET_THREAD_LOCAL JanetFiber *janet_vm_fiber = NULL; +JANET_THREAD_LOCAL JanetFiber *janet_vm_root_fiber = NULL; JANET_THREAD_LOCAL Janet *janet_vm_return_reg = NULL; JANET_THREAD_LOCAL jmp_buf *janet_vm_jmp_buf = NULL; @@ -1244,7 +1245,9 @@ Janet janet_call(JanetFunction *fun, int32_t argc, const Janet *argv) { janet_vm_stackn = oldn; janet_gcunlock(handle); - if (signal != JANET_SIGNAL_OK) janet_panicv(*janet_vm_return_reg); + if (signal != JANET_SIGNAL_OK) { + janet_panicv(*janet_vm_return_reg); + } return *janet_vm_return_reg; } @@ -1276,10 +1279,12 @@ static JanetSignal janet_continue_no_check(JanetFiber *fiber, Janet in, Janet *o /* Continue child fiber if it exists */ if (fiber->child) { + if (janet_vm_root_fiber == NULL) janet_vm_root_fiber = fiber; JanetFiber *child = fiber->child; janet_vm_stackn++; JanetSignal sig = janet_continue(child, in, &in); janet_vm_stackn--; + if (janet_vm_root_fiber == fiber) janet_vm_root_fiber = NULL; if (sig != JANET_SIGNAL_OK && !(child->flags & (1 << sig))) { *out = in; return sig; @@ -1308,6 +1313,7 @@ static JanetSignal janet_continue_no_check(JanetFiber *fiber, Janet in, Janet *o Janet *old_vm_return_reg = janet_vm_return_reg; /* Setup fiber */ + if (janet_vm_root_fiber == NULL) janet_vm_root_fiber = fiber; janet_vm_fiber = fiber; janet_gcroot(janet_wrap_fiber(fiber)); janet_fiber_set_status(fiber, JANET_STATUS_ALIVE); @@ -1333,6 +1339,7 @@ static JanetSignal janet_continue_no_check(JanetFiber *fiber, Janet in, Janet *o janet_gcunroot(janet_wrap_fiber(fiber)); /* Restore global state */ + if (janet_vm_root_fiber == fiber) janet_vm_root_fiber = NULL; janet_vm_gc_suspend = handle; janet_vm_fiber = old_vm_fiber; janet_vm_stackn = oldn; @@ -1414,6 +1421,10 @@ int janet_init(void) { janet_vm_core_env = NULL; /* Seed RNG */ janet_rng_seed(janet_default_rng(), 0); + /* Fibers */ + janet_vm_fiber = NULL; + janet_vm_root_fiber = NULL; + janet_vm_stackn = 0; /* Threads */ #ifdef JANET_THREADS janet_threads_init(); @@ -1433,7 +1444,12 @@ void janet_deinit(void) { janet_vm_abstract_registry = NULL; janet_vm_core_env = NULL; free(janet_vm_traversal_base); + janet_vm_fiber = NULL; + janet_vm_root_fiber = NULL; #ifdef JANET_THREADS janet_threads_deinit(); #endif +#ifdef JANET_NET + janet_net_deinit(); +#endif } diff --git a/src/include/janet.h b/src/include/janet.h index f832db8f..46f4c8c2 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -165,6 +165,11 @@ extern "C" { #define JANET_TYPED_ARRAY #endif +/* Enable or disable networking */ +#ifndef JANET_NO_NET +#define JANET_NET +#endif + /* Enable or disable large int types (for now 64 bit, maybe 128 / 256 bit integer types) */ #ifndef JANET_NO_INT_TYPES #define JANET_INT_TYPES @@ -295,6 +300,8 @@ typedef enum { JANET_SIGNAL_USER9 } JanetSignal; +#define JANET_SIGNAL_EVENT JANET_SIGNAL_USER9 + /* Fiber statuses - mostly corresponds to signals. */ typedef enum { JANET_STATUS_DEAD, @@ -1115,6 +1122,11 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT]; /***** START SECTION MAIN *****/ +/* Event Loop */ +#ifdef JANET_NET +JANET_API void janet_loop(void); +#endif + /* Parsing */ extern JANET_API const JanetAbstractType janet_parser_type; JANET_API void janet_parser_init(JanetParser *parser); @@ -1290,6 +1302,7 @@ JANET_API JanetFiber *janet_fiber(JanetFunction *callee, int32_t capacity, int32 JANET_API JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t argc, const Janet *argv); JANET_API JanetFiberStatus janet_fiber_status(JanetFiber *fiber); JANET_API JanetFiber *janet_current_fiber(void); +JANET_API JanetFiber *janet_root_fiber(void); /* Treat similar types through uniform interfaces for iteration */ JANET_API int janet_indexed_view(Janet seq, const Janet **data, int32_t *len); diff --git a/src/mainclient/shell.c b/src/mainclient/shell.c index 31befdac..ecb7534e 100644 --- a/src/mainclient/shell.c +++ b/src/mainclient/shell.c @@ -1011,10 +1011,15 @@ int main(int argc, char **argv) { JanetFiber *fiber = janet_fiber(janet_unwrap_function(mainfun), 64, 1, mainargs); fiber->env = env; status = janet_continue(fiber, janet_wrap_nil(), &out); - if (status != JANET_SIGNAL_OK) { + if (status != JANET_SIGNAL_OK && status != JANET_SIGNAL_EVENT) { janet_stacktrace(fiber, out); } +#ifdef JANET_NET + status = JANET_SIGNAL_OK; + janet_loop(); +#endif + /* Deinitialize vm */ janet_deinit(); janet_line_deinit(); diff --git a/test/suite9.janet b/test/suite9.janet new file mode 100644 index 00000000..c87f952c --- /dev/null +++ b/test/suite9.janet @@ -0,0 +1,51 @@ +# Copyright (c) 2020 Calvin Rose & contributors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +(import ./helper :prefix "" :exit true) +(start-suite 9) + +# Net testing + +(defn handler + "Simple handler for connections." + [stream] + (defer (:close stream) + (def id (gensym)) + (def b @"") + (:read stream 1024 b) + (:write stream b) + (buffer/clear b))) + +(def s (net/server "127.0.0.1" "8000" handler)) +(assert s "made server 1") + +(defn test-echo [msg] + (with [conn (net/connect "127.0.0.1" "8000")] + (:write conn msg) + (def res (:read conn 1024)) + (assert (= (string res) msg) (string "echo " msg)))) + +(test-echo "hello") +(test-echo "world") +(test-echo (string/repeat "abcd" 200)) + +(:close s) + +(end-suite)