From 8286b33c5235c47ac0a3f04ac7e9be89588d117d Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sat, 9 Jan 2021 20:23:06 -0600 Subject: [PATCH] Add event-chan argument to ev/go. The event-chan is the final piece of the puzzle for fibers, and will be pushed to when a fiber yields to the event loop. --- src/core/ev.c | 26 ++++++++++++++++++++++---- src/core/fiber.c | 2 ++ src/core/gc.c | 3 +++ src/include/janet.h | 5 +++++ 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index 9431b539..fbcaa148 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -525,9 +525,14 @@ static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) { JanetChannel *chan = (JanetChannel *)(fiber->done_channel); janet_channel_push(chan, janet_wrap_fiber(fiber), 2); fiber->done_channel = NULL; + fiber->event_channel = NULL; + fiber->new_channel = NULL; } else if (sig != JANET_SIGNAL_OK) { janet_stacktrace(fiber, res); } + } else if (fiber->event_channel) { + JanetChannel *chan = (JanetChannel *)(fiber->event_channel); + janet_channel_push(chan, janet_wrap_fiber(fiber), 2); } } @@ -1977,15 +1982,18 @@ error: /* C functions */ static Janet cfun_ev_go(int32_t argc, Janet *argv) { - janet_arity(argc, 1, 4); + janet_arity(argc, 1, 5); JanetFiber *fiber = janet_getfiber(argv, 0); Janet value = argc == 2 ? argv[1] : janet_wrap_nil(); JanetChannel *done_channel = janet_optabstract(argv, argc, 2, &ChannelAT, janet_vm_root_fiber->done_channel); JanetChannel *new_channel = janet_optabstract(argv, argc, 3, &ChannelAT, janet_vm_root_fiber->new_channel); + JanetChannel *event_channel = janet_optabstract(argv, argc, 4, &ChannelAT, + janet_vm_root_fiber->event_channel); fiber->done_channel = done_channel; fiber->new_channel = new_channel; + fiber->event_channel = event_channel; if (new_channel != NULL) { janet_channel_push((JanetChannel *) new_channel, janet_wrap_fiber(fiber), 2); } @@ -2001,6 +2009,7 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) { fiber->env = janet_table(0); fiber->env->proto = janet_current_fiber()->env; fiber->done_channel = janet_vm_root_fiber->done_channel; + fiber->event_channel = janet_vm_root_fiber->event_channel; fiber->new_channel = janet_vm_root_fiber->new_channel; if (fiber->new_channel != NULL) { janet_channel_push((JanetChannel *) fiber->new_channel, janet_wrap_fiber(fiber), 2); @@ -2110,10 +2119,19 @@ static const JanetReg ev_cfuns[] = { }, { "ev/go", cfun_ev_go, - JDOC("(ev/go fiber &opt value chan)\n\n" + JDOC("(ev/go fiber &opt value done-chan new-chan event-chan)\n\n" "Put a fiber on the event loop to be resumed later. Optionally pass " - "a value to resume with, otherwise resumes with nil. If chan is provided, " - "the fiber will push itself to the channel upon completion or error. Returns the fiber.") + "a value to resume with, otherwise resumes with nil. Returns the fiber. " + "\n\n" + "Three optional `core/channel`s can be provided as well. These are channels " + "that will the fiber will be pushed to with `ev/give` on the relevant event. " + "\n\n" + "* `done-chan` - when `fiber` completes or errors, it will be pushed to this channel.\n" + "* `new-chan` - `fiber` will be pushed to this channel right away. Nested calls to `ev/go` " + "that don't set `new-chan` will also push to this channel.\n" + "* `event-chan` - when `fiber` yields to the event loop it will be pushed to this channel.\n\n" + "With these channels, the programmer can implement a \"supervisor\" for fibers for a fault " + "tolerant application.") }, { "ev/sleep", cfun_ev_sleep, diff --git a/src/core/fiber.c b/src/core/fiber.c index 678dd90d..e6d95c1f 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -42,6 +42,7 @@ static void fiber_reset(JanetFiber *fiber) { fiber->waiting = NULL; fiber->sched_id = 0; fiber->done_channel = NULL; + fiber->event_channel = NULL; fiber->new_channel = NULL; #endif janet_fiber_set_status(fiber, JANET_STATUS_NEW); @@ -87,6 +88,7 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t #ifdef JANET_EV fiber->waiting = NULL; fiber->done_channel = NULL; + fiber->event_channel = NULL; fiber->new_channel = NULL; #endif return fiber; diff --git a/src/core/gc.c b/src/core/gc.c index 76b21579..a88f9558 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -271,6 +271,9 @@ recur: if (fiber->done_channel) { janet_mark_abstract(fiber->done_channel); } + if (fiber->event_channel) { + janet_mark_abstract(fiber->event_channel); + } if (fiber->new_channel) { janet_mark_abstract(fiber->new_channel); } diff --git a/src/include/janet.h b/src/include/janet.h index 8056fa01..740fc7f3 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -840,9 +840,14 @@ struct JanetFiber { JanetFiber *child; /* Keep linked list of fibers for restarting pending fibers */ Janet last_value; /* Last returned value from a fiber */ #ifdef JANET_EV + /* These fields are only relevant for fibers that are used as "root fibers" - + * that is, fibers that are scheduled on the event loop and behave much like threads + * in a multi-tasking system. It would be possible to move these fields to a new + * type, say "JanetTask", that as separate from fibers to save a bit of space. */ JanetListenerState *waiting; uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */ void *done_channel; /* Channel to push self to when complete */ + void *event_channel; /* Channel to push self to when yielding to event loop */ void *new_channel; /* Channel to push spawned children */ #endif };