Add initial implementation for supervisor channels.

Supervisor channels are a simple concept to more efficiently
enable dynamic, structure concurrency. When a top-level fiber
completes (or errors), it will push itself to it's supervisor
channel if it has one (instead of printing a stacktrace). This
let's another fiber poll a channel and "supervise" a set of fibers.
This commit is contained in:
Calvin Rose 2021-01-06 23:19:22 -06:00
parent ee0e1a2342
commit 4e7ad3c7ce
5 changed files with 72 additions and 23 deletions

View File

@ -3138,7 +3138,32 @@
(with-syms [f]
~(let [,f (coro ,;body)]
(,ev/deadline ,deadline nil ,f)
(,resume ,f)))))
(,resume ,f))))
(defn- wait-for-fibers
[chan n]
(repeat n
(def fiber (ev/take chan))
(def x (fiber/last-value fiber))
(if (not= :dead (fiber/status fiber))
(propagate x fiber))))
(defmacro ev/gather
``
Run a number of fibers in parallel on the event loop, and join when they complete.
Returns the gathered results in an array.
``
[& bodies]
(with-syms [chan res]
~(do
(def ,chan (,ev/chan))
(def ,res @[])
,;(seq [[i body] :pairs bodies]
~(,ev/go (,fiber/new (fn [] (put ,res ,i ,body))) nil ,chan))
(,wait-for-fibers ,chan ,(length bodies))
,res)))
(undef wait-for-fibers))
(compwhen (dyn 'net/listen)
(defn net/server

View File

@ -63,6 +63,23 @@ typedef struct {
void *data;
} JanetQueue;
typedef struct {
JanetFiber *fiber;
uint32_t sched_id;
enum {
JANET_CP_MODE_ITEM,
JANET_CP_MODE_CHOICE_READ,
JANET_CP_MODE_CHOICE_WRITE
} mode;
} JanetChannelPending;
typedef struct {
JanetQueue items;
JanetQueue read_pending;
JanetQueue write_pending;
int32_t limit;
} JanetChannel;
#define JANET_MAX_Q_CAPACITY 0x7FFFFFF
static void janet_q_init(JanetQueue *q) {
@ -496,13 +513,21 @@ void janet_ev_mark(void) {
}
}
static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice);
/* Run a top level task */
static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) {
fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED;
Janet res;
JanetSignal sig = janet_continue_signal(fiber, value, &res, sigin);
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) {
janet_stacktrace(fiber, res);
if (sig != JANET_SIGNAL_EVENT) {
if (fiber->supervisor_channel) {
JanetChannel *chan = (JanetChannel *)(fiber->supervisor_channel);
janet_channel_push(chan, janet_wrap_fiber(fiber), 0);
fiber->supervisor_channel = NULL;
} else if (sig != JANET_SIGNAL_OK) {
janet_stacktrace(fiber, res);
}
}
}
@ -553,23 +578,6 @@ void janet_ev_dec_refcount(void) {
/* Channels */
typedef struct {
JanetFiber *fiber;
uint32_t sched_id;
enum {
JANET_CP_MODE_ITEM,
JANET_CP_MODE_CHOICE_READ,
JANET_CP_MODE_CHOICE_WRITE
} mode;
} JanetChannelPending;
typedef struct {
JanetQueue items;
JanetQueue read_pending;
JanetQueue write_pending;
int32_t limit;
} JanetChannel;
#define JANET_MAX_CHANNEL_CAPACITY 0xFFFFFF
static void janet_chan_init(JanetChannel *chan, int32_t limit) {
@ -668,6 +676,10 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice) {
if (janet_q_push(&channel->items, &x, sizeof(Janet))) {
janet_panicf("channel overflow: %v", x);
} else if (janet_q_count(&channel->items) > channel->limit) {
/* No root fiber, we are in completion on a root fiber. Don't block. */
if (NULL == janet_vm_root_fiber) {
return 0;
}
/* Pushed successfully, but should block. */
JanetChannelPending pending;
pending.fiber = janet_vm_root_fiber,
@ -1967,9 +1979,11 @@ error:
/* C functions */
static Janet cfun_ev_go(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 2);
janet_arity(argc, 1, 3);
JanetFiber *fiber = janet_getfiber(argv, 0);
Janet value = argc == 2 ? argv[1] : janet_wrap_nil();
JanetChannel *channel = janet_optabstract(argv, argc, 2, &ChannelAT, NULL);
fiber->supervisor_channel = channel;
janet_schedule(fiber, value);
return argv[0];
}
@ -2086,9 +2100,10 @@ static const JanetReg ev_cfuns[] = {
},
{
"ev/go", cfun_ev_go,
JDOC("(ev/go fiber &opt value)\n\n"
JDOC("(ev/go fiber &opt value chan)\n\n"
"Put a fiber on the event loop to be resumed later. Optionally pass "
"a value to resume with, otherwise resumes with nil.")
"a value to resume with, otherwise resumes with nil. If chan is provided, "
"the fiber will push itself to the channel upon completion or error. Returns the fiber.")
},
{
"ev/sleep", cfun_ev_sleep,

View File

@ -41,6 +41,7 @@ static void fiber_reset(JanetFiber *fiber) {
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->sched_id = 0;
fiber->supervisor_channel = NULL;
#endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
}
@ -84,6 +85,7 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t
janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE;
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->supervisor_channel = NULL;
#endif
return fiber;
}

View File

@ -267,6 +267,12 @@ recur:
if (fiber->env)
janet_mark_table(fiber->env);
#ifdef JANET_EV
if (fiber->supervisor_channel) {
janet_mark_abstract(fiber->supervisor_channel);
}
#endif
/* Explicit tail recursion */
if (fiber->child) {
fiber = fiber->child;

View File

@ -842,6 +842,7 @@ struct JanetFiber {
#ifdef JANET_EV
JanetListenerState *waiting;
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
void *supervisor_channel; /* Channel to push self to when signaling. */
#endif
};