From cb4903fa8675f4af6764daca19af849e3e860196 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 11 Oct 2020 09:07:11 -0500 Subject: [PATCH] Overhaul of poll loop, redo ev/select. --- examples/select.janet | 4 +- src/core/ev.c | 290 ++++++++++++++++++++++++------------------ src/core/fiber.c | 1 - src/core/marsh.c | 4 +- src/include/janet.h | 1 - 5 files changed, 169 insertions(+), 131 deletions(-) diff --git a/examples/select.janet b/examples/select.janet index 98e3610d..29ca2c17 100644 --- a/examples/select.janet +++ b/examples/select.janet @@ -10,8 +10,8 @@ (defn reader [name] (forever - (def c (ev/select ;channels)) - (print "reader " name " got " (ev/take c) " from " c))) + (def [_ c x] (ev/select ;channels)) + (print "reader " name " got " x " from " c))) # Readers (each letter [:a :b :c :d :e :f :g] diff --git a/src/core/ev.c b/src/core/ev.c index e0dabf52..cf73bc3e 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -129,6 +129,7 @@ typedef struct JanetTimeout JanetTimeout; struct JanetTimeout { JanetTimestamp when; JanetFiber *fiber; + uint32_t sched_id; int is_error; }; @@ -163,9 +164,7 @@ static int peek_timeout(JanetTimeout *out) { /* Remove the next timeout from the priority queue */ static void pop_timeout(size_t index) { if (janet_vm_tq_count <= index) return; - janet_vm_tq[index].fiber->timeout_index = -1; janet_vm_tq[index] = janet_vm_tq[--janet_vm_tq_count]; - janet_vm_tq[index].fiber->timeout_index = index; for (;;) { size_t left = (index << 1) + 1; size_t right = left + 1; @@ -180,8 +179,6 @@ static void pop_timeout(size_t index) { JanetTimeout temp = janet_vm_tq[index]; janet_vm_tq[index] = janet_vm_tq[smallest]; janet_vm_tq[smallest] = temp; - janet_vm_tq[index].fiber->timeout_index = index; - janet_vm_tq[smallest].fiber->timeout_index = smallest; index = smallest; } } @@ -204,10 +201,6 @@ static void add_timeout(JanetTimeout to) { janet_vm_tq[oldcount] = to; /* Heapify */ size_t index = oldcount; - if (to.fiber->timeout_index >= 0) { - pop_timeout(to.fiber->timeout_index); - } - to.fiber->timeout_index = index; while (index > 0) { size_t parent = (index - 1) >> 1; if (janet_vm_tq[parent].when <= janet_vm_tq[index].when) break; @@ -330,10 +323,6 @@ void janet_schedule(JanetFiber *fiber, Janet value) { void janet_fiber_did_resume(JanetFiber *fiber) { /* Cancel any pending fibers */ if (fiber->waiting) janet_unlisten(fiber->waiting); - if (fiber->timeout_index >= 0) { - pop_timeout(fiber->timeout_index); - fiber->timeout_index = -1; - } } /* Mark all pending tasks */ @@ -395,6 +384,7 @@ void janet_addtimeout(double sec) { JanetTimeout to; to.when = ts_delta(ts_now(), sec); to.fiber = fiber; + to.sched_id = fiber->sched_id; to.is_error = 1; add_timeout(to); } @@ -405,9 +395,9 @@ typedef struct { JanetFiber *fiber; uint32_t sched_id; enum { - JANET_CP_MODE_NONE, JANET_CP_MODE_ITEM, - JANET_CP_MODE_SELECT + JANET_CP_MODE_CHOICE_READ, + JANET_CP_MODE_CHOICE_WRITE } mode; } JanetChannelPending; @@ -488,11 +478,24 @@ static int janet_chanat_mark(void *p, size_t s) { return 0; } -/* Channel Methods */ +static Janet make_write_result(JanetChannel *channel) { + Janet *tup = janet_tuple_begin(2); + tup[0] = janet_ckeywordv("give"); + tup[1] = janet_wrap_abstract(channel); + return janet_wrap_tuple(janet_tuple_end(tup)); +} -static Janet cfun_channel_push(int32_t argc, Janet *argv) { - janet_fixarity(argc, 2); - JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); +static Janet make_read_result(JanetChannel *channel, Janet x) { + Janet *tup = janet_tuple_begin(3); + tup[0] = janet_ckeywordv("take"); + tup[1] = janet_wrap_abstract(channel); + tup[2] = x; + return janet_wrap_tuple(janet_tuple_end(tup)); +} + +/* Push a value to a channel, and return 1 if channel should block, zero otherwise. + * If the push would block, will add to the write_pending queue in the channel. */ +static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice) { JanetChannelPending reader; int is_empty; do { @@ -500,66 +503,113 @@ static Janet cfun_channel_push(int32_t argc, Janet *argv) { } while (!is_empty && (reader.sched_id != reader.fiber->sched_id)); if (is_empty) { /* No pending reader */ - if (janet_q_push(&channel->items, argv + 1, sizeof(Janet))) { - janet_panicf("channel overflow: %v", argv[1]); + if (janet_q_push(&channel->items, &x, sizeof(Janet))) { + janet_panicf("channel overflow: %v", x); } else if (janet_q_count(&channel->items) > channel->limit) { /* Pushed successfully, but should block. */ JanetChannelPending pending; pending.fiber = janet_vm_root_fiber, pending.sched_id = janet_vm_root_fiber->sched_id, - pending.mode = JANET_CP_MODE_ITEM; + pending.mode = is_choice ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_ITEM; janet_q_push(&channel->write_pending, &pending, sizeof(pending)); + return 1; } } else { /* Pending reader */ - if (reader.mode == JANET_CP_MODE_SELECT) { - janet_q_push(&channel->items, argv + 1, sizeof(Janet)); - janet_schedule(reader.fiber, argv[0]); + if (reader.mode == JANET_CP_MODE_CHOICE_READ) { + janet_schedule(reader.fiber, make_read_result(channel, x)); } else { - janet_schedule(reader.fiber, argv[1]); + janet_schedule(reader.fiber, x); } } + return 0; +} + +/* Pop from a channel - returns 1 if item was obtain, 0 otherwise. The item + * is returned by reference. If the pop would block, will add to the read_pending + * queue in the channel. */ +static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) { + JanetChannelPending writer; + if (janet_q_pop(&channel->items, item, sizeof(Janet))) { + /* Queue empty */ + JanetChannelPending pending; + pending.fiber = janet_vm_root_fiber, + pending.sched_id = janet_vm_root_fiber->sched_id; + pending.mode = is_choice ? JANET_CP_MODE_CHOICE_READ : JANET_CP_MODE_ITEM; + janet_q_push(&channel->read_pending, &pending, sizeof(pending)); + return 0; + } + if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { + /* pending writer */ + if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { + janet_schedule(writer.fiber, make_write_result(channel)); + } else { + janet_schedule(writer.fiber, janet_wrap_abstract(channel)); + } + } + return 1; +} + +/* Channel Methods */ + +static Janet cfun_channel_push(int32_t argc, Janet *argv) { + janet_fixarity(argc, 2); + JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); + if (janet_channel_push(channel, argv[1], 0)) { + janet_await(); + } return argv[0]; } static Janet cfun_channel_pop(int32_t argc, Janet *argv) { janet_fixarity(argc, 1); JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); - Janet item = janet_wrap_nil(); - JanetChannelPending writer; - if (janet_q_pop(&channel->items, &item, sizeof(item))) { - /* Queue empty */ - JanetChannelPending pending; - pending.fiber = janet_vm_root_fiber, - pending.sched_id = janet_vm_root_fiber->sched_id; - pending.mode = JANET_CP_MODE_ITEM; - janet_q_push(&channel->read_pending, &pending, sizeof(pending)); - janet_await(); - } - janet_schedule(janet_vm_root_fiber, item); - if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { - /* Got item, and there are pending writers. This means we should - * schedule one. */ - janet_schedule(writer.fiber, argv[0]); + Janet item; + if (janet_channel_pop(channel, &item, 0)) { + janet_schedule(janet_vm_root_fiber, item); } janet_await(); } -static Janet cfun_channel_select(int32_t argc, Janet *argv) { +static Janet cfun_channel_choice(int32_t argc, Janet *argv) { janet_arity(argc, 1, -1); + int32_t len; + const Janet *data; + + /* Check channels for immediate reads and writes */ for (int32_t i = 0; i < argc; i++) { - JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT); - if (chan->items.head != chan->items.tail) return argv[i]; + if (janet_indexed_view(argv[i], &data, &len) && len == 2) { + /* Write */ + JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT); + if (janet_q_count(&chan->items) < chan->limit) { + janet_channel_push(chan, data[1], 1); + return make_write_result(chan); + } + } else { + /* Read */ + JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT); + if (chan->items.head != chan->items.tail) { + Janet item; + janet_channel_pop(chan, &item, 1); + return make_read_result(chan, item); + } + } } - /* None of the channels have data, so we wait on all of them. */ + + /* Wait for all readers or writers */ for (int32_t i = 0; i < argc; i++) { - JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT); - JanetChannelPending pending; - pending.fiber = janet_vm_root_fiber, - pending.sched_id = janet_vm_root_fiber->sched_id; - pending.mode = JANET_CP_MODE_SELECT; - janet_q_push(&chan->read_pending, &pending, sizeof(pending)); + if (janet_indexed_view(argv[i], &data, &len) && len == 2) { + /* Write */ + JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT); + janet_channel_push(chan, data[1], 1); + } else { + /* Read */ + Janet item; + JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT); + janet_channel_pop(chan, &item, 1); + } } + janet_await(); } @@ -581,15 +631,19 @@ static Janet cfun_channel_count(int32_t argc, Janet *argv) { return janet_wrap_integer(janet_q_count(&channel->items)); } -static Janet cfun_channel_rselect(int32_t argc, Janet *argv) { - /* Fisher yates shuffle of arguments to get fairness */ +/* Fisher yates shuffle of arguments to get fairness */ +static void fisher_yates_args(int32_t argc, Janet *argv) { for (int32_t i = argc; i > 1; i--) { int32_t swap_index = janet_rng_u32(&janet_vm_ev_rng) % i; Janet temp = argv[swap_index]; argv[swap_index] = argv[i - 1]; argv[i - 1] = temp; } - return cfun_channel_select(argc, argv); +} + +static Janet cfun_channel_rchoice(int32_t argc, Janet *argv) { + fisher_yates_args(argc, argv); + return cfun_channel_choice(argc, argv); } static Janet cfun_channel_new(int32_t argc, Janet *argv) { @@ -602,29 +656,48 @@ static Janet cfun_channel_new(int32_t argc, Janet *argv) { /* Main event loop */ -void janet_loop1_impl(void); +void janet_loop1_impl(int has_timeout, JanetTimestamp timeout); -void janet_loop(void) { - while (janet_vm_active_listeners || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) { - /* Run expired timers */ +void janet_loop1(void) { + /* Schedule expired timers */ + JanetTimeout to; + JanetTimestamp now = ts_now(); + while (peek_timeout(&to) && to.when <= now) { + pop_timeout(0); + if (to.fiber->sched_id == to.sched_id) { + if (to.is_error) { + janet_cancel(to.fiber, janet_cstringv("timeout")); + } else { + janet_schedule(to.fiber, janet_wrap_nil()); + } + } + } + /* Run scheduled fibers */ + while (janet_vm_spawn.head != janet_vm_spawn.tail) { + JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK}; + janet_q_pop(&janet_vm_spawn, &task, sizeof(task)); + run_one(task.fiber, task.value, task.sig); + } + /* Poll for events */ + if (janet_vm_active_listeners || janet_vm_tq_count) { JanetTimeout to; - while (peek_timeout(&to) && to.when <= ts_now()) { + memset(&to, 0, sizeof(to)); + int has_timeout; + /* Drop timeouts that are no longer needed */ + while ((has_timeout = peek_timeout(&to)) && to.fiber->sched_id != to.sched_id) { pop_timeout(0); - janet_schedule(to.fiber, janet_wrap_nil()); - } - /* Run scheduled fibers */ - while (janet_vm_spawn.head != janet_vm_spawn.tail) { - JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK}; - janet_q_pop(&janet_vm_spawn, &task, sizeof(task)); - run_one(task.fiber, task.value, task.sig); - } - /* Poll for events */ - if (janet_vm_active_listeners || janet_vm_tq_count) { - janet_loop1_impl(); } + /* Run polling implementation */ + janet_loop1_impl(has_timeout, to.when); } } +void janet_loop(void) { + while (janet_vm_active_listeners || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) { + janet_loop1(); + } +} +; #ifdef JANET_EV_EPOLL /* @@ -692,17 +765,12 @@ static void janet_unlisten(JanetListenerState *state) { } #define JANET_EPOLL_MAX_EVENTS 64 -void janet_loop1_impl(void) { - /* Set timer */ - JanetTimeout to; - struct itimerspec its; - memset(&to, 0, sizeof(to)); - int has_timeout = peek_timeout(&to); +void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { if (janet_vm_timer_enabled || has_timeout) { memset(&its, 0, sizeof(its)); if (has_timeout) { - its.it_value.tv_sec = to.when / 1000; - its.it_value.tv_nsec = (to.when % 1000) * 1000000; + its.it_value.tv_sec = timeout / 1000; + its.it_value.tv_nsec = (timeout % 1000) * 1000000; } timerfd_settime(janet_vm_timerfd, TFD_TIMER_ABSTIME, &its, NULL); } @@ -721,17 +789,7 @@ void janet_loop1_impl(void) { /* Step state machines */ for (int i = 0; i < ready; i++) { JanetPollable *pollable = events[i].data.ptr; - if (NULL == pollable) { - /* Timer event */ - pop_timeout(0); - /* Cancel waiters for this fiber */ - if (to.is_error) { - janet_cancel(to.fiber, janet_cstringv("timeout")); - } else { - janet_schedule(to.fiber, janet_wrap_nil()); - } - } else { - /* Normal event */ + if (NULL != pollable) { /* If NULL, is a timeout */ int mask = events[i].events; JanetListenerState *state = pollable->state; while (NULL != state) { @@ -845,18 +903,13 @@ static void janet_unlisten(JanetListenerState *state) { janet_unlisten_impl(state); } -void janet_loop1_impl(void) { - /* Set timer */ - JanetTimeout to; - memset(&to, 0, sizeof(to)); - int has_timeout = peek_timeout(&to); - +void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { /* Poll for events */ int ready; do { if (has_timeout) { - int64_t diff = to.when - ts_now(); - ready = poll(janet_vm_fds, janet_vm_fdcount, diff < 0 ? 0 : (int) diff); + JanetTimestamp now = ts_now(); + ready = poll(janet_vm_fds, janet_vm_fdcount, now > timeout ? 0 : (int) (timeout - now)); } else { ready = poll(janet_vm_fds, janet_vm_fdcount, -1); } @@ -866,10 +919,8 @@ void janet_loop1_impl(void) { } /* Step state machines */ - int did_handle_something = 0; for (size_t i = 0; i < janet_vm_fdcount; i++) { struct pollfd *pfd = janet_vm_fds + i; - did_handle_something |= pfd->revents; /* Skip fds where nothing interesting happened */ if (!(pfd->revents & (pfd->events | POLLHUP | POLLERR | POLLNVAL))) continue; JanetListenerState *state = janet_vm_listener_map[i]; @@ -884,19 +935,6 @@ void janet_loop1_impl(void) { if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE) janet_unlisten(state); } - - /* If nothing was handled and poll returned, then we know that it timedout and we should trigger - * one of our timers. */ - if (!did_handle_something) { - /* Timer event */ - pop_timeout(0); - /* Cancel waiters for this fiber */ - if (to.is_error) { - janet_cancel(to.fiber, janet_cstringv("timeout")); - } else { - janet_schedule(to.fiber, janet_wrap_nil()); - } - } } void janet_ev_init(void) { @@ -945,6 +983,7 @@ static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { to.when = ts_delta(ts_now(), sec); to.fiber = janet_vm_root_fiber; to.is_error = 0; + to.sched_id = to.fiber->sched_id; add_timeout(to); janet_await(); } @@ -1006,25 +1045,24 @@ static const JanetReg ev_cfuns[] = { JDOC("(ev/count channel)\n\n" "Get the number of items currently waiting in a channel.") }, - { - "ev/select", cfun_channel_select, - JDOC("(ev/select & channels)\n\n" - "Get a channel that is not empty, suspending the current fiber until at least one channel " - "is not empty. Will prefer channels in the order they are passed as arguments (ordered choice). " - "Returns a non-empty channel.") - }, - { - "ev/rselect", cfun_channel_rselect, - JDOC("(ev/rselect & channels)\n\n" - "Get a channel that is not empty, suspending the current fiber until at least one channel " - "is not empty. Will prefer channels in a random order (random choice). " - "Returns a non-empty channel.") - }, { "ev/cancel", cfun_ev_cancel, JDOC("(ev/cancel fiber err)\n\n" "Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately") }, + { + "ev/select", cfun_channel_choice, + JDOC("(ev/select & clauses)\n\n" + "Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where " + "a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for " + "a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first " + "clauses will take precedence over later clauses.") + }, + { + "ev/rselect", cfun_channel_rchoice, + JDOC("(ev/rselect & clauses)\n\n" + "Similar to ev/choice, but will try clauses in a random order for fairness.") + }, {NULL, NULL, NULL} }; diff --git a/src/core/fiber.c b/src/core/fiber.c index 8ac94ff6..4618740f 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -39,7 +39,6 @@ static void fiber_reset(JanetFiber *fiber) { fiber->env = NULL; #ifdef JANET_EV fiber->waiting = NULL; - fiber->timeout_index = -1; fiber->sched_id = 0; #endif janet_fiber_set_status(fiber, JANET_STATUS_NEW); diff --git a/src/core/marsh.c b/src/core/marsh.c index 0f795f07..96767a53 100644 --- a/src/core/marsh.c +++ b/src/core/marsh.c @@ -934,8 +934,10 @@ static const uint8_t *unmarshal_one_fiber( fiber->data = NULL; fiber->child = NULL; fiber->env = NULL; +#ifdef JANET_EV fiber->waiting = NULL; - fiber->timeout_index = -1; + fiber->sched_id = 0; +#endif /* Push fiber to seen stack */ janet_v_push(st->lookup, janet_wrap_fiber(fiber)); diff --git a/src/include/janet.h b/src/include/janet.h index 287e78d4..c717d1e3 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -826,7 +826,6 @@ struct JanetFiber { JanetFiber *child; /* Keep linked list of fibers for restarting pending fibers */ #ifdef JANET_EV JanetListenerState *waiting; - int32_t timeout_index; uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */ #endif };