diff --git a/src/core/ev.c b/src/core/ev.c index a3eb06ef..25fb221e 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -598,14 +598,15 @@ static int janet_chan_pack(JanetChannel *chan, Janet *x) { } } -static int janet_chan_unpack(JanetChannel *chan, Janet *x) { +static int janet_chan_unpack(JanetChannel *chan, Janet *x, int is_cleanup) { if (!janet_chan_is_threaded(chan)) return 0; switch (janet_type(*x)) { default: return 1; case JANET_BUFFER: { JanetBuffer *buf = janet_unwrap_buffer(*x); - *x = janet_unmarshal(buf->data, buf->count, JANET_MARSHAL_UNSAFE, NULL, NULL); + int flags = is_cleanup ? JANET_MARSHAL_UNSAFE : (JANET_MARSHAL_UNSAFE | JANET_MARSHAL_DECREF); + *x = janet_unmarshal(buf->data, buf->count, flags, NULL, NULL); janet_buffer_deinit(buf); janet_free(buf); return 0; @@ -639,7 +640,7 @@ static void janet_chan_deinit(JanetChannel *chan) { if (janet_chan_is_threaded(chan)) { Janet item; while (!janet_q_pop(&chan->items, &item, sizeof(item))) { - janet_chan_unpack(chan, &item); + janet_chan_unpack(chan, &item, 1); } } janet_q_deinit(&chan->items); @@ -747,12 +748,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { janet_ev_dec_refcount(); if (fiber->sched_id == sched_id) { if (mode == JANET_CP_MODE_CHOICE_READ) { - janet_assert(!janet_chan_unpack(channel, &x), "packing error"); + janet_assert(!janet_chan_unpack(channel, &x, 0), "packing error"); janet_schedule(fiber, make_read_result(channel, x)); } else if (mode == JANET_CP_MODE_CHOICE_WRITE) { janet_schedule(fiber, make_write_result(channel)); } else if (mode == JANET_CP_MODE_READ) { - janet_assert(!janet_chan_unpack(channel, &x), "packing error"); + janet_assert(!janet_chan_unpack(channel, &x, 0), "packing error"); janet_schedule(fiber, x); } else if (mode == JANET_CP_MODE_WRITE) { janet_schedule(fiber, janet_wrap_channel(channel)); @@ -893,7 +894,7 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) } return 0; } - janet_assert(!janet_chan_unpack(channel, item), "bad channel packing"); + janet_assert(!janet_chan_unpack(channel, item, 0), "bad channel packing"); if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { /* Pending writer */ if (is_threaded) { diff --git a/src/core/marsh.c b/src/core/marsh.c index a937a70b..a9912190 100644 --- a/src/core/marsh.c +++ b/src/core/marsh.c @@ -1396,16 +1396,21 @@ static const uint8_t *unmarshal_one( } u; memcpy(u.bytes, data, sizeof(void *)); data += sizeof(void *); - *out = janet_wrap_abstract(u.ptr); - /* Check if we have already seen this abstract type - if we have, decrement refcount */ - Janet check = janet_table_get(&janet_vm.threaded_abstracts, *out); - if (janet_checktype(check, JANET_NIL)) { - /* Transfers reference from threaded channel buffer to current heap */ - janet_table_put(&janet_vm.threaded_abstracts, *out, janet_wrap_false()); - } else { - /* Heap reference already accounted for, remove threaded channel reference. */ + if (flags & JANET_MARSHAL_DECREF) { + /* Decrement immediately and don't bother putting into heap */ janet_abstract_decref(u.ptr); + *out = janet_wrap_nil(); + } else { + *out = janet_wrap_abstract(u.ptr); + Janet check = janet_table_get(&janet_vm.threaded_abstracts, *out); + if (janet_checktype(check, JANET_NIL)) { + /* Transfers reference from threaded channel buffer to current heap */ + janet_table_put(&janet_vm.threaded_abstracts, *out, janet_wrap_false()); + } else { + /* Heap reference already accounted for, remove threaded channel reference. */ + janet_abstract_decref(u.ptr); + } } janet_v_push(st->lookup, *out); @@ -1419,7 +1424,6 @@ static const uint8_t *unmarshal_one( return NULL; } } -#undef EXTRA } Janet janet_unmarshal( diff --git a/src/core/util.h b/src/core/util.h index 2c7e2a0e..fb54715f 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -49,6 +49,8 @@ } while (0) #endif +#define JANET_MARSHAL_DECREF 0x40000 + #define janet_assert(c, m) do { \ if (!(c)) JANET_EXIT((m)); \ } while (0)