Decrement thread channel pointer during cleanup without adding to heap.

This commit is contained in:
Calvin Rose 2021-08-20 16:41:19 -05:00
parent c8827424e7
commit 1920ecd668
3 changed files with 22 additions and 15 deletions

View File

@ -598,14 +598,15 @@ static int janet_chan_pack(JanetChannel *chan, Janet *x) {
}
}
static int janet_chan_unpack(JanetChannel *chan, Janet *x) {
static int janet_chan_unpack(JanetChannel *chan, Janet *x, int is_cleanup) {
if (!janet_chan_is_threaded(chan)) return 0;
switch (janet_type(*x)) {
default:
return 1;
case JANET_BUFFER: {
JanetBuffer *buf = janet_unwrap_buffer(*x);
*x = janet_unmarshal(buf->data, buf->count, JANET_MARSHAL_UNSAFE, NULL, NULL);
int flags = is_cleanup ? JANET_MARSHAL_UNSAFE : (JANET_MARSHAL_UNSAFE | JANET_MARSHAL_DECREF);
*x = janet_unmarshal(buf->data, buf->count, flags, NULL, NULL);
janet_buffer_deinit(buf);
janet_free(buf);
return 0;
@ -639,7 +640,7 @@ static void janet_chan_deinit(JanetChannel *chan) {
if (janet_chan_is_threaded(chan)) {
Janet item;
while (!janet_q_pop(&chan->items, &item, sizeof(item))) {
janet_chan_unpack(chan, &item);
janet_chan_unpack(chan, &item, 1);
}
}
janet_q_deinit(&chan->items);
@ -747,12 +748,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
janet_ev_dec_refcount();
if (fiber->sched_id == sched_id) {
if (mode == JANET_CP_MODE_CHOICE_READ) {
janet_assert(!janet_chan_unpack(channel, &x), "packing error");
janet_assert(!janet_chan_unpack(channel, &x, 0), "packing error");
janet_schedule(fiber, make_read_result(channel, x));
} else if (mode == JANET_CP_MODE_CHOICE_WRITE) {
janet_schedule(fiber, make_write_result(channel));
} else if (mode == JANET_CP_MODE_READ) {
janet_assert(!janet_chan_unpack(channel, &x), "packing error");
janet_assert(!janet_chan_unpack(channel, &x, 0), "packing error");
janet_schedule(fiber, x);
} else if (mode == JANET_CP_MODE_WRITE) {
janet_schedule(fiber, janet_wrap_channel(channel));
@ -893,7 +894,7 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
}
return 0;
}
janet_assert(!janet_chan_unpack(channel, item), "bad channel packing");
janet_assert(!janet_chan_unpack(channel, item, 0), "bad channel packing");
if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
/* Pending writer */
if (is_threaded) {

View File

@ -1396,16 +1396,21 @@ static const uint8_t *unmarshal_one(
} u;
memcpy(u.bytes, data, sizeof(void *));
data += sizeof(void *);
*out = janet_wrap_abstract(u.ptr);
/* Check if we have already seen this abstract type - if we have, decrement refcount */
Janet check = janet_table_get(&janet_vm.threaded_abstracts, *out);
if (janet_checktype(check, JANET_NIL)) {
/* Transfers reference from threaded channel buffer to current heap */
janet_table_put(&janet_vm.threaded_abstracts, *out, janet_wrap_false());
} else {
/* Heap reference already accounted for, remove threaded channel reference. */
if (flags & JANET_MARSHAL_DECREF) {
/* Decrement immediately and don't bother putting into heap */
janet_abstract_decref(u.ptr);
*out = janet_wrap_nil();
} else {
*out = janet_wrap_abstract(u.ptr);
Janet check = janet_table_get(&janet_vm.threaded_abstracts, *out);
if (janet_checktype(check, JANET_NIL)) {
/* Transfers reference from threaded channel buffer to current heap */
janet_table_put(&janet_vm.threaded_abstracts, *out, janet_wrap_false());
} else {
/* Heap reference already accounted for, remove threaded channel reference. */
janet_abstract_decref(u.ptr);
}
}
janet_v_push(st->lookup, *out);
@ -1419,7 +1424,6 @@ static const uint8_t *unmarshal_one(
return NULL;
}
}
#undef EXTRA
}
Janet janet_unmarshal(

View File

@ -49,6 +49,8 @@
} while (0)
#endif
#define JANET_MARSHAL_DECREF 0x40000
#define janet_assert(c, m) do { \
if (!(c)) JANET_EXIT((m)); \
} while (0)