From d543f8857b84ac9e59972c56ef5af0b38edbb041 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 9 Dec 2021 18:44:55 -0600 Subject: [PATCH] Fix #892 - Remove racy ref counts for channels Rather than manual reference counting for suspended fibers, we automate the process by incrementing "extra_listeners" every time we suspend a fiber in the event loop, and decrement when that fiber is resumed. In this manner, we keep track of the number of suspending fibers in a simpler, more correct way. --- src/core/ev.c | 18 ++++++++---------- src/core/fiber.h | 4 +++- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index abc330ab..dc7c0c92 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -464,9 +464,9 @@ const JanetAbstractType janet_stream_type = { /* Register a fiber to resume with value */ void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) { - if (fiber->flags & JANET_FIBER_FLAG_CANCELED) return; + if (fiber->gc.flags & JANET_FIBER_EV_FLAG_CANCELED) return; JanetTask t = { fiber, value, sig, ++fiber->sched_id }; - if (sig == JANET_SIGNAL_ERROR) fiber->flags |= JANET_FIBER_FLAG_CANCELED; + if (sig == JANET_SIGNAL_ERROR) fiber->gc.flags |= JANET_FIBER_EV_FLAG_CANCELED; janet_q_push(&janet_vm.spawn, &t, sizeof(t)); } @@ -744,7 +744,6 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { int mode = msg.tag; JanetChannel *channel = (JanetChannel *) msg.argp; Janet x = msg.argj; - janet_ev_dec_refcount(); if (fiber->sched_id == sched_id) { if (mode == JANET_CP_MODE_CHOICE_READ) { janet_assert(!janet_chan_unpack(channel, &x, 0), "packing error"); @@ -837,7 +836,6 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { pending.mode = mode ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_WRITE; janet_q_push(&channel->write_pending, &pending, sizeof(pending)); janet_chan_unlock(channel); - janet_ev_inc_refcount(); if (is_threaded) { janet_gcroot(janet_wrap_fiber(pending.fiber)); } @@ -855,7 +853,6 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { msg.argj = x; janet_ev_post_event(vm, janet_thread_chan_cb, msg); } else { - janet_ev_dec_refcount(); if (reader.mode == JANET_CP_MODE_CHOICE_READ) { janet_schedule(reader.fiber, make_read_result(channel, x)); } else { @@ -888,7 +885,6 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) pending.mode = is_choice ? JANET_CP_MODE_CHOICE_READ : JANET_CP_MODE_READ; janet_q_push(&channel->read_pending, &pending, sizeof(pending)); janet_chan_unlock(channel); - janet_ev_inc_refcount(); if (is_threaded) { janet_gcroot(janet_wrap_fiber(pending.fiber)); } @@ -907,7 +903,6 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) msg.argj = janet_wrap_nil(); janet_ev_post_event(vm, janet_thread_chan_cb, msg); } else { - janet_ev_dec_refcount(); if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { janet_schedule(writer.fiber, make_write_result(channel)); } else { @@ -1111,7 +1106,6 @@ JANET_CORE_FN(cfun_channel_close, msg.argj = janet_wrap_nil(); janet_ev_post_event(vm, janet_thread_chan_cb, msg); } else { - janet_ev_dec_refcount(); if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { janet_schedule(writer.fiber, janet_wrap_nil()); } else { @@ -1131,7 +1125,6 @@ JANET_CORE_FN(cfun_channel_close, msg.argj = janet_wrap_nil(); janet_ev_post_event(vm, janet_thread_chan_cb, msg); } else { - janet_ev_dec_refcount(); if (reader.mode == JANET_CP_MODE_CHOICE_READ) { janet_schedule(reader.fiber, janet_wrap_nil()); } else { @@ -1228,12 +1221,17 @@ JanetFiber *janet_loop1(void) { while (janet_vm.spawn.head != janet_vm.spawn.tail) { JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK, 0}; janet_q_pop(&janet_vm.spawn, &task, sizeof(task)); - task.fiber->flags &= ~JANET_FIBER_FLAG_CANCELED; + if (task.fiber->gc.flags & JANET_FIBER_EV_FLAG_SUSPENDED) janet_ev_dec_refcount(); + task.fiber->gc.flags &= ~(JANET_FIBER_EV_FLAG_CANCELED | JANET_FIBER_EV_FLAG_SUSPENDED); if (task.expected_sched_id != task.fiber->sched_id) continue; Janet res; JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig); void *sv = task.fiber->supervisor_channel; int is_suspended = sig == JANET_SIGNAL_EVENT || sig == JANET_SIGNAL_YIELD || sig == JANET_SIGNAL_INTERRUPT; + if (is_suspended) { + task.fiber->gc.flags |= JANET_FIBER_EV_FLAG_SUSPENDED; + janet_ev_inc_refcount(); + } if (NULL == sv) { if (!is_suspended) { janet_stacktrace_ext(task.fiber, res, ""); diff --git a/src/core/fiber.h b/src/core/fiber.h index ac2064fb..4b39aa84 100644 --- a/src/core/fiber.h +++ b/src/core/fiber.h @@ -48,7 +48,6 @@ #define JANET_FIBER_STATUS_MASK 0x3F0000 #define JANET_FIBER_RESUME_SIGNAL 0x400000 -#define JANET_FIBER_FLAG_CANCELED 0x400000 #define JANET_FIBER_STATUS_OFFSET 16 #define JANET_FIBER_BREAKPOINT 0x1000000 @@ -57,6 +56,9 @@ #define JANET_FIBER_DID_LONGJUMP 0x8000000 #define JANET_FIBER_FLAG_MASK 0xF000000 +#define JANET_FIBER_EV_FLAG_CANCELED 0x10000 +#define JANET_FIBER_EV_FLAG_SUSPENDED 0x20000 + #define janet_fiber_set_status(f, s) do {\ (f)->flags &= ~JANET_FIBER_STATUS_MASK;\ (f)->flags |= (s) << JANET_FIBER_STATUS_OFFSET;\