diff --git a/src/core/ev.c b/src/core/ev.c index 47e3a3a0..ab981fd3 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -510,10 +510,10 @@ void janet_ev_mark(void) { static int janet_channel_push(JanetChannel *channel, Janet x, int mode); static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice); -static Janet make_supervisor_event(const char *name, JanetFiber *fiber) { +static Janet make_supervisor_event(const char *name, JanetFiber *fiber, int threaded) { Janet tup[2]; tup[0] = janet_ckeywordv(name); - tup[1] = janet_wrap_fiber(fiber); + tup[1] = threaded ? janet_ckeywordv("thread-fiber") : janet_wrap_fiber(fiber) ; return janet_wrap_tuple(janet_tuple_n(tup, 2)); } @@ -1215,7 +1215,8 @@ JanetFiber *janet_loop1(void) { } } else if (sig == JANET_SIGNAL_OK || (task.fiber->flags & (1 << sig))) { JanetChannel *chan = janet_channel_unwrap(sv); - janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], task.fiber), 2); + janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], + task.fiber, chan->is_threaded), 2); } if (sig == JANET_SIGNAL_INTERRUPT) { /* On interrupts, return the interrupted fiber immediately */ @@ -2504,8 +2505,26 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) { JANET_MARSHAL_UNSAFE, NULL, &nextbytes); Janet value = janet_unmarshal(nextbytes, endbytes - nextbytes, JANET_MARSHAL_UNSAFE, NULL, &nextbytes); - if (!janet_checktype(fiberv, JANET_FIBER)) janet_panicf("expected fiber, got %v", fiberv); - JanetFiber *fiber = janet_unwrap_fiber(fiberv); + JanetFiber *fiber; + if (!janet_checktype(fiberv, JANET_FIBER)) { + if (!janet_checktype(fiberv, JANET_FUNCTION)) { + janet_panicf("expected function|fiber, got %v", fiberv); + } + JanetFunction *func = janet_unwrap_function(fiberv); + if (func->def->min_arity > 1) { + janet_panicf("thread function must accept 0 or 1 arguments"); + } + fiber = janet_fiber(func, 64, func->def->min_arity, &value); + fiber->flags |= + JANET_FIBER_MASK_ERROR | + JANET_FIBER_MASK_USER0 | + JANET_FIBER_MASK_USER1 | + JANET_FIBER_MASK_USER2 | + JANET_FIBER_MASK_USER3 | + JANET_FIBER_MASK_USER4; + } else { + fiber = janet_unwrap_fiber(fiberv); + } fiber->supervisor_channel = janet_vm.user; janet_schedule(fiber, value); janet_loop(); @@ -2542,9 +2561,10 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) { } JANET_CORE_FN(cfun_ev_thread, - "(ev/thread fiber &opt value flags supervisor)", - "Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` " - "to resume with. " + "(ev/thread main &opt value flags supervisor)", + "Run `main` in a new operating system thread, optionally passing `value` " + "to resume with. The parameter `main` can either be a fiber, or a function that accepts " + "0 or 1 arguments. " "Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. " "If you want to run the thread without waiting for a result, pass the `:n` flag to return nil immediately. " "Otherwise, returns nil. Available flags:\n\n" @@ -2552,8 +2572,8 @@ JANET_CORE_FN(cfun_ev_thread, "* `:a` - don't copy abstract registry to new thread (performance optimization)\n" "* `:c` - don't copy cfunction registry to new thread (performance optimization)") { janet_arity(argc, 1, 4); - janet_getfiber(argv, 0); Janet value = argc >= 2 ? argv[1] : janet_wrap_nil(); + if (!janet_checktype(argv[0], JANET_FUNCTION)) janet_getfiber(argv, 0); uint64_t flags = 0; if (argc >= 3) { flags = janet_getflags(argv, 2, "nac");