Fix threaded supervisor channels - Fix #766

Some pointer casting with abstract types was incorrect, resulting
in strange behavior when trying to use supervisor channels that were
threaded. This fix also adds the ability to supply a supervisor channel
directly when creating a thread.
This commit is contained in:
Calvin Rose 2021-08-16 21:14:06 -05:00
parent 87b8dffe23
commit e552757edc
4 changed files with 66 additions and 24 deletions

View File

@ -3370,9 +3370,9 @@
~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t)))
(defmacro ev/spawn-thread
``Run some code in a new thread. Returns a fiber that can be ``
``Run some code in a new thread. Like `ev/do-thread`, but returns immediately with a fiber.``
[& body]
~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t) :n))
~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t) nil :n))
(defmacro ev/with-deadline
`Run a body of code with a deadline, such that if the code does not complete before

View File

@ -637,8 +637,6 @@ static void janet_chan_deinit(JanetChannel *chan) {
janet_q_deinit(&chan->read_pending);
janet_q_deinit(&chan->write_pending);
if (janet_chan_is_threaded(chan)) {
/* Unpack everything in the queue so that memory isn't leaked
* and destructors are called for everything in the queue. */
Janet item;
while (!janet_q_pop(&chan->items, &item, sizeof(item))) {
janet_chan_unpack(chan, &item);
@ -720,11 +718,22 @@ static void *janet_chanat_unmarshal(JanetMarshalContext *ctx) {
if (!(ctx->flags & JANET_MARSHAL_UNSAFE)) {
janet_panic("can only unmarshal threaded channel with unsafe flag");
}
JanetChannel **pchan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel *));
int64_t intptr = janet_unmarshal_int64(ctx);
JanetChannel *chan = (JanetChannel *) intptr;
*pchan = chan;
return pchan;
Janet check = janet_table_get(&janet_vm.channel_map, janet_wrap_pointer(chan));
if (janet_checktype(check, JANET_NIL)) {
JanetChannel **pchan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel *));
*pchan = chan;
janet_table_put(&janet_vm.channel_map, janet_wrap_pointer(chan), janet_wrap_abstract(pchan));
return pchan;
} else {
janet_chan_lock(chan);
chan->ref_count--;
janet_chan_unlock(chan);
void *p = janet_unwrap_abstract(check);
janet_unmarshal_abstract_reuse(ctx, p);
return p;
}
} else {
JanetChannel *chan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel));
chan->limit = 0;
@ -1007,14 +1016,25 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
return 1;
}
JanetChannel *janet_getchannel(const Janet *argv, int32_t n) {
void *p = janet_getabstract(argv, n, &janet_channel_type);
JanetChannel *janet_channel_unwrap(void *abstract) {
/* Rely on Janet's abstract type size tracking and the fact that a channel will
* surely be bigger than a pointer to a channel */
if (janet_abstract_size(p) == sizeof(JanetChannel *)) {
return *((JanetChannel **)p);
if (janet_abstract_size(abstract) == sizeof(JanetChannel *)) {
return *((JanetChannel **) abstract);
} else {
return (JanetChannel *)p;
return abstract;
}
}
JanetChannel *janet_getchannel(const Janet *argv, int32_t n) {
return janet_channel_unwrap(janet_getabstract(argv, n, &janet_channel_type));
}
JanetChannel *janet_optchannel(const Janet *argv, int32_t argc, int32_t n, JanetChannel *dflt) {
if (argc > n && !janet_checktype(argv[n], JANET_NIL)) {
return janet_getchannel(argv, n);
} else {
return dflt;
}
}
@ -1316,12 +1336,13 @@ JanetFiber *janet_loop1(void) {
task.fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED;
Janet res;
JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig);
JanetChannel *chan = (JanetChannel *)(task.fiber->supervisor_channel);
if (NULL == chan) {
void *sv = task.fiber->supervisor_channel;
if (NULL == sv) {
if (sig != JANET_SIGNAL_EVENT && sig != JANET_SIGNAL_YIELD && sig != JANET_SIGNAL_INTERRUPT) {
janet_stacktrace(task.fiber, res);
}
} else if (sig == JANET_SIGNAL_OK || (task.fiber->flags & (1 << sig))) {
JanetChannel *chan = janet_channel_unwrap(sv);
janet_channel_push(chan, make_supervisor_event(janet_signal_names[sig], task.fiber), 2);
}
if (sig == JANET_SIGNAL_INTERRUPT) {
@ -2551,9 +2572,8 @@ JANET_CORE_FN(cfun_ev_go,
janet_arity(argc, 1, 3);
JanetFiber *fiber = janet_getfiber(argv, 0);
Janet value = argc >= 2 ? argv[1] : janet_wrap_nil();
JanetChannel *supervisor_channel = janet_optabstract(argv, argc, 2, &janet_channel_type,
janet_vm.root_fiber->supervisor_channel);
fiber->supervisor_channel = supervisor_channel;
void *supervisor = janet_optabstract(argv, argc, 2, &janet_channel_type, janet_vm.root_fiber->supervisor_channel);
fiber->supervisor_channel = supervisor;
janet_schedule(fiber, value);
return argv[0];
}
@ -2578,6 +2598,15 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
janet_vm.abstract_registry = janet_unwrap_table(aregv);
}
/* Get supervsior */
void *supervisor = NULL;
if (flags & 0x8) {
Janet sup =
janet_unmarshal(nextbytes, endbytes - nextbytes,
JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
supervisor = janet_unwrap_pointer(sup);
}
/* Set cfunction registry */
if (!(flags & 0x4)) {
uint32_t count1;
@ -2604,6 +2633,7 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
if (!janet_checktype(fiberv, JANET_FIBER)) janet_panicf("expected fiber, got %v", fiberv);
JanetFiber *fiber = janet_unwrap_fiber(fiberv);
fiber->supervisor_channel = supervisor;
janet_schedule(fiber, value);
janet_loop();
args.tag = JANET_EV_TCTAG_NIL;
@ -2624,7 +2654,7 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
}
JANET_CORE_FN(cfun_ev_thread,
"(ev/thread fiber &opt value flags)",
"(ev/thread fiber &opt value flags supervisor)",
"Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` "
"to resume with. "
"Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. "
@ -2633,13 +2663,16 @@ JANET_CORE_FN(cfun_ev_thread,
"* `:n` - return immediately\n"
"* `:a` - don't copy abstract registry to new thread (performance optimization)\n"
"* `:c` - don't copy cfunction registry to new thread (performance optimization)") {
janet_arity(argc, 1, 3);
janet_arity(argc, 1, 4);
janet_getfiber(argv, 0);
Janet value = argc >= 2 ? argv[1] : janet_wrap_nil();
uint64_t flags = 0;
if (argc >= 3) {
flags = janet_getflags(argv, 2, "nac");
}
void *supervisor = janet_optabstract(argv, argc, 3, &janet_channel_type, janet_vm.root_fiber->supervisor_channel);
if (NULL != supervisor) flags |= 0x8;
/* Marshal arguments for the new thread. */
JanetBuffer *buffer = janet_malloc(sizeof(JanetBuffer));
if (NULL == buffer) {
@ -2649,6 +2682,9 @@ JANET_CORE_FN(cfun_ev_thread,
if (!(flags & 0x2)) {
janet_marshal(buffer, janet_wrap_table(janet_vm.abstract_registry), NULL, JANET_MARSHAL_UNSAFE);
}
if (flags & 0x8) {
janet_marshal(buffer, janet_wrap_abstract(supervisor), NULL, JANET_MARSHAL_UNSAFE);
}
if (!(flags & 0x4)) {
janet_assert(janet_vm.registry_count <= UINT32_MAX, "assert failed size check");
uint32_t temp = (uint32_t) janet_vm.registry_count;
@ -2668,7 +2704,7 @@ JANET_CORE_FN(cfun_ev_thread,
janet_ev_threaded_call(janet_go_thread_subr, arguments, janet_ev_default_threaded_callback);
return janet_wrap_nil();
} else {
janet_ev_threaded_await(janet_go_thread_subr, 0, argc, buffer);
janet_ev_threaded_await(janet_go_thread_subr, (uint32_t) flags, argc, buffer);
}
}
@ -2678,8 +2714,9 @@ JANET_CORE_FN(cfun_ev_give_supervisor,
"tuple of all of the arguments combined into a single message, where the first element is tag. "
"By convention, tag should be a keyword indicating the type of message. Returns nil.") {
janet_arity(argc, 1, -1);
JanetChannel *chan = janet_vm.root_fiber->supervisor_channel;
if (NULL != chan) {
void *chanv = janet_vm.root_fiber->supervisor_channel;
if (NULL != chanv) {
JanetChannel *chan = janet_channel_unwrap(chanv);
if (janet_channel_push(chan, janet_wrap_tuple(janet_tuple_n(argv, argc)), 0)) {
janet_await();
}

View File

@ -1103,14 +1103,18 @@ Janet janet_unmarshal_janet(JanetMarshalContext *ctx) {
return ret;
}
void *janet_unmarshal_abstract(JanetMarshalContext *ctx, size_t size) {
void janet_unmarshal_abstract_reuse(JanetMarshalContext *ctx, void *p) {
UnmarshalState *st = (UnmarshalState *)(ctx->u_state);
if (ctx->at == NULL) {
janet_panicf("janet_unmarshal_abstract called more than once");
}
void *p = janet_abstract(ctx->at, size);
janet_v_push(st->lookup, janet_wrap_abstract(p));
ctx->at = NULL;
}
void *janet_unmarshal_abstract(JanetMarshalContext *ctx, size_t size) {
void *p = janet_abstract(ctx->at, size);
janet_unmarshal_abstract_reuse(ctx, p);
return p;
}

View File

@ -1951,6 +1951,7 @@ JANET_API uint8_t janet_unmarshal_byte(JanetMarshalContext *ctx);
JANET_API void janet_unmarshal_bytes(JanetMarshalContext *ctx, uint8_t *dest, size_t len);
JANET_API Janet janet_unmarshal_janet(JanetMarshalContext *ctx);
JANET_API JanetAbstract janet_unmarshal_abstract(JanetMarshalContext *ctx, size_t size);
JANET_API void janet_unmarshal_abstract_reuse(JanetMarshalContext *ctx, void *p);
JANET_API void janet_register_abstract_type(const JanetAbstractType *at);
JANET_API const JanetAbstractType *janet_get_abstract_type(Janet key);