From a25b030e3668c38206e2599bccc35bb7fe72fb63 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 15 Nov 2020 19:40:47 -0600 Subject: [PATCH] Fix EPOLL implementation. --- src/core/ev.c | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index 1a8880a8..d9b45219 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -237,7 +237,6 @@ static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener state->_index = 0; stream->_mask |= mask; janet_vm_active_listeners++; - /* Prepend to linked list */ state->_next = stream->state; stream->state = state; /* Emit INIT event for convenience */ @@ -389,6 +388,9 @@ void janet_fiber_did_resume(JanetFiber *fiber) { if (fiber->waiting) janet_unlisten(fiber->waiting); } +/* forward declaration for marking extra state */ +static void janet_ev_mark_extra(void); + /* Mark all pending tasks */ void janet_ev_mark(void) { JanetTask *tasks = janet_vm_spawn.data; @@ -410,6 +412,7 @@ void janet_ev_mark(void) { for (size_t i = 0; i < janet_vm_tq_count; i++) { janet_mark(janet_wrap_fiber(janet_vm_tq[i].fiber)); } + janet_ev_mark_extra(); } /* Run a top level task */ @@ -767,6 +770,8 @@ void janet_loop(void) { /* Epoll global data */ JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL; +static void janet_ev_mark_extra(void) {} + static JanetTimestamp ts_now(void) { return (JanetTimestamp) GetTickCount64(); } @@ -783,6 +788,9 @@ void janet_ev_deinit(void) { } JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { + if (stream->state == NULL) { + janet_gcroot(janet_wrap_abstract(stream)); + } /* Add the handle to the io completion port if not already added */ JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); if (!(stream->flags & JANET_STREAM_IOCP)) { @@ -796,7 +804,11 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in static void janet_unlisten(JanetListenerState *state) { + JanetStream *stream = state->stream; janet_unlisten_impl(state); + if (NULL == stream->state) { + janet_gcunroot(janet_wrap_abstract(stream)); + } } void janet_loop1_impl(int has_timeout, JanetTimestamp to) { @@ -842,12 +854,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { } } -#elif defined(JANET_EV_POLL) +#elif defined(JANET_EV_EPOLL) /* * Start linux/epoll implementation */ +static void janet_ev_mark_extra(void) {} + static JanetTimestamp ts_now(void) { struct timespec now; janet_assert(-1 != clock_gettime(CLOCK_MONOTONIC, &now), "failed to get time"); @@ -886,6 +900,9 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in janet_unlisten_impl(state); janet_panicv(janet_ev_lasterr()); } + if (is_first) { + janet_gcroot(janet_wrap_abstract(stream)); + } return state; } @@ -908,10 +925,14 @@ static void janet_unlisten(JanetListenerState *state) { } /* Destroy state machine and free memory */ janet_unlisten_impl(state); + if (NULL == stream->state) { + janet_gcunroot(janet_wrap_abstract(stream)); + } } #define JANET_EPOLL_MAX_EVENTS 64 void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { + struct itimerspec its; if (janet_vm_timer_enabled || has_timeout) { memset(&its, 0, sizeof(its)); if (has_timeout) { @@ -943,6 +964,8 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { JanetListenerState *next_state = state->_next; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; + JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE; + JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE; if (mask & EPOLLOUT) status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); if (mask & EPOLLIN) @@ -1008,6 +1031,17 @@ JANET_THREAD_LOCAL JanetListenerState **janet_vm_listener_map = NULL; JANET_THREAD_LOCAL size_t janet_vm_fdcap = 0; JANET_THREAD_LOCAL size_t janet_vm_fdcount = 0; +static void janet_ev_mark_extra(void) { + for (size_t i = 0; i < janet_vm_fdcount; i++) { + JanetListenerState *state = janet_vm_listener_map[i]; + if (NULL != state->fiber) { + janet_mark(janet_wrap_fiber(state->fiber)); + } + janet_stream_mark(state->stream, sizeof(JanetStream)); + (state->machine)(state, JANET_ASYNC_EVENT_MARK); + } +} + static int make_poll_events(int mask) { int events = 0; if (mask & JANET_ASYNC_LISTEN_READ)