From eb0b37f7296611a411b9a86d9d65fe620b0454d8 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 19 Aug 2021 20:56:48 -0500 Subject: [PATCH] Initial threaded abstract types. --- src/core/abstract.c | 70 +++++++++++++++++++++++++++++++++++++++++++++ src/core/ev.c | 2 ++ src/core/gc.c | 44 ++++++++++++++++++++++++++++ src/core/gc.h | 5 ++-- src/core/marsh.c | 52 +++++++++++++++++++++++++++++++-- src/core/state.h | 1 + src/include/janet.h | 12 +++++++- 7 files changed, 181 insertions(+), 5 deletions(-) diff --git a/src/core/abstract.c b/src/core/abstract.c index 225f2cc7..c4178728 100644 --- a/src/core/abstract.c +++ b/src/core/abstract.c @@ -24,6 +24,12 @@ #include "features.h" #include #include "gc.h" +#include "state.h" +#ifdef JANET_EV +#ifdef JANET_WINDOWS +#include +#endif +#endif #endif /* Create new userdata */ @@ -43,3 +49,67 @@ void *janet_abstract_end(void *x) { void *janet_abstract(const JanetAbstractType *atype, size_t size) { return janet_abstract_end(janet_abstract_begin(atype, size)); } + +#ifdef JANET_EV + +/* + * Threaded abstracts + */ + +void *janet_abstract_begin_threaded(const JanetAbstractType *atype, size_t size) { + JanetAbstractHead *header = janet_malloc(sizeof(JanetAbstractHead) + size); + if (NULL == header) { + JANET_OUT_OF_MEMORY; + } + janet_vm.next_collection += size + sizeof(JanetAbstractHead); + header->gc.flags = JANET_MEMORY_THREADED_ABSTRACT; + header->gc.refcount = 1; + header->size = size; + header->type = atype; + void *abstract = (void *) & (header->data); + janet_table_put(&janet_vm.threaded_abstracts, janet_wrap_abstract(abstract), janet_wrap_false()); + return abstract; +} + +void *janet_abstract_end_threaded(void *x) { + janet_gc_settype((void *)(janet_abstract_head(x)), JANET_MEMORY_THREADED_ABSTRACT); + return x; +} + +void *janet_abstract_threaded(const JanetAbstractType *atype, size_t size) { + return janet_abstract_end_threaded(janet_abstract_begin_threaded(atype, size)); +} + +/* Refcounting primitives */ + +#ifdef JANET_WINDOWS + +static int32_t janet_incref(JanetAbstractHead *ab) { + return InterlockedIncrement(&ab->gc.refcount); +} + +static int32_t janet_decref(JanetAbstractHead *ab) { + return InterlockedDecrement(&ab->gc.refcount); +} + +#else + +static int32_t janet_incref(JanetAbstractHead *ab) { + return __atomic_add_fetch(&ab->gc.refcount, 1, __ATOMIC_RELAXED); +} + +static int32_t janet_decref(JanetAbstractHead *ab) { + return __atomic_add_fetch(&ab->gc.refcount, -1, __ATOMIC_RELAXED); +} + +#endif + +int32_t janet_abstract_incref(void *abst) { + return janet_incref(janet_abstract_head(abst)); +} + +int32_t janet_abstract_decref(void *abst) { + return janet_decref(janet_abstract_head(abst)); +} + +#endif diff --git a/src/core/ev.c b/src/core/ev.c index 4e304465..6f03c47a 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -531,6 +531,7 @@ void janet_ev_init_common(void) { janet_vm.tq_count = 0; janet_vm.tq_capacity = 0; janet_table_init_raw(&janet_vm.channel_map, 0); + janet_table_init_raw(&janet_vm.threaded_abstracts, 0); janet_rng_seed(&janet_vm.ev_rng, 0); } @@ -541,6 +542,7 @@ void janet_ev_deinit_common(void) { janet_free(janet_vm.listeners); janet_vm.listeners = NULL; janet_table_deinit(&janet_vm.channel_map); + janet_table_deinit(&janet_vm.threaded_abstracts); } /* Short hand to yield to event loop */ diff --git a/src/core/gc.c b/src/core/gc.c index 3c36bad3..faba1bf7 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -105,6 +105,14 @@ static void janet_mark_buffer(JanetBuffer *buffer) { } static void janet_mark_abstract(void *adata) { +#ifdef JANET_EV + /* Check if abstract type is a threaded abstract type. If it is, marking means + * updating the threaded_abstract table. */ + if ((janet_abstract_head(adata)->gc.flags & JANET_MEM_TYPEBITS) == JANET_MEMORY_THREADED_ABSTRACT) { + janet_table_put(&janet_vm.threaded_abstracts, janet_wrap_abstract(adata), janet_wrap_true()); + return; + } +#endif if (janet_gc_reachable(janet_abstract_head(adata))) return; janet_gc_mark(janet_abstract_head(adata)); @@ -329,6 +337,42 @@ void janet_sweep() { } current = next; } +#ifdef JANET_EV + /* Sweep threaded abstract types for references to decrement */ + JanetKV *items = janet_vm.threaded_abstracts.data; + for (int32_t i = 0; i < janet_vm.threaded_abstracts.capacity; i++) { + if (janet_checktype(items[i].key, JANET_ABSTRACT)) { + + /* If item was not visited during the mark phase, then this + * abstract type isn't present in the heap and needs its refcount + * decremented, and shouuld be removed from table. If the refcount is + * then 0, the item will be collected. This ensures that only one interpreter + * will clean up the threaded abstract. */ + + /* If not visited... */ + if (!janet_truthy(items[i].value)) { + void *abst = janet_unwrap_abstract(items[i].key); + if (0 == janet_abstract_decref(abst)) { + /* Run finalizer */ + JanetAbstractHead *head = janet_abstract_head(abst); + if (head->type->gc) { + janet_assert(!head->type->gc(head->data, head->size), "finalizer failed"); + } + /* Mark as tombstone in place */ + items[i].key = janet_wrap_nil(); + items[i].value = janet_wrap_false(); + janet_vm.threaded_abstracts.deleted++; + janet_vm.threaded_abstracts.count--; + /* Free memory */ + janet_free(janet_abstract_head(abst)); + } + } + + /* Reset for next sweep */ + items[i].value = janet_wrap_false(); + } + } +#endif } /* Allocate some memory that is tracked for garbage collection */ diff --git a/src/core/gc.h b/src/core/gc.h index 9bc3f62c..7fd23122 100644 --- a/src/core/gc.h +++ b/src/core/gc.h @@ -55,10 +55,11 @@ enum JanetMemoryType { JANET_MEMORY_FUNCTION, JANET_MEMORY_ABSTRACT, JANET_MEMORY_FUNCENV, - JANET_MEMORY_FUNCDEF + JANET_MEMORY_FUNCDEF, + JANET_MEMORY_THREADED_ABSTRACT, }; -/* To allocate collectable memory, one must calk janet_alloc, initialize the memory, +/* To allocate collectable memory, one must call janet_alloc, initialize the memory, * and then call when janet_enablegc when it is initailize and reachable by the gc (on the JANET stack) */ void *janet_gcalloc(enum JanetMemoryType type, size_t size); diff --git a/src/core/marsh.c b/src/core/marsh.c index b6d0c61b..57c1093a 100644 --- a/src/core/marsh.c +++ b/src/core/marsh.c @@ -63,7 +63,10 @@ enum { LB_FUNCENV_REF, /* 219 */ LB_FUNCDEF_REF, /* 220 */ LB_UNSAFE_CFUNCTION, /* 221 */ - LB_UNSAFE_POINTER /* 222 */ + LB_UNSAFE_POINTER, /* 222 */ +#ifdef JANET_EV + LB_THREADED_ABSTRACT/* 223 */ +#endif } LeadBytes; /* Helper to look inside an entry in an environment */ @@ -369,6 +372,20 @@ void janet_marshal_abstract(JanetMarshalContext *ctx, void *abstract) { static void marshal_one_abstract(MarshalState *st, Janet x, int flags) { void *abstract = janet_unwrap_abstract(x); +#ifdef JANET_EV + /* Threaded abstract types get passed through as pointers in the unsafe mode */ + if ((flags & JANET_MARSHAL_UNSAFE) && + (JANET_MEMORY_THREADED_ABSTRACT == (janet_abstract_head(abstract)->gc.flags & JANET_MEM_TYPEBITS))) { + + /* Increment refcount before sending message. This prevents a "death in transit" problem + * where a message is garbage collected while in transit between two threads - i.e., the sending threads + * loses the reference and runs a garbage collection before the receiving thread gets the message. */ + janet_abstract_incref(abstract); + pushbyte(st, LB_THREADED_ABSTRACT); + pushbytes(st, (uint8_t *) &abstract, sizeof(abstract)); + return; + } +#endif const JanetAbstractType *at = janet_abstract_type(abstract); if (at->marshal) { pushbyte(st, LB_ABSTRACT); @@ -376,7 +393,7 @@ static void marshal_one_abstract(MarshalState *st, Janet x, int flags) { JanetMarshalContext context = {st, NULL, flags, NULL, at}; at->marshal(abstract, &context); } else { - janet_panicf("try to marshal unregistered abstract type, cannot marshal %p", x); + janet_panicf("cannot marshal %p", x); } } @@ -1361,6 +1378,37 @@ static const uint8_t *unmarshal_one( janet_v_push(st->lookup, *out); return data; } +#ifdef JANET_EV + case LB_THREADED_ABSTRACT: { + MARSH_EOS(st, data + sizeof(void *)); + data++; + if (!(flags & JANET_MARSHAL_UNSAFE)) { + janet_panicf("unsafe flag not given, " + "will not unmarshal threaded abstract pointer at index %d", + (int)(data - st->start)); + } + union { + void *ptr; + uint8_t bytes[sizeof(void *)]; + } u; + memcpy(u.bytes, data, sizeof(void *)); + data += sizeof(void *); + *out = janet_wrap_abstract(u.ptr); + + /* Check if we have already seen this abstract type - if we have, decrement refcount */ + Janet check = janet_table_get(&janet_vm.threaded_abstracts, *out); + if (janet_checktype(check, JANET_NIL)) { + /* Transfers reference from threaded channel buffer to current heap */ + janet_table_put(&janet_vm.threaded_abstracts, *out, janet_wrap_false()); + } else { + /* Heap reference already accounted for, remove threaded channel reference. */ + janet_abstract_decref(u.ptr); + } + + janet_v_push(st->lookup, *out); + return data; + } +#endif default: { janet_panicf("unknown byte %x at index %d", *data, diff --git a/src/core/state.h b/src/core/state.h index 6b36b022..a3956f34 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -161,6 +161,7 @@ struct JanetVM { size_t listener_cap; size_t extra_listeners; JanetTable channel_map; /* threaded channel lookup table, no gc */ + JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */ #ifdef JANET_WINDOWS void **iocp; #elif defined(JANET_EV_EPOLL) diff --git a/src/include/janet.h b/src/include/janet.h index b7f5c2b8..4027b672 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -839,7 +839,10 @@ JANET_API JanetAbstract janet_checkabstract(Janet x, const JanetAbstractType *at * list of blocks, which is naive but works. */ struct JanetGCObject { int32_t flags; - JanetGCObject *next; + union { + JanetGCObject *next; + int32_t refcount; /* For threaded abstract types */ + }; }; /* A lightweight green thread in janet. Does not correspond to @@ -1344,6 +1347,13 @@ JANET_API void janet_addtimeout(double sec); JANET_API void janet_ev_inc_refcount(void); JANET_API void janet_ev_dec_refcount(void); +/* Thread aware abstract types and helpers */ +JANET_API void *janet_abstract_begin_threaded(const JanetAbstractType *atype, size_t size); +JANET_API void *janet_abstract_end_threaded(void *x); +JANET_API void *janet_abstract_threaded(const JanetAbstractType *atype, size_t size); +JANET_API int32_t janet_abstract_incref(void *abst); +JANET_API int32_t janet_abstract_decref(void *abst); + /* Get last error from an IO operation */ JANET_API Janet janet_ev_lasterr(void);