From 4f2d1cdc009172f0b73419805253e6136f664fb9 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Tue, 12 Jan 2021 21:35:28 -0600 Subject: [PATCH] Go back to a single supervisor channel per fiber. We now also use the fiber mask to figure out which flags to wait for. --- src/boot/boot.janet | 18 ++++++------ src/core/ev.c | 70 +++++++++++++++++--------------------------- src/core/fiber.c | 8 ++--- src/core/gc.c | 10 ++----- src/include/janet.h | 4 +-- test/suite0009.janet | 6 ++++ 6 files changed, 47 insertions(+), 69 deletions(-) diff --git a/src/boot/boot.janet b/src/boot/boot.janet index 8f31e0fe..5766939a 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -3163,12 +3163,12 @@ (,resume ,f)))) (defn- wait-for-fibers - [chan n] - (repeat n - (def fiber (ev/take chan)) - (def x (fiber/last-value fiber)) - (if (not= :dead (fiber/status fiber)) - (propagate x fiber)))) + [chan fibers] + (repeat (length fibers) + (def [sig fiber] (ev/take chan)) + (unless (= sig :ok) + (each f fibers (ev/cancel f "sibling canceled")) + (propagate (fiber/last-value fiber) fiber)))) (defmacro ev/gather `` @@ -3180,9 +3180,9 @@ ~(do (def ,chan (,ev/chan)) (def ,res @[]) - ,;(seq [[i body] :pairs bodies] - ~(,ev/go (,fiber/new (fn [] (put ,res ,i ,body))) nil ,chan)) - (,wait-for-fibers ,chan ,(length bodies)) + (,wait-for-fibers ,chan + ,(seq [[i body] :pairs bodies] + ~(,ev/go (,fiber/new (fn [] (put ,res ,i ,body)) :tp) nil ,chan))) ,res))) (undef wait-for-fibers)) diff --git a/src/core/ev.c b/src/core/ev.c index 2547bd4c..42b599f1 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -507,24 +507,25 @@ void janet_ev_mark(void) { static int janet_channel_push(JanetChannel *channel, Janet x, int mode); +static Janet make_supervisor_event(const char *name, JanetFiber *fiber) { + Janet tup[2]; + tup[0] = janet_ckeywordv(name); + tup[1] = janet_wrap_fiber(fiber); + return janet_wrap_tuple(janet_tuple_n(tup, 2)); +} + /* Run a top level task */ static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) { fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED; Janet res; JanetSignal sig = janet_continue_signal(fiber, value, &res, sigin); - if (sig != JANET_SIGNAL_EVENT && sig != JANET_SIGNAL_YIELD) { - if (fiber->done_channel) { - JanetChannel *chan = (JanetChannel *)(fiber->done_channel); - janet_channel_push(chan, janet_wrap_fiber(fiber), 2); - fiber->done_channel = NULL; - fiber->event_channel = NULL; - fiber->new_channel = NULL; - } else if (sig != JANET_SIGNAL_OK) { + JanetChannel *chan = (JanetChannel *)(fiber->supervisor_channel); + if (NULL == chan) { + if (sig != JANET_SIGNAL_EVENT && sig != JANET_SIGNAL_YIELD) { janet_stacktrace(fiber, res); } - } else if (fiber->event_channel) { - JanetChannel *chan = (JanetChannel *)(fiber->event_channel); - janet_channel_push(chan, janet_wrap_fiber(fiber), 2); + } else if (sig == JANET_SIGNAL_OK || (fiber->flags & (1 << sig))) { + janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], fiber), 2); } } @@ -2001,25 +2002,21 @@ error: #endif } +static void janet_ev_go(JanetFiber *fiber, Janet value, JanetChannel *supervisor_channel) { + fiber->supervisor_channel = supervisor_channel; + /* janet_channel_push(supervisor_channel, make_supervisor_event("new", fiber), 2); */ + janet_schedule(fiber, value); +} + /* C functions */ static Janet cfun_ev_go(int32_t argc, Janet *argv) { - janet_arity(argc, 1, 5); + janet_arity(argc, 1, 3); JanetFiber *fiber = janet_getfiber(argv, 0); Janet value = argc == 2 ? argv[1] : janet_wrap_nil(); - JanetChannel *done_channel = janet_optabstract(argv, argc, 2, &ChannelAT, - janet_vm_root_fiber->done_channel); - JanetChannel *new_channel = janet_optabstract(argv, argc, 3, &ChannelAT, - janet_vm_root_fiber->new_channel); - JanetChannel *event_channel = janet_optabstract(argv, argc, 4, &ChannelAT, - janet_vm_root_fiber->event_channel); - fiber->done_channel = done_channel; - fiber->new_channel = new_channel; - fiber->event_channel = event_channel; - if (new_channel != NULL) { - janet_channel_push((JanetChannel *) new_channel, janet_wrap_fiber(fiber), 2); - } - janet_schedule(fiber, value); + JanetChannel *supervisor_channel = janet_optabstract(argv, argc, 2, &ChannelAT, + janet_vm_root_fiber->supervisor_channel); + janet_ev_go(fiber, value, supervisor_channel); return argv[0]; } @@ -2030,13 +2027,7 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) { if (NULL == fiber) janet_panicf("invalid arity to function %v", argv[0]); fiber->env = janet_table(0); fiber->env->proto = janet_current_fiber()->env; - fiber->done_channel = janet_vm_root_fiber->done_channel; - fiber->event_channel = janet_vm_root_fiber->event_channel; - fiber->new_channel = janet_vm_root_fiber->new_channel; - if (fiber->new_channel != NULL) { - janet_channel_push((JanetChannel *) fiber->new_channel, janet_wrap_fiber(fiber), 2); - } - janet_schedule(fiber, janet_wrap_nil()); + janet_ev_go(fiber, janet_wrap_nil(), (JanetChannel *)(janet_vm_root_fiber->supervisor_channel)); return janet_wrap_fiber(fiber); } @@ -2141,19 +2132,12 @@ static const JanetReg ev_cfuns[] = { }, { "ev/go", cfun_ev_go, - JDOC("(ev/go fiber &opt value done-chan new-chan event-chan)\n\n" + JDOC("(ev/go fiber &opt value supervisor)\n\n" "Put a fiber on the event loop to be resumed later. Optionally pass " "a value to resume with, otherwise resumes with nil. Returns the fiber. " - "\n\n" - "Three optional `core/channel`s can be provided as well. These are channels " - "that will the fiber will be pushed to with `ev/give` on the relevant event. " - "\n\n" - "* `done-chan` - when `fiber` completes or errors, it will be pushed to this channel.\n" - "* `new-chan` - `fiber` will be pushed to this channel right away. Nested calls to `ev/go` " - "that don't set `new-chan` will also push to this channel.\n" - "* `event-chan` - when `fiber` yields to the event loop it will be pushed to this channel.\n\n" - "With these channels, the programmer can implement a \"supervisor\" for fibers for a fault " - "tolerant application.") + "An optional `core/channel` can be provided as well as a supervisor. When various " + "events occur in the newly scheduled fiber, an event will be pushed to the supervisor. " + "If not provided, the new fiber will inherit the current supervisor.") }, { "ev/sleep", cfun_ev_sleep, diff --git a/src/core/fiber.c b/src/core/fiber.c index a1d09d59..03576021 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -41,9 +41,7 @@ static void fiber_reset(JanetFiber *fiber) { #ifdef JANET_EV fiber->waiting = NULL; fiber->sched_id = 0; - fiber->done_channel = NULL; - fiber->event_channel = NULL; - fiber->new_channel = NULL; + fiber->supervisor_channel = NULL; #endif janet_fiber_set_status(fiber, JANET_STATUS_NEW); } @@ -87,9 +85,7 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE; #ifdef JANET_EV fiber->waiting = NULL; - fiber->done_channel = NULL; - fiber->event_channel = NULL; - fiber->new_channel = NULL; + fiber->supervisor_channel = NULL; #endif return fiber; } diff --git a/src/core/gc.c b/src/core/gc.c index a88f9558..1045e60a 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -268,14 +268,8 @@ recur: janet_mark_table(fiber->env); #ifdef JANET_EV - if (fiber->done_channel) { - janet_mark_abstract(fiber->done_channel); - } - if (fiber->event_channel) { - janet_mark_abstract(fiber->event_channel); - } - if (fiber->new_channel) { - janet_mark_abstract(fiber->new_channel); + if (fiber->supervisor_channel) { + janet_mark_abstract(fiber->supervisor_channel); } #endif diff --git a/src/include/janet.h b/src/include/janet.h index 38586820..4f6781cf 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -846,9 +846,7 @@ struct JanetFiber { * type, say "JanetTask", that as separate from fibers to save a bit of space. */ JanetListenerState *waiting; uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */ - void *done_channel; /* Channel to push self to when complete */ - void *event_channel; /* Channel to push self to when yielding to event loop */ - void *new_channel; /* Channel to push spawned children */ + void *supervisor_channel; /* Channel to push self to when complete */ #endif }; diff --git a/test/suite0009.janet b/test/suite0009.janet index 4dc4d321..df4fb2ac 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -74,6 +74,12 @@ (calc-2 "(+ 9 10 11 12)")) @[10 26 42]) "parallel subprocesses 2") +# ev/gather + +(assert (deep= @[1 2 3] (ev/gather 1 2 3)) "ev/gather 1") +(assert (deep= @[] (ev/gather)) "ev/gather 2") +(assert-error "ev/gather 3" (ev/gather 1 2 (error 3))) + # Net testing (repeat 10