More work on timeouts and racing listeners.

When two listeners are racing to resume the same fiber, the
first should cancel out the other.
This commit is contained in:
Calvin Rose 2020-07-05 17:26:17 -05:00
parent a4de83b3a3
commit 9ba94d2c6b
5 changed files with 119 additions and 39 deletions

View File

@ -5,10 +5,17 @@
(def b @"")
(print "Connection " id "!")
(while (:read stream 1024 b)
(repeat 10 (print "work for " id " ...") (ev/sleep 1))
(:write stream b)
(buffer/clear b))
(printf "Done %v!" id))
(defn spawn-kid
"Run handler in a new fiber"
[conn]
(def f (fiber/new handler))
(ev/go f conn))
(print "Starting echo server on 127.0.0.1:8000")
(def server (net/server "127.0.0.1" "8000"))
@ -16,4 +23,5 @@
# Run server.
(while true
(with [conn (:accept server)]
(handler conn)))
(spawn-kid conn)
(ev/sleep 0.1)))

View File

@ -26,6 +26,7 @@
#include "util.h"
#include "gc.h"
#include "state.h"
#include "vector.h"
#endif
#ifdef JANET_EV
@ -57,6 +58,9 @@ struct JanetTimeout {
JanetFiber *fiber;
};
/* Forward declaration */
static void janet_unlisten(JanetListenerState *state);
/* Global data */
JANET_THREAD_LOCAL size_t janet_vm_active_listeners = 0;
JANET_THREAD_LOCAL size_t janet_vm_spawn_capacity = 0;
@ -75,6 +79,40 @@ static JanetTimestamp ts_delta(JanetTimestamp ts, double delta) {
return ts;
}
/* Look at the next timeout value without
* removing it. */
static int peek_timeout(JanetTimeout *out) {
if (janet_vm_tq_count == 0) return 0;
*out = janet_vm_tq[0];
return 1;
}
/* Remove the next timeout from the priority queue */
static void pop_timeout(size_t index) {
if (janet_vm_tq_count <= index) return;
janet_vm_tq[index].fiber->timeout_index = -1;
janet_vm_tq[index] = janet_vm_tq[--janet_vm_tq_count];
janet_vm_tq[index].fiber->timeout_index = index;
for (;;) {
size_t left = (index << 1) + 1;
size_t right = left + 1;
size_t smallest = index;
if (left < janet_vm_tq_count &&
(janet_vm_tq[left].when < janet_vm_tq[smallest].when))
smallest = left;
if (right < janet_vm_tq_count &&
(janet_vm_tq[right].when < janet_vm_tq[smallest].when))
smallest = right;
if (smallest == index) return;
JanetTimeout temp = janet_vm_tq[index];
janet_vm_tq[index] = janet_vm_tq[smallest];
janet_vm_tq[smallest] = temp;
janet_vm_tq[index].fiber->timeout_index = index;
janet_vm_tq[smallest].fiber->timeout_index = smallest;
index = smallest;
}
}
/* Add a timeout to the timeout min heap */
static void add_timeout(JanetTimeout to) {
size_t oldcount = janet_vm_tq_count;
@ -93,6 +131,10 @@ static void add_timeout(JanetTimeout to) {
janet_vm_tq[oldcount] = to;
/* Heapify */
size_t index = oldcount;
if (to.fiber->timeout_index >= 0) {
pop_timeout(to.fiber->timeout_index);
}
to.fiber->timeout_index = index;
while (index > 0) {
size_t parent = (index - 1) >> 1;
if (janet_vm_tq[parent].when <= janet_vm_tq[index].when) break;
@ -105,38 +147,6 @@ static void add_timeout(JanetTimeout to) {
}
}
/* Look at the next timeout value without
* removing it. */
static int peek_timeout(JanetTimeout *out) {
if (janet_vm_tq_count == 0) return 0;
*out = janet_vm_tq[0];
return 1;
}
/* Remove the next timeout from the priority queue */
static void pop_timeout(void) {
if (janet_vm_tq_count == 0) return;
janet_vm_tq[0] = janet_vm_tq[--janet_vm_tq_count];
/* Keep heap invariant */
size_t index = 0;
for (;;) {
size_t left = (index << 1) + 1;
size_t right = left + 1;
size_t smallest = index;
if (left < janet_vm_tq_count &&
(janet_vm_tq[left].when < janet_vm_tq[smallest].when))
smallest = left;
if (right < janet_vm_tq_count &&
(janet_vm_tq[right].when < janet_vm_tq[smallest].when))
smallest = right;
if (smallest == index) return;
JanetTimeout temp = janet_vm_tq[index];
janet_vm_tq[index] = janet_vm_tq[smallest];
janet_vm_tq[smallest] = temp;
index = smallest;
}
}
/* Create a new event listener */
static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) {
if (pollable->_mask & mask) {
@ -149,7 +159,13 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe
JANET_OUT_OF_MEMORY;
}
state->machine = behavior;
state->fiber = janet_vm_root_fiber;
if (mask & JANET_ASYNC_LISTEN_SPAWNER) {
state->fiber = NULL;
} else {
state->fiber = janet_vm_root_fiber;
janet_v_push(janet_vm_root_fiber->waiting, state);
}
mask |= JANET_ASYNC_LISTEN_SPAWNER;
state->pollable = pollable;
state->_mask = mask;
pollable->_mask |= mask;
@ -175,6 +191,18 @@ static void janet_unlisten_impl(JanetListenerState *state) {
janet_vm_active_listeners--;
/* Remove mask */
state->pollable->_mask &= ~(state->_mask);
/* Ensure fiber does not reference this state */
JanetFiber *fiber = state->fiber;
if (NULL != fiber) {
int32_t count = janet_v_count(fiber->waiting);
for (int32_t i = 0; i < count; i++) {
if (fiber->waiting[i] == state) {
fiber->waiting[i] = janet_v_last(fiber->waiting);
janet_v_pop(fiber->waiting);
break;
}
}
}
free(state);
}
@ -212,8 +240,31 @@ void janet_pollable_deinit(JanetPollable *pollable) {
pollable->state = NULL;
}
/* In order to avoid unexpected wakeups on a fiber, prior to
* resuming a fiber after and event is triggered, we need to
* cancel all listeners that also want to wakeup this fiber.
* Otherwise, these listeners make wakeup the fiber on an unexpected
* await point. */
void janet_unschedule_others(JanetFiber *fiber) {
int32_t lcount = janet_v_count(fiber->waiting);
janet_v_empty(fiber->waiting);
for (int32_t index = 0; index < lcount; index++) {
janet_unlisten(fiber->waiting[index]);
}
/* Clear timeout on the current fiber */
if (fiber->timeout_index >= 0) {
pop_timeout(fiber->timeout_index);
fiber->timeout_index = -1;
}
}
/* Register a fiber to resume with value */
void janet_schedule(JanetFiber *fiber, Janet value) {
if (fiber->gc.flags & 0x10000) {
/* already scheduled to run, do nothing */
return;
}
fiber->gc.flags |= 0x10000;
size_t oldcount = janet_vm_spawn_count;
size_t newcount = oldcount + 1;
if (newcount > janet_vm_spawn_capacity) {
@ -243,6 +294,9 @@ void janet_ev_mark(void) {
/* Run a top level task */
static void run_one(JanetFiber *fiber, Janet value) {
/* Use a gc flag bit to indicate (is this fiber scheduled?) */
fiber->gc.flags &= ~0x10000;
janet_unschedule_others(fiber);
Janet res;
JanetSignal sig = janet_continue(fiber, value, &res);
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) {
@ -271,6 +325,15 @@ void janet_await(void) {
janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil());
}
/* Set timeout for the current root fiber */
void janet_addtimeout(double sec) {
JanetFiber *fiber = janet_vm_root_fiber;
JanetTimeout to;
to.when = ts_delta(ts_now(), sec);
to.fiber = fiber;
add_timeout(to);
}
/* Main event loop */
void janet_loop1_impl(void);
@ -280,7 +343,7 @@ void janet_loop(void) {
/* Run expired timers */
JanetTimeout to;
while (peek_timeout(&to) && to.when <= ts_now()) {
pop_timeout();
pop_timeout(0);
janet_schedule(to.fiber, janet_wrap_nil());
}
/* Run scheduled fibers */
@ -397,7 +460,8 @@ void janet_loop1_impl(void) {
JanetPollable *pollable = events[i].data.ptr;
if (NULL == pollable) {
/* Timer event */
pop_timeout();
pop_timeout(0);
/* Cancel waiters for this fiber */
janet_schedule(to.fiber, janet_wrap_nil());
} else {
/* Normal event */

View File

@ -37,7 +37,10 @@ static void fiber_reset(JanetFiber *fiber) {
fiber->child = NULL;
fiber->flags = JANET_FIBER_MASK_YIELD | JANET_FIBER_RESUME_NO_USEVAL | JANET_FIBER_RESUME_NO_SKIP;
fiber->env = NULL;
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->timeout_index = -1;
#endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
}

View File

@ -28,6 +28,7 @@
#include "gc.h"
#include "util.h"
#include "fiber.h"
#include "vector.h"
#endif
struct JanetScratch {
@ -286,7 +287,7 @@ static void janet_deinit_block(JanetGCObject *mem) {
case JANET_MEMORY_FIBER:
free(((JanetFiber *)mem)->data);
#ifdef JANET_EV
free(((JanetFiber *)mem)->waiting);
janet_v_free(((JanetFiber *)mem)->waiting);
#endif
break;
case JANET_MEMORY_BUFFER:

View File

@ -489,6 +489,7 @@ typedef enum {
#define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ)
#define JANET_ASYNC_LISTEN_WRITE (1 << JANET_ASYNC_EVENT_WRITE)
#define JANET_ASYNC_LISTEN_SPAWNER 0x1000
typedef enum {
JANET_ASYNC_STATUS_NOT_DONE,
@ -799,8 +800,7 @@ struct JanetFiber {
JanetFiber *child; /* Keep linked list of fibers for restarting pending fibers */
#ifdef JANET_EV
JanetListenerState **waiting;
#else
void *waiting;
int32_t timeout_index;
#endif
};
@ -1218,6 +1218,10 @@ JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListene
/* Shorthand for yielding to event loop in C */
JANET_NO_RETURN JANET_API void janet_await(void);
/* For use inside listeners - adds a timeout to the current fiber, such that
* it will be resumed after sec seconds if no other event schedules the current fiber. */
void janet_addtimeout(double sec);
#endif
/* Parsing */