diff --git a/src/core/ev.c b/src/core/ev.c index 79b9c02c..19d4b39a 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -524,9 +524,9 @@ static void janet_schedule_general(JanetFiber *fiber, Janet value, JanetSignal s fiber->gc.flags |= JANET_FIBER_FLAG_ROOT; if (sig == JANET_SIGNAL_ERROR) fiber->gc.flags |= JANET_FIBER_EV_FLAG_CANCELED; if (soon) { - janet_q_push_head(&janet_vm.spawn, &t, sizeof(t)); + janet_assert(!janet_q_push_head(&janet_vm.spawn, &t, sizeof(t)), "schedule queue overflow"); } else { - janet_q_push(&janet_vm.spawn, &t, sizeof(t)); + janet_assert(!janet_q_push(&janet_vm.spawn, &t, sizeof(t)), "schedule queue overflow"); } } @@ -959,11 +959,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { janet_schedule(fiber, janet_wrap_nil()); } } else if (mode != JANET_CP_MODE_CLOSE) { - /* Fiber has already been cancelled or resumed. */ + /* Fiber has already been canceled or resumed. */ /* Resend event to another waiting thread, depending on mode */ int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ); if (is_read) { JanetChannelPending reader; + int sent = 0; while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) { JanetVM *vm = reader.thread; if (!vm) continue; @@ -974,8 +975,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { msg.argp = channel; msg.argj = x; janet_ev_post_event(vm, janet_thread_chan_cb, msg); + sent = 1; break; } + if (!sent) { + janet_chan_unpack(channel, &x, 1); + } } else { JanetChannelPending writer; while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { @@ -1001,14 +1006,14 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode) { JanetChannelPending reader; int is_empty; - if (janet_chan_pack(channel, &x)) { - janet_chan_unlock(channel); - janet_panicf("failed to pack value for channel: %v", x); - } if (channel->closed) { janet_chan_unlock(channel); janet_panic("cannot write to closed channel"); } + if (janet_chan_pack(channel, &x)) { + janet_chan_unlock(channel); + janet_panicf("failed to pack value for channel: %v", x); + } int is_threaded = janet_chan_is_threaded(channel); if (is_threaded) { /* don't dereference fiber from another thread */ @@ -1021,6 +1026,7 @@ static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode if (is_empty) { /* No pending reader */ if (janet_q_push(&channel->items, &x, sizeof(Janet))) { + janet_chan_unpack(channel, &x, 1); janet_chan_unlock(channel); janet_panicf("channel overflow: %v", x); } else if (janet_q_count(&channel->items) > channel->limit) { @@ -1054,6 +1060,9 @@ static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode msg.argj = x; if (vm) { janet_ev_post_event(vm, janet_thread_chan_cb, msg); + } else { + /* If no vm to send to, we must clean up (unpack) the packed payload to avoid leak */ + janet_chan_unpack(channel, &x, 1); } } else { if (reader.mode == JANET_CP_MODE_CHOICE_READ) { @@ -1458,11 +1467,12 @@ static void *janet_chanat_unmarshal(JanetMarshalContext *ctx) { int32_t limit = janet_unmarshal_int(ctx); int32_t count = janet_unmarshal_int(ctx); if (count < 0) janet_panic("invalid negative channel count"); + if (count > limit) janet_panic("invalid channel count"); janet_chan_init(abst, limit, 0); abst->closed = !!is_closed; for (int32_t i = 0; i < count; i++) { Janet item = janet_unmarshal_janet(ctx); - janet_q_push(&abst->items, &item, sizeof(item)); + janet_assert(!janet_q_push(&abst->items, &item, sizeof(item)), "bad unmarshal channel"); } return abst; }