From 82416e4e4e69683f48eacc68929b1e4ba4b919d2 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Mon, 1 Sep 2025 10:25:02 -0500 Subject: [PATCH] Address #1629 - janet_deinit called before threaded channel message sent to thread. If we take a reference to another thread inside channel code, make sure that we increase the refcount to avoid a use after free. --- meson.build | 1 + src/core/ev.c | 57 +++++++++++++++++++++++++++++++++++++------ src/core/gc.c | 6 ++++- src/include/janet.h | 4 ++- test/suite-ev2.janet | 58 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 117 insertions(+), 9 deletions(-) create mode 100644 test/suite-ev2.janet diff --git a/meson.build b/meson.build index 4bc4dcc7..08835187 100644 --- a/meson.build +++ b/meson.build @@ -281,6 +281,7 @@ test_files = [ 'test/suite-corelib.janet', 'test/suite-debug.janet', 'test/suite-ev.janet', + 'test/suite-ev2.janet', 'test/suite-ffi.janet', 'test/suite-filewatch.janet', 'test/suite-inttypes.janet', diff --git a/src/core/ev.c b/src/core/ev.c index c71214c7..3acf9dd6 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -839,6 +839,34 @@ static int janet_chanat_gc(void *p, size_t s) { return 0; } +static void janet_chanat_remove_vmref(JanetQueue *fq) { + JanetChannelPending *pending = fq->data; + if (fq->head <= fq->tail) { + for (int32_t i = fq->head; i < fq->tail; i++) { + if (pending[i].thread == &janet_vm) pending[i].thread = NULL; + } + } else { + for (int32_t i = fq->head; i < fq->capacity; i++) { + if (pending[i].thread == &janet_vm) pending[i].thread = NULL; + } + for (int32_t i = 0; i < fq->tail; i++) { + if (pending[i].thread == &janet_vm) pending[i].thread = NULL; + } + } +} + +static int janet_chanat_gcperthread(void *p, size_t s) { + (void) s; + JanetChannel *chan = p; + janet_chan_lock(chan); + /* Make sure that the internals of the threaded channel no longer reference _this_ thread. Replace + * those references with NULL. */ + janet_chanat_remove_vmref(&chan->read_pending); + janet_chanat_remove_vmref(&chan->write_pending); + janet_chan_unlock(chan); + return 0; +} + static void janet_chanat_mark_fq(JanetQueue *fq) { JanetChannelPending *pending = fq->data; if (fq->head <= fq->tail) { @@ -921,8 +949,9 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ); if (is_read) { JanetChannelPending reader; - if (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) { + while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) { JanetVM *vm = reader.thread; + if (!vm) continue; JanetEVGenericMessage msg; msg.tag = reader.mode; msg.fiber = reader.fiber; @@ -930,11 +959,13 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { msg.argp = channel; msg.argj = x; janet_ev_post_event(vm, janet_thread_chan_cb, msg); + break; } } else { JanetChannelPending writer; - if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { + while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { JanetVM *vm = writer.thread; + if (!vm) continue; JanetEVGenericMessage msg; msg.tag = writer.mode; msg.fiber = writer.fiber; @@ -942,6 +973,7 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { msg.argp = channel; msg.argj = janet_wrap_nil(); janet_ev_post_event(vm, janet_thread_chan_cb, msg); + break; } } } @@ -1005,7 +1037,9 @@ static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode msg.argi = (int32_t) reader.sched_id; msg.argp = channel; msg.argj = x; - janet_ev_post_event(vm, janet_thread_chan_cb, msg); + if (vm) { + janet_ev_post_event(vm, janet_thread_chan_cb, msg); + } } else { if (reader.mode == JANET_CP_MODE_CHOICE_READ) { janet_schedule(reader.fiber, make_read_result(channel, x)); @@ -1060,7 +1094,9 @@ static int janet_channel_pop_with_lock(JanetChannel *channel, Janet *item, int i msg.argi = (int32_t) writer.sched_id; msg.argp = channel; msg.argj = janet_wrap_nil(); - janet_ev_post_event(vm, janet_thread_chan_cb, msg); + if (vm) { + janet_ev_post_event(vm, janet_thread_chan_cb, msg); + } } else { if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { janet_schedule(writer.fiber, make_write_result(channel)); @@ -1324,7 +1360,9 @@ JANET_CORE_FN(cfun_channel_close, msg.tag = JANET_CP_MODE_CLOSE; msg.argi = (int32_t) writer.sched_id; msg.argj = janet_wrap_nil(); - janet_ev_post_event(vm, janet_thread_chan_cb, msg); + if (vm) { + janet_ev_post_event(vm, janet_thread_chan_cb, msg); + } } else { if (janet_fiber_can_resume(writer.fiber)) { if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { @@ -1345,7 +1383,9 @@ JANET_CORE_FN(cfun_channel_close, msg.tag = JANET_CP_MODE_CLOSE; msg.argi = (int32_t) reader.sched_id; msg.argj = janet_wrap_nil(); - janet_ev_post_event(vm, janet_thread_chan_cb, msg); + if (vm) { + janet_ev_post_event(vm, janet_thread_chan_cb, msg); + } } else { if (janet_fiber_can_resume(reader.fiber)) { if (reader.mode == JANET_CP_MODE_CHOICE_READ) { @@ -1438,7 +1478,10 @@ const JanetAbstractType janet_channel_type = { NULL, /* compare */ NULL, /* hash */ janet_chanat_next, - JANET_ATEND_NEXT + NULL, /* call */ + NULL, /* length */ + NULL, /* bytes */ + janet_chanat_gcperthread }; /* Main event loop */ diff --git a/src/core/gc.c b/src/core/gc.c index 79e67409..074560af 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -497,9 +497,13 @@ void janet_sweep() { /* If not visited... */ if (!janet_truthy(items[i].value)) { void *abst = janet_unwrap_abstract(items[i].key); + JanetAbstractHead *head = janet_abstract_head(abst); + /* Optional per-thread finalizer */ + if (head->type->gcperthread) { + janet_assert(!head->type->gcperthread(head->data, head->size), "finalizer failed"); + } if (0 == janet_abstract_decref(abst)) { /* Run finalizer */ - JanetAbstractHead *head = janet_abstract_head(abst); if (head->type->gc) { janet_assert(!head->type->gc(head->data, head->size), "finalizer failed"); } diff --git a/src/include/janet.h b/src/include/janet.h index 4d48de95..729adc44 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1188,6 +1188,7 @@ struct JanetAbstractType { Janet(*call)(void *p, int32_t argc, Janet *argv); size_t (*length)(void *p, size_t len); JanetByteView(*bytes)(void *p, size_t len); + int (*gcperthread)(void *data, size_t len); }; /* Some macros to let us add extra types to JanetAbstract types without @@ -1207,7 +1208,8 @@ struct JanetAbstractType { #define JANET_ATEND_NEXT NULL,JANET_ATEND_CALL #define JANET_ATEND_CALL NULL,JANET_ATEND_LENGTH #define JANET_ATEND_LENGTH NULL,JANET_ATEND_BYTES -#define JANET_ATEND_BYTES +#define JANET_ATEND_BYTES NULL,JANET_ATEND_GCPERTHREAD +#define JANET_ATEND_GCPERTHREAD struct JanetReg { const char *name; diff --git a/test/suite-ev2.janet b/test/suite-ev2.janet new file mode 100644 index 00000000..60cf2838 --- /dev/null +++ b/test/suite-ev2.janet @@ -0,0 +1,58 @@ +# Copyright (c) 2025 Calvin Rose & contributors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +(import ./helper :prefix "" :exit true) +(start-suite) + +# Issue #1629 +(def thread-channel (ev/thread-chan 100)) +(def super (ev/thread-chan 10)) +(defn worker [] + (while true + (def item (ev/take thread-channel)) + (when (= item :deadline) + (ev/deadline 0.1 nil (fiber/current) true)))) +(ev/thread worker nil :n super) +(ev/give thread-channel :item) +(ev/sleep 0.05) +(ev/give thread-channel :item) +(ev/sleep 0.05) +(ev/give thread-channel :deadline) +(ev/sleep 0.05) +(ev/give thread-channel :item) +(ev/sleep 0.05) +(ev/give thread-channel :item) +(ev/sleep 0.15) +(assert (deep= '(:error "deadline expired" nil) (ev/take super)) "deadline expirataion") + +# Another variant +(def thread-channel (ev/thread-chan 100)) +(def super (ev/thread-chan 10)) +(defn worker [] + (while true + (def item (ev/take thread-channel)) + (when (= item :deadline) + (ev/deadline 0.1 nil (fiber/current) true)))) +(ev/thread worker nil :n super) +(ev/give thread-channel :deadline) +(ev/sleep 0.2) +(assert (deep= '(:error "deadline expired" nil) (ev/take super)) "deadline expirataion") + +(end-suite)