Allow passing function to ev/thread.

Convenient when there is no need to create an entire fiber.
This commit is contained in:
Calvin Rose 2021-08-30 22:04:15 -05:00
parent e1c4fc29de
commit acbebc5631
1 changed files with 29 additions and 9 deletions

View File

@ -510,10 +510,10 @@ void janet_ev_mark(void) {
static int janet_channel_push(JanetChannel *channel, Janet x, int mode);
static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice);
static Janet make_supervisor_event(const char *name, JanetFiber *fiber) {
static Janet make_supervisor_event(const char *name, JanetFiber *fiber, int threaded) {
Janet tup[2];
tup[0] = janet_ckeywordv(name);
tup[1] = janet_wrap_fiber(fiber);
tup[1] = threaded ? janet_ckeywordv("thread-fiber") : janet_wrap_fiber(fiber) ;
return janet_wrap_tuple(janet_tuple_n(tup, 2));
}
@ -1215,7 +1215,8 @@ JanetFiber *janet_loop1(void) {
}
} else if (sig == JANET_SIGNAL_OK || (task.fiber->flags & (1 << sig))) {
JanetChannel *chan = janet_channel_unwrap(sv);
janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], task.fiber), 2);
janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig],
task.fiber, chan->is_threaded), 2);
}
if (sig == JANET_SIGNAL_INTERRUPT) {
/* On interrupts, return the interrupted fiber immediately */
@ -2504,8 +2505,26 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
Janet value = janet_unmarshal(nextbytes, endbytes - nextbytes,
JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
if (!janet_checktype(fiberv, JANET_FIBER)) janet_panicf("expected fiber, got %v", fiberv);
JanetFiber *fiber = janet_unwrap_fiber(fiberv);
JanetFiber *fiber;
if (!janet_checktype(fiberv, JANET_FIBER)) {
if (!janet_checktype(fiberv, JANET_FUNCTION)) {
janet_panicf("expected function|fiber, got %v", fiberv);
}
JanetFunction *func = janet_unwrap_function(fiberv);
if (func->def->min_arity > 1) {
janet_panicf("thread function must accept 0 or 1 arguments");
}
fiber = janet_fiber(func, 64, func->def->min_arity, &value);
fiber->flags |=
JANET_FIBER_MASK_ERROR |
JANET_FIBER_MASK_USER0 |
JANET_FIBER_MASK_USER1 |
JANET_FIBER_MASK_USER2 |
JANET_FIBER_MASK_USER3 |
JANET_FIBER_MASK_USER4;
} else {
fiber = janet_unwrap_fiber(fiberv);
}
fiber->supervisor_channel = janet_vm.user;
janet_schedule(fiber, value);
janet_loop();
@ -2542,9 +2561,10 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
}
JANET_CORE_FN(cfun_ev_thread,
"(ev/thread fiber &opt value flags supervisor)",
"Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` "
"to resume with. "
"(ev/thread main &opt value flags supervisor)",
"Run `main` in a new operating system thread, optionally passing `value` "
"to resume with. The parameter `main` can either be a fiber, or a function that accepts "
"0 or 1 arguments. "
"Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. "
"If you want to run the thread without waiting for a result, pass the `:n` flag to return nil immediately. "
"Otherwise, returns nil. Available flags:\n\n"
@ -2552,8 +2572,8 @@ JANET_CORE_FN(cfun_ev_thread,
"* `:a` - don't copy abstract registry to new thread (performance optimization)\n"
"* `:c` - don't copy cfunction registry to new thread (performance optimization)") {
janet_arity(argc, 1, 4);
janet_getfiber(argv, 0);
Janet value = argc >= 2 ? argv[1] : janet_wrap_nil();
if (!janet_checktype(argv[0], JANET_FUNCTION)) janet_getfiber(argv, 0);
uint64_t flags = 0;
if (argc >= 3) {
flags = janet_getflags(argv, 2, "nac");