From 0ff8f58be87b00132e43f9a86483e5522a0d1c66 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Fri, 6 Oct 2023 00:37:19 -0500 Subject: [PATCH] Simpler async model that is better suited to epoll --- src/core/ev.c | 491 ++++++++++++++++++++------------------------ src/core/fiber.c | 4 +- src/core/gc.c | 12 +- src/core/net.c | 61 +++--- src/core/state.h | 4 +- src/include/janet.h | 77 +++---- test/helper.janet | 2 +- 7 files changed, 300 insertions(+), 351 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index d62f1db2..10dd6168 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -56,6 +56,9 @@ #ifdef JANET_EV_KQUEUE #include #endif +#ifdef JANET_EV_POLL +#include +#endif #endif typedef struct { @@ -179,9 +182,6 @@ static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) { return 0; } -/* Forward declaration */ -static void janet_unlisten(JanetListenerState *state); - /* Get current timestamp (millisecond precision) */ static JanetTimestamp ts_now(void); @@ -194,8 +194,7 @@ static JanetTimestamp ts_delta(JanetTimestamp ts, double delta) { return ts; } -/* Look at the next timeout value without - * removing it. */ +/* Look at the next timeout value without removing it. */ static int peek_timeout(JanetTimeout *out) { if (janet_vm.tq_count == 0) return 0; *out = janet_vm.tq[0]; @@ -254,102 +253,48 @@ static void add_timeout(JanetTimeout to) { } } -static int janet_listener_gc(void *p, size_t s); -static int janet_listener_mark(void *p, size_t s); - -static const JanetAbstractType janet_listener_AT = { - "core/ev-listener", - janet_listener_gc, - janet_listener_mark, - JANET_ATEND_GCMARK -}; - -/* Create a new event listener */ -static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { - if (stream->flags & JANET_STREAM_CLOSED) { - janet_panic("cannot listen on closed stream"); +void janet_async_end(JanetFiber *fiber) { + if (fiber->ev_callback) { + janet_gcunroot(janet_wrap_abstract(fiber->ev_stream)); + fiber->ev_callback = NULL; + if (fiber->ev_state) { + janet_free(fiber->ev_state); + fiber->ev_state = NULL; + } + janet_ev_dec_refcount(); } - if ((mask & JANET_ASYNC_LISTEN_READ) && stream->read_state) goto bad_listen_read; - if ((mask & JANET_ASYNC_LISTEN_WRITE) && stream->write_state) goto bad_listen_write; - janet_assert(size >= sizeof(JanetListenerState), "bad size"); - JanetListenerState *state = janet_abstract(&janet_listener_AT, size); - state->machine = behavior; - state->fiber = janet_vm.root_fiber; - state->flags = 0; - janet_vm.root_fiber->waiting = state; - if (mask & JANET_ASYNC_LISTEN_READ) stream->read_state = state; - if (mask & JANET_ASYNC_LISTEN_WRITE) stream->write_state = state; - state->stream = stream; - state->event = user; - state->machine(state, JANET_ASYNC_EVENT_INIT); +} + +void *janet_async_start(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, size_t data_size) { + janet_async_end(fiber); /* Clear existing callback */ + if (mode & JANET_ASYNC_LISTEN_READ) stream->read_fiber = fiber; + if (mode & JANET_ASYNC_LISTEN_WRITE) stream->write_fiber = fiber; + fiber->ev_callback = callback; + fiber->ev_stream = stream; janet_ev_inc_refcount(); - state->index = janet_vm.listeners->count; - janet_array_push(janet_vm.listeners, janet_wrap_abstract(state)); - return state; -bad_listen_write: - janet_panic("cannot listen for duplicate write event on stream"); -bad_listen_read: - janet_panic("cannot listen for duplicate read event on stream"); + janet_gcroot(janet_wrap_abstract(stream)); + if (data_size) { + void *data = janet_malloc(data_size); + fiber->ev_state = data; + return data; + } else { + return NULL; + } } void janet_fiber_did_resume(JanetFiber *fiber) { - if (fiber->waiting) { - janet_unlisten(fiber->waiting); - fiber->waiting = NULL; - } -} - -static void janet_unlisten_impl(JanetListenerState *state) { - /* Move last listener to position of this listener - O(1) removal and keep things densely packed. */ - if (state->stream) { - Janet popped = janet_array_pop(janet_vm.listeners); - janet_assert(janet_checktype(popped, JANET_ABSTRACT), "pop check"); - JanetListenerState *popped_state = (JanetListenerState *) janet_unwrap_abstract(popped); - janet_vm.listeners->data[state->index] = popped; - popped_state->index = state->index; - state->index = UINT32_MAX; /* just in case */ - janet_ev_dec_refcount(); - if (state->stream->read_state == state) { - state->stream->read_state = NULL; - } - if (state->stream->write_state == state) { - state->stream->write_state = NULL; - } - state->stream = NULL; - } -} - -static int janet_listener_gc(void *p, size_t size) { - (void) size; - JanetListenerState *state = (JanetListenerState *)p; - if (state->stream) { - janet_ev_dec_refcount(); - } - if (state->machine) { - state->machine(state, JANET_ASYNC_EVENT_DEINIT); - } - return 0; -} - -static int janet_listener_mark(void *p, size_t size) { - (void) size; - JanetListenerState *state = (JanetListenerState *)p; - if (state->stream) { - janet_mark(janet_wrap_abstract(state->stream)); - } - if (state->fiber) { - janet_mark(janet_wrap_fiber(state->fiber)); - } - state->machine(state, JANET_ASYNC_EVENT_MARK); - return 0; + janet_async_end(fiber); } static void janet_stream_checktoclose(JanetStream *stream) { - if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->read_state && !stream->write_state) { + if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->read_fiber && !stream->write_fiber) { janet_stream_close(stream); } } +/* Forward declaration */ +static void janet_register_stream(JanetStream *stream); + static const JanetMethod ev_default_stream_methods[] = { {"close", janet_cfun_stream_close}, {"read", janet_cfun_stream_read}, @@ -363,10 +308,12 @@ JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream)); stream->handle = handle; stream->flags = flags; - stream->read_state = NULL; - stream->write_state = NULL; + stream->read_fiber = NULL; + stream->write_fiber = NULL; if (methods == NULL) methods = ev_default_stream_methods; stream->methods = methods; + stream->index = 0; + janet_register_stream(stream); return stream; } @@ -388,18 +335,30 @@ static void janet_stream_close_impl(JanetStream *stream) { if (stream->handle != -1) { close(stream->handle); stream->handle = -1; +#ifdef JANET_EV_POLL + uint32_t i = stream->index; + size_t j = janet_vm.stream_count - 1; + JanetStream *last = janet_vm.streams[j]; + struct pollfd lastfd = janet_vm.fds[j + 1]; + janet_vm.fds[i + 1] = lastfd; + janet_vm.streams[i] = last; + last->index = stream->index; + janet_vm.stream_count--; +#endif } #endif } void janet_stream_close(JanetStream *stream) { - if (stream->read_state) { - stream->read_state->machine(stream->read_state, JANET_ASYNC_EVENT_CLOSE); - janet_unlisten(stream->read_state); + JanetFiber *rf = stream->read_fiber; + JanetFiber *wf = stream->write_fiber; + if (rf && rf->ev_callback) { + rf->ev_callback(rf, JANET_ASYNC_EVENT_CLOSE); + stream->read_fiber = NULL; } - if (stream->write_state) { - stream->write_state->machine(stream->write_state, JANET_ASYNC_EVENT_CLOSE); - janet_unlisten(stream->write_state); + if (wf && wf->ev_callback) { + wf->ev_callback(wf, JANET_ASYNC_EVENT_CLOSE); + stream->write_fiber = NULL; } janet_stream_close_impl(stream); } @@ -416,11 +375,13 @@ static int janet_stream_gc(void *p, size_t s) { static int janet_stream_mark(void *p, size_t s) { (void) s; JanetStream *stream = (JanetStream *) p; - if (NULL != stream->read_state) { - janet_mark(janet_wrap_abstract(stream->read_state)); + JanetFiber *rf = stream->read_fiber; + JanetFiber *wf = stream->write_fiber; + if (rf) { + janet_mark(janet_wrap_fiber(rf)); } - if (NULL != stream->write_state) { - janet_mark(janet_wrap_abstract(stream->write_state)); + if (wf) { + janet_mark(janet_wrap_fiber(wf)); } return 0; } @@ -473,8 +434,8 @@ static void *janet_stream_unmarshal(JanetMarshalContext *ctx) { } JanetStream *p = janet_unmarshal_abstract(ctx, sizeof(JanetStream)); /* Can't share listening state and such across threads */ - p->read_state = NULL; - p->write_state = NULL; + p->read_fiber = NULL; + p->write_fiber = NULL; p->flags = (uint32_t) janet_unmarshal_int(ctx); p->methods = janet_unmarshal_ptr(ctx); #ifdef JANET_WINDOWS @@ -593,8 +554,6 @@ void janet_ev_init_common(void) { janet_table_init_raw(&janet_vm.active_tasks, 0); janet_table_init_raw(&janet_vm.signal_handlers, 0); janet_rng_seed(&janet_vm.ev_rng, 0); - janet_vm.listeners = janet_array(0); - janet_gcroot(janet_wrap_array(janet_vm.listeners)); #ifndef JANET_WINDOWS pthread_attr_init(&janet_vm.new_thread_attr); pthread_attr_setdetachstate(&janet_vm.new_thread_attr, PTHREAD_CREATE_DETACHED); @@ -608,7 +567,6 @@ void janet_ev_deinit_common(void) { janet_table_deinit(&janet_vm.threaded_abstracts); janet_table_deinit(&janet_vm.active_tasks); janet_table_deinit(&janet_vm.signal_handlers); - janet_vm.listeners = NULL; #ifndef JANET_WINDOWS pthread_attr_destroy(&janet_vm.new_thread_attr); #endif @@ -1554,62 +1512,25 @@ static JanetTimestamp ts_now(void) { return res; } -static void janet_epoll_sync_callback(JanetEVGenericMessage msg) { - JanetListenerState *state = msg.argp; - if (state->stream) { - JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; - JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; - if (state == state->stream->read_state) - status1 = state->machine(state, JANET_ASYNC_EVENT_READ); - if (state == state->stream->write_state) - status2 = state->machine(state, JANET_ASYNC_EVENT_WRITE); - if (status1 == JANET_ASYNC_STATUS_DONE || - status2 == JANET_ASYNC_STATUS_DONE) { - janet_unlisten(state); - } else { - /* Repost event */ - janet_ev_post_event(NULL, janet_epoll_sync_callback, msg); - } - } -} - /* Wait for the next event */ -JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { - JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); - if (!(stream->flags & JANET_STREAM_REGISTERED)) { - struct epoll_event ev; - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; - ev.data.ptr = stream; - int status; - do { - status = epoll_ctl(janet_vm.epoll, EPOLL_CTL_ADD, stream->handle, &ev); - } while (status == -1 && errno == EINTR); - if (status == -1) { - if (errno == EPERM) { - /* Couldn't add to event loop, so assume that it completes - * synchronously. In that case, fire the completion - * event manually, since this should be a read or write - * event to a file. So we just post a custom event to do the read/write - * asap. */ - /* Use flag to indicate state is not registered in epoll */ - state->flags = 1; - JanetEVGenericMessage msg = {0}; - msg.argp = state; - janet_ev_post_event(NULL, janet_epoll_sync_callback, msg); - } else { - /* Unexpected error */ - janet_unlisten_impl(state); - janet_panicv(janet_ev_lasterr()); - } +static void janet_register_stream(JanetStream *stream) { + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.data.ptr = stream; + int status; + do { + status = epoll_ctl(janet_vm.epoll, EPOLL_CTL_ADD, stream->handle, &ev); + } while (status == -1 && errno == EINTR); + if (status == -1) { + if (errno == EPERM) { + /* Couldn't add to event loop, so assume that it completes + * synchronously. */ + stream->flags |= JANET_STREAM_UNREGISTERED; + } else { + /* Unexpected error */ + janet_panicv(janet_ev_lasterr()); } - stream->flags |= JANET_STREAM_REGISTERED; } - return state; -} - -/* Tell system we are done listening for a certain event */ -static void janet_unlisten(JanetListenerState *state) { - janet_unlisten_impl(state); } #define JANET_EPOLL_MAX_EVENTS 64 @@ -1646,28 +1567,30 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { } else { JanetStream *stream = p; int mask = events[i].events; - JanetListenerState *states[2] = {stream->read_state, stream->write_state}; - for (int j = 0; j < 2; j++) { - JanetListenerState *state = states[j]; - if (!state) continue; - state->event = events + i; - JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; - JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; - JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE; - JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE; - if (mask & EPOLLOUT) - status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); - if (mask & EPOLLIN) - status2 = state->machine(state, JANET_ASYNC_EVENT_READ); - if (mask & EPOLLERR) - status3 = state->machine(state, JANET_ASYNC_EVENT_ERR); - if ((mask & EPOLLHUP) && !(mask & (EPOLLOUT | EPOLLIN))) - status4 = state->machine(state, JANET_ASYNC_EVENT_HUP); - if (status1 == JANET_ASYNC_STATUS_DONE || - status2 == JANET_ASYNC_STATUS_DONE || - status3 == JANET_ASYNC_STATUS_DONE || - status4 == JANET_ASYNC_STATUS_DONE) { - janet_unlisten(state); + int has_err = mask & EPOLLERR; + int has_hup = mask & EPOLLHUP; + JanetFiber *rf = stream->read_fiber; + JanetFiber *wf = stream->write_fiber; + if (rf) { + if (rf->ev_callback && (mask & EPOLLIN)) { + rf->ev_callback(rf, JANET_ASYNC_EVENT_READ); + } + if (rf->ev_callback && has_err) { + rf->ev_callback(rf, JANET_ASYNC_EVENT_ERR); + } + if (rf->ev_callback && has_hup) { + rf->ev_callback(rf, JANET_ASYNC_EVENT_HUP); + } + } + if (wf) { + if (wf->ev_callback && (mask & EPOLLOUT)) { + wf->ev_callback(wf, JANET_ASYNC_EVENT_WRITE); + } + if (wf->ev_callback && has_err) { + wf->ev_callback(wf, JANET_ASYNC_EVENT_ERR); + } + if (wf->ev_callback && has_hup) { + wf->ev_callback(wf, JANET_ASYNC_EVENT_HUP); } } janet_stream_checktoclose(stream); @@ -1853,9 +1776,7 @@ void janet_ev_deinit(void) { janet_vm.kq = 0; } -#else - -#include +#elif defined(JANET_EV_POLL) static JanetTimestamp ts_now(void) { struct timespec now; @@ -1866,34 +1787,37 @@ static JanetTimestamp ts_now(void) { } /* Wait for the next event */ -JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { - size_t oldsize = janet_vm.listeners->capacity; - JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); - size_t newsize = janet_vm.listeners->capacity; - if (newsize > oldsize) { - janet_vm.fds = janet_realloc(janet_vm.fds, (newsize + 1) * sizeof(struct pollfd)); - if (NULL == janet_vm.fds) { +void janet_register_stream(JanetStream *stream) { + struct pollfd ev = {0}; + stream->index = (uint32_t) janet_vm.stream_count; + size_t new_count = janet_vm.stream_count + 1; + if (new_count > janet_vm.stream_capacity) { + size_t new_cap = new_count * 2; + janet_vm.fds = janet_realloc(janet_vm.fds, (1 + new_cap) * sizeof(struct pollfd)); + janet_vm.streams = janet_realloc(janet_vm.streams, new_cap * sizeof(JanetStream *)); + if (!janet_vm.fds || !janet_vm.streams) { JANET_OUT_OF_MEMORY; } + janet_vm.stream_capacity = new_cap; } - struct pollfd ev; ev.fd = stream->handle; - ev.events = 0; - if (stream->read_state) ev.events |= POLLIN; - if (stream->write_state) ev.events |= POLLOUT; - ev.revents = 0; - janet_vm.fds[state->index + 1] = ev; - return state; -} - -static void janet_unlisten(JanetListenerState *state) { - if (state->stream) { - janet_vm.fds[state->index + 1] = janet_vm.fds[janet_vm.listeners->count]; - } - janet_unlisten_impl(state); + ev.events = POLLIN | POLLOUT; + janet_vm.fds[janet_vm.stream_count + 1] = ev; + janet_vm.streams[janet_vm.stream_count] = stream; + janet_vm.stream_count = new_count; } void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { + + /* set event flags */ + for (size_t i = 0; i < janet_vm.stream_count; i++) { + JanetStream *stream = janet_vm.streams[i]; + janet_vm.fds[i + 1].events = 0; + janet_vm.fds[i + 1].revents = 0; + if (stream->read_fiber) janet_vm.fds[i + 1].events |= POLLIN; + if (stream->write_fiber) janet_vm.fds[i + 1].events |= POLLOUT; + } + /* Poll for events */ int ready; do { @@ -1902,7 +1826,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { JanetTimestamp now = ts_now(); to = now > timeout ? 0 : (int)(timeout - now); } - ready = poll(janet_vm.fds, janet_vm.listeners->count + 1, to); + ready = poll(janet_vm.fds, janet_vm.stream_count + 1, to); } while (ready == -1 && errno == EINTR); if (ready == -1) { JANET_EXIT("failed to poll events"); @@ -1915,31 +1839,32 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { } /* Step state machines */ - for (int32_t i = 0; i < janet_vm.listeners->count; i++) { + for (size_t i = 0; i < janet_vm.stream_count; i++) { struct pollfd *pfd = janet_vm.fds + i + 1; - /* Skip fds where nothing interesting happened */ - JanetListenerState *state = (JanetListenerState *) janet_unwrap_abstract(janet_vm.listeners->data[i]); - /* Normal event */ + JanetStream *stream = janet_vm.streams[i]; int mask = pfd->revents; - JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; - JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; - JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE; - JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE; - state->event = pfd; - JanetStream *stream = state->stream; - if (mask & POLLOUT) - status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); - if (mask & POLLIN) - status2 = state->machine(state, JANET_ASYNC_EVENT_READ); - if (mask & POLLERR) - status3 = state->machine(state, JANET_ASYNC_EVENT_ERR); - if ((mask & POLLHUP) && !(mask & (POLLIN | POLLOUT))) - status4 = state->machine(state, JANET_ASYNC_EVENT_HUP); - if (status1 == JANET_ASYNC_STATUS_DONE || - status2 == JANET_ASYNC_STATUS_DONE || - status3 == JANET_ASYNC_STATUS_DONE || - status4 == JANET_ASYNC_STATUS_DONE) { - janet_unlisten(state); + if (!mask) continue; + int has_err = mask & POLLERR; + int has_hup = mask & POLLHUP; + JanetFiber *rf = stream->read_fiber; + JanetFiber *wf = stream->write_fiber; + if (rf) { + if (rf->ev_callback && (mask & POLLIN)) { + rf->ev_callback(rf, JANET_ASYNC_EVENT_READ); + } else if (rf->ev_callback && has_hup) { + rf->ev_callback(rf, JANET_ASYNC_EVENT_HUP); + } else if (rf->ev_callback && has_err) { + rf->ev_callback(rf, JANET_ASYNC_EVENT_ERR); + } + } + if (wf) { + if (wf->ev_callback && (mask & POLLOUT)) { + wf->ev_callback(wf, JANET_ASYNC_EVENT_WRITE); + } else if (wf->ev_callback && has_hup) { + wf->ev_callback(wf, JANET_ASYNC_EVENT_HUP); + } else if (wf->ev_callback && has_err) { + wf->ev_callback(wf, JANET_ASYNC_EVENT_ERR); + } } janet_stream_checktoclose(stream); } @@ -1956,6 +1881,9 @@ void janet_ev_init(void) { janet_vm.fds[0].fd = janet_vm.selfpipe[0]; janet_vm.fds[0].events = POLLIN; janet_vm.fds[0].revents = 0; + janet_vm.streams = NULL; + janet_vm.stream_count = 0; + janet_vm.stream_capacity = 0; return; } @@ -1963,7 +1891,9 @@ void janet_ev_deinit(void) { janet_ev_deinit_common(); janet_ev_cleanup_selfpipe(); janet_free(janet_vm.fds); + janet_free(janet_vm.streams); janet_vm.fds = NULL; + janet_vm.streams = NULL; } #endif @@ -2204,7 +2134,6 @@ typedef enum { } JanetReadMode; typedef struct { - JanetListenerState head; int32_t bytes_left; int32_t bytes_read; JanetBuffer *buf; @@ -2224,8 +2153,9 @@ typedef struct { #endif } StateRead; -JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { - StateRead *state = (StateRead *) s; +void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { + JanetStream *stream = fiber->ev_stream; + StateRead *state = (StateRead *) fiber->ev_state; switch (event) { default: break; @@ -2233,8 +2163,9 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { janet_mark(janet_wrap_buffer(state->buf)); break; case JANET_ASYNC_EVENT_CLOSE: - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; + janet_schedule(fiber, janet_wrap_nil()); + janet_async_end(fiber); + break; #ifdef JANET_WINDOWS case JANET_ASYNC_EVENT_COMPLETE: { /* Called when read finished */ @@ -2307,12 +2238,15 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { #else case JANET_ASYNC_EVENT_ERR: { if (state->bytes_read) { - janet_schedule(s->fiber, janet_wrap_buffer(state->buf)); + janet_schedule(fiber, janet_wrap_buffer(state->buf)); } else { - janet_schedule(s->fiber, janet_wrap_nil()); + janet_schedule(fiber, janet_wrap_nil()); } - return JANET_ASYNC_STATUS_DONE; + janet_async_end(fiber); + break; } + + read_more: case JANET_ASYNC_EVENT_HUP: case JANET_ASYNC_EVENT_USER: case JANET_ASYNC_EVENT_READ: { @@ -2328,36 +2262,38 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { do { #ifdef JANET_NET if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { - nread = recvfrom(s->stream->handle, buffer->data + buffer->count, read_limit, state->flags, + nread = recvfrom(stream->handle, buffer->data + buffer->count, read_limit, state->flags, (struct sockaddr *)&saddr, &socklen); } else if (state->mode == JANET_ASYNC_READMODE_RECV) { - nread = recv(s->stream->handle, buffer->data + buffer->count, read_limit, state->flags); + nread = recv(stream->handle, buffer->data + buffer->count, read_limit, state->flags); } else #endif { - nread = read(s->stream->handle, buffer->data + buffer->count, read_limit); + nread = read(stream->handle, buffer->data + buffer->count, read_limit); } } while (nread == -1 && errno == EINTR); /* Check for errors - special case errors that can just be waited on to fix */ if (nread == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { - return JANET_ASYNC_STATUS_NOT_DONE; + break; } /* In stream protocols, a pipe error is end of stream */ if (errno == EPIPE && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { nread = 0; } else { - janet_cancel(s->fiber, janet_ev_lasterr()); - return JANET_ASYNC_STATUS_DONE; + janet_cancel(fiber, janet_ev_lasterr()); + janet_async_end(fiber); + break; } } /* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */ state->bytes_read += nread; if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; + janet_schedule(fiber, janet_wrap_nil()); + janet_async_end(fiber); + break; } /* Increment buffer counts */ @@ -2378,19 +2314,22 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { { resume_val = janet_wrap_buffer(buffer); } - janet_schedule(s->fiber, resume_val); - return JANET_ASYNC_STATUS_DONE; + janet_schedule(fiber, resume_val); + janet_async_end(fiber); + break; } + + /* Read some more if possible */ + goto read_more; } break; #endif } - return JANET_ASYNC_STATUS_NOT_DONE; } static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) { - StateRead *state = (StateRead *) janet_listen(stream, ev_machine_read, - JANET_ASYNC_LISTEN_READ, sizeof(StateRead), NULL); + JanetFiber *f = janet_vm.root_fiber; + StateRead *state = (StateRead *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, ev_callback_read, sizeof(StateRead)); state->is_chunk = is_chunked; state->buf = buf; state->bytes_left = nbytes; @@ -2401,7 +2340,7 @@ static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t #else state->flags = flags; #endif - ev_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); + ev_callback_read(f, JANET_ASYNC_EVENT_USER); } void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { @@ -2433,7 +2372,6 @@ typedef enum { } JanetWriteMode; typedef struct { - JanetListenerState head; union { JanetBuffer *buf; const uint8_t *str; @@ -2453,8 +2391,9 @@ typedef struct { #endif } StateWrite; -JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) { - StateWrite *state = (StateWrite *) s; +void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) { + JanetStream *stream = fiber->ev_stream; + StateWrite *state = (StateWrite *) fiber->ev_state; switch (event) { default: break; @@ -2468,8 +2407,9 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) break; } case JANET_ASYNC_EVENT_CLOSE: - janet_cancel(s->fiber, janet_cstringv("stream closed")); - return JANET_ASYNC_STATUS_DONE; + janet_cancel(fiber, janet_cstringv("stream closed")); + janet_async_end(fiber); + break; #ifdef JANET_WINDOWS case JANET_ASYNC_EVENT_COMPLETE: { /* Called when write finished */ @@ -2540,11 +2480,13 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) break; #else case JANET_ASYNC_EVENT_ERR: - janet_cancel(s->fiber, janet_cstringv("stream err")); - return JANET_ASYNC_STATUS_DONE; + janet_cancel(fiber, janet_cstringv("stream err")); + janet_async_end(fiber); + break; case JANET_ASYNC_EVENT_HUP: - janet_cancel(s->fiber, janet_cstringv("stream hup")); - return JANET_ASYNC_STATUS_DONE; + janet_cancel(fiber, janet_cstringv("stream hup")); + janet_async_end(fiber); + break; case JANET_ASYNC_EVENT_USER: case JANET_ASYNC_EVENT_WRITE: { int32_t start, len; @@ -2565,28 +2507,30 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) do { #ifdef JANET_NET if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { - nwrote = sendto(s->stream->handle, bytes + start, nbytes, state->flags, + nwrote = sendto(stream->handle, bytes + start, nbytes, state->flags, (struct sockaddr *) dest_abst, janet_abstract_size(dest_abst)); } else if (state->mode == JANET_ASYNC_WRITEMODE_SEND) { - nwrote = send(s->stream->handle, bytes + start, nbytes, state->flags); + nwrote = send(stream->handle, bytes + start, nbytes, state->flags); } else #endif { - nwrote = write(s->stream->handle, bytes + start, nbytes); + nwrote = write(stream->handle, bytes + start, nbytes); } } while (nwrote == -1 && errno == EINTR); /* Handle write errors */ if (nwrote == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; - janet_cancel(s->fiber, janet_ev_lasterr()); - return JANET_ASYNC_STATUS_DONE; + janet_cancel(fiber, janet_ev_lasterr()); + janet_async_end(fiber); + break; } /* Unless using datagrams, empty message is a disconnect */ if (nwrote == 0 && !dest_abst) { - janet_cancel(s->fiber, janet_cstringv("disconnect")); - return JANET_ASYNC_STATUS_DONE; + janet_cancel(fiber, janet_cstringv("disconnect")); + janet_async_end(fiber); + break; } if (nwrote > 0) { @@ -2597,20 +2541,21 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) } state->start = start; if (start >= len) { - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; + janet_schedule(fiber, janet_wrap_nil()); + janet_async_end(fiber); + break; } break; } break; #endif } - return JANET_ASYNC_STATUS_NOT_DONE; } static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) { - StateWrite *state = (StateWrite *) janet_listen(stream, ev_machine_write, - JANET_ASYNC_LISTEN_WRITE, sizeof(StateWrite), NULL); + JanetFiber *f = janet_vm.root_fiber; + StateWrite *state = (StateWrite *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_WRITE, + ev_callback_write, sizeof(StateWrite)); state->is_buffer = is_buffer; state->src.buf = buf; state->dest_abst = dest_abst; @@ -2621,7 +2566,7 @@ static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_ab state->flags = flags; #endif state->start = 0; - ev_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); + ev_callback_write(f, JANET_ASYNC_EVENT_USER); } void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf) { diff --git a/src/core/fiber.c b/src/core/fiber.c index b65fc5ab..d5f51599 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -40,7 +40,9 @@ static void fiber_reset(JanetFiber *fiber) { fiber->last_value = janet_wrap_nil(); #ifdef JANET_EV fiber->sched_id = 0; - fiber->waiting = NULL; + fiber->ev_callback = NULL; + fiber->ev_state = NULL; + fiber->ev_stream = NULL; fiber->supervisor_channel = NULL; #endif janet_fiber_set_status(fiber, JANET_STATUS_NEW); diff --git a/src/core/gc.c b/src/core/gc.c index 9c9be5bb..f9807282 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -296,8 +296,11 @@ recur: if (fiber->supervisor_channel) { janet_mark_abstract(fiber->supervisor_channel); } - if (fiber->waiting) { - janet_mark_abstract(fiber->waiting); + if (fiber->ev_stream) { + janet_mark_abstract(fiber->ev_stream); + } + if (fiber->ev_callback) { + fiber->ev_callback(fiber, JANET_ASYNC_EVENT_MARK); } #endif @@ -324,6 +327,11 @@ static void janet_deinit_block(JanetGCObject *mem) { janet_free(((JanetTable *) mem)->data); break; case JANET_MEMORY_FIBER: +#ifdef JANET_EV + if (((JanetFiber *)mem)->ev_state) { + janet_free(((JanetFiber *)mem)->ev_state); + } +#endif janet_free(((JanetFiber *)mem)->data); break; case JANET_MEMORY_BUFFER: diff --git a/src/core/net.c b/src/core/net.c index f6484684..7621a2ed 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -114,18 +114,19 @@ static void janet_net_socknoblock(JSock s) { /* State machine for async connect */ typedef struct { - JanetListenerState head; int did_connect; } NetStateConnect; -JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent event) { - NetStateConnect *state = (NetStateConnect *)s; +void net_callback_connect(JanetFiber *fiber, JanetAsyncEvent event) { + JanetStream *stream = fiber->ev_stream; + NetStateConnect *state = (NetStateConnect *)fiber->ev_state; switch (event) { default: - return JANET_ASYNC_STATUS_NOT_DONE; + break; case JANET_ASYNC_EVENT_CLOSE: - janet_cancel(s->fiber, janet_cstringv("stream closed")); - return JANET_ASYNC_STATUS_DONE; + janet_cancel(fiber, janet_cstringv("stream closed")); + janet_async_end(fiber); + return; case JANET_ASYNC_EVENT_HUP: case JANET_ASYNC_EVENT_ERR: case JANET_ASYNC_EVENT_COMPLETE: @@ -133,7 +134,6 @@ JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent even case JANET_ASYNC_EVENT_USER: break; } - JanetStream *stream = s->stream; #ifdef JANET_WINDOWS int res = 0; int size = sizeof(res); @@ -146,24 +146,24 @@ JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent even if (r == 0) { if (res == 0) { state->did_connect = 1; - janet_schedule(s->fiber, janet_wrap_abstract(s->stream)); + janet_schedule(fiber, janet_wrap_abstract(stream)); } else { - janet_cancel(s->fiber, janet_cstringv(strerror(res))); + janet_cancel(fiber, janet_cstringv(strerror(res))); stream->flags |= JANET_STREAM_TOCLOSE; } } else { - janet_cancel(s->fiber, janet_ev_lasterr()); + janet_cancel(fiber, janet_ev_lasterr()); stream->flags |= JANET_STREAM_TOCLOSE; } - return JANET_ASYNC_STATUS_DONE; + janet_async_end(fiber); } static void net_sched_connect(JanetStream *stream) { - JanetListenerState *s = janet_listen(stream, net_machine_connect, JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateConnect), NULL); - NetStateConnect *state = (NetStateConnect *)s; + JanetFiber *f = janet_vm.root_fiber; + NetStateConnect *state = (NetStateConnect *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_WRITE, net_callback_connect, sizeof(NetStateConnect)); state->did_connect = 0; #ifdef JANET_WINDOWS - net_machine_connect(s, JANET_ASYNC_EVENT_USER); + net_callback_connect(s, JANET_ASYNC_EVENT_USER); #endif } @@ -264,12 +264,12 @@ static int net_sched_accept_impl(NetStateAccept *state, Janet *err) { #else typedef struct { - JanetListenerState head; JanetFunction *function; } NetStateAccept; -JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) { - NetStateAccept *state = (NetStateAccept *)s; +void net_callback_accept(JanetFiber *fiber, JanetAsyncEvent event) { + JanetStream *stream = fiber->ev_stream; + NetStateAccept *state = (NetStateAccept *)fiber->ev_state; switch (event) { default: break; @@ -278,40 +278,41 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event break; } case JANET_ASYNC_EVENT_CLOSE: - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; + janet_schedule(fiber, janet_wrap_nil()); + janet_async_end(fiber); + return; case JANET_ASYNC_EVENT_USER: case JANET_ASYNC_EVENT_READ: { #if defined(JANET_LINUX) - JSock connfd = accept4(s->stream->handle, NULL, NULL, SOCK_CLOEXEC); + JSock connfd = accept4(stream->handle, NULL, NULL, SOCK_CLOEXEC); #else /* On BSDs, CLOEXEC should be inherited from server socket */ - JSock connfd = accept(s->stream->handle, NULL, NULL); + JSock connfd = accept(stream->handle, NULL, NULL); #endif if (JSOCKVALID(connfd)) { janet_net_socknoblock(connfd); JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); Janet streamv = janet_wrap_abstract(stream); if (state->function) { - JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv); - fiber->supervisor_channel = s->fiber->supervisor_channel; - janet_schedule(fiber, janet_wrap_nil()); + JanetFiber *sub_fiber = janet_fiber(state->function, 64, 1, &streamv); + sub_fiber->supervisor_channel = fiber->supervisor_channel; + janet_schedule(sub_fiber, janet_wrap_nil()); } else { - janet_schedule(s->fiber, streamv); - return JANET_ASYNC_STATUS_DONE; + janet_schedule(fiber, streamv); + janet_async_end(fiber); + return; } } break; } } - return JANET_ASYNC_STATUS_NOT_DONE; } JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) { - JanetListenerState *s = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL); - NetStateAccept *state = (NetStateAccept *) s; + JanetFiber *f = janet_vm.root_fiber; + NetStateAccept *state = (NetStateAccept *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, sizeof(NetStateAccept)); state->function = fun; - net_machine_accept(s, JANET_ASYNC_EVENT_USER); + net_callback_accept(f, JANET_ASYNC_EVENT_USER); janet_await(); } diff --git a/src/core/state.h b/src/core/state.h index 12ea13a6..a272b2d2 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -159,7 +159,6 @@ struct JanetVM { volatile JanetAtomicInt listener_count; /* used in signal handler, must be volatile */ JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */ JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */ - JanetArray *listeners; /* For GC */ JanetTable signal_handlers; #ifdef JANET_WINDOWS void **iocp; @@ -176,6 +175,9 @@ struct JanetVM { int timer; int timer_enabled; #else + JanetStream **streams; + size_t stream_count; + size_t stream_capacity; pthread_attr_t new_thread_attr; JanetHandle selfpipe[2]; struct pollfd *fds; diff --git a/src/include/janet.h b/src/include/janet.h index 746c71dd..96765279 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -234,6 +234,11 @@ extern "C" { #define JANET_EV_KQUEUE #endif +/* Use poll as last resort */ +#if !defined(JANET_WINDOWS) && !defined(JANET_EV_EPOLL) && !defined(JANET_EV_KQUEUE) +#define JANET_EV_POLL +#endif + /* How to export symbols */ #ifndef JANET_EXPORT #ifdef JANET_WINDOWS @@ -406,12 +411,11 @@ typedef enum { JANET_SIGNAL_USER6, JANET_SIGNAL_USER7, JANET_SIGNAL_USER8, - JANET_SIGNAL_USER9 + JANET_SIGNAL_USER9, + JANET_SIGNAL_INTERRUPT = JANET_SIGNAL_USER8, + JANET_SIGNAL_EVENT = JANET_SIGNAL_USER9, } JanetSignal; -#define JANET_SIGNAL_EVENT JANET_SIGNAL_USER9 -#define JANET_SIGNAL_INTERRUPT JANET_SIGNAL_USER8 - /* Fiber statuses - mostly corresponds to signals. */ typedef enum { JANET_STATUS_DEAD, @@ -575,7 +579,7 @@ typedef void *JanetAbstract; #define JANET_STREAM_CLOSED 0x1 #define JANET_STREAM_SOCKET 0x2 -#define JANET_STREAM_REGISTERED 0x4 +#define JANET_STREAM_UNREGISTERED 0x4 #define JANET_STREAM_READABLE 0x200 #define JANET_STREAM_WRITABLE 0x400 #define JANET_STREAM_ACCEPTABLE 0x800 @@ -583,54 +587,42 @@ typedef void *JanetAbstract; #define JANET_STREAM_TOCLOSE 0x10000 typedef enum { - JANET_ASYNC_EVENT_INIT, - JANET_ASYNC_EVENT_MARK, - JANET_ASYNC_EVENT_DEINIT, - JANET_ASYNC_EVENT_CLOSE, - JANET_ASYNC_EVENT_ERR, - JANET_ASYNC_EVENT_HUP, - JANET_ASYNC_EVENT_READ, - JANET_ASYNC_EVENT_WRITE, - JANET_ASYNC_EVENT_COMPLETE, /* Used on windows for IOCP */ - JANET_ASYNC_EVENT_USER + JANET_ASYNC_EVENT_INIT = 0, + JANET_ASYNC_EVENT_MARK = 1, + JANET_ASYNC_EVENT_DEINIT = 2, + JANET_ASYNC_EVENT_CLOSE = 3, + JANET_ASYNC_EVENT_ERR = 4, + JANET_ASYNC_EVENT_HUP = 5, + JANET_ASYNC_EVENT_READ = 6, + JANET_ASYNC_EVENT_WRITE = 7, + JANET_ASYNC_EVENT_COMPLETE = 8, /* Used on windows for IOCP */ + JANET_ASYNC_EVENT_USER = 9 } JanetAsyncEvent; -#define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ) -#define JANET_ASYNC_LISTEN_WRITE (1 << JANET_ASYNC_EVENT_WRITE) - typedef enum { - JANET_ASYNC_STATUS_NOT_DONE, - JANET_ASYNC_STATUS_DONE -} JanetAsyncStatus; + JANET_ASYNC_LISTEN_READ = 1, + JANET_ASYNC_LISTEN_WRITE, + JANET_ASYNC_LISTEN_BOTH +} JanetAsyncMode; /* Typedefs */ -typedef struct JanetListenerState JanetListenerState; typedef struct JanetStream JanetStream; -typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEvent event); +typedef void (*JanetEVCallback)(JanetFiber *fiber, JanetAsyncEvent event); /* Wrapper around file descriptors and HANDLEs that can be polled. */ struct JanetStream { JanetHandle handle; uint32_t flags; - JanetListenerState *read_state; - JanetListenerState *write_state; + uint32_t index; + JanetFiber *read_fiber; + JanetFiber *write_fiber; const void *methods; /* Methods for this stream */ }; -/* Interface for state machine based event loop */ -struct JanetListenerState { - JanetListener machine; - JanetFiber *fiber; - JanetStream *stream; - void *event; /* Used to pass data from asynchronous IO event. Contents depend on both - implementation of the event loop and the particular event. */ - uint32_t index; /* Used for GC and poll implentation */ - uint32_t flags; -#ifdef JANET_WINDOWS - void *tag; /* Used to associate listeners with an overlapped structure */ - int bytes; /* Used to track how many bytes were transfered. */ -#endif -}; +JANET_API void janet_async_end(JanetFiber *fiber); +JANET_API void *janet_async_start(JanetFiber *fiber, JanetStream *stream, + JanetAsyncMode mode, JanetEVCallback callback, size_t data_size); + #endif /* Janet uses atomic integers in several places for synchronization between threads and @@ -930,7 +922,9 @@ struct JanetFiber { * in a multi-tasking system. It would be possible to move these fields to a new * type, say "JanetTask", that as separate from fibers to save a bit of space. */ uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */ - JanetListenerState *waiting; + JanetEVCallback ev_callback; /* Call this before starting scheduled fibers */ + JanetStream *ev_stream; /* which stream we are waiting on */ + void *ev_state; /* Extra data for ev callback state */ void *supervisor_channel; /* Channel to push self to when complete */ #endif }; @@ -1407,9 +1401,6 @@ JANET_API void janet_cancel(JanetFiber *fiber, Janet value); JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig); JANET_API void janet_schedule_soon(JanetFiber *fiber, Janet value, JanetSignal sig); -/* Start a state machine listening for events from a stream */ -JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user); - /* Shorthand for yielding to event loop in C */ JANET_NO_RETURN JANET_API void janet_await(void); JANET_NO_RETURN JANET_API void janet_sleep_await(double sec); diff --git a/test/helper.janet b/test/helper.janet index 93b19704..dc6ada96 100644 --- a/test/helper.janet +++ b/test/helper.janet @@ -19,7 +19,7 @@ (frame :source) (frame :source-line))) (if x (when is-verbose (eprintf "\e[32m✔\e[0m %s: %s: %v" line-info (describe e) x)) - (eprintf "\e[31m✘\e[0m %s: %s: %v" line-info (describe e) x)) + (do (eprintf "\e[31m✘\e[0m %s: %s: %v" line-info (describe e) x) (eflush))) x) (defmacro assert-error