Fix #892 - Remove racy ref counts for channels

Rather than manual reference counting for suspended fibers, we
automate the process by incrementing "extra_listeners" every time
we suspend a fiber in the event loop, and decrement when that fiber
is resumed. In this manner, we keep track of the number of suspending
fibers in a simpler, more correct way.
This commit is contained in:
Calvin Rose 2021-12-09 18:44:55 -06:00
parent 6d9286a202
commit d543f8857b
2 changed files with 11 additions and 11 deletions

View File

@ -464,9 +464,9 @@ const JanetAbstractType janet_stream_type = {
/* Register a fiber to resume with value */
void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) {
if (fiber->flags & JANET_FIBER_FLAG_CANCELED) return;
if (fiber->gc.flags & JANET_FIBER_EV_FLAG_CANCELED) return;
JanetTask t = { fiber, value, sig, ++fiber->sched_id };
if (sig == JANET_SIGNAL_ERROR) fiber->flags |= JANET_FIBER_FLAG_CANCELED;
if (sig == JANET_SIGNAL_ERROR) fiber->gc.flags |= JANET_FIBER_EV_FLAG_CANCELED;
janet_q_push(&janet_vm.spawn, &t, sizeof(t));
}
@ -744,7 +744,6 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
int mode = msg.tag;
JanetChannel *channel = (JanetChannel *) msg.argp;
Janet x = msg.argj;
janet_ev_dec_refcount();
if (fiber->sched_id == sched_id) {
if (mode == JANET_CP_MODE_CHOICE_READ) {
janet_assert(!janet_chan_unpack(channel, &x, 0), "packing error");
@ -837,7 +836,6 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) {
pending.mode = mode ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_WRITE;
janet_q_push(&channel->write_pending, &pending, sizeof(pending));
janet_chan_unlock(channel);
janet_ev_inc_refcount();
if (is_threaded) {
janet_gcroot(janet_wrap_fiber(pending.fiber));
}
@ -855,7 +853,6 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) {
msg.argj = x;
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
} else {
janet_ev_dec_refcount();
if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
janet_schedule(reader.fiber, make_read_result(channel, x));
} else {
@ -888,7 +885,6 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
pending.mode = is_choice ? JANET_CP_MODE_CHOICE_READ : JANET_CP_MODE_READ;
janet_q_push(&channel->read_pending, &pending, sizeof(pending));
janet_chan_unlock(channel);
janet_ev_inc_refcount();
if (is_threaded) {
janet_gcroot(janet_wrap_fiber(pending.fiber));
}
@ -907,7 +903,6 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
msg.argj = janet_wrap_nil();
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
} else {
janet_ev_dec_refcount();
if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) {
janet_schedule(writer.fiber, make_write_result(channel));
} else {
@ -1111,7 +1106,6 @@ JANET_CORE_FN(cfun_channel_close,
msg.argj = janet_wrap_nil();
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
} else {
janet_ev_dec_refcount();
if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) {
janet_schedule(writer.fiber, janet_wrap_nil());
} else {
@ -1131,7 +1125,6 @@ JANET_CORE_FN(cfun_channel_close,
msg.argj = janet_wrap_nil();
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
} else {
janet_ev_dec_refcount();
if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
janet_schedule(reader.fiber, janet_wrap_nil());
} else {
@ -1228,12 +1221,17 @@ JanetFiber *janet_loop1(void) {
while (janet_vm.spawn.head != janet_vm.spawn.tail) {
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK, 0};
janet_q_pop(&janet_vm.spawn, &task, sizeof(task));
task.fiber->flags &= ~JANET_FIBER_FLAG_CANCELED;
if (task.fiber->gc.flags & JANET_FIBER_EV_FLAG_SUSPENDED) janet_ev_dec_refcount();
task.fiber->gc.flags &= ~(JANET_FIBER_EV_FLAG_CANCELED | JANET_FIBER_EV_FLAG_SUSPENDED);
if (task.expected_sched_id != task.fiber->sched_id) continue;
Janet res;
JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig);
void *sv = task.fiber->supervisor_channel;
int is_suspended = sig == JANET_SIGNAL_EVENT || sig == JANET_SIGNAL_YIELD || sig == JANET_SIGNAL_INTERRUPT;
if (is_suspended) {
task.fiber->gc.flags |= JANET_FIBER_EV_FLAG_SUSPENDED;
janet_ev_inc_refcount();
}
if (NULL == sv) {
if (!is_suspended) {
janet_stacktrace_ext(task.fiber, res, "");

View File

@ -48,7 +48,6 @@
#define JANET_FIBER_STATUS_MASK 0x3F0000
#define JANET_FIBER_RESUME_SIGNAL 0x400000
#define JANET_FIBER_FLAG_CANCELED 0x400000
#define JANET_FIBER_STATUS_OFFSET 16
#define JANET_FIBER_BREAKPOINT 0x1000000
@ -57,6 +56,9 @@
#define JANET_FIBER_DID_LONGJUMP 0x8000000
#define JANET_FIBER_FLAG_MASK 0xF000000
#define JANET_FIBER_EV_FLAG_CANCELED 0x10000
#define JANET_FIBER_EV_FLAG_SUSPENDED 0x20000
#define janet_fiber_set_status(f, s) do {\
(f)->flags &= ~JANET_FIBER_STATUS_MASK;\
(f)->flags |= (s) << JANET_FIBER_STATUS_OFFSET;\