From cc066dd6a1bc2ec099074813be5f8f7d36ce6513 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 19 Aug 2021 21:16:20 -0500 Subject: [PATCH] Add basic runtime support for threaded abstracts. A threaded abstract is an abstract type that can be freely shared between threads. While no synchronization is provided, refcounting and transport between threads is. This will let implementers more easily exploit OS-level parallelism in C library code. The caveat with these types is that they need to be careful in how they interact with objects on other heaps. --- src/core/abstract.c | 1 + src/core/ev.c | 139 +++++--------------------------------------- src/core/marsh.c | 4 ++ src/core/state.h | 1 - 4 files changed, 19 insertions(+), 126 deletions(-) diff --git a/src/core/abstract.c b/src/core/abstract.c index c4178728..df829ead 100644 --- a/src/core/abstract.c +++ b/src/core/abstract.c @@ -63,6 +63,7 @@ void *janet_abstract_begin_threaded(const JanetAbstractType *atype, size_t size) } janet_vm.next_collection += size + sizeof(JanetAbstractHead); header->gc.flags = JANET_MEMORY_THREADED_ABSTRACT; + header->gc.next = NULL; /* Clear memory for address sanitizers */ header->gc.refcount = 1; header->size = size; header->type = atype; diff --git a/src/core/ev.c b/src/core/ev.c index 6f03c47a..c24af8d6 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -64,7 +64,7 @@ typedef struct { JanetQueue write_pending; int32_t limit; int closed; - int ref_count; /* refcount < 0 means no threading */ + int is_threaded; #ifdef JANET_WINDOWS CRITICAL_SECTION lock; #else @@ -530,7 +530,6 @@ void janet_ev_init_common(void) { janet_vm.tq = NULL; janet_vm.tq_count = 0; janet_vm.tq_capacity = 0; - janet_table_init_raw(&janet_vm.channel_map, 0); janet_table_init_raw(&janet_vm.threaded_abstracts, 0); janet_rng_seed(&janet_vm.ev_rng, 0); } @@ -541,7 +540,6 @@ void janet_ev_deinit_common(void) { janet_free(janet_vm.tq); janet_free(janet_vm.listeners); janet_vm.listeners = NULL; - janet_table_deinit(&janet_vm.channel_map); janet_table_deinit(&janet_vm.threaded_abstracts); } @@ -575,11 +573,11 @@ void janet_ev_dec_refcount(void) { #define JANET_MAX_CHANNEL_CAPACITY 0xFFFFFF static inline int janet_chan_is_threaded(JanetChannel *chan) { - return chan->ref_count >= 0; + return chan->is_threaded; } static int janet_chan_pack(JanetChannel *chan, Janet *x) { - if (chan->ref_count < 0) return 0; + if (!janet_chan_is_threaded(chan)) return 0; switch (janet_type(*x)) { default: { JanetBuffer *buf = janet_malloc(sizeof(JanetBuffer)); @@ -624,7 +622,7 @@ static int janet_chan_unpack(JanetChannel *chan, Janet *x) { static void janet_chan_init(JanetChannel *chan, int32_t limit, int threaded) { chan->limit = limit; chan->closed = 0; - chan->ref_count = threaded ? 1 : -1; + chan->is_threaded = threaded; janet_q_init(&chan->items); janet_q_init(&chan->read_pending); janet_q_init(&chan->write_pending); @@ -675,112 +673,13 @@ static void janet_chan_unlock(JanetChannel *chan) { */ static Janet janet_wrap_channel(JanetChannel *channel) { - if (janet_chan_is_threaded(channel)) { - return janet_table_get(&janet_vm.channel_map, janet_wrap_pointer(channel)); - } else { - return janet_wrap_abstract(channel); - } -} - -static void janet_chanat_marshal(void *p, JanetMarshalContext *ctx) { - size_t s = janet_abstract_size(p); - int is_threaded = s == sizeof(JanetChannel *); - janet_marshal_int(ctx, is_threaded); - janet_marshal_abstract(ctx, p); - if (is_threaded) { - if (!(ctx->flags & JANET_MARSHAL_UNSAFE)) { - janet_panic("can only marshal threaded channel with unsafe flag"); - } - JanetChannel *channel = *((JanetChannel **) p); - - janet_marshal_int64(ctx, (int64_t) channel); - - /* Increment refcount BEFORE sending to thread - this prevents the channel from being GCed while in transit. - * The runtime will ensure that the channel will be unmarshalled eventually, even if no thread can receive it to - * run the gc finailizer and eventually decrement the refcount. */ - janet_chan_lock(channel); - channel->ref_count++; - janet_chan_unlock(channel); - - } else { - JanetChannel *channel = (JanetChannel *)p; - janet_marshal_int(ctx, channel->closed); - janet_marshal_int(ctx, channel->limit); - janet_marshal_int(ctx, janet_q_count(&channel->items)); - Janet *items = channel->items.data; - for (int32_t i = channel->items.head; i < channel->items.tail; i++) { - janet_marshal_janet(ctx, items[i]); - } - } -} - -static void *janet_chanat_unmarshal(JanetMarshalContext *ctx) { - int is_threaded = janet_unmarshal_int(ctx); - if (is_threaded) { - if (!(ctx->flags & JANET_MARSHAL_UNSAFE)) { - janet_panic("can only unmarshal threaded channel with unsafe flag"); - } - int64_t intptr = janet_unmarshal_int64(ctx); - JanetChannel *chan = (JanetChannel *) intptr; - Janet check = janet_table_get(&janet_vm.channel_map, janet_wrap_pointer(chan)); - if (janet_checktype(check, JANET_NIL)) { - JanetChannel **pchan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel *)); - *pchan = chan; - janet_table_put(&janet_vm.channel_map, janet_wrap_pointer(chan), janet_wrap_abstract(pchan)); - return pchan; - } else { - janet_chan_lock(chan); - chan->ref_count--; - janet_chan_unlock(chan); - void *p = janet_unwrap_abstract(check); - janet_unmarshal_abstract_reuse(ctx, p); - return p; - } - } else { - JanetChannel *chan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel)); - chan->limit = 0; - chan->closed = 1; - chan->ref_count = -1; - janet_q_init(&chan->items); - janet_q_init(&chan->read_pending); - janet_q_init(&chan->write_pending); - int closed = janet_unmarshal_int(ctx); - int32_t limit = janet_unmarshal_int(ctx); - if (limit < 0) { - janet_panicf("invalid channel limit found: %d", limit); - } - int32_t count = janet_unmarshal_int(ctx); - if (count < 0) { - janet_panicf("invalid channel count found: %d", count); - } - chan->limit = limit; - chan->closed = closed; - for (int32_t i = 0; i < count; i++) { - Janet item = janet_unmarshal_janet(ctx); - janet_q_push(&chan->items, &item, sizeof(item)); - } - return chan; - } + return janet_wrap_abstract(channel); } static int janet_chanat_gc(void *p, size_t s) { - JanetChannel *channel; - if (s == sizeof(JanetChannel *)) { - /* threaded */ - channel = *((JanetChannel **)p); - janet_table_remove(&janet_vm.channel_map, janet_wrap_pointer(channel)); - janet_chan_lock(channel); - if (--channel->ref_count == 0) { - janet_chan_deinit(channel); - janet_free(channel); - } else { - janet_chan_unlock(channel); - } - } else { - /* not threaded */ - channel = p; - janet_chan_deinit(channel); - } + (void) s; + JanetChannel *channel = p; + janet_chan_deinit(channel); return 0; } @@ -798,7 +697,7 @@ static void janet_chanat_mark_fq(JanetQueue *fq) { } static int janet_chanat_mark(void *p, size_t s) { - if (s == sizeof(JanetChannel *)) return 0; + (void) s; JanetChannel *chan = p; janet_chanat_mark_fq(&chan->read_pending); janet_chanat_mark_fq(&chan->write_pending); @@ -1019,13 +918,7 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) } JanetChannel *janet_channel_unwrap(void *abstract) { - /* Rely on Janet's abstract type size tracking and the fact that a channel will - * surely be bigger than a pointer to a channel */ - if (janet_abstract_size(abstract) == sizeof(JanetChannel *)) { - return *((JanetChannel **) abstract); - } else { - return abstract; - } + return abstract; } JanetChannel *janet_getchannel(const Janet *argv, int32_t n) { @@ -1190,13 +1083,9 @@ JANET_CORE_FN(cfun_channel_new_threaded, "used to communicate between any number of operating system threads.") { janet_arity(argc, 0, 1); int32_t limit = janet_optnat(argv, argc, 0, 0); - JanetChannel *tchan = janet_malloc(sizeof(JanetChannel)); + JanetChannel *tchan = janet_abstract_threaded(&janet_channel_type, sizeof(JanetChannel)); janet_chan_init(tchan, limit, 1); - JanetChannel **wrap = janet_abstract(&janet_channel_type, sizeof(JanetChannel *)); - *wrap = tchan; - Janet ret = janet_wrap_abstract(wrap); - janet_table_put(&janet_vm.channel_map, janet_wrap_pointer(tchan), ret); - return ret; + return janet_wrap_abstract(tchan); } JANET_CORE_FN(cfun_channel_close, @@ -1280,8 +1169,8 @@ const JanetAbstractType janet_channel_type = { janet_chanat_mark, janet_chanat_get, NULL, /* put */ - janet_chanat_marshal, - janet_chanat_unmarshal, + NULL, /* marshal */ + NULL, /* unmarshal */ NULL, /* tostring */ NULL, /* compare */ NULL, /* hash */ diff --git a/src/core/marsh.c b/src/core/marsh.c index 57c1093a..2d7d2066 100644 --- a/src/core/marsh.c +++ b/src/core/marsh.c @@ -328,6 +328,7 @@ static void marshal_one_fiber(MarshalState *st, JanetFiber *fiber, int flags) { } if (fiber->child) marshal_one(st, janet_wrap_fiber(fiber->child), flags + 1); + marshal_one(st, fiber->last_value, flags + 1); } void janet_marshal_size(JanetMarshalContext *ctx, size_t value) { @@ -1063,6 +1064,9 @@ static const uint8_t *unmarshal_one_fiber( fiber->child = janet_unwrap_fiber(fiberv); } + /* Get the fiber last value */ + data = unmarshal_one(st, data, &fiber->last_value, flags + 1); + /* We have valid fiber, finally construct remaining fields. */ fiber->frame = frame; fiber->flags = fiber_flags; diff --git a/src/core/state.h b/src/core/state.h index a3956f34..b6ee5f2f 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -160,7 +160,6 @@ struct JanetVM { size_t listener_count; size_t listener_cap; size_t extra_listeners; - JanetTable channel_map; /* threaded channel lookup table, no gc */ JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */ #ifdef JANET_WINDOWS void **iocp;