From 9ba94d2c6b322874bc4b82720549b284bbfcd98f Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 5 Jul 2020 17:26:17 -0500 Subject: [PATCH] More work on timeouts and racing listeners. When two listeners are racing to resume the same fiber, the first should cancel out the other. --- examples/tcpserver.janet | 10 ++- src/core/ev.c | 134 +++++++++++++++++++++++++++++---------- src/core/fiber.c | 3 + src/core/gc.c | 3 +- src/include/janet.h | 8 ++- 5 files changed, 119 insertions(+), 39 deletions(-) diff --git a/examples/tcpserver.janet b/examples/tcpserver.janet index eacd9526..58e6386d 100644 --- a/examples/tcpserver.janet +++ b/examples/tcpserver.janet @@ -5,10 +5,17 @@ (def b @"") (print "Connection " id "!") (while (:read stream 1024 b) + (repeat 10 (print "work for " id " ...") (ev/sleep 1)) (:write stream b) (buffer/clear b)) (printf "Done %v!" id)) +(defn spawn-kid + "Run handler in a new fiber" + [conn] + (def f (fiber/new handler)) + (ev/go f conn)) + (print "Starting echo server on 127.0.0.1:8000") (def server (net/server "127.0.0.1" "8000")) @@ -16,4 +23,5 @@ # Run server. (while true (with [conn (:accept server)] - (handler conn))) + (spawn-kid conn) + (ev/sleep 0.1))) diff --git a/src/core/ev.c b/src/core/ev.c index 82a7e0b8..262b942d 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -26,6 +26,7 @@ #include "util.h" #include "gc.h" #include "state.h" +#include "vector.h" #endif #ifdef JANET_EV @@ -57,6 +58,9 @@ struct JanetTimeout { JanetFiber *fiber; }; +/* Forward declaration */ +static void janet_unlisten(JanetListenerState *state); + /* Global data */ JANET_THREAD_LOCAL size_t janet_vm_active_listeners = 0; JANET_THREAD_LOCAL size_t janet_vm_spawn_capacity = 0; @@ -75,6 +79,40 @@ static JanetTimestamp ts_delta(JanetTimestamp ts, double delta) { return ts; } +/* Look at the next timeout value without + * removing it. */ +static int peek_timeout(JanetTimeout *out) { + if (janet_vm_tq_count == 0) return 0; + *out = janet_vm_tq[0]; + return 1; +} + +/* Remove the next timeout from the priority queue */ +static void pop_timeout(size_t index) { + if (janet_vm_tq_count <= index) return; + janet_vm_tq[index].fiber->timeout_index = -1; + janet_vm_tq[index] = janet_vm_tq[--janet_vm_tq_count]; + janet_vm_tq[index].fiber->timeout_index = index; + for (;;) { + size_t left = (index << 1) + 1; + size_t right = left + 1; + size_t smallest = index; + if (left < janet_vm_tq_count && + (janet_vm_tq[left].when < janet_vm_tq[smallest].when)) + smallest = left; + if (right < janet_vm_tq_count && + (janet_vm_tq[right].when < janet_vm_tq[smallest].when)) + smallest = right; + if (smallest == index) return; + JanetTimeout temp = janet_vm_tq[index]; + janet_vm_tq[index] = janet_vm_tq[smallest]; + janet_vm_tq[smallest] = temp; + janet_vm_tq[index].fiber->timeout_index = index; + janet_vm_tq[smallest].fiber->timeout_index = smallest; + index = smallest; + } +} + /* Add a timeout to the timeout min heap */ static void add_timeout(JanetTimeout to) { size_t oldcount = janet_vm_tq_count; @@ -93,6 +131,10 @@ static void add_timeout(JanetTimeout to) { janet_vm_tq[oldcount] = to; /* Heapify */ size_t index = oldcount; + if (to.fiber->timeout_index >= 0) { + pop_timeout(to.fiber->timeout_index); + } + to.fiber->timeout_index = index; while (index > 0) { size_t parent = (index - 1) >> 1; if (janet_vm_tq[parent].when <= janet_vm_tq[index].when) break; @@ -105,38 +147,6 @@ static void add_timeout(JanetTimeout to) { } } -/* Look at the next timeout value without - * removing it. */ -static int peek_timeout(JanetTimeout *out) { - if (janet_vm_tq_count == 0) return 0; - *out = janet_vm_tq[0]; - return 1; -} - -/* Remove the next timeout from the priority queue */ -static void pop_timeout(void) { - if (janet_vm_tq_count == 0) return; - janet_vm_tq[0] = janet_vm_tq[--janet_vm_tq_count]; - /* Keep heap invariant */ - size_t index = 0; - for (;;) { - size_t left = (index << 1) + 1; - size_t right = left + 1; - size_t smallest = index; - if (left < janet_vm_tq_count && - (janet_vm_tq[left].when < janet_vm_tq[smallest].when)) - smallest = left; - if (right < janet_vm_tq_count && - (janet_vm_tq[right].when < janet_vm_tq[smallest].when)) - smallest = right; - if (smallest == index) return; - JanetTimeout temp = janet_vm_tq[index]; - janet_vm_tq[index] = janet_vm_tq[smallest]; - janet_vm_tq[smallest] = temp; - index = smallest; - } -} - /* Create a new event listener */ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { if (pollable->_mask & mask) { @@ -149,7 +159,13 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe JANET_OUT_OF_MEMORY; } state->machine = behavior; - state->fiber = janet_vm_root_fiber; + if (mask & JANET_ASYNC_LISTEN_SPAWNER) { + state->fiber = NULL; + } else { + state->fiber = janet_vm_root_fiber; + janet_v_push(janet_vm_root_fiber->waiting, state); + } + mask |= JANET_ASYNC_LISTEN_SPAWNER; state->pollable = pollable; state->_mask = mask; pollable->_mask |= mask; @@ -175,6 +191,18 @@ static void janet_unlisten_impl(JanetListenerState *state) { janet_vm_active_listeners--; /* Remove mask */ state->pollable->_mask &= ~(state->_mask); + /* Ensure fiber does not reference this state */ + JanetFiber *fiber = state->fiber; + if (NULL != fiber) { + int32_t count = janet_v_count(fiber->waiting); + for (int32_t i = 0; i < count; i++) { + if (fiber->waiting[i] == state) { + fiber->waiting[i] = janet_v_last(fiber->waiting); + janet_v_pop(fiber->waiting); + break; + } + } + } free(state); } @@ -212,8 +240,31 @@ void janet_pollable_deinit(JanetPollable *pollable) { pollable->state = NULL; } +/* In order to avoid unexpected wakeups on a fiber, prior to + * resuming a fiber after and event is triggered, we need to + * cancel all listeners that also want to wakeup this fiber. + * Otherwise, these listeners make wakeup the fiber on an unexpected + * await point. */ +void janet_unschedule_others(JanetFiber *fiber) { + int32_t lcount = janet_v_count(fiber->waiting); + janet_v_empty(fiber->waiting); + for (int32_t index = 0; index < lcount; index++) { + janet_unlisten(fiber->waiting[index]); + } + /* Clear timeout on the current fiber */ + if (fiber->timeout_index >= 0) { + pop_timeout(fiber->timeout_index); + fiber->timeout_index = -1; + } +} + /* Register a fiber to resume with value */ void janet_schedule(JanetFiber *fiber, Janet value) { + if (fiber->gc.flags & 0x10000) { + /* already scheduled to run, do nothing */ + return; + } + fiber->gc.flags |= 0x10000; size_t oldcount = janet_vm_spawn_count; size_t newcount = oldcount + 1; if (newcount > janet_vm_spawn_capacity) { @@ -243,6 +294,9 @@ void janet_ev_mark(void) { /* Run a top level task */ static void run_one(JanetFiber *fiber, Janet value) { + /* Use a gc flag bit to indicate (is this fiber scheduled?) */ + fiber->gc.flags &= ~0x10000; + janet_unschedule_others(fiber); Janet res; JanetSignal sig = janet_continue(fiber, value, &res); if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) { @@ -271,6 +325,15 @@ void janet_await(void) { janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); } +/* Set timeout for the current root fiber */ +void janet_addtimeout(double sec) { + JanetFiber *fiber = janet_vm_root_fiber; + JanetTimeout to; + to.when = ts_delta(ts_now(), sec); + to.fiber = fiber; + add_timeout(to); +} + /* Main event loop */ void janet_loop1_impl(void); @@ -280,7 +343,7 @@ void janet_loop(void) { /* Run expired timers */ JanetTimeout to; while (peek_timeout(&to) && to.when <= ts_now()) { - pop_timeout(); + pop_timeout(0); janet_schedule(to.fiber, janet_wrap_nil()); } /* Run scheduled fibers */ @@ -397,7 +460,8 @@ void janet_loop1_impl(void) { JanetPollable *pollable = events[i].data.ptr; if (NULL == pollable) { /* Timer event */ - pop_timeout(); + pop_timeout(0); + /* Cancel waiters for this fiber */ janet_schedule(to.fiber, janet_wrap_nil()); } else { /* Normal event */ diff --git a/src/core/fiber.c b/src/core/fiber.c index b01975db..2629767e 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -37,7 +37,10 @@ static void fiber_reset(JanetFiber *fiber) { fiber->child = NULL; fiber->flags = JANET_FIBER_MASK_YIELD | JANET_FIBER_RESUME_NO_USEVAL | JANET_FIBER_RESUME_NO_SKIP; fiber->env = NULL; +#ifdef JANET_EV fiber->waiting = NULL; + fiber->timeout_index = -1; +#endif janet_fiber_set_status(fiber, JANET_STATUS_NEW); } diff --git a/src/core/gc.c b/src/core/gc.c index 618fcdb9..0c2cf4c1 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -28,6 +28,7 @@ #include "gc.h" #include "util.h" #include "fiber.h" +#include "vector.h" #endif struct JanetScratch { @@ -286,7 +287,7 @@ static void janet_deinit_block(JanetGCObject *mem) { case JANET_MEMORY_FIBER: free(((JanetFiber *)mem)->data); #ifdef JANET_EV - free(((JanetFiber *)mem)->waiting); + janet_v_free(((JanetFiber *)mem)->waiting); #endif break; case JANET_MEMORY_BUFFER: diff --git a/src/include/janet.h b/src/include/janet.h index 9fa632ee..5ba66e5f 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -489,6 +489,7 @@ typedef enum { #define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ) #define JANET_ASYNC_LISTEN_WRITE (1 << JANET_ASYNC_EVENT_WRITE) +#define JANET_ASYNC_LISTEN_SPAWNER 0x1000 typedef enum { JANET_ASYNC_STATUS_NOT_DONE, @@ -799,8 +800,7 @@ struct JanetFiber { JanetFiber *child; /* Keep linked list of fibers for restarting pending fibers */ #ifdef JANET_EV JanetListenerState **waiting; -#else - void *waiting; + int32_t timeout_index; #endif }; @@ -1218,6 +1218,10 @@ JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListene /* Shorthand for yielding to event loop in C */ JANET_NO_RETURN JANET_API void janet_await(void); +/* For use inside listeners - adds a timeout to the current fiber, such that + * it will be resumed after sec seconds if no other event schedules the current fiber. */ +void janet_addtimeout(double sec); + #endif /* Parsing */