Add a "new_channel" for root fibers.

When new fibers are scheduled on the event loop, this new_channel
receives the newly created fibers. This lets a fiber track which fibers
have been added and let's a user implement a supervisor.

Fix formatting.
This commit is contained in:
Calvin Rose 2021-01-09 18:33:40 -06:00
parent 11067d7a56
commit 475775cc9d
5 changed files with 36 additions and 22 deletions

View File

@ -513,7 +513,7 @@ void janet_ev_mark(void) {
}
}
static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice);
static int janet_channel_push(JanetChannel *channel, Janet x, int mode);
/* Run a top level task */
static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) {
@ -521,10 +521,10 @@ static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) {
Janet res;
JanetSignal sig = janet_continue_signal(fiber, value, &res, sigin);
if (sig != JANET_SIGNAL_EVENT) {
if (fiber->supervisor_channel) {
JanetChannel *chan = (JanetChannel *)(fiber->supervisor_channel);
janet_channel_push(chan, janet_wrap_fiber(fiber), 0);
fiber->supervisor_channel = NULL;
if (fiber->done_channel) {
JanetChannel *chan = (JanetChannel *)(fiber->done_channel);
janet_channel_push(chan, janet_wrap_fiber(fiber), 2);
fiber->done_channel = NULL;
} else if (sig != JANET_SIGNAL_OK) {
janet_stacktrace(fiber, res);
}
@ -665,7 +665,7 @@ static Janet make_read_result(JanetChannel *channel, Janet x) {
/* Push a value to a channel, and return 1 if channel should block, zero otherwise.
* If the push would block, will add to the write_pending queue in the channel. */
static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice) {
static int janet_channel_push(JanetChannel *channel, Janet x, int mode) {
JanetChannelPending reader;
int is_empty;
do {
@ -677,14 +677,12 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice) {
janet_panicf("channel overflow: %v", x);
} else if (janet_q_count(&channel->items) > channel->limit) {
/* No root fiber, we are in completion on a root fiber. Don't block. */
if (NULL == janet_vm_root_fiber) {
return 0;
}
if (mode == 2) return 0;
/* Pushed successfully, but should block. */
JanetChannelPending pending;
pending.fiber = janet_vm_root_fiber,
pending.sched_id = janet_vm_root_fiber->sched_id,
pending.mode = is_choice ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_ITEM;
pending.mode = mode ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_ITEM;
janet_q_push(&channel->write_pending, &pending, sizeof(pending));
return 1;
}
@ -1979,12 +1977,18 @@ error:
/* C functions */
static Janet cfun_ev_go(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 3);
janet_arity(argc, 1, 4);
JanetFiber *fiber = janet_getfiber(argv, 0);
Janet value = argc == 2 ? argv[1] : janet_wrap_nil();
JanetChannel *channel = janet_optabstract(argv, argc, 2, &ChannelAT,
janet_vm_root_fiber->supervisor_channel);
fiber->supervisor_channel = channel;
JanetChannel *done_channel = janet_optabstract(argv, argc, 2, &ChannelAT,
janet_vm_root_fiber->done_channel);
JanetChannel *new_channel = janet_optabstract(argv, argc, 3, &ChannelAT,
janet_vm_root_fiber->new_channel);
fiber->done_channel = done_channel;
fiber->new_channel = new_channel;
if (new_channel != NULL) {
janet_channel_push((JanetChannel *) new_channel, janet_wrap_fiber(fiber), 2);
}
janet_schedule(fiber, value);
return argv[0];
}
@ -1996,7 +2000,11 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) {
if (NULL == fiber) janet_panicf("invalid arity to function %v", argv[0]);
fiber->env = janet_table(0);
fiber->env->proto = janet_current_fiber()->env;
fiber->supervisor_channel = janet_vm_root_fiber->supervisor_channel;
fiber->done_channel = janet_vm_root_fiber->done_channel;
fiber->new_channel = janet_vm_root_fiber->new_channel;
if (fiber->new_channel != NULL) {
janet_channel_push((JanetChannel *) fiber->new_channel, janet_wrap_fiber(fiber), 2);
}
janet_schedule(fiber, janet_wrap_nil());
return janet_wrap_fiber(fiber);
}

View File

@ -41,7 +41,8 @@ static void fiber_reset(JanetFiber *fiber) {
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->sched_id = 0;
fiber->supervisor_channel = NULL;
fiber->done_channel = NULL;
fiber->new_channel = NULL;
#endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
}
@ -85,7 +86,8 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t
janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE;
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->supervisor_channel = NULL;
fiber->done_channel = NULL;
fiber->new_channel = NULL;
#endif
return fiber;
}

View File

@ -268,8 +268,11 @@ recur:
janet_mark_table(fiber->env);
#ifdef JANET_EV
if (fiber->supervisor_channel) {
janet_mark_abstract(fiber->supervisor_channel);
if (fiber->done_channel) {
janet_mark_abstract(fiber->done_channel);
}
if (fiber->new_channel) {
janet_mark_abstract(fiber->new_channel);
}
#endif

View File

@ -389,8 +389,8 @@ static Janet cfun_it_s64_mod(int32_t argc, Janet *argv) {
int64_t op2 = janet_unwrap_s64(argv[1]);
int64_t x = op1 % op2;
*box = (op1 > 0)
? ((op2 > 0) ? x : (0 == x ? x : x + op2))
: ((op2 > 0) ? (0 == x ? x : x + op2) : x);
? ((op2 > 0) ? x : (0 == x ? x : x + op2))
: ((op2 > 0) ? (0 == x ? x : x + op2) : x);
return janet_wrap_abstract(box);
}

View File

@ -842,7 +842,8 @@ struct JanetFiber {
#ifdef JANET_EV
JanetListenerState *waiting;
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
void *supervisor_channel; /* Channel to push self to when signaling. */
void *done_channel; /* Channel to push self to when complete */
void *new_channel; /* Channel to push spawned children */
#endif
};