Add ev/thread and ev/do-thread.

- Also fix setting supervisor with net/accept-loop.
This commit is contained in:
Calvin Rose 2021-01-22 12:52:45 -06:00
parent 0acf167e84
commit 317ab6df6b
6 changed files with 80 additions and 2 deletions

View File

@ -3168,6 +3168,12 @@
[& body]
~(,ev/go (fiber/new (fn _spawn [&] ,;body) :tp)))
(defmacro ev/do-thread
``Run some code in a new thread. Suspends the current fiber until the thread is complete, and
evaluates to nil.``
[& body]
~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t)))
(defmacro ev/with-deadline
`Run a body of code with a deadline, such that if the code does not complete before
the deadline is up, it will be canceled.`

View File

@ -53,7 +53,9 @@ JANET_NO_RETURN static void janet_top_level_signal(const char *msg) {
void janet_signalv(JanetSignal sig, Janet message) {
if (janet_vm_return_reg != NULL) {
*janet_vm_return_reg = message;
janet_vm_fiber->flags |= JANET_FIBER_DID_LONGJUMP;
if (NULL != janet_vm_fiber) {
janet_vm_fiber->flags |= JANET_FIBER_DID_LONGJUMP;
}
#if defined(JANET_BSD) || defined(JANET_APPLE)
_longjmp(*janet_vm_jmp_buf, sig);
#else

View File

@ -2033,6 +2033,65 @@ static Janet cfun_ev_go(int32_t argc, Janet *argv) {
return argv[0];
}
/* For ev/thread - Run an interpreter in the new thread. */
static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
JanetBuffer *buffer = (JanetBuffer *) args.argp;
const uint8_t *nextbytes = buffer->data;
const uint8_t *endbytes = nextbytes + buffer->count;
janet_init();
JanetTryState tstate;
JanetSignal signal = janet_try(&tstate);
if (!signal) {
Janet aregv = janet_unmarshal(nextbytes, endbytes - nextbytes,
JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
if (!janet_checktype(aregv, JANET_TABLE)) janet_panic("expected table for abstract registry");
janet_vm_abstract_registry = janet_unwrap_table(aregv);
Janet regv = janet_unmarshal(nextbytes, endbytes - nextbytes,
JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
if (!janet_checktype(regv, JANET_TABLE)) janet_panic("expected table for cfunction registry");
janet_vm_registry = janet_unwrap_table(regv);
Janet fiberv = janet_unmarshal(nextbytes, endbytes - nextbytes,
JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
Janet value = janet_unmarshal(nextbytes, endbytes - nextbytes,
JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
if (!janet_checktype(fiberv, JANET_FIBER)) janet_panic("expected fiber");
JanetFiber *fiber = janet_unwrap_fiber(fiberv);
janet_gcroot(fiberv);
janet_schedule(fiber, value);
janet_loop();
args.tag = JANET_EV_TCTAG_NIL;
} else {
if (janet_checktype(tstate.payload, JANET_STRING)) {
args.tag = JANET_EV_TCTAG_ERR_STRINGF;
args.argp = strdup((const char *) janet_unwrap_string(tstate.payload));
} else {
args.tag = JANET_EV_TCTAG_ERR_STRING;
args.argp = "failed to start thread";
}
}
janet_buffer_deinit(buffer);
janet_restore(&tstate);
janet_deinit();
return args;
}
static Janet cfun_ev_thread(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 3);
janet_getfiber(argv, 0);
Janet value = argc == 2 ? argv[1] : janet_wrap_nil();
/* Marshal arguments for the new thread. */
JanetBuffer *buffer = malloc(sizeof(JanetBuffer));
if (NULL == buffer) {
JANET_OUT_OF_MEMORY;
}
janet_buffer_init(buffer, 0);
janet_marshal(buffer, janet_wrap_table(janet_vm_abstract_registry), NULL, JANET_MARSHAL_UNSAFE);
janet_marshal(buffer, janet_wrap_table(janet_vm_registry), NULL, JANET_MARSHAL_UNSAFE);
janet_marshal(buffer, argv[0], NULL, JANET_MARSHAL_UNSAFE);
janet_marshal(buffer, value, NULL, JANET_MARSHAL_UNSAFE);
janet_ev_threaded_await(janet_go_thread_subr, 0, argc, buffer);
}
static Janet cfun_ev_give_supervisor(int32_t argc, Janet *argv) {
janet_arity(argc, 1, -1);
JanetChannel *chan = janet_vm_root_fiber->supervisor_channel;
@ -2146,6 +2205,14 @@ static const JanetReg ev_cfuns[] = {
"events occur in the newly scheduled fiber, an event will be pushed to the supervisor. "
"If not provided, the new fiber will inherit the current supervisor.")
},
{
"ev/thread", cfun_ev_thread,
JDOC("(ev/thread fiber &opt value flags)\n\n"
"Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` "
"to resume with. "
"Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. "
"The the final result.")
},
{
"ev/give-supervisor", cfun_ev_give_supervisor,
JDOC("(ev/give-supervsior tag & payload)\n\n"

View File

@ -938,6 +938,7 @@ static const uint8_t *unmarshal_one_fiber(
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->sched_id = 0;
fiber->supervisor_channel = NULL;
#endif
/* Push fiber to seen stack */

View File

@ -146,6 +146,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
if (state->function) {
/* Schedule worker */
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
fiber->supervisor_channel = s->fiber->supervisor_channel;
janet_schedule(fiber, janet_wrap_nil());
/* Now listen again for next connection */
Janet err;
@ -222,6 +223,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
Janet streamv = janet_wrap_abstract(stream);
if (state->function) {
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
fiber->supervisor_channel = s->fiber->supervisor_channel;
janet_schedule(fiber, janet_wrap_nil());
} else {
janet_schedule(s->fiber, streamv);

View File

@ -520,7 +520,7 @@ static int thread_worker(JanetMailboxPair *pair) {
janet_stacktrace(fiber, out);
}
#ifdef JANET_NET
#ifdef JANET_EV
janet_loop();
#endif