diff --git a/src/boot/boot.janet b/src/boot/boot.janet index fb700200..33ff3298 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -3138,7 +3138,32 @@ (with-syms [f] ~(let [,f (coro ,;body)] (,ev/deadline ,deadline nil ,f) - (,resume ,f))))) + (,resume ,f)))) + + (defn- wait-for-fibers + [chan n] + (repeat n + (def fiber (ev/take chan)) + (def x (fiber/last-value fiber)) + (if (not= :dead (fiber/status fiber)) + (propagate x fiber)))) + + (defmacro ev/gather + `` + Run a number of fibers in parallel on the event loop, and join when they complete. + Returns the gathered results in an array. + `` + [& bodies] + (with-syms [chan res] + ~(do + (def ,chan (,ev/chan)) + (def ,res @[]) + ,;(seq [[i body] :pairs bodies] + ~(,ev/go (,fiber/new (fn [] (put ,res ,i ,body))) nil ,chan)) + (,wait-for-fibers ,chan ,(length bodies)) + ,res))) + + (undef wait-for-fibers)) (compwhen (dyn 'net/listen) (defn net/server diff --git a/src/core/ev.c b/src/core/ev.c index 50b1a21c..a9073464 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -63,6 +63,23 @@ typedef struct { void *data; } JanetQueue; +typedef struct { + JanetFiber *fiber; + uint32_t sched_id; + enum { + JANET_CP_MODE_ITEM, + JANET_CP_MODE_CHOICE_READ, + JANET_CP_MODE_CHOICE_WRITE + } mode; +} JanetChannelPending; + +typedef struct { + JanetQueue items; + JanetQueue read_pending; + JanetQueue write_pending; + int32_t limit; +} JanetChannel; + #define JANET_MAX_Q_CAPACITY 0x7FFFFFF static void janet_q_init(JanetQueue *q) { @@ -496,13 +513,21 @@ void janet_ev_mark(void) { } } +static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice); + /* Run a top level task */ static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) { fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED; Janet res; JanetSignal sig = janet_continue_signal(fiber, value, &res, sigin); - if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) { - janet_stacktrace(fiber, res); + if (sig != JANET_SIGNAL_EVENT) { + if (fiber->supervisor_channel) { + JanetChannel *chan = (JanetChannel *)(fiber->supervisor_channel); + janet_channel_push(chan, janet_wrap_fiber(fiber), 0); + fiber->supervisor_channel = NULL; + } else if (sig != JANET_SIGNAL_OK) { + janet_stacktrace(fiber, res); + } } } @@ -553,23 +578,6 @@ void janet_ev_dec_refcount(void) { /* Channels */ -typedef struct { - JanetFiber *fiber; - uint32_t sched_id; - enum { - JANET_CP_MODE_ITEM, - JANET_CP_MODE_CHOICE_READ, - JANET_CP_MODE_CHOICE_WRITE - } mode; -} JanetChannelPending; - -typedef struct { - JanetQueue items; - JanetQueue read_pending; - JanetQueue write_pending; - int32_t limit; -} JanetChannel; - #define JANET_MAX_CHANNEL_CAPACITY 0xFFFFFF static void janet_chan_init(JanetChannel *chan, int32_t limit) { @@ -668,6 +676,10 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice) { if (janet_q_push(&channel->items, &x, sizeof(Janet))) { janet_panicf("channel overflow: %v", x); } else if (janet_q_count(&channel->items) > channel->limit) { + /* No root fiber, we are in completion on a root fiber. Don't block. */ + if (NULL == janet_vm_root_fiber) { + return 0; + } /* Pushed successfully, but should block. */ JanetChannelPending pending; pending.fiber = janet_vm_root_fiber, @@ -1967,9 +1979,11 @@ error: /* C functions */ static Janet cfun_ev_go(int32_t argc, Janet *argv) { - janet_arity(argc, 1, 2); + janet_arity(argc, 1, 3); JanetFiber *fiber = janet_getfiber(argv, 0); Janet value = argc == 2 ? argv[1] : janet_wrap_nil(); + JanetChannel *channel = janet_optabstract(argv, argc, 2, &ChannelAT, NULL); + fiber->supervisor_channel = channel; janet_schedule(fiber, value); return argv[0]; } @@ -2086,9 +2100,10 @@ static const JanetReg ev_cfuns[] = { }, { "ev/go", cfun_ev_go, - JDOC("(ev/go fiber &opt value)\n\n" + JDOC("(ev/go fiber &opt value chan)\n\n" "Put a fiber on the event loop to be resumed later. Optionally pass " - "a value to resume with, otherwise resumes with nil.") + "a value to resume with, otherwise resumes with nil. If chan is provided, " + "the fiber will push itself to the channel upon completion or error. Returns the fiber.") }, { "ev/sleep", cfun_ev_sleep, diff --git a/src/core/fiber.c b/src/core/fiber.c index 5901cfe2..8262a6dd 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -41,6 +41,7 @@ static void fiber_reset(JanetFiber *fiber) { #ifdef JANET_EV fiber->waiting = NULL; fiber->sched_id = 0; + fiber->supervisor_channel = NULL; #endif janet_fiber_set_status(fiber, JANET_STATUS_NEW); } @@ -84,6 +85,7 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE; #ifdef JANET_EV fiber->waiting = NULL; + fiber->supervisor_channel = NULL; #endif return fiber; } diff --git a/src/core/gc.c b/src/core/gc.c index 61f729b7..1045e60a 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -267,6 +267,12 @@ recur: if (fiber->env) janet_mark_table(fiber->env); +#ifdef JANET_EV + if (fiber->supervisor_channel) { + janet_mark_abstract(fiber->supervisor_channel); + } +#endif + /* Explicit tail recursion */ if (fiber->child) { fiber = fiber->child; diff --git a/src/include/janet.h b/src/include/janet.h index 98e08f52..aa3938d4 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -842,6 +842,7 @@ struct JanetFiber { #ifdef JANET_EV JanetListenerState *waiting; uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */ + void *supervisor_channel; /* Channel to push self to when signaling. */ #endif };