diff --git a/src/boot/boot.janet b/src/boot/boot.janet index bb48aaa8..e542b60a 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -3370,9 +3370,9 @@ ~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t))) (defmacro ev/spawn-thread - ``Run some code in a new thread. Returns a fiber that can be `` + ``Run some code in a new thread. Like `ev/do-thread`, but returns immediately with a fiber.`` [& body] - ~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t) :n)) + ~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t) nil :n)) (defmacro ev/with-deadline `Run a body of code with a deadline, such that if the code does not complete before diff --git a/src/core/ev.c b/src/core/ev.c index 6e690351..4e304465 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -637,8 +637,6 @@ static void janet_chan_deinit(JanetChannel *chan) { janet_q_deinit(&chan->read_pending); janet_q_deinit(&chan->write_pending); if (janet_chan_is_threaded(chan)) { - /* Unpack everything in the queue so that memory isn't leaked - * and destructors are called for everything in the queue. */ Janet item; while (!janet_q_pop(&chan->items, &item, sizeof(item))) { janet_chan_unpack(chan, &item); @@ -720,11 +718,22 @@ static void *janet_chanat_unmarshal(JanetMarshalContext *ctx) { if (!(ctx->flags & JANET_MARSHAL_UNSAFE)) { janet_panic("can only unmarshal threaded channel with unsafe flag"); } - JanetChannel **pchan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel *)); int64_t intptr = janet_unmarshal_int64(ctx); JanetChannel *chan = (JanetChannel *) intptr; - *pchan = chan; - return pchan; + Janet check = janet_table_get(&janet_vm.channel_map, janet_wrap_pointer(chan)); + if (janet_checktype(check, JANET_NIL)) { + JanetChannel **pchan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel *)); + *pchan = chan; + janet_table_put(&janet_vm.channel_map, janet_wrap_pointer(chan), janet_wrap_abstract(pchan)); + return pchan; + } else { + janet_chan_lock(chan); + chan->ref_count--; + janet_chan_unlock(chan); + void *p = janet_unwrap_abstract(check); + janet_unmarshal_abstract_reuse(ctx, p); + return p; + } } else { JanetChannel *chan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel)); chan->limit = 0; @@ -1007,14 +1016,25 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) return 1; } -JanetChannel *janet_getchannel(const Janet *argv, int32_t n) { - void *p = janet_getabstract(argv, n, &janet_channel_type); +JanetChannel *janet_channel_unwrap(void *abstract) { /* Rely on Janet's abstract type size tracking and the fact that a channel will * surely be bigger than a pointer to a channel */ - if (janet_abstract_size(p) == sizeof(JanetChannel *)) { - return *((JanetChannel **)p); + if (janet_abstract_size(abstract) == sizeof(JanetChannel *)) { + return *((JanetChannel **) abstract); } else { - return (JanetChannel *)p; + return abstract; + } +} + +JanetChannel *janet_getchannel(const Janet *argv, int32_t n) { + return janet_channel_unwrap(janet_getabstract(argv, n, &janet_channel_type)); +} + +JanetChannel *janet_optchannel(const Janet *argv, int32_t argc, int32_t n, JanetChannel *dflt) { + if (argc > n && !janet_checktype(argv[n], JANET_NIL)) { + return janet_getchannel(argv, n); + } else { + return dflt; } } @@ -1316,12 +1336,13 @@ JanetFiber *janet_loop1(void) { task.fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED; Janet res; JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig); - JanetChannel *chan = (JanetChannel *)(task.fiber->supervisor_channel); - if (NULL == chan) { + void *sv = task.fiber->supervisor_channel; + if (NULL == sv) { if (sig != JANET_SIGNAL_EVENT && sig != JANET_SIGNAL_YIELD && sig != JANET_SIGNAL_INTERRUPT) { janet_stacktrace(task.fiber, res); } } else if (sig == JANET_SIGNAL_OK || (task.fiber->flags & (1 << sig))) { + JanetChannel *chan = janet_channel_unwrap(sv); janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], task.fiber), 2); } if (sig == JANET_SIGNAL_INTERRUPT) { @@ -2551,9 +2572,8 @@ JANET_CORE_FN(cfun_ev_go, janet_arity(argc, 1, 3); JanetFiber *fiber = janet_getfiber(argv, 0); Janet value = argc >= 2 ? argv[1] : janet_wrap_nil(); - JanetChannel *supervisor_channel = janet_optabstract(argv, argc, 2, &janet_channel_type, - janet_vm.root_fiber->supervisor_channel); - fiber->supervisor_channel = supervisor_channel; + void *supervisor = janet_optabstract(argv, argc, 2, &janet_channel_type, janet_vm.root_fiber->supervisor_channel); + fiber->supervisor_channel = supervisor; janet_schedule(fiber, value); return argv[0]; } @@ -2578,6 +2598,15 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) { janet_vm.abstract_registry = janet_unwrap_table(aregv); } + /* Get supervsior */ + void *supervisor = NULL; + if (flags & 0x8) { + Janet sup = + janet_unmarshal(nextbytes, endbytes - nextbytes, + JANET_MARSHAL_UNSAFE, NULL, &nextbytes); + supervisor = janet_unwrap_pointer(sup); + } + /* Set cfunction registry */ if (!(flags & 0x4)) { uint32_t count1; @@ -2604,6 +2633,7 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) { JANET_MARSHAL_UNSAFE, NULL, &nextbytes); if (!janet_checktype(fiberv, JANET_FIBER)) janet_panicf("expected fiber, got %v", fiberv); JanetFiber *fiber = janet_unwrap_fiber(fiberv); + fiber->supervisor_channel = supervisor; janet_schedule(fiber, value); janet_loop(); args.tag = JANET_EV_TCTAG_NIL; @@ -2624,7 +2654,7 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) { } JANET_CORE_FN(cfun_ev_thread, - "(ev/thread fiber &opt value flags)", + "(ev/thread fiber &opt value flags supervisor)", "Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` " "to resume with. " "Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. " @@ -2633,13 +2663,16 @@ JANET_CORE_FN(cfun_ev_thread, "* `:n` - return immediately\n" "* `:a` - don't copy abstract registry to new thread (performance optimization)\n" "* `:c` - don't copy cfunction registry to new thread (performance optimization)") { - janet_arity(argc, 1, 3); + janet_arity(argc, 1, 4); janet_getfiber(argv, 0); Janet value = argc >= 2 ? argv[1] : janet_wrap_nil(); uint64_t flags = 0; if (argc >= 3) { flags = janet_getflags(argv, 2, "nac"); } + void *supervisor = janet_optabstract(argv, argc, 3, &janet_channel_type, janet_vm.root_fiber->supervisor_channel); + if (NULL != supervisor) flags |= 0x8; + /* Marshal arguments for the new thread. */ JanetBuffer *buffer = janet_malloc(sizeof(JanetBuffer)); if (NULL == buffer) { @@ -2649,6 +2682,9 @@ JANET_CORE_FN(cfun_ev_thread, if (!(flags & 0x2)) { janet_marshal(buffer, janet_wrap_table(janet_vm.abstract_registry), NULL, JANET_MARSHAL_UNSAFE); } + if (flags & 0x8) { + janet_marshal(buffer, janet_wrap_abstract(supervisor), NULL, JANET_MARSHAL_UNSAFE); + } if (!(flags & 0x4)) { janet_assert(janet_vm.registry_count <= UINT32_MAX, "assert failed size check"); uint32_t temp = (uint32_t) janet_vm.registry_count; @@ -2668,7 +2704,7 @@ JANET_CORE_FN(cfun_ev_thread, janet_ev_threaded_call(janet_go_thread_subr, arguments, janet_ev_default_threaded_callback); return janet_wrap_nil(); } else { - janet_ev_threaded_await(janet_go_thread_subr, 0, argc, buffer); + janet_ev_threaded_await(janet_go_thread_subr, (uint32_t) flags, argc, buffer); } } @@ -2678,8 +2714,9 @@ JANET_CORE_FN(cfun_ev_give_supervisor, "tuple of all of the arguments combined into a single message, where the first element is tag. " "By convention, tag should be a keyword indicating the type of message. Returns nil.") { janet_arity(argc, 1, -1); - JanetChannel *chan = janet_vm.root_fiber->supervisor_channel; - if (NULL != chan) { + void *chanv = janet_vm.root_fiber->supervisor_channel; + if (NULL != chanv) { + JanetChannel *chan = janet_channel_unwrap(chanv); if (janet_channel_push(chan, janet_wrap_tuple(janet_tuple_n(argv, argc)), 0)) { janet_await(); } diff --git a/src/core/marsh.c b/src/core/marsh.c index d4bc8ad9..b6d0c61b 100644 --- a/src/core/marsh.c +++ b/src/core/marsh.c @@ -1103,14 +1103,18 @@ Janet janet_unmarshal_janet(JanetMarshalContext *ctx) { return ret; } -void *janet_unmarshal_abstract(JanetMarshalContext *ctx, size_t size) { +void janet_unmarshal_abstract_reuse(JanetMarshalContext *ctx, void *p) { UnmarshalState *st = (UnmarshalState *)(ctx->u_state); if (ctx->at == NULL) { janet_panicf("janet_unmarshal_abstract called more than once"); } - void *p = janet_abstract(ctx->at, size); janet_v_push(st->lookup, janet_wrap_abstract(p)); ctx->at = NULL; +} + +void *janet_unmarshal_abstract(JanetMarshalContext *ctx, size_t size) { + void *p = janet_abstract(ctx->at, size); + janet_unmarshal_abstract_reuse(ctx, p); return p; } diff --git a/src/include/janet.h b/src/include/janet.h index d4e904da..b7f5c2b8 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1951,6 +1951,7 @@ JANET_API uint8_t janet_unmarshal_byte(JanetMarshalContext *ctx); JANET_API void janet_unmarshal_bytes(JanetMarshalContext *ctx, uint8_t *dest, size_t len); JANET_API Janet janet_unmarshal_janet(JanetMarshalContext *ctx); JANET_API JanetAbstract janet_unmarshal_abstract(JanetMarshalContext *ctx, size_t size); +JANET_API void janet_unmarshal_abstract_reuse(JanetMarshalContext *ctx, void *p); JANET_API void janet_register_abstract_type(const JanetAbstractType *at); JANET_API const JanetAbstractType *janet_get_abstract_type(Janet key);