1
0
mirror of https://github.com/janet-lang/janet synced 2025-02-22 11:10:02 +00:00

Go back to a single supervisor channel per fiber.

We now also use the fiber mask to figure out which flags to wait for.
This commit is contained in:
Calvin Rose 2021-01-12 21:35:28 -06:00
parent 61cca10cf6
commit 4f2d1cdc00
6 changed files with 47 additions and 69 deletions

View File

@ -3163,12 +3163,12 @@
(,resume ,f)))) (,resume ,f))))
(defn- wait-for-fibers (defn- wait-for-fibers
[chan n] [chan fibers]
(repeat n (repeat (length fibers)
(def fiber (ev/take chan)) (def [sig fiber] (ev/take chan))
(def x (fiber/last-value fiber)) (unless (= sig :ok)
(if (not= :dead (fiber/status fiber)) (each f fibers (ev/cancel f "sibling canceled"))
(propagate x fiber)))) (propagate (fiber/last-value fiber) fiber))))
(defmacro ev/gather (defmacro ev/gather
`` ``
@ -3180,9 +3180,9 @@
~(do ~(do
(def ,chan (,ev/chan)) (def ,chan (,ev/chan))
(def ,res @[]) (def ,res @[])
,;(seq [[i body] :pairs bodies] (,wait-for-fibers ,chan
~(,ev/go (,fiber/new (fn [] (put ,res ,i ,body))) nil ,chan)) ,(seq [[i body] :pairs bodies]
(,wait-for-fibers ,chan ,(length bodies)) ~(,ev/go (,fiber/new (fn [] (put ,res ,i ,body)) :tp) nil ,chan)))
,res))) ,res)))
(undef wait-for-fibers)) (undef wait-for-fibers))

View File

