1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-28 19:19:53 +00:00

Add event-chan argument to ev/go.

The event-chan is the final piece of the puzzle for fibers, and
will be pushed to when a fiber yields to the event loop.
This commit is contained in:
Calvin Rose 2021-01-09 20:23:06 -06:00
parent 475775cc9d
commit 8286b33c52
4 changed files with 32 additions and 4 deletions

View File

@ -525,9 +525,14 @@ static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) {
JanetChannel *chan = (JanetChannel *)(fiber->done_channel);
janet_channel_push(chan, janet_wrap_fiber(fiber), 2);
fiber->done_channel = NULL;
fiber->event_channel = NULL;
fiber->new_channel = NULL;
} else if (sig != JANET_SIGNAL_OK) {
janet_stacktrace(fiber, res);
}
} else if (fiber->event_channel) {
JanetChannel *chan = (JanetChannel *)(fiber->event_channel);
janet_channel_push(chan, janet_wrap_fiber(fiber), 2);
}
}
@ -1977,15 +1982,18 @@ error:
/* C functions */
static Janet cfun_ev_go(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 4);
janet_arity(argc, 1, 5);
JanetFiber *fiber = janet_getfiber(argv, 0);
Janet value = argc == 2 ? argv[1] : janet_wrap_nil();
JanetChannel *done_channel = janet_optabstract(argv, argc, 2, &ChannelAT,
janet_vm_root_fiber->done_channel);
JanetChannel *new_channel = janet_optabstract(argv, argc, 3, &ChannelAT,
janet_vm_root_fiber->new_channel);
JanetChannel *event_channel = janet_optabstract(argv, argc, 4, &ChannelAT,
janet_vm_root_fiber->event_channel);
fiber->done_channel = done_channel;
fiber->new_channel = new_channel;
fiber->event_channel = event_channel;
if (new_channel != NULL) {
janet_channel_push((JanetChannel *) new_channel, janet_wrap_fiber(fiber), 2);
}
@ -2001,6 +2009,7 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) {
fiber->env = janet_table(0);
fiber->env->proto = janet_current_fiber()->env;
fiber->done_channel = janet_vm_root_fiber->done_channel;
fiber->event_channel = janet_vm_root_fiber->event_channel;
fiber->new_channel = janet_vm_root_fiber->new_channel;
if (fiber->new_channel != NULL) {
janet_channel_push((JanetChannel *) fiber->new_channel, janet_wrap_fiber(fiber), 2);
@ -2110,10 +2119,19 @@ static const JanetReg ev_cfuns[] = {
},
{
"ev/go", cfun_ev_go,
JDOC("(ev/go fiber &opt value chan)\n\n"
JDOC("(ev/go fiber &opt value done-chan new-chan event-chan)\n\n"
"Put a fiber on the event loop to be resumed later. Optionally pass "
"a value to resume with, otherwise resumes with nil. If chan is provided, "
"the fiber will push itself to the channel upon completion or error. Returns the fiber.")
"a value to resume with, otherwise resumes with nil. Returns the fiber. "
"\n\n"
"Three optional `core/channel`s can be provided as well. These are channels "
"that will the fiber will be pushed to with `ev/give` on the relevant event. "
"\n\n"
"* `done-chan` - when `fiber` completes or errors, it will be pushed to this channel.\n"
"* `new-chan` - `fiber` will be pushed to this channel right away. Nested calls to `ev/go` "
"that don't set `new-chan` will also push to this channel.\n"
"* `event-chan` - when `fiber` yields to the event loop it will be pushed to this channel.\n\n"
"With these channels, the programmer can implement a \"supervisor\" for fibers for a fault "
"tolerant application.")
},
{
"ev/sleep", cfun_ev_sleep,

View File

@ -42,6 +42,7 @@ static void fiber_reset(JanetFiber *fiber) {
fiber->waiting = NULL;
fiber->sched_id = 0;
fiber->done_channel = NULL;
fiber->event_channel = NULL;
fiber->new_channel = NULL;
#endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
@ -87,6 +88,7 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->done_channel = NULL;
fiber->event_channel = NULL;
fiber->new_channel = NULL;
#endif
return fiber;

View File

@ -271,6 +271,9 @@ recur:
if (fiber->done_channel) {
janet_mark_abstract(fiber->done_channel);
}
if (fiber->event_channel) {
janet_mark_abstract(fiber->event_channel);
}
if (fiber->new_channel) {
janet_mark_abstract(fiber->new_channel);
}

View File

@ -840,9 +840,14 @@ struct JanetFiber {
JanetFiber *child; /* Keep linked list of fibers for restarting pending fibers */
Janet last_value; /* Last returned value from a fiber */
#ifdef JANET_EV
/* These fields are only relevant for fibers that are used as "root fibers" -
* that is, fibers that are scheduled on the event loop and behave much like threads
* in a multi-tasking system. It would be possible to move these fields to a new
* type, say "JanetTask", that as separate from fibers to save a bit of space. */
JanetListenerState *waiting;
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
void *done_channel; /* Channel to push self to when complete */
void *event_channel; /* Channel to push self to when yielding to event loop */
void *new_channel; /* Channel to push spawned children */
#endif
};