diff --git a/Makefile b/Makefile index c6cb3d80..d57bb7bf 100644 --- a/Makefile +++ b/Makefile @@ -90,6 +90,7 @@ JANET_CORE_SOURCES=src/core/abstract.c \ src/core/corelib.c \ src/core/debug.c \ src/core/emit.c \ + src/core/ev.c \ src/core/fiber.c \ src/core/gc.c \ src/core/inttypes.c \ diff --git a/meson.build b/meson.build index ee68cb56..2324e440 100644 --- a/meson.build +++ b/meson.build @@ -60,6 +60,7 @@ conf.set('JANET_NO_SOURCEMAPS', not get_option('sourcemaps')) conf.set('JANET_NO_ASSEMBLER', not get_option('assembler')) conf.set('JANET_NO_PEG', not get_option('peg')) conf.set('JANET_NO_NET', not get_option('net')) +conf.set('JANET_NO_EV', not get_option('ev')) conf.set('JANET_REDUCED_OS', get_option('reduced_os')) conf.set('JANET_NO_TYPED_ARRAY', not get_option('typed_array')) conf.set('JANET_NO_INT_TYPES', not get_option('int_types')) @@ -110,6 +111,7 @@ core_src = [ 'src/core/corelib.c', 'src/core/debug.c', 'src/core/emit.c', + 'src/core/ev.c', 'src/core/fiber.c', 'src/core/gc.c', 'src/core/inttypes.c', diff --git a/meson_options.txt b/meson_options.txt index bfc8925f..6a6503cb 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -12,6 +12,7 @@ option('typed_array', type : 'boolean', value : true) option('int_types', type : 'boolean', value : true) option('prf', type : 'boolean', value : true) option('net', type : 'boolean', value : true) +option('ev', type : 'boolean', value : true) option('processes', type : 'boolean', value : true) option('umask', type : 'boolean', value : true) option('realpath', type : 'boolean', value : true) diff --git a/src/boot/boot.janet b/src/boot/boot.janet index fc077435..80cba0a7 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -2832,6 +2832,7 @@ "src/core/corelib.c" "src/core/debug.c" "src/core/emit.c" + "src/core/ev.c" "src/core/fiber.c" "src/core/gc.c" "src/core/inttypes.c" diff --git a/src/conf/janetconf.h b/src/conf/janetconf.h index 72a4c577..c99c7d39 100644 --- a/src/conf/janetconf.h +++ b/src/conf/janetconf.h @@ -51,6 +51,7 @@ /* #define JANET_NO_NET */ /* #define JANET_NO_TYPED_ARRAY */ /* #define JANET_NO_INT_TYPES */ +/* #define JANET_NO_EV */ /* Other settings */ /* #define JANET_NO_PRF */ diff --git a/src/core/corelib.c b/src/core/corelib.c index 3e17e918..5781e763 100644 --- a/src/core/corelib.c +++ b/src/core/corelib.c @@ -1004,6 +1004,9 @@ static void janet_load_libs(JanetTable *env) { #ifdef JANET_THREADS janet_lib_thread(env); #endif +#ifdef JANET_EV + janet_lib_ev(env); +#endif #ifdef JANET_NET janet_lib_net(env); #endif diff --git a/src/core/ev.c b/src/core/ev.c new file mode 100644 index 00000000..6ab80db4 --- /dev/null +++ b/src/core/ev.c @@ -0,0 +1,378 @@ +/* +* Copyright (c) 2020 Calvin Rose +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to +* deal in the Software without restriction, including without limitation the +* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +* sell copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in +* all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +* IN THE SOFTWARE. +*/ + +#ifndef JANET_AMALG +#include "features.h" +#include +#include "util.h" +#include "gc.h" +#include "state.h" +#endif + +#ifdef JANET_EV + +/* Includes */ + +#include +#include +#include +#include +#include +#include +#include +#include + +/* New fibers to spawn or resume */ +typedef struct JanetTask JanetTask; +struct JanetTask { + JanetFiber *fiber; + Janet value; +}; + +/* Min priority queue of timestamps for timeouts. */ +typedef struct JanetListenerTimeout JanetListenerTimeout; +struct JanetListenerTimeout JanetListenerTimeout { + JanetListenerState *state; + struct timespec when; +}; + +/* Global data */ +JANET_THREAD_LOCAL size_t janet_vm_active_listeners = 0; +JANET_THREAD_LOCAL size_t janet_vm_spawn_capacity = 0; +JANET_THREAD_LOCAL size_t janet_vm_spawn_count = 0; +JANET_THREAD_LOCAL size_t janet_vm_tq_count = 0; +JANET_THREAD_LOCAL size_t janet_vm_tq_capacity = 0; +JANET_THREAD_LOCAL JanetTask *janet_vm_spawn = NULL; +JANET_THREAD_LOCAL JanetListenerTimeout *janet_vm_tq = NULL; + +/* Compare two timespecs - 1 if t1 > t2 */ +static int timespec_cmp(struct timespec t1, struct timespec t2) { + if (t1.tv_sec < t2.tv_sec) return -1; + if (t1.tv_sec > t2.tv_sec) return 1; + if (t1.tv_nsec < t2.tv_nsec) return -1; + if (t1.tv_nsec > t2.tv_nsec) return 1; + return 0; +} + +/* Add a timeout to the timeout min heap */ +static void add_timeout(JanetListenerState *state, struct timespec when) { + size_t oldcount = janet_vm_tq_count; + size_t newcount = oldcount + 1; + if (oldcount == janet_vm_tq_capacity) { + size_t newcap = 2 * newcount; + JanetListenerTimeout *tq = realloc(janet_vm_tq, newcap * sizeof(JanetListenerTimeout)); + if (NULL == tq) { + JANET_OUT_OF_MEMORY; + } + janet_vm_tq_capacity = newcap; + } + /* Append */ + janet_vm_tq_count = newcount; + janet_vm_tq[oldcount] = { state, when }; + /* Heapify */ + size_t index = oldcount; + while (index > 0) { + size_t parent = (index - 1) >> 1; + int cmp = timespec_cmp(janet_vm_tq[parent].when, when); + if (cmp <= 0) break; + /* Swap */ + JanetListenerState tmp = janet_vm_tq[index]; + janet_vm_tq[index] = janet_vm_tq[parent]; + janet_vm_tq[parent] = tmp; + /* Next */ + index = parent; + } +} + +/* Extract the next timeout from the priority queue */ +static JanetListenerTimeout next_timeout(void) { + +} + +/* Create a new event listener */ +static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { + if (size < sizeof(JanetListenerState)) + size = sizeof(JanetListenerState); + JanetListenerState *state = malloc(size); + if (NULL == state) { + JANET_OUT_OF_MEMORY; + } + state->machine = behavior; + state->fiber = janet_vm_root_fiber; + state->pollable = pollable; + state->_mask = mask; + pollable->_mask |= mask; + janet_vm_active_listeners++; + /* Prepend to linked list */ + state->_next = pollable->state; + pollable->state = state; + /* Emit INIT event for convenience */ + state->machine(state, JANET_ASYNC_EVENT_INIT); + return state; +} + +/* Indicate we are no longer listening for an event. This + * frees the memory of the state machine as well. */ +static void janet_unlisten_impl(JanetListenerState *state) { + state->machine(state, JANET_ASYNC_EVENT_DEINIT); + /* Remove state machine from poll list */ + JanetListenerState **iter = &(state->pollable->state); + while (*iter && *iter != state) + iter = &((*iter)->_next); + janet_assert(*iter, "failed to remove listener"); + *iter = state->_next; + janet_vm_active_listeners--; + /* Remove mask */ + state->pollable->_mask &= ~(state->_mask); + free(state); +} + +/* Call after creating a pollable */ +void janet_pollable_init(JanetPollable *pollable, JanetPollType handle) { + pollable->handle = handle; + pollable->flags = 0; + pollable->state = NULL; + pollable->_mask = 0; +} + +/* Mark a pollable for GC */ +void janet_pollable_mark(JanetPollable *pollable) { + JanetListenerState *state = pollable->state; + while (NULL != state) { + if (NULL != state->fiber) { + janet_mark(janet_wrap_fiber(state->fiber)); + } + (state->machine)(state, JANET_ASYNC_EVENT_MARK); + state = state->_next; + } +} + +/* Must be called to close all pollables - does NOT call `close` for you. + * Also does not free memory of the pollable, so can be used on close. */ +void janet_pollable_deinit(JanetPollable *pollable) { + pollable->flags |= JANET_POLL_FLAG_CLOSED; + JanetListenerState *state = pollable->state; + while (NULL != state) { + state->machine(state, JANET_ASYNC_EVENT_CLOSE); + JanetListenerState *next_state = state->_next; + janet_unlisten_impl(state); + state = next_state; + } + pollable->state = NULL; +} + +/* Register a fiber to resume with value */ +void janet_schedule(JanetFiber *fiber, Janet value) { + size_t oldcount = janet_vm_spawn_count; + size_t newcount = oldcount + 1; + if (newcount > janet_vm_spawn_capacity) { + size_t newcap = 2 * newcount; + JanetTask *tasks = realloc(janet_vm_spawn, newcap * sizeof(JanetTask)); + if (NULL == tasks) { + JANET_OUT_OF_MEMORY; + } + janet_vm_spawn = tasks; + janet_vm_spawn_capacity = newcap; + } + janet_vm_spawn_count = newcount; + janet_vm_spawn[oldcount].fiber = fiber; + janet_vm_spawn[oldcount].value = value; +} + +/* Mark all pending tasks */ +void janet_ev_mark(void) { + for (size_t i = 0; i < janet_vm_spawn_count; i++) { + janet_mark(janet_wrap_fiber(janet_vm_spawn[i].fiber)); + janet_mark(janet_vm_spawn[i].value); + } +} + +/* Run scheduled tasks */ +static void run_scheduled(void) { + size_t index = 0; + while (index < janet_vm_spawn_count) { + JanetTask task = janet_vm_spawn[index]; + Janet res; + JanetSignal sig = janet_continue(task.fiber, task.value, &res); + if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) { + janet_stacktrace(task.fiber, res); + } + index++; + } + janet_vm_spawn_count = 0; +} + +/* Main event loop */ + +void janet_loop1_impl(void); + +void janet_loop1(void) { + if (janet_vm_active_listeners) { + janet_loop1_impl(); + } + /* Run scheduled fibers */ + run_scheduled(); +} + +void janet_loop(void) { + while (janet_vm_active_listeners || janet_vm_spawn_count) janet_loop1(); +} + +/* Common init code */ +void janet_ev_init_common(void) { + janet_vm_spawn_capacity = 0; + janet_vm_spawn_count = 0; + janet_vm_spawn = NULL; + janet_vm_active_listeners = 0; +} + +/* Common deinit code */ +void janet_ev_deinit_common(void) { + free(janet_vm_spawn); +} + +/* Short hand to yield to event loop */ +void janet_await(void) { + janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); +} + +/* + * Start epoll implementation + */ + +/* Epoll global data */ +JANET_THREAD_LOCAL int janet_vm_epoll = 0; + +static int make_epoll_events(int mask) { + int events = 0; + if (mask & JANET_ASYNC_EVENT_READ) + events |= EPOLLIN; + if (mask & JANET_ASYNC_EVENT_WRITE) + events |= EPOLLOUT; + return events; +} + +/* Wait for the next event */ +JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { + int is_first = !(pollable->state); + int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; + JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size); + struct epoll_event ev; + ev.events = make_epoll_events(state->pollable->_mask); + ev.data.ptr = pollable; + int status; + do { + status = epoll_ctl(janet_vm_epoll, op, pollable->handle, &ev); + } while (status == -1 && errno == EINTR); + if (status == -1) { + janet_unlisten_impl(state); + janet_panicf("failed to schedule event: %s", strerror(errno)); + } + return state; +} + +/* Tell system we are done listening for a certain event */ +void janet_unlisten(JanetListenerState *state) { + JanetPollable *pollable = state->pollable; + int is_last = (state->_next == NULL && pollable->state == state); + int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; + struct epoll_event ev; + ev.events = make_epoll_events(pollable->_mask); + ev.data.ptr = pollable; + int status; + do { + status = epoll_ctl(janet_vm_epoll, op, pollable->handle, &ev); + } while (status == -1 && errno == EINTR); + if (status == -1) { + janet_panicf("failed to unschedule event: %s", strerror(errno)); + } + /* Destroy state machine and free memory */ + janet_unlisten_impl(state); +} + +/* Replace janet_loop with this */ +#define JANET_EPOLL_MAX_EVENTS 64 +void janet_loop1_impl(void) { + /* Poll for events */ + struct epoll_event events[JANET_EPOLL_MAX_EVENTS]; + int ready; + do { + ready = epoll_wait(janet_vm_epoll, events, JANET_EPOLL_MAX_EVENTS, -1); + } while (ready == -1 && errno == EINTR); + if (ready == -1) { + JANET_EXIT("failed to poll events"); + } + /* Step state machines */ + for (int i = 0; i < ready; i++) { + JanetPollable *pollable = events[i].data.ptr; + int mask = events[i].events; + JanetListenerState *state = pollable->state; + while (NULL != state) { + if (mask & EPOLLOUT) + state->machine(state, JANET_ASYNC_EVENT_WRITE); + if (mask & EPOLLIN) + state->machine(state, JANET_ASYNC_EVENT_READ); + state = state->_next; + } + } +} + +void janet_ev_init(void) { + janet_ev_init_common(); + janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC); +} + +void janet_ev_deinit(void) { + janet_ev_deinit_common(); + close(janet_vm_epoll); + janet_vm_epoll = 0; +} + +/* + * End epoll implementation + */ + +/* C functions */ + +static Janet cfun_ev_spawn(int32_t argc, Janet *argv) { + janet_arity(argc, 1, 2); + JanetFiber *fiber = janet_getfiber(argv, 0); + Janet value = argc == 2 ? argv[1] : janet_wrap_nil(); + janet_schedule(fiber, value); + return argv[0]; +} + +static const JanetReg ev_cfuns[] = { + { + "ev/go", cfun_ev_spawn, + JDOC("(ev/go fiber &opt value)\n\n" + "Put a fiber on the event loop to be resumed later. Optionally pass " + "a value to resume with, otherwise resumes with nil.") + }, + {NULL, NULL, NULL} +}; + +void janet_lib_ev(JanetTable *env) { + janet_core_cfuns(env, NULL, ev_cfuns); +} + +#endif diff --git a/src/core/gc.c b/src/core/gc.c index 36752b06..0701ed00 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -389,8 +389,8 @@ void janet_collect(void) { if (janet_vm_gc_suspend) return; depth = JANET_RECURSION_GUARD; orig_rootcount = janet_vm_root_count; -#ifdef JANET_NET - janet_net_markloop(); +#ifdef JANET_EV + janet_ev_mark(); #endif for (i = 0; i < orig_rootcount; i++) janet_mark(janet_vm_roots[i]); diff --git a/src/core/net.c b/src/core/net.c index dc86f01b..dd79bdd5 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -26,6 +26,8 @@ #include "util.h" #endif +#ifdef JANET_NET + #ifdef JANET_WINDOWS #include #include @@ -45,28 +47,26 @@ #endif /* - * Streams + * Streams - simple abstract type that wraps a pollable + extra flags */ -#define JANET_STREAM_CLOSED 1 -#define JANET_STREAM_READABLE 2 -#define JANET_STREAM_WRITABLE 4 +#define JANET_STREAM_READABLE 0x200 +#define JANET_STREAM_WRITABLE 0x400 static int janet_stream_close(void *p, size_t s); +static int janet_stream_mark(void *p, size_t s); static int janet_stream_getter(void *p, Janet key, Janet *out); static const JanetAbstractType StreamAT = { "core/stream", janet_stream_close, - NULL, + janet_stream_mark, janet_stream_getter, JANET_ATEND_GET }; +typedef JanetPollable JanetStream; + #ifdef JANET_WINDOWS -typedef struct { - SOCKET fd; - int flags; -} JanetStream; #define JSOCKCLOSE(x) closesocket(x) #define JSOCKDEFAULT INVALID_SOCKET #define JLASTERR WSAGetLastError() @@ -79,19 +79,15 @@ typedef struct { #define JSock SOCKET #define JReadInt long #define JSOCKFLAGS 0 -static JanetStream *make_stream(SOCKET fd, int flags) { +static JanetStream *make_stream(SOCKET fd, uint32_t flags) { u_long iMode = 0; JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); + janet_pollable_init(stream, fd); ioctlsocket(fd, FIONBIO, &iMode); - stream->fd = fd; stream->flags = flags; return stream; } #else -typedef struct { - int fd; - int flags; -} JanetStream; #define JSOCKCLOSE(x) close(x) #define JSOCKDEFAULT 0 #define JLASTERR errno @@ -108,15 +104,15 @@ typedef struct { #else #define JSOCKFLAGS 0 #endif -static JanetStream *make_stream(int fd, int flags) { +static JanetStream *make_stream(int fd, uint32_t flags) { JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); + janet_pollable_init(stream, fd); #ifndef SOCK_CLOEXEC int extra = O_CLOEXEC; #else int extra = 0; #endif fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK | extra); - stream->fd = fd; stream->flags = flags; return stream; } @@ -130,220 +126,138 @@ static JanetStream *make_stream(int fd, int flags) { static int janet_stream_close(void *p, size_t s) { (void) s; JanetStream *stream = p; - if (!(stream->flags & JANET_STREAM_CLOSED)) { - stream->flags |= JANET_STREAM_CLOSED; - JSOCKCLOSE(stream->fd); + if (!(stream->flags & JANET_POLL_FLAG_CLOSED)) { + JSOCKCLOSE(stream->handle); + janet_pollable_deinit(stream); } return 0; } +static int janet_stream_mark(void *p, size_t s) { + (void) s; + janet_pollable_mark((JanetPollable *) p); + return 0; +} + /* - * Event loop + * State machine for read */ -/* This large struct describes a waiting file descriptor, as well - * as what to do when we get an event for it. It is a variant type, where - * each variant implements a simple state machine. */ typedef struct { + JanetListenerState head; + int32_t bytes_left; + JanetBuffer *buf; + int is_chunk; +} NetStateRead; - /* File descriptor to listen for events on. */ - JanetStream *stream; - - /* Fiber to resume when event finishes. Can be NULL, in which case, - * no fiber is resumed when event completes. */ - JanetFiber *fiber; - - /* What kind of event we are listening for. - * As more IO functionality get's added, we can - * expand this. */ - enum { - JLE_READ_CHUNK, - JLE_READ_SOME, - JLE_READ_ACCEPT, - JLE_CONNECT, - JLE_WRITE_FROM_BUFFER, - JLE_WRITE_FROM_STRINGLIKE - } event_type; - - /* Each variant can have a different payload. */ - union { - - /* JLE_READ_CHUNK/JLE_READ_SOME */ - struct { - int32_t bytes_left; - JanetBuffer *buf; - } read_chunk; - - /* JLE_READ_ACCEPT */ - struct { - JanetFunction *handler; - } read_accept; - - /* JLE_WRITE_FROM_BUFFER */ - struct { - JanetBuffer *buf; - int32_t start; - } write_from_buffer; - - /* JLE_WRITE_FROM_STRINGLIKE */ - struct { - const uint8_t *str; - int32_t start; - } write_from_stringlike; - - } data; - -} JanetLoopFD; - -#define JANET_LOOPFD_MAX 1024 - -/* Global loop data */ -JANET_THREAD_LOCAL JPollStruct janet_vm_pollfds[JANET_LOOPFD_MAX]; -JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX]; -JANET_THREAD_LOCAL int janet_vm_loop_count; - -/* We could also add/remove gc roots. This is easier for now. */ -void janet_net_markloop(void) { - for (int i = 0; i < janet_vm_loop_count; i++) { - JanetLoopFD lfd = janet_vm_loopfds[i]; - if (lfd.fiber != NULL) { - janet_mark(janet_wrap_fiber(lfd.fiber)); - } - janet_mark(janet_wrap_abstract(lfd.stream)); - switch (lfd.event_type) { - default: - break; - case JLE_READ_CHUNK: - case JLE_READ_SOME: - janet_mark(janet_wrap_buffer(lfd.data.read_chunk.buf)); - break; - case JLE_READ_ACCEPT: - janet_mark(janet_wrap_function(lfd.data.read_accept.handler)); - break; - case JLE_CONNECT: - break; - case JLE_WRITE_FROM_BUFFER: - janet_mark(janet_wrap_buffer(lfd.data.write_from_buffer.buf)); - break; - case JLE_WRITE_FROM_STRINGLIKE: - janet_mark(janet_wrap_string(lfd.data.write_from_stringlike.str)); - } - } -} - -/* Add a loop fd to the global event loop */ -static int janet_loop_schedule(JanetLoopFD lfd, short events) { - if (janet_vm_loop_count == JANET_LOOPFD_MAX) { - return -1; - } - int index = janet_vm_loop_count++; - janet_vm_loopfds[index] = lfd; - janet_vm_pollfds[index].fd = lfd.stream->fd; - janet_vm_pollfds[index].events = events; - janet_vm_pollfds[index].revents = 0; - return index; -} - -/* Remove event from list */ -static void janet_loop_rmindex(int index) { - janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count]; - janet_vm_pollfds[index] = janet_vm_pollfds[janet_vm_loop_count]; -} - - -/* Return delta in number of loop fds. Abstracted out so - * we can separate out the polling logic */ -static size_t janet_loop_event(size_t index) { - JanetLoopFD *jlfd = janet_vm_loopfds + index; - JanetStream *stream = jlfd->stream; - JSock fd = stream->fd; - int ret = 1; - int should_resume = 0; - Janet resumeval = janet_wrap_nil(); - if (stream->flags & JANET_STREAM_CLOSED) { - should_resume = 1; - ret = 0; - } else { - switch (jlfd->event_type) { - case JLE_READ_CHUNK: - case JLE_READ_SOME: { - JanetBuffer *buffer = jlfd->data.read_chunk.buf; - int32_t bytes_left = jlfd->data.read_chunk.bytes_left; +void net_machine_read(JanetListenerState *s, int event) { + NetStateRead *state = (NetStateRead *) s; + switch (event) { + default: + break; + case JANET_ASYNC_EVENT_MARK: + janet_mark(janet_wrap_buffer(state->buf)); + break; + case JANET_ASYNC_EVENT_CLOSE: + /* Read is finished, even if chunk is incomplete */ + janet_schedule(s->fiber, janet_wrap_nil()); + break; + case JANET_ASYNC_EVENT_READ: + /* Read in bytes */ + { + JanetBuffer *buffer = state->buf; + int32_t bytes_left = state->bytes_left; janet_buffer_extra(buffer, bytes_left); - if (!(stream->flags & JANET_STREAM_READABLE)) { - should_resume = 1; - ret = 0; - break; - } JReadInt nread; do { - nread = recv(fd, buffer->data + buffer->count, bytes_left, 0); + nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0); } while (nread == -1 && JLASTERR == JEINTR); if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) { - ret = 1; break; } + /* Increment buffer counts */ if (nread > 0) { buffer->count += nread; bytes_left -= nread; } else { bytes_left = 0; } - if (jlfd->event_type == JLE_READ_SOME || bytes_left == 0) { - should_resume = 1; - if (nread > 0) { - resumeval = janet_wrap_buffer(buffer); - } - ret = 0; - } else { - jlfd->data.read_chunk.bytes_left = bytes_left; - ret = 1; + state->bytes_left = bytes_left; + + /* Resume if done */ + if (!state->is_chunk || bytes_left == 0) { + Janet resume_val = nread > 0 ? janet_wrap_buffer(buffer) : janet_wrap_nil(); + janet_schedule(s->fiber, resume_val); + janet_unlisten(s); } - break; } - case JLE_READ_ACCEPT: { - JSock connfd = accept(fd, NULL, NULL); - if (JSOCKVALID(connfd)) { - /* Made a new connection socket */ - JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); - Janet streamv = janet_wrap_abstract(stream); - JanetFunction *handler = jlfd->data.read_accept.handler; - Janet out; - JanetFiber *fiberp = NULL; - /* Launch connection fiber */ - JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp); - if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) { - janet_stacktrace(fiberp, out); - } - } - ret = JANET_LOOPFD_MAX; - break; - } - case JLE_WRITE_FROM_BUFFER: - case JLE_WRITE_FROM_STRINGLIKE: { + break; + } +} + +JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { + NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, + JANET_ASYNC_EVENT_READ, sizeof(NetStateRead)); + state->is_chunk = 0; + state->buf = buf; + state->bytes_left = nbytes; + janet_await(); +} + +JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { + NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, + JANET_ASYNC_EVENT_READ, sizeof(NetStateRead)); + state->is_chunk = 1; + state->buf = buf; + state->bytes_left = nbytes; + janet_await(); +} + +/* + * State machine for write + */ + +typedef struct { + JanetListenerState head; + union { + JanetBuffer *buf; + const uint8_t *str; + } src; + int32_t start; + int is_buffer; +} NetStateWrite; + +void net_machine_write(JanetListenerState *s, int event) { + NetStateWrite *state = (NetStateWrite *) s; + switch (event) { + default: + break; + case JANET_ASYNC_EVENT_MARK: + janet_mark(state->is_buffer + ? janet_wrap_buffer(state->src.buf) + : janet_wrap_string(state->src.str)); + break; + case JANET_ASYNC_EVENT_CLOSE: + janet_schedule(s->fiber, janet_wrap_nil()); + break; + case JANET_ASYNC_EVENT_WRITE: { int32_t start, len; const uint8_t *bytes; - if (!(stream->flags & JANET_STREAM_WRITABLE)) { - should_resume = 1; - ret = 0; - break; - } - if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { - JanetBuffer *buffer = jlfd->data.write_from_buffer.buf; + start = state->start; + if (state->is_buffer) { + JanetBuffer *buffer = state->src.buf; bytes = buffer->data; len = buffer->count; - start = jlfd->data.write_from_buffer.start; } else { - bytes = jlfd->data.write_from_stringlike.str; + bytes = state->src.str; len = janet_string_length(bytes); - start = jlfd->data.write_from_stringlike.start; } if (start < len) { int32_t nbytes = len - start; JReadInt nwrote; do { - nwrote = send(fd, bytes + start, nbytes, MSG_NOSIGNAL); + nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL); } while (nwrote == -1 && JLASTERR == JEINTR); if (nwrote > 0) { start += nwrote; @@ -351,115 +265,77 @@ static size_t janet_loop_event(size_t index) { start = len; } } + state->start = start; if (start >= len) { - should_resume = 1; - ret = 0; - } else { - if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { - jlfd->data.write_from_buffer.start = start; - } else { - jlfd->data.write_from_stringlike.start = start; - } - ret = 1; + janet_schedule(s->fiber, janet_wrap_nil()); + janet_unlisten(s); } break; } - case JLE_CONNECT: { - break; - } - } + break; } - - /* Resume a fiber for some events */ - if (NULL != jlfd->fiber && should_resume) { - /* Resume the fiber */ - Janet out; - JanetSignal sig = janet_continue(jlfd->fiber, resumeval, &out); - if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) { - janet_stacktrace(jlfd->fiber, out); - } - } - - /* Remove this handler from the handler pool. */ - if (should_resume) janet_loop_rmindex((int) index); - - return ret; -} - -static void janet_loop1(void) { - /* Remove closed file descriptors */ - for (int i = 0; i < janet_vm_loop_count;) { - if (janet_vm_loopfds[i].stream->flags & JANET_STREAM_CLOSED) { - janet_loop_rmindex(i); - } else { - i++; - } - } - /* Poll */ - if (janet_vm_loop_count == 0) return; - int ready; - do { - ready = JPOLL(janet_vm_pollfds, janet_vm_loop_count, -1); - } while (ready == -1 && JLASTERR == JEINTR); - if (ready == -1) return; - /* Handle events */ - for (int i = 0; i < janet_vm_loop_count;) { - int revents = janet_vm_pollfds[i].revents; - janet_vm_pollfds[i].revents = 0; - if ((janet_vm_pollfds[i].events | POLLHUP | POLLERR) & revents) { - size_t delta = janet_loop_event(i); - i += (int) delta; - } else { - i++; - } - } -} - -void janet_loop(void) { - while (janet_vm_loop_count) { - janet_loop1(); - } -} - -/* - * Scheduling Helpers - */ - -#define JANET_SCHED_FSOME 1 - -JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { - JanetLoopFD lfd; - lfd.stream = stream; - lfd.fiber = janet_root_fiber(); - lfd.event_type = (flags & JANET_SCHED_FSOME) ? JLE_READ_SOME : JLE_READ_CHUNK; - lfd.data.read_chunk.buf = buf; - lfd.data.read_chunk.bytes_left = nbytes; - janet_loop_schedule(lfd, POLLIN); - janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); } JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf) { - JanetLoopFD lfd; - lfd.stream = stream; - lfd.fiber = janet_root_fiber(); - lfd.event_type = JLE_WRITE_FROM_BUFFER; - lfd.data.write_from_buffer.buf = buf; - lfd.data.write_from_buffer.start = 0; - janet_loop_schedule(lfd, POLLOUT); - janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); + NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, + JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite)); + state->is_buffer = 1; + state->start = 0; + state->src.buf = buf; + janet_await(); } + JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str) { - JanetLoopFD lfd; - lfd.stream = stream; - lfd.fiber = janet_root_fiber(); - lfd.event_type = JLE_WRITE_FROM_STRINGLIKE; - lfd.data.write_from_stringlike.str = str; - lfd.data.write_from_stringlike.start = 0; - janet_loop_schedule(lfd, POLLOUT); - janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); + NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, + JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite)); + state->is_buffer = 0; + state->start = 0; + state->src.str = str; + janet_await(); } +/* + * State machine for simple server + */ + +typedef struct { + JanetListenerState head; + JanetFunction *function; +} NetStateSimpleServer; + +void net_machine_simple_server(JanetListenerState *s, int event) { + NetStateSimpleServer *state = (NetStateSimpleServer *) s; + switch (event) { + default: + break; + case JANET_ASYNC_EVENT_INIT: + /* We know the pollable will be a stream */ + janet_gcroot(janet_wrap_abstract(s->pollable)); + break; + case JANET_ASYNC_EVENT_MARK: + janet_mark(janet_wrap_function(state->function)); + break; + case JANET_ASYNC_EVENT_CLOSE: + janet_schedule(s->fiber, janet_wrap_nil()); + janet_gcunroot(janet_wrap_abstract(s->pollable)); + break; + case JANET_ASYNC_EVENT_READ: { + JSock connfd = accept(s->pollable->handle, NULL, NULL); + if (JSOCKVALID(connfd)) { + /* Made a new connection socket */ + JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); + Janet streamv = janet_wrap_abstract(stream); + JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv); + janet_schedule(fiber, janet_wrap_nil()); + } + break; + } + } +} + +/* Adress info */ + /* Needs argc >= offset + 2 */ static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset) { /* Get host and port */ @@ -564,29 +440,33 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { } /* Put sfd on our loop */ - JanetLoopFD lfd = {0}; - lfd.stream = make_stream(sfd, 0); - lfd.event_type = JLE_READ_ACCEPT; - lfd.data.read_accept.handler = fun; - janet_loop_schedule(lfd, POLLIN); - - return janet_wrap_abstract(lfd.stream); + JanetStream *stream = make_stream(sfd, 0); + NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, + JANET_ASYNC_EVENT_READ, sizeof(NetStateSimpleServer)); + ss->function = fun; + return janet_wrap_abstract(stream); } static Janet cfun_stream_read(int32_t argc, Janet *argv) { janet_arity(argc, 2, 3); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + if (!(stream->flags & JANET_STREAM_READABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { + janet_panic("got non readable stream"); + } int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); - janet_sched_read(stream, buffer, n, JANET_SCHED_FSOME); + janet_sched_read(stream, buffer, n); } static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { janet_arity(argc, 2, 3); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + if (!(stream->flags & JANET_STREAM_READABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { + janet_panic("got non readable stream"); + } int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); - janet_sched_read(stream, buffer, n, 0); + janet_sched_chunk(stream, buffer, n); } static Janet cfun_stream_close(int32_t argc, Janet *argv) { @@ -599,6 +479,9 @@ static Janet cfun_stream_close(int32_t argc, Janet *argv) { static Janet cfun_stream_write(int32_t argc, Janet *argv) { janet_fixarity(argc, 2); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + if (!(stream->flags & JANET_STREAM_WRITABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { + janet_panic("got non writeable stream"); + } if (janet_checktype(argv[1], JANET_BUFFER)) { janet_sched_write_buffer(stream, janet_getbuffer(argv, 1)); } else { @@ -662,12 +545,14 @@ static const JanetReg net_cfuns[] = { }; void janet_lib_net(JanetTable *env) { - janet_vm_loop_count = 0; + janet_core_cfuns(env, NULL, net_cfuns); +} + +void janet_net_init(void) { #ifdef JANET_WINDOWS WSADATA wsaData; janet_assert(!WSAStartup(MAKEWORD(2, 2), &wsaData), "could not start winsock"); #endif - janet_core_cfuns(env, NULL, net_cfuns); } void janet_net_deinit(void) { @@ -675,3 +560,5 @@ void janet_net_deinit(void) { WSACleanup(); #endif } + +#endif diff --git a/src/core/state.h b/src/core/state.h index 43bf8200..79bfbb7e 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -97,4 +97,14 @@ void janet_threads_init(void); void janet_threads_deinit(void); #endif +#ifdef JANET_NET +void janet_net_init(void); +void janet_net_deinit(void); +#endif + +#ifdef JANET_EV +void janet_ev_init(void); +void janet_ev_deinit(void); +#endif + #endif /* JANET_STATE_H_defined */ diff --git a/src/core/util.h b/src/core/util.h index c487622e..ef4a65a1 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -128,8 +128,10 @@ void janet_lib_thread(JanetTable *env); #endif #ifdef JANET_NET void janet_lib_net(JanetTable *env); -void janet_net_deinit(void); -void janet_net_markloop(void); +#endif +#ifdef JANET_EV +void janet_lib_ev(JanetTable *env); +void janet_ev_mark(void); #endif #endif diff --git a/src/core/vm.c b/src/core/vm.c index bb87b62a..0e5bf40b 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -1428,6 +1428,12 @@ int janet_init(void) { /* Threads */ #ifdef JANET_THREADS janet_threads_init(); +#endif +#ifdef JANET_EV + janet_ev_init(); +#endif +#ifdef JANET_NET + janet_net_init(); #endif return 0; } @@ -1449,6 +1455,9 @@ void janet_deinit(void) { #ifdef JANET_THREADS janet_threads_deinit(); #endif +#ifdef JANET_EV + janet_ev_deinit(); +#endif #ifdef JANET_NET janet_net_deinit(); #endif diff --git a/src/include/janet.h b/src/include/janet.h index 94a90a3b..cfc613e7 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -176,8 +176,13 @@ extern "C" { #define JANET_TYPED_ARRAY #endif +/* Enable or disable event loop */ +#if !defined(JANET_NO_EV) && !defined(__EMSCRIPTEN__) +#define JANET_EV +#endif + /* Enable or disable networking */ -#if !defined(JANET_NO_NET) && !defined(__EMSCRIPTEN__) +#if defined(JANET_EV) && !defined(JANET_NO_NET) && !defined(__EMSCRIPTEN__) #define JANET_NET #endif @@ -1134,8 +1139,65 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT]; /***** START SECTION MAIN *****/ /* Event Loop */ -#ifdef JANET_NET +#ifdef JANET_EV +#define JANET_POLL_FLAG_CLOSED 0x1 +#define JANET_POLL_FLAG_SOCKET 0x2 +#define JANET_ASYNC_EVENT_INIT 0 +#define JANET_ASYNC_EVENT_MARK 1 +#define JANET_ASYNC_EVENT_DEINIT 2 +#define JANET_ASYNC_EVENT_CLOSE 3 +#define JANET_ASYNC_EVENT_READ 4 +#define JANET_ASYNC_EVENT_WRITE 5 +#define JANET_ASYNC_EVENT_TIMEOUT 6 + +/* Typedefs */ +#ifdef JANET_WINDOWS +typedef HANDLE JanetPollType; +#else +typedef int JanetPollType; +#endif +typedef struct JanetListenerState JanetListenerState; +typedef struct JanetPollable JanetPollable; +typedef void (*JanetListener)(JanetListenerState *state, int event); + +/* Wrapper around file descriptors and HANDLEs that can be polled. */ +struct JanetPollable { + JanetPollType handle; + uint32_t flags; + JanetListenerState *state; + /* internal */ + int _mask; +}; + +/* Interface for state machine based event loop */ +struct JanetListenerState { + JanetListener machine; + JanetFiber *fiber; + JanetPollable *pollable; + /* internal */ + int _mask; + JanetListenerState *_next; +}; + +/* Run the event loop */ +JANET_API void janet_loop1(void); JANET_API void janet_loop(void); + +/* Wrapper around pollables */ +JANET_API void janet_pollable_init(JanetPollable *pollable, JanetPollType handle); +JANET_API void janet_pollable_mark(JanetPollable *pollable); +JANET_API void janet_pollable_deinit(JanetPollable *pollable); + +/* Queue a fiber to run on the event loop */ +JANET_API void janet_schedule(JanetFiber *fiber, Janet value); + +/* Start a state machine listening for events from a pollable */ +JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size); +JANET_API void janet_unlisten(JanetListenerState *state); + +/* Shorthand for yielding to event loop in C */ +JANET_NO_RETURN JANET_API void janet_await(void); + #endif /* Parsing */ diff --git a/src/mainclient/shell.c b/src/mainclient/shell.c index 3b240346..732fdcd9 100644 --- a/src/mainclient/shell.c +++ b/src/mainclient/shell.c @@ -1026,7 +1026,7 @@ int main(int argc, char **argv) { janet_stacktrace(fiber, out); } -#ifdef JANET_NET +#ifdef JANET_EV status = JANET_SIGNAL_OK; janet_loop(); #endif