@ -507,24 +507,25 @@ void janet_ev_mark(void) {
static int janet_channel_push(JanetChannel *channel, Janet x, int mode); static int janet_channel_push(JanetChannel *channel, Janet x, int mode);
static Janet make_supervisor_event(const char *name, JanetFiber *fiber) {
Janet tup[2];
tup[0] = janet_ckeywordv(name);
tup[1] = janet_wrap_fiber(fiber);
return janet_wrap_tuple(janet_tuple_n(tup, 2));
}
/* Run a top level task */ /* Run a top level task */
static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) { static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) {
fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED; fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED;
Janet res; Janet res;
JanetSignal sig = janet_continue_signal(fiber, value, &res, sigin); JanetSignal sig = janet_continue_signal(fiber, value, &res, sigin);
if (sig != JANET_SIGNAL_EVENT && sig != JANET_SIGNAL_YIELD) { JanetChannel *chan = (JanetChannel *)(fiber->supervisor_channel);
if (fiber->done_channel) { if (NULL == chan) {
JanetChannel *chan = (JanetChannel *)(fiber->done_channel); if (sig != JANET_SIGNAL_EVENT && sig != JANET_SIGNAL_YIELD) {
janet_channel_push(chan, janet_wrap_fiber(fiber), 2);
fiber->done_channel = NULL;
fiber->event_channel = NULL;
fiber->new_channel = NULL;
} else if (sig != JANET_SIGNAL_OK) {
janet_stacktrace(fiber, res); janet_stacktrace(fiber, res);
} }
} else if (fiber->event_channel) { } else if (sig == JANET_SIGNAL_OK || (fiber->flags & (1 << sig))) {
JanetChannel *chan = (JanetChannel *)(fiber->event_channel); janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], fiber), 2);
janet_channel_push(chan, janet_wrap_fiber(fiber), 2);
} }
} }
@ -2001,25 +2002,21 @@ error:
#endif #endif
} }
static void janet_ev_go(JanetFiber *fiber, Janet value, JanetChannel *supervisor_channel) {
fiber->supervisor_channel = supervisor_channel;
/* janet_channel_push(supervisor_channel, make_supervisor_event("new", fiber), 2); */
janet_schedule(fiber, value);
}
/* C functions */ /* C functions */
static Janet cfun_ev_go(int32_t argc, Janet *argv) { static Janet cfun_ev_go(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 5); janet_arity(argc, 1, 3);
JanetFiber *fiber = janet_getfiber(argv, 0); JanetFiber *fiber = janet_getfiber(argv, 0);
Janet value = argc == 2 ? argv[1] : janet_wrap_nil(); Janet value = argc == 2 ? argv[1] : janet_wrap_nil();
JanetChannel *done_channel = janet_optabstract(argv, argc, 2, &ChannelAT, JanetChannel *supervisor_channel = janet_optabstract(argv, argc, 2, &ChannelAT,
janet_vm_root_fiber->done_channel); janet_vm_root_fiber->supervisor_channel);
JanetChannel *new_channel = janet_optabstract(argv, argc, 3, &ChannelAT, janet_ev_go(fiber, value, supervisor_channel);
janet_vm_root_fiber->new_channel);
JanetChannel *event_channel = janet_optabstract(argv, argc, 4, &ChannelAT,
janet_vm_root_fiber->event_channel);
fiber->done_channel = done_channel;
fiber->new_channel = new_channel;
fiber->event_channel = event_channel;
if (new_channel != NULL) {
janet_channel_push((JanetChannel *) new_channel, janet_wrap_fiber(fiber), 2);
}
janet_schedule(fiber, value);
return argv[0]; return argv[0];
} }
@ -2030,13 +2027,7 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) {
if (NULL == fiber) janet_panicf("invalid arity to function %v", argv[0]); if (NULL == fiber) janet_panicf("invalid arity to function %v", argv[0]);
fiber->env = janet_table(0); fiber->env = janet_table(0);
fiber->env->proto = janet_current_fiber()->env; fiber->env->proto = janet_current_fiber()->env;
fiber->done_channel = janet_vm_root_fiber->done_channel; janet_ev_go(fiber, janet_wrap_nil(), (JanetChannel *)(janet_vm_root_fiber->supervisor_channel));
fiber->event_channel = janet_vm_root_fiber->event_channel;
fiber->new_channel = janet_vm_root_fiber->new_channel;
if (fiber->new_channel != NULL) {
janet_channel_push((JanetChannel *) fiber->new_channel, janet_wrap_fiber(fiber), 2);
}
janet_schedule(fiber, janet_wrap_nil());
return janet_wrap_fiber(fiber); return janet_wrap_fiber(fiber);
} }
@ -2141,19 +2132,12 @@ static const JanetReg ev_cfuns[] = {
}, },
{ {
"ev/go", cfun_ev_go, "ev/go", cfun_ev_go,
JDOC("(ev/go fiber &opt value done-chan new-chan event-chan)\n\n" JDOC("(ev/go fiber &opt value supervisor)\n\n"
"Put a fiber on the event loop to be resumed later. Optionally pass " "Put a fiber on the event loop to be resumed later. Optionally pass "
"a value to resume with, otherwise resumes with nil. Returns the fiber. " "a value to resume with, otherwise resumes with nil. Returns the fiber. "
"\n\n" "An optional `core/channel` can be provided as well as a supervisor. When various "
"Three optional `core/channel`s can be provided as well. These are channels " "events occur in the newly scheduled fiber, an event will be pushed to the supervisor. "
"that will the fiber will be pushed to with `ev/give` on the relevant event. " "If not provided, the new fiber will inherit the current supervisor.")
"\n\n"
"* `done-chan` - when `fiber` completes or errors, it will be pushed to this channel.\n"
"* `new-chan` - `fiber` will be pushed to this channel right away. Nested calls to `ev/go` "
"that don't set `new-chan` will also push to this channel.\n"
"* `event-chan` - when `fiber` yields to the event loop it will be pushed to this channel.\n\n"
"With these channels, the programmer can implement a \"supervisor\" for fibers for a fault "
"tolerant application.")
}, },
{ {
"ev/sleep", cfun_ev_sleep, "ev/sleep", cfun_ev_sleep,

View File

@ -41,9 +41,7 @@ static void fiber_reset(JanetFiber *fiber) {
#ifdef JANET_EV #ifdef JANET_EV
fiber->waiting = NULL; fiber->waiting = NULL;
fiber->sched_id = 0; fiber->sched_id = 0;
fiber->done_channel = NULL; fiber->supervisor_channel = NULL;
fiber->event_channel = NULL;
fiber->new_channel = NULL;
#endif #endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW); janet_fiber_set_status(fiber, JANET_STATUS_NEW);
} }
@ -87,9 +85,7 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t
janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE; janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE;
#ifdef JANET_EV #ifdef JANET_EV
fiber->waiting = NULL; fiber->waiting = NULL;
fiber->done_channel = NULL; fiber->supervisor_channel = NULL;
fiber->event_channel = NULL;
fiber->new_channel = NULL;
#endif #endif
return fiber; return fiber;
} }

View File

@ -268,14 +268,8 @@ recur:
janet_mark_table(fiber->env); janet_mark_table(fiber->env);
#ifdef JANET_EV #ifdef JANET_EV
if (fiber->done_channel) { if (fiber->supervisor_channel) {
janet_mark_abstract(fiber->done_channel); janet_mark_abstract(fiber->supervisor_channel);
}
if (fiber->event_channel) {
janet_mark_abstract(fiber->event_channel);
}
if (fiber->new_channel) {
janet_mark_abstract(fiber->new_channel);
} }
#endif #endif

View File

@ -846,9 +846,7 @@ struct JanetFiber {
* type, say "JanetTask", that as separate from fibers to save a bit of space. */ * type, say "JanetTask", that as separate from fibers to save a bit of space. */
JanetListenerState *waiting; JanetListenerState *waiting;
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */ uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
void *done_channel; /* Channel to push self to when complete */ void *supervisor_channel; /* Channel to push self to when complete */
void *event_channel; /* Channel to push self to when yielding to event loop */
void *new_channel; /* Channel to push spawned children */
#endif #endif
}; };

View File

@ -74,6 +74,12 @@
(calc-2 "(+ 9 10 11 12)")) (calc-2 "(+ 9 10 11 12)"))
@[10 26 42]) "parallel subprocesses 2") @[10 26 42]) "parallel subprocesses 2")
# ev/gather
(assert (deep= @[1 2 3] (ev/gather 1 2 3)) "ev/gather 1")
(assert (deep= @[] (ev/gather)) "ev/gather 2")
(assert-error "ev/gather 3" (ev/gather 1 2 (error 3)))
# Net testing # Net testing
(repeat 10 (repeat 10