From fb26c9b2c4b49af5b6a21d922b9d34a9a8888291 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 9 Aug 2020 00:20:27 -0500 Subject: [PATCH] Add ev/select and ev/rselect initial implementation. Getting closer to a CSP implmententation. Probably useful to move scheduling fields outside of fibers and into an external table. --- examples/select.janet | 22 ++++++++ src/core/ev.c | 122 ++++++++++++++++++++++++++++++++++-------- src/core/fiber.c | 1 + src/include/janet.h | 1 + 4 files changed, 124 insertions(+), 22 deletions(-) create mode 100644 examples/select.janet diff --git a/examples/select.janet b/examples/select.janet new file mode 100644 index 00000000..98e3610d --- /dev/null +++ b/examples/select.janet @@ -0,0 +1,22 @@ +(def channels + (seq [:repeat 5] (ev/chan 4))) + +(defn writer [c] + (for i 0 3 + (ev/sleep 0.1) + (print "writer giving item " i " to " c "...") + (ev/give c (string "item " i))) + (print "Done!")) + +(defn reader [name] + (forever + (def c (ev/select ;channels)) + (print "reader " name " got " (ev/take c) " from " c))) + +# Readers +(each letter [:a :b :c :d :e :f :g] + (ev/call reader letter)) + +# Writers +(each c channels + (ev/call writer c)) diff --git a/src/core/ev.c b/src/core/ev.c index 913c2d80..e0b2783d 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -86,21 +86,25 @@ static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) { if (q->head > q->tail) { /* Two segments, fix 2nd seg. */ int32_t newhead = q->head + (newcap - q->capacity); - int32_t seg1 = q->capacity - q->head; - memmove(q->data + newhead, q->data + q->head, seg1 * itemsize); + size_t seg1 = (size_t)(q->capacity - q->head); + if (seg1 > 0) { + memmove(q->data + (newhead * itemsize), + q->data + (q->head * itemsize), + seg1 * itemsize); + } q->head = newhead; } q->capacity = newcap; } - memcpy(q->data + itemsize * q->tail++, item, itemsize); - if (q->tail >= q->capacity) q->tail = 0; + memcpy(q->data + itemsize * q->tail, item, itemsize); + q->tail = q->tail + 1 < q->capacity ? q->tail + 1 : 0; return 0; } static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) { if (q->head == q->tail) return 1; - memcpy(out, q->data + itemsize * q->head++, itemsize); - if (q->head >= q->capacity) q->head = 0; + memcpy(out, q->data + itemsize * q->head, itemsize); + q->head = q->head + 1 < q->capacity ? q->head + 1 : 0; return 0; } @@ -128,6 +132,7 @@ JANET_THREAD_LOCAL size_t janet_vm_tq_count = 0; JANET_THREAD_LOCAL size_t janet_vm_tq_capacity = 0; JANET_THREAD_LOCAL JanetQueue janet_vm_spawn; JANET_THREAD_LOCAL JanetTimeout *janet_vm_tq = NULL; +JANET_THREAD_LOCAL JanetRNG janet_vm_ev_rng; /* Get current timestamp (millisecond precision) */ static JanetTimestamp ts_now(void); @@ -308,6 +313,7 @@ void janet_cancel(JanetFiber *fiber) { void janet_schedule(JanetFiber *fiber, Janet value) { if (fiber->flags & JANET_FIBER_FLAG_SCHEDULED) return; fiber->flags |= JANET_FIBER_FLAG_SCHEDULED; + fiber->sched_id++; JanetTask t = { fiber, value }; janet_q_push(&janet_vm_spawn, &t, sizeof(t)); } @@ -352,6 +358,7 @@ void janet_ev_init_common(void) { janet_vm_tq = NULL; janet_vm_tq_count = 0; janet_vm_tq_capacity = 0; + janet_rng_seed(&janet_vm_ev_rng, 0); } /* Common deinit code */ @@ -375,6 +382,16 @@ void janet_addtimeout(double sec) { /* Channels */ +typedef struct { + JanetFiber *fiber; + uint32_t sched_id; + enum { + JANET_CP_MODE_NONE, + JANET_CP_MODE_ITEM, + JANET_CP_MODE_SELECT + } mode; +} JanetChannelPending; + typedef struct { JanetQueue items; JanetQueue read_pending; @@ -421,15 +438,15 @@ static int janet_chanat_gc(void *p, size_t s) { } static void janet_chanat_mark_fq(JanetQueue *fq) { - JanetFiber **fibers = fq->data; + JanetChannelPending *pending = fq->data; if (fq->head <= fq->tail) { for (int32_t i = fq->head; i < fq->tail; i++) - janet_mark(janet_wrap_fiber(fibers[i])); + janet_mark(janet_wrap_fiber(pending[i].fiber)); } else { for (int32_t i = fq->head; i < fq->capacity; i++) - janet_mark(janet_wrap_fiber(fibers[i])); + janet_mark(janet_wrap_fiber(pending[i].fiber)); for (int32_t i = 0; i < fq->tail; i++) - janet_mark(janet_wrap_fiber(fibers[i])); + janet_mark(janet_wrap_fiber(pending[i].fiber)); } } @@ -457,18 +474,30 @@ static int janet_chanat_mark(void *p, size_t s) { static Janet cfun_channel_push(int32_t argc, Janet *argv) { janet_fixarity(argc, 2); JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); - JanetFiber *reader; - if (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) { - /* Pending reader */ - janet_schedule(reader, argv[1]); - } else { + JanetChannelPending reader; + int is_empty; + do { + is_empty = janet_q_pop(&channel->read_pending, &reader, sizeof(reader)); + } while (!is_empty && (reader.sched_id != reader.fiber->sched_id)); + if (is_empty) { /* No pending reader */ if (janet_q_push(&channel->items, argv + 1, sizeof(Janet))) { janet_panicf("channel overflow: %v", argv[1]); } else if (janet_q_count(&channel->items) > channel->limit) { /* Pushed successfully, but should block. */ - janet_q_push(&channel->write_pending, &janet_vm_root_fiber, sizeof(JanetFiber *)); - janet_await(); + JanetChannelPending pending; + pending.fiber = janet_vm_root_fiber, + pending.sched_id = janet_vm_root_fiber->sched_id, + pending.mode = JANET_CP_MODE_ITEM; + janet_q_push(&channel->write_pending, &pending, sizeof(pending)); + } + } else { + /* Pending reader */ + if (reader.mode == JANET_CP_MODE_SELECT) { + janet_q_push(&channel->items, argv + 1, sizeof(Janet)); + janet_schedule(reader.fiber, argv[0]); + } else { + janet_schedule(reader.fiber, argv[1]); } } return argv[0]; @@ -478,17 +507,41 @@ static Janet cfun_channel_pop(int32_t argc, Janet *argv) { janet_fixarity(argc, 1); JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); Janet item = janet_wrap_nil(); - JanetFiber *writer; + JanetChannelPending writer; if (janet_q_pop(&channel->items, &item, sizeof(item))) { /* Queue empty */ - janet_q_push(&channel->read_pending, &janet_vm_root_fiber, sizeof(JanetFiber *)); + JanetChannelPending pending; + pending.fiber = janet_vm_root_fiber, + pending.sched_id = janet_vm_root_fiber->sched_id; + pending.mode = JANET_CP_MODE_ITEM; + janet_q_push(&channel->read_pending, &pending, sizeof(pending)); janet_await(); - } else if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { + } + janet_schedule(janet_vm_root_fiber, item); + if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { /* Got item, and there are pending writers. This means we should * schedule one. */ - janet_schedule(writer, argv[0]); + janet_schedule(writer.fiber, argv[0]); } - return item; + janet_await(); +} + +static Janet cfun_channel_select(int32_t argc, Janet *argv) { + janet_arity(argc, 1, -1); + for (int32_t i = 0; i < argc; i++) { + JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT); + if (chan->items.head != chan->items.tail) return argv[i]; + } + /* None of the channels have data, so we wait on all of them. */ + for (int32_t i = 0; i < argc; i++) { + JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT); + JanetChannelPending pending; + pending.fiber = janet_vm_root_fiber, + pending.sched_id = janet_vm_root_fiber->sched_id; + pending.mode = JANET_CP_MODE_SELECT; + janet_q_push(&chan->read_pending, &pending, sizeof(pending)); + } + janet_await(); } static Janet cfun_channel_full(int32_t argc, Janet *argv) { @@ -509,6 +562,17 @@ static Janet cfun_channel_count(int32_t argc, Janet *argv) { return janet_wrap_integer(janet_q_count(&channel->items)); } +static Janet cfun_channel_rselect(int32_t argc, Janet *argv) { + /* Fisher yates shuffle of arguments to get fairness */ + for (int32_t i = argc; i > 1; i--) { + int32_t swap_index = janet_rng_u32(&janet_vm_ev_rng) % i; + Janet temp = argv[swap_index]; + argv[swap_index] = argv[i - 1]; + argv[i - 1] = temp; + } + return cfun_channel_select(argc, argv); +} + static Janet cfun_channel_new(int32_t argc, Janet *argv) { janet_arity(argc, 0, 1); int32_t limit = janet_optnat(argv, argc, 0, 0); @@ -773,6 +837,20 @@ static const JanetReg ev_cfuns[] = { JDOC("(ev/count channel)\n\n" "Get the number of items currently waiting in a channel.") }, + { + "ev/select", cfun_channel_select, + JDOC("(ev/select & channels)\n\n" + "Get a channel that is not empty, suspending the current fiber until at least one channel " + "is not empty. Will prefer channels in the order they are passed as arguments (ordered choice). " + "Returns a non-empty channel.") + }, + { + "ev/rselect", cfun_channel_rselect, + JDOC("(ev/rselect & channels)\n\n" + "Get a channel that is not empty, suspending the current fiber until at least one channel " + "is not empty. Will prefer channels in a random order (random choice). " + "Returns a non-empty channel.") + }, {NULL, NULL, NULL} }; diff --git a/src/core/fiber.c b/src/core/fiber.c index d5d78fe9..e8d8b985 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -40,6 +40,7 @@ static void fiber_reset(JanetFiber *fiber) { #ifdef JANET_EV fiber->waiting = NULL; fiber->timeout_index = -1; + fiber->sched_id = 0; #endif janet_fiber_set_status(fiber, JANET_STATUS_NEW); } diff --git a/src/include/janet.h b/src/include/janet.h index e1e515c1..9feaca65 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -806,6 +806,7 @@ struct JanetFiber { #ifdef JANET_EV JanetListenerState *waiting; int32_t timeout_index; + uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */ #endif };