From 475775cc9d38d9295714db940f05c8c387e27a52 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sat, 9 Jan 2021 18:33:40 -0600 Subject: [PATCH] Add a "new_channel" for root fibers. When new fibers are scheduled on the event loop, this new_channel receives the newly created fibers. This lets a fiber track which fibers have been added and let's a user implement a supervisor. Fix formatting. --- src/core/ev.c | 38 +++++++++++++++++++++++--------------- src/core/fiber.c | 6 ++++-- src/core/gc.c | 7 +++++-- src/core/inttypes.c | 4 ++-- src/include/janet.h | 3 ++- 5 files changed, 36 insertions(+), 22 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index ce8640d2..9431b539 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -513,7 +513,7 @@ void janet_ev_mark(void) { } } -static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice); +static int janet_channel_push(JanetChannel *channel, Janet x, int mode); /* Run a top level task */ static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) { @@ -521,10 +521,10 @@ static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) { Janet res; JanetSignal sig = janet_continue_signal(fiber, value, &res, sigin); if (sig != JANET_SIGNAL_EVENT) { - if (fiber->supervisor_channel) { - JanetChannel *chan = (JanetChannel *)(fiber->supervisor_channel); - janet_channel_push(chan, janet_wrap_fiber(fiber), 0); - fiber->supervisor_channel = NULL; + if (fiber->done_channel) { + JanetChannel *chan = (JanetChannel *)(fiber->done_channel); + janet_channel_push(chan, janet_wrap_fiber(fiber), 2); + fiber->done_channel = NULL; } else if (sig != JANET_SIGNAL_OK) { janet_stacktrace(fiber, res); } @@ -665,7 +665,7 @@ static Janet make_read_result(JanetChannel *channel, Janet x) { /* Push a value to a channel, and return 1 if channel should block, zero otherwise. * If the push would block, will add to the write_pending queue in the channel. */ -static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice) { +static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { JanetChannelPending reader; int is_empty; do { @@ -677,14 +677,12 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice) { janet_panicf("channel overflow: %v", x); } else if (janet_q_count(&channel->items) > channel->limit) { /* No root fiber, we are in completion on a root fiber. Don't block. */ - if (NULL == janet_vm_root_fiber) { - return 0; - } + if (mode == 2) return 0; /* Pushed successfully, but should block. */ JanetChannelPending pending; pending.fiber = janet_vm_root_fiber, pending.sched_id = janet_vm_root_fiber->sched_id, - pending.mode = is_choice ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_ITEM; + pending.mode = mode ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_ITEM; janet_q_push(&channel->write_pending, &pending, sizeof(pending)); return 1; } @@ -1979,12 +1977,18 @@ error: /* C functions */ static Janet cfun_ev_go(int32_t argc, Janet *argv) { - janet_arity(argc, 1, 3); + janet_arity(argc, 1, 4); JanetFiber *fiber = janet_getfiber(argv, 0); Janet value = argc == 2 ? argv[1] : janet_wrap_nil(); - JanetChannel *channel = janet_optabstract(argv, argc, 2, &ChannelAT, - janet_vm_root_fiber->supervisor_channel); - fiber->supervisor_channel = channel; + JanetChannel *done_channel = janet_optabstract(argv, argc, 2, &ChannelAT, + janet_vm_root_fiber->done_channel); + JanetChannel *new_channel = janet_optabstract(argv, argc, 3, &ChannelAT, + janet_vm_root_fiber->new_channel); + fiber->done_channel = done_channel; + fiber->new_channel = new_channel; + if (new_channel != NULL) { + janet_channel_push((JanetChannel *) new_channel, janet_wrap_fiber(fiber), 2); + } janet_schedule(fiber, value); return argv[0]; } @@ -1996,7 +2000,11 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) { if (NULL == fiber) janet_panicf("invalid arity to function %v", argv[0]); fiber->env = janet_table(0); fiber->env->proto = janet_current_fiber()->env; - fiber->supervisor_channel = janet_vm_root_fiber->supervisor_channel; + fiber->done_channel = janet_vm_root_fiber->done_channel; + fiber->new_channel = janet_vm_root_fiber->new_channel; + if (fiber->new_channel != NULL) { + janet_channel_push((JanetChannel *) fiber->new_channel, janet_wrap_fiber(fiber), 2); + } janet_schedule(fiber, janet_wrap_nil()); return janet_wrap_fiber(fiber); } diff --git a/src/core/fiber.c b/src/core/fiber.c index 8262a6dd..678dd90d 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -41,7 +41,8 @@ static void fiber_reset(JanetFiber *fiber) { #ifdef JANET_EV fiber->waiting = NULL; fiber->sched_id = 0; - fiber->supervisor_channel = NULL; + fiber->done_channel = NULL; + fiber->new_channel = NULL; #endif janet_fiber_set_status(fiber, JANET_STATUS_NEW); } @@ -85,7 +86,8 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE; #ifdef JANET_EV fiber->waiting = NULL; - fiber->supervisor_channel = NULL; + fiber->done_channel = NULL; + fiber->new_channel = NULL; #endif return fiber; } diff --git a/src/core/gc.c b/src/core/gc.c index 1045e60a..76b21579 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -268,8 +268,11 @@ recur: janet_mark_table(fiber->env); #ifdef JANET_EV - if (fiber->supervisor_channel) { - janet_mark_abstract(fiber->supervisor_channel); + if (fiber->done_channel) { + janet_mark_abstract(fiber->done_channel); + } + if (fiber->new_channel) { + janet_mark_abstract(fiber->new_channel); } #endif diff --git a/src/core/inttypes.c b/src/core/inttypes.c index cb0316aa..4e84a046 100644 --- a/src/core/inttypes.c +++ b/src/core/inttypes.c @@ -389,8 +389,8 @@ static Janet cfun_it_s64_mod(int32_t argc, Janet *argv) { int64_t op2 = janet_unwrap_s64(argv[1]); int64_t x = op1 % op2; *box = (op1 > 0) - ? ((op2 > 0) ? x : (0 == x ? x : x + op2)) - : ((op2 > 0) ? (0 == x ? x : x + op2) : x); + ? ((op2 > 0) ? x : (0 == x ? x : x + op2)) + : ((op2 > 0) ? (0 == x ? x : x + op2) : x); return janet_wrap_abstract(box); } diff --git a/src/include/janet.h b/src/include/janet.h index aa3938d4..8056fa01 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -842,7 +842,8 @@ struct JanetFiber { #ifdef JANET_EV JanetListenerState *waiting; uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */ - void *supervisor_channel; /* Channel to push self to when signaling. */ + void *done_channel; /* Channel to push self to when complete */ + void *new_channel; /* Channel to push spawned children */ #endif };