Add basic runtime support for threaded abstracts.

A threaded abstract is an abstract type that can be freely shared
between threads. While no synchronization is provided, refcounting
and transport between threads is. This will let implementers more easily
exploit OS-level parallelism in C library code. The caveat with these
types is that they need to be careful in how they interact with objects
on other heaps.
This commit is contained in:
Calvin Rose 2021-08-19 21:16:20 -05:00
parent eb0b37f729
commit cc066dd6a1
4 changed files with 19 additions and 126 deletions

View File

@ -63,6 +63,7 @@ void *janet_abstract_begin_threaded(const JanetAbstractType *atype, size_t size)
}
janet_vm.next_collection += size + sizeof(JanetAbstractHead);
header->gc.flags = JANET_MEMORY_THREADED_ABSTRACT;
header->gc.next = NULL; /* Clear memory for address sanitizers */
header->gc.refcount = 1;
header->size = size;
header->type = atype;

View File

@ -64,7 +64,7 @@ typedef struct {
JanetQueue write_pending;
int32_t limit;
int closed;
int ref_count; /* refcount < 0 means no threading */
int is_threaded;
#ifdef JANET_WINDOWS
CRITICAL_SECTION lock;
#else
@ -530,7 +530,6 @@ void janet_ev_init_common(void) {
janet_vm.tq = NULL;
janet_vm.tq_count = 0;
janet_vm.tq_capacity = 0;
janet_table_init_raw(&janet_vm.channel_map, 0);
janet_table_init_raw(&janet_vm.threaded_abstracts, 0);
janet_rng_seed(&janet_vm.ev_rng, 0);
}
@ -541,7 +540,6 @@ void janet_ev_deinit_common(void) {
janet_free(janet_vm.tq);
janet_free(janet_vm.listeners);
janet_vm.listeners = NULL;
janet_table_deinit(&janet_vm.channel_map);
janet_table_deinit(&janet_vm.threaded_abstracts);
}
@ -575,11 +573,11 @@ void janet_ev_dec_refcount(void) {
#define JANET_MAX_CHANNEL_CAPACITY 0xFFFFFF
static inline int janet_chan_is_threaded(JanetChannel *chan) {
return chan->ref_count >= 0;
return chan->is_threaded;
}
static int janet_chan_pack(JanetChannel *chan, Janet *x) {
if (chan->ref_count < 0) return 0;
if (!janet_chan_is_threaded(chan)) return 0;
switch (janet_type(*x)) {
default: {
JanetBuffer *buf = janet_malloc(sizeof(JanetBuffer));
@ -624,7 +622,7 @@ static int janet_chan_unpack(JanetChannel *chan, Janet *x) {
static void janet_chan_init(JanetChannel *chan, int32_t limit, int threaded) {
chan->limit = limit;
chan->closed = 0;
chan->ref_count = threaded ? 1 : -1;
chan->is_threaded = threaded;
janet_q_init(&chan->items);
janet_q_init(&chan->read_pending);
janet_q_init(&chan->write_pending);
@ -675,112 +673,13 @@ static void janet_chan_unlock(JanetChannel *chan) {
*/
static Janet janet_wrap_channel(JanetChannel *channel) {
if (janet_chan_is_threaded(channel)) {
return janet_table_get(&janet_vm.channel_map, janet_wrap_pointer(channel));
} else {
return janet_wrap_abstract(channel);
}
}
static void janet_chanat_marshal(void *p, JanetMarshalContext *ctx) {
size_t s = janet_abstract_size(p);
int is_threaded = s == sizeof(JanetChannel *);
janet_marshal_int(ctx, is_threaded);
janet_marshal_abstract(ctx, p);
if (is_threaded) {
if (!(ctx->flags & JANET_MARSHAL_UNSAFE)) {
janet_panic("can only marshal threaded channel with unsafe flag");
}
JanetChannel *channel = *((JanetChannel **) p);
janet_marshal_int64(ctx, (int64_t) channel);
/* Increment refcount BEFORE sending to thread - this prevents the channel from being GCed while in transit.
* The runtime will ensure that the channel will be unmarshalled eventually, even if no thread can receive it to
* run the gc finailizer and eventually decrement the refcount. */
janet_chan_lock(channel);
channel->ref_count++;
janet_chan_unlock(channel);
} else {
JanetChannel *channel = (JanetChannel *)p;
janet_marshal_int(ctx, channel->closed);
janet_marshal_int(ctx, channel->limit);
janet_marshal_int(ctx, janet_q_count(&channel->items));
Janet *items = channel->items.data;
for (int32_t i = channel->items.head; i < channel->items.tail; i++) {
janet_marshal_janet(ctx, items[i]);
}
}
}
static void *janet_chanat_unmarshal(JanetMarshalContext *ctx) {
int is_threaded = janet_unmarshal_int(ctx);
if (is_threaded) {
if (!(ctx->flags & JANET_MARSHAL_UNSAFE)) {
janet_panic("can only unmarshal threaded channel with unsafe flag");
}
int64_t intptr = janet_unmarshal_int64(ctx);
JanetChannel *chan = (JanetChannel *) intptr;
Janet check = janet_table_get(&janet_vm.channel_map, janet_wrap_pointer(chan));
if (janet_checktype(check, JANET_NIL)) {
JanetChannel **pchan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel *));
*pchan = chan;
janet_table_put(&janet_vm.channel_map, janet_wrap_pointer(chan), janet_wrap_abstract(pchan));
return pchan;
} else {
janet_chan_lock(chan);
chan->ref_count--;
janet_chan_unlock(chan);
void *p = janet_unwrap_abstract(check);
janet_unmarshal_abstract_reuse(ctx, p);
return p;
}
} else {
JanetChannel *chan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel));
chan->limit = 0;
chan->closed = 1;
chan->ref_count = -1;
janet_q_init(&chan->items);
janet_q_init(&chan->read_pending);
janet_q_init(&chan->write_pending);
int closed = janet_unmarshal_int(ctx);
int32_t limit = janet_unmarshal_int(ctx);
if (limit < 0) {
janet_panicf("invalid channel limit found: %d", limit);
}
int32_t count = janet_unmarshal_int(ctx);
if (count < 0) {
janet_panicf("invalid channel count found: %d", count);
}
chan->limit = limit;
chan->closed = closed;
for (int32_t i = 0; i < count; i++) {
Janet item = janet_unmarshal_janet(ctx);
janet_q_push(&chan->items, &item, sizeof(item));
}
return chan;
}
return janet_wrap_abstract(channel);
}
static int janet_chanat_gc(void *p, size_t s) {
JanetChannel *channel;
if (s == sizeof(JanetChannel *)) {
/* threaded */
channel = *((JanetChannel **)p);
janet_table_remove(&janet_vm.channel_map, janet_wrap_pointer(channel));
janet_chan_lock(channel);
if (--channel->ref_count == 0) {
janet_chan_deinit(channel);
janet_free(channel);
} else {
janet_chan_unlock(channel);
}
} else {
/* not threaded */
channel = p;
janet_chan_deinit(channel);
}
(void) s;
JanetChannel *channel = p;
janet_chan_deinit(channel);
return 0;
}
@ -798,7 +697,7 @@ static void janet_chanat_mark_fq(JanetQueue *fq) {
}
static int janet_chanat_mark(void *p, size_t s) {
if (s == sizeof(JanetChannel *)) return 0;
(void) s;
JanetChannel *chan = p;
janet_chanat_mark_fq(&chan->read_pending);
janet_chanat_mark_fq(&chan->write_pending);
@ -1019,13 +918,7 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
}
JanetChannel *janet_channel_unwrap(void *abstract) {
/* Rely on Janet's abstract type size tracking and the fact that a channel will
* surely be bigger than a pointer to a channel */
if (janet_abstract_size(abstract) == sizeof(JanetChannel *)) {
return *((JanetChannel **) abstract);
} else {
return abstract;
}
return abstract;
}
JanetChannel *janet_getchannel(const Janet *argv, int32_t n) {
@ -1190,13 +1083,9 @@ JANET_CORE_FN(cfun_channel_new_threaded,
"used to communicate between any number of operating system threads.") {
janet_arity(argc, 0, 1);
int32_t limit = janet_optnat(argv, argc, 0, 0);
JanetChannel *tchan = janet_malloc(sizeof(JanetChannel));
JanetChannel *tchan = janet_abstract_threaded(&janet_channel_type, sizeof(JanetChannel));
janet_chan_init(tchan, limit, 1);
JanetChannel **wrap = janet_abstract(&janet_channel_type, sizeof(JanetChannel *));
*wrap = tchan;
Janet ret = janet_wrap_abstract(wrap);
janet_table_put(&janet_vm.channel_map, janet_wrap_pointer(tchan), ret);
return ret;
return janet_wrap_abstract(tchan);
}
JANET_CORE_FN(cfun_channel_close,
@ -1280,8 +1169,8 @@ const JanetAbstractType janet_channel_type = {
janet_chanat_mark,
janet_chanat_get,
NULL, /* put */
janet_chanat_marshal,
janet_chanat_unmarshal,
NULL, /* marshal */
NULL, /* unmarshal */
NULL, /* tostring */
NULL, /* compare */
NULL, /* hash */

View File

@ -328,6 +328,7 @@ static void marshal_one_fiber(MarshalState *st, JanetFiber *fiber, int flags) {
}
if (fiber->child)
marshal_one(st, janet_wrap_fiber(fiber->child), flags + 1);
marshal_one(st, fiber->last_value, flags + 1);
}
void janet_marshal_size(JanetMarshalContext *ctx, size_t value) {
@ -1063,6 +1064,9 @@ static const uint8_t *unmarshal_one_fiber(
fiber->child = janet_unwrap_fiber(fiberv);
}
/* Get the fiber last value */
data = unmarshal_one(st, data, &fiber->last_value, flags + 1);
/* We have valid fiber, finally construct remaining fields. */
fiber->frame = frame;
fiber->flags = fiber_flags;

View File

@ -160,7 +160,6 @@ struct JanetVM {
size_t listener_count;
size_t listener_cap;
size_t extra_listeners;
JanetTable channel_map; /* threaded channel lookup table, no gc */
JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */
#ifdef JANET_WINDOWS
void **iocp;