diff --git a/src/boot/boot.janet b/src/boot/boot.janet index d1d7342e..5bb21a38 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -2229,7 +2229,7 @@ :on-compile-error (fn compile-error [msg errf &] (error (string "compile error: " msg))) :on-parse-error (fn parse-error [p x] - (error (string "parse error: " (parser/error p)))) + (error (string "parse error: " (:error p)))) :fiber-flags :i :on-status (fn on-status [f val] (if-not (= (fiber/status f) :dead) @@ -2699,9 +2699,9 @@ (getline (string "repl:" - ((parser/where p) 0) + ((:where p) 0) ":" - (parser/state p :delimiters) "> ") + (:state p :delimiters) "> ") buf env))) (defn make-onsignal [e level] @@ -2716,8 +2716,8 @@ (debug/stacktrace f x) (eflush) (defn debugger-chunks [buf p] - (def status (parser/state p :delimiters)) - (def c ((parser/where p) 0)) + (def status (:state p :delimiters)) + (def c ((:where p) 0)) (def prpt (string "debug[" level "]:" c ":" status "> ")) (getline prpt buf nextenv)) (print "entering debug[" level "] - (quit) to exit") diff --git a/src/core/ev.c b/src/core/ev.c index d9b45219..92b1667b 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -139,12 +139,14 @@ struct JanetTimeout { static void janet_unlisten(JanetListenerState *state); /* Global data */ -JANET_THREAD_LOCAL size_t janet_vm_active_listeners = 0; JANET_THREAD_LOCAL size_t janet_vm_tq_count = 0; JANET_THREAD_LOCAL size_t janet_vm_tq_capacity = 0; JANET_THREAD_LOCAL JanetQueue janet_vm_spawn; JANET_THREAD_LOCAL JanetTimeout *janet_vm_tq = NULL; JANET_THREAD_LOCAL JanetRNG janet_vm_ev_rng; +JANET_THREAD_LOCAL JanetListenerState **janet_vm_listeners = NULL; +JANET_THREAD_LOCAL size_t janet_vm_listener_count = 0; +JANET_THREAD_LOCAL size_t janet_vm_listener_cap = 0; /* Get current timestamp (millisecond precision) */ static JanetTimestamp ts_now(void); @@ -234,11 +236,24 @@ static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener janet_vm_root_fiber->waiting = state; state->stream = stream; state->_mask = mask; - state->_index = 0; stream->_mask |= mask; - janet_vm_active_listeners++; state->_next = stream->state; stream->state = state; + + /* Keep track of a listener for GC purposes */ + int resize = janet_vm_listener_cap == janet_vm_listener_count; + if (resize) { + size_t newcap = janet_vm_listener_count ? janet_vm_listener_cap * 2 : 16; + janet_vm_listeners = realloc(janet_vm_listeners, newcap * sizeof(JanetListenerState *)); + if (NULL == janet_vm_listeners) { + JANET_OUT_OF_MEMORY; + } + janet_vm_listener_cap = newcap; + } + size_t index = janet_vm_listener_count++; + janet_vm_listeners[index] = state; + state->_index = index; + /* Emit INIT event for convenience */ state->event = user; state->machine(state, JANET_ASYNC_EVENT_INIT); @@ -255,7 +270,6 @@ static void janet_unlisten_impl(JanetListenerState *state) { iter = &((*iter)->_next); janet_assert(*iter, "failed to remove listener"); *iter = state->_next; - janet_vm_active_listeners--; /* Remove mask */ state->stream->_mask &= ~(state->_mask); /* Ensure fiber does not reference this state */ @@ -263,6 +277,10 @@ static void janet_unlisten_impl(JanetListenerState *state) { if (NULL != fiber && fiber->waiting == state) { fiber->waiting = NULL; } + /* Untrack a listener for gc purposes */ + size_t index = state->_index; + janet_vm_listeners[index] = janet_vm_listeners[--janet_vm_listener_count]; + janet_vm_listeners[index]->_index = index; free(state); } @@ -388,11 +406,10 @@ void janet_fiber_did_resume(JanetFiber *fiber) { if (fiber->waiting) janet_unlisten(fiber->waiting); } -/* forward declaration for marking extra state */ -static void janet_ev_mark_extra(void); - /* Mark all pending tasks */ void janet_ev_mark(void) { + + /* Pending tasks */ JanetTask *tasks = janet_vm_spawn.data; if (janet_vm_spawn.head <= janet_vm_spawn.tail) { for (int32_t i = janet_vm_spawn.head; i < janet_vm_spawn.tail; i++) { @@ -409,10 +426,21 @@ void janet_ev_mark(void) { janet_mark(tasks[i].value); } } + + /* Pending timeouts */ for (size_t i = 0; i < janet_vm_tq_count; i++) { janet_mark(janet_wrap_fiber(janet_vm_tq[i].fiber)); } - janet_ev_mark_extra(); + + /* Pending listeners */ + for (size_t i = 0; i < janet_vm_listener_count; i++) { + JanetListenerState *state = janet_vm_listeners[i]; + if (NULL != state->fiber) { + janet_mark(janet_wrap_fiber(state->fiber)); + } + janet_stream_mark(state->stream, sizeof(JanetStream)); + (state->machine)(state, JANET_ASYNC_EVENT_MARK); + } } /* Run a top level task */ @@ -428,7 +456,9 @@ static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) { /* Common init code */ void janet_ev_init_common(void) { janet_q_init(&janet_vm_spawn); - janet_vm_active_listeners = 0; + janet_vm_listener_count = 0; + janet_vm_listener_cap = 0; + janet_vm_listeners = NULL; janet_vm_tq = NULL; janet_vm_tq_count = 0; janet_vm_tq_capacity = 0; @@ -438,6 +468,8 @@ void janet_ev_init_common(void) { /* Common deinit code */ void janet_ev_deinit_common(void) { janet_q_deinit(&janet_vm_spawn); + free(janet_vm_listeners); + janet_vm_listeners = NULL; } /* Short hand to yield to event loop */ @@ -746,7 +778,7 @@ void janet_loop1(void) { run_one(task.fiber, task.value, task.sig); } /* Poll for events */ - if (janet_vm_active_listeners || janet_vm_tq_count) { + if (janet_vm_listener_count || janet_vm_tq_count) { JanetTimeout to; memset(&to, 0, sizeof(to)); int has_timeout; @@ -760,18 +792,15 @@ void janet_loop1(void) { } void janet_loop(void) { - while (janet_vm_active_listeners || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) { + while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) { janet_loop1(); } } #ifdef JANET_WINDOWS -/* Epoll global data */ JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL; -static void janet_ev_mark_extra(void) {} - static JanetTimestamp ts_now(void) { return (JanetTimestamp) GetTickCount64(); } @@ -788,9 +817,6 @@ void janet_ev_deinit(void) { } JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { - if (stream->state == NULL) { - janet_gcroot(janet_wrap_abstract(stream)); - } /* Add the handle to the io completion port if not already added */ JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); if (!(stream->flags & JANET_STREAM_IOCP)) { @@ -804,11 +830,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in static void janet_unlisten(JanetListenerState *state) { - JanetStream *stream = state->stream; janet_unlisten_impl(state); - if (NULL == stream->state) { - janet_gcunroot(janet_wrap_abstract(stream)); - } } void janet_loop1_impl(int has_timeout, JanetTimestamp to) { @@ -856,11 +878,9 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { #elif defined(JANET_EV_EPOLL) -/* - * Start linux/epoll implementation - */ - -static void janet_ev_mark_extra(void) {} +JANET_THREAD_LOCAL int janet_vm_epoll = 0; +JANET_THREAD_LOCAL int janet_vm_timerfd = 0; +JANET_THREAD_LOCAL int janet_vm_timer_enabled = 0; static JanetTimestamp ts_now(void) { struct timespec now; @@ -870,11 +890,6 @@ static JanetTimestamp ts_now(void) { return res; } -/* Epoll global data */ -JANET_THREAD_LOCAL int janet_vm_epoll = 0; -JANET_THREAD_LOCAL int janet_vm_timerfd = 0; -JANET_THREAD_LOCAL int janet_vm_timer_enabled = 0; - static int make_epoll_events(int mask) { int events = EPOLLET; if (mask & JANET_ASYNC_LISTEN_READ) @@ -900,9 +915,6 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in janet_unlisten_impl(state); janet_panicv(janet_ev_lasterr()); } - if (is_first) { - janet_gcroot(janet_wrap_abstract(stream)); - } return state; } @@ -925,9 +937,6 @@ static void janet_unlisten(JanetListenerState *state) { } /* Destroy state machine and free memory */ janet_unlisten_impl(state); - if (NULL == stream->state) { - janet_gcunroot(janet_wrap_abstract(stream)); - } } #define JANET_EPOLL_MAX_EVENTS 64 @@ -1015,7 +1024,7 @@ void janet_ev_deinit(void) { #include -/* Poll implementation */ +JANET_THREAD_LOCAL struct pollfd *janet_vm_fds = NULL; static JanetTimestamp ts_now(void) { struct timespec now; @@ -1025,23 +1034,6 @@ static JanetTimestamp ts_now(void) { return res; } -/* Epoll global data */ -JANET_THREAD_LOCAL struct pollfd *janet_vm_fds = NULL; -JANET_THREAD_LOCAL JanetListenerState **janet_vm_listener_map = NULL; -JANET_THREAD_LOCAL size_t janet_vm_fdcap = 0; -JANET_THREAD_LOCAL size_t janet_vm_fdcount = 0; - -static void janet_ev_mark_extra(void) { - for (size_t i = 0; i < janet_vm_fdcount; i++) { - JanetListenerState *state = janet_vm_listener_map[i]; - if (NULL != state->fiber) { - janet_mark(janet_wrap_fiber(state->fiber)); - } - janet_stream_mark(state->stream, sizeof(JanetStream)); - (state->machine)(state, JANET_ASYNC_EVENT_MARK); - } -} - static int make_poll_events(int mask) { int events = 0; if (mask & JANET_ASYNC_LISTEN_READ) @@ -1051,43 +1043,27 @@ static int make_poll_events(int mask) { return events; } -static void janet_push_pollfd(struct pollfd pfd) { - if (janet_vm_fdcap == janet_vm_fdcount) { - size_t newcap = janet_vm_fdcount ? janet_vm_fdcount * 2 : 16; - janet_vm_fds = realloc(janet_vm_fds, newcap * sizeof(struct pollfd)); +/* Wait for the next event */ +JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { + size_t oldsize = janet_vm_listener_cap; + JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); + size_t newsize = janet_vm_listener_cap; + if (newsize > oldsize) { + janet_vm_fds = realloc(janet_vm_fds, newsize * sizeof(struct pollfd)); if (NULL == janet_vm_fds) { JANET_OUT_OF_MEMORY; } - janet_vm_listener_map = realloc(janet_vm_listener_map, newcap * sizeof(JanetListenerState *)); - if (NULL == janet_vm_listener_map) { - JANET_OUT_OF_MEMORY; - } - janet_vm_fdcap = newcap; } - janet_vm_fds[janet_vm_fdcount++] = pfd; -} - -/* Wait for the next event */ -JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { - JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); struct pollfd ev; ev.fd = stream->handle; ev.events = make_poll_events(state->stream->_mask); ev.revents = 0; - state->_index = janet_vm_fdcount; - janet_push_pollfd(ev); - janet_vm_listener_map[state->_index] = state; + janet_vm_fds[state->_index] = ev; return state; } -/* Tell system we are done listening for a certain event */ static void janet_unlisten(JanetListenerState *state) { - janet_vm_fds[state->_index] = janet_vm_fds[--janet_vm_fdcount]; - JanetListenerState *replacer = janet_vm_listener_map[janet_vm_fdcount]; - janet_vm_listener_map[state->_index] = replacer; - /* Update pointers in replacer */ - replacer->_index = state->_index; - /* Destroy state machine and free memory */ + janet_vm_fds[state->_index] = janet_vm_fds[janet_vm_listener_count - 1]; janet_unlisten_impl(state); } @@ -1095,22 +1071,22 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { /* Poll for events */ int ready; do { + int to = -1; if (has_timeout) { JanetTimestamp now = ts_now(); - ready = poll(janet_vm_fds, janet_vm_fdcount, now > timeout ? 0 : (int)(timeout - now)); - } else { - ready = poll(janet_vm_fds, janet_vm_fdcount, -1); + to = now > timeout ? 0 : (int)(timeout - now); } + ready = poll(janet_vm_fds, janet_vm_listener_count, to); } while (ready == -1 && errno == EINTR); if (ready == -1) { JANET_EXIT("failed to poll events"); } /* Step state machines */ - for (size_t i = 0; i < janet_vm_fdcount; i++) { + for (size_t i = 0; i < janet_vm_listener_count; i++) { struct pollfd *pfd = janet_vm_fds + i; /* Skip fds where nothing interesting happened */ - JanetListenerState *state = janet_vm_listener_map[i]; + JanetListenerState *state = janet_vm_listeners[i]; /* Normal event */ int mask = janet_vm_fds[i].revents; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; @@ -1137,20 +1113,13 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_ev_init(void) { janet_ev_init_common(); janet_vm_fds = NULL; - janet_vm_listener_map = NULL; - janet_vm_fdcap = 0; - janet_vm_fdcount = 0; return; } void janet_ev_deinit(void) { janet_ev_deinit_common(); free(janet_vm_fds); - free(janet_vm_listener_map); janet_vm_fds = NULL; - janet_vm_listener_map = NULL; - janet_vm_fdcap = 0; - janet_vm_fdcount = 0; } #endif