1
0
mirror of https://github.com/janet-lang/janet synced 2026-03-04 23:09:48 +00:00

Avoid memory leak when canceling fibers with threaded channels.

Objects in channels are sent as messages that need to be freed by the
consumer. However, in certain cases, no consumer is available and the
messages were being discarded without properly being freed. This should
also fix `-fsanitize=address` on GCC and CLANG with the default test
suite.
This commit is contained in:
Calvin Rose
2026-02-20 14:45:32 -06:00
parent e61194a8d9
commit ca9ffaa5bb

View File

@@ -524,9 +524,9 @@ static void janet_schedule_general(JanetFiber *fiber, Janet value, JanetSignal s
fiber->gc.flags |= JANET_FIBER_FLAG_ROOT;
if (sig == JANET_SIGNAL_ERROR) fiber->gc.flags |= JANET_FIBER_EV_FLAG_CANCELED;
if (soon) {
janet_q_push_head(&janet_vm.spawn, &t, sizeof(t));
janet_assert(!janet_q_push_head(&janet_vm.spawn, &t, sizeof(t)), "schedule queue overflow");
} else {
janet_q_push(&janet_vm.spawn, &t, sizeof(t));
janet_assert(!janet_q_push(&janet_vm.spawn, &t, sizeof(t)), "schedule queue overflow");
}
}
@@ -959,11 +959,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
janet_schedule(fiber, janet_wrap_nil());
}
} else if (mode != JANET_CP_MODE_CLOSE) {
/* Fiber has already been cancelled or resumed. */
/* Fiber has already been canceled or resumed. */
/* Resend event to another waiting thread, depending on mode */
int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ);
if (is_read) {
JanetChannelPending reader;
int sent = 0;
while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) {
JanetVM *vm = reader.thread;
if (!vm) continue;
@@ -974,8 +975,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
msg.argp = channel;
msg.argj = x;
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
sent = 1;
break;
}
if (!sent) {
janet_chan_unpack(channel, &x, 1);
}
} else {
JanetChannelPending writer;
while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
@@ -1001,14 +1006,14 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode) {
JanetChannelPending reader;
int is_empty;
if (janet_chan_pack(channel, &x)) {
janet_chan_unlock(channel);
janet_panicf("failed to pack value for channel: %v", x);
}
if (channel->closed) {
janet_chan_unlock(channel);
janet_panic("cannot write to closed channel");
}
if (janet_chan_pack(channel, &x)) {
janet_chan_unlock(channel);
janet_panicf("failed to pack value for channel: %v", x);
}
int is_threaded = janet_chan_is_threaded(channel);
if (is_threaded) {
/* don't dereference fiber from another thread */
@@ -1021,6 +1026,7 @@ static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode
if (is_empty) {
/* No pending reader */
if (janet_q_push(&channel->items, &x, sizeof(Janet))) {
janet_chan_unpack(channel, &x, 1);
janet_chan_unlock(channel);
janet_panicf("channel overflow: %v", x);
} else if (janet_q_count(&channel->items) > channel->limit) {
@@ -1054,6 +1060,9 @@ static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode
msg.argj = x;
if (vm) {
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
} else {
/* If no vm to send to, we must clean up (unpack) the packed payload to avoid leak */
janet_chan_unpack(channel, &x, 1);
}
} else {
if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
@@ -1458,11 +1467,12 @@ static void *janet_chanat_unmarshal(JanetMarshalContext *ctx) {
int32_t limit = janet_unmarshal_int(ctx);
int32_t count = janet_unmarshal_int(ctx);
if (count < 0) janet_panic("invalid negative channel count");
if (count > limit) janet_panic("invalid channel count");
janet_chan_init(abst, limit, 0);
abst->closed = !!is_closed;
for (int32_t i = 0; i < count; i++) {
Janet item = janet_unmarshal_janet(ctx);
janet_q_push(&abst->items, &item, sizeof(item));
janet_assert(!janet_q_push(&abst->items, &item, sizeof(item)), "bad unmarshal channel");
}
return abst;
}