diff --git a/src/boot/boot.janet b/src/boot/boot.janet index 37b10012..13c1daa0 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -3168,6 +3168,12 @@ [& body] ~(,ev/go (fiber/new (fn _spawn [&] ,;body) :tp))) + (defmacro ev/do-thread + ``Run some code in a new thread. Suspends the current fiber until the thread is complete, and + evaluates to nil.`` + [& body] + ~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t))) + (defmacro ev/with-deadline `Run a body of code with a deadline, such that if the code does not complete before the deadline is up, it will be canceled.` diff --git a/src/core/capi.c b/src/core/capi.c index b4f9bae7..4cff6257 100644 --- a/src/core/capi.c +++ b/src/core/capi.c @@ -53,7 +53,9 @@ JANET_NO_RETURN static void janet_top_level_signal(const char *msg) { void janet_signalv(JanetSignal sig, Janet message) { if (janet_vm_return_reg != NULL) { *janet_vm_return_reg = message; - janet_vm_fiber->flags |= JANET_FIBER_DID_LONGJUMP; + if (NULL != janet_vm_fiber) { + janet_vm_fiber->flags |= JANET_FIBER_DID_LONGJUMP; + } #if defined(JANET_BSD) || defined(JANET_APPLE) _longjmp(*janet_vm_jmp_buf, sig); #else diff --git a/src/core/ev.c b/src/core/ev.c index af652d68..abdf22f1 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -2033,6 +2033,65 @@ static Janet cfun_ev_go(int32_t argc, Janet *argv) { return argv[0]; } +/* For ev/thread - Run an interpreter in the new thread. */ +static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) { + JanetBuffer *buffer = (JanetBuffer *) args.argp; + const uint8_t *nextbytes = buffer->data; + const uint8_t *endbytes = nextbytes + buffer->count; + janet_init(); + JanetTryState tstate; + JanetSignal signal = janet_try(&tstate); + if (!signal) { + Janet aregv = janet_unmarshal(nextbytes, endbytes - nextbytes, + JANET_MARSHAL_UNSAFE, NULL, &nextbytes); + if (!janet_checktype(aregv, JANET_TABLE)) janet_panic("expected table for abstract registry"); + janet_vm_abstract_registry = janet_unwrap_table(aregv); + Janet regv = janet_unmarshal(nextbytes, endbytes - nextbytes, + JANET_MARSHAL_UNSAFE, NULL, &nextbytes); + if (!janet_checktype(regv, JANET_TABLE)) janet_panic("expected table for cfunction registry"); + janet_vm_registry = janet_unwrap_table(regv); + Janet fiberv = janet_unmarshal(nextbytes, endbytes - nextbytes, + JANET_MARSHAL_UNSAFE, NULL, &nextbytes); + Janet value = janet_unmarshal(nextbytes, endbytes - nextbytes, + JANET_MARSHAL_UNSAFE, NULL, &nextbytes); + if (!janet_checktype(fiberv, JANET_FIBER)) janet_panic("expected fiber"); + JanetFiber *fiber = janet_unwrap_fiber(fiberv); + janet_gcroot(fiberv); + janet_schedule(fiber, value); + janet_loop(); + args.tag = JANET_EV_TCTAG_NIL; + } else { + if (janet_checktype(tstate.payload, JANET_STRING)) { + args.tag = JANET_EV_TCTAG_ERR_STRINGF; + args.argp = strdup((const char *) janet_unwrap_string(tstate.payload)); + } else { + args.tag = JANET_EV_TCTAG_ERR_STRING; + args.argp = "failed to start thread"; + } + } + janet_buffer_deinit(buffer); + janet_restore(&tstate); + janet_deinit(); + return args; +} + +static Janet cfun_ev_thread(int32_t argc, Janet *argv) { + janet_arity(argc, 1, 3); + janet_getfiber(argv, 0); + Janet value = argc == 2 ? argv[1] : janet_wrap_nil(); + /* Marshal arguments for the new thread. */ + JanetBuffer *buffer = malloc(sizeof(JanetBuffer)); + if (NULL == buffer) { + JANET_OUT_OF_MEMORY; + } + janet_buffer_init(buffer, 0); + janet_marshal(buffer, janet_wrap_table(janet_vm_abstract_registry), NULL, JANET_MARSHAL_UNSAFE); + janet_marshal(buffer, janet_wrap_table(janet_vm_registry), NULL, JANET_MARSHAL_UNSAFE); + janet_marshal(buffer, argv[0], NULL, JANET_MARSHAL_UNSAFE); + janet_marshal(buffer, value, NULL, JANET_MARSHAL_UNSAFE); + janet_ev_threaded_await(janet_go_thread_subr, 0, argc, buffer); +} + static Janet cfun_ev_give_supervisor(int32_t argc, Janet *argv) { janet_arity(argc, 1, -1); JanetChannel *chan = janet_vm_root_fiber->supervisor_channel; @@ -2146,6 +2205,14 @@ static const JanetReg ev_cfuns[] = { "events occur in the newly scheduled fiber, an event will be pushed to the supervisor. " "If not provided, the new fiber will inherit the current supervisor.") }, + { + "ev/thread", cfun_ev_thread, + JDOC("(ev/thread fiber &opt value flags)\n\n" + "Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` " + "to resume with. " + "Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. " + "The the final result.") + }, { "ev/give-supervisor", cfun_ev_give_supervisor, JDOC("(ev/give-supervsior tag & payload)\n\n" diff --git a/src/core/marsh.c b/src/core/marsh.c index e80932bc..20b88408 100644 --- a/src/core/marsh.c +++ b/src/core/marsh.c @@ -938,6 +938,7 @@ static const uint8_t *unmarshal_one_fiber( #ifdef JANET_EV fiber->waiting = NULL; fiber->sched_id = 0; + fiber->supervisor_channel = NULL; #endif /* Push fiber to seen stack */ diff --git a/src/core/net.c b/src/core/net.c index f7e1c3ac..165c93e0 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -146,6 +146,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event if (state->function) { /* Schedule worker */ JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv); + fiber->supervisor_channel = s->fiber->supervisor_channel; janet_schedule(fiber, janet_wrap_nil()); /* Now listen again for next connection */ Janet err; @@ -222,6 +223,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event Janet streamv = janet_wrap_abstract(stream); if (state->function) { JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv); + fiber->supervisor_channel = s->fiber->supervisor_channel; janet_schedule(fiber, janet_wrap_nil()); } else { janet_schedule(s->fiber, streamv); diff --git a/src/core/thread.c b/src/core/thread.c index 9e019013..1394547d 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -520,7 +520,7 @@ static int thread_worker(JanetMailboxPair *pair) { janet_stacktrace(fiber, out); } -#ifdef JANET_NET +#ifdef JANET_EV janet_loop(); #endif