mirror of
				https://github.com/janet-lang/janet
				synced 2025-10-30 23:23:07 +00:00 
			
		
		
		
	Initial threaded abstract types.
This commit is contained in:
		| @@ -24,6 +24,12 @@ | ||||
| #include "features.h" | ||||
| #include <janet.h> | ||||
| #include "gc.h" | ||||
| #include "state.h" | ||||
| #ifdef JANET_EV | ||||
| #ifdef JANET_WINDOWS | ||||
| #include <windows.h> | ||||
| #endif | ||||
| #endif | ||||
| #endif | ||||
|  | ||||
| /* Create new userdata */ | ||||
| @@ -43,3 +49,67 @@ void *janet_abstract_end(void *x) { | ||||
| void *janet_abstract(const JanetAbstractType *atype, size_t size) { | ||||
|     return janet_abstract_end(janet_abstract_begin(atype, size)); | ||||
| } | ||||
|  | ||||
| #ifdef JANET_EV | ||||
|  | ||||
| /* | ||||
|  * Threaded abstracts | ||||
|  */ | ||||
|  | ||||
| void *janet_abstract_begin_threaded(const JanetAbstractType *atype, size_t size) { | ||||
|     JanetAbstractHead *header = janet_malloc(sizeof(JanetAbstractHead) + size); | ||||
|     if (NULL == header) { | ||||
|         JANET_OUT_OF_MEMORY; | ||||
|     } | ||||
|     janet_vm.next_collection += size + sizeof(JanetAbstractHead); | ||||
|     header->gc.flags = JANET_MEMORY_THREADED_ABSTRACT; | ||||
|     header->gc.refcount = 1; | ||||
|     header->size = size; | ||||
|     header->type = atype; | ||||
|     void *abstract = (void *) & (header->data); | ||||
|     janet_table_put(&janet_vm.threaded_abstracts, janet_wrap_abstract(abstract), janet_wrap_false()); | ||||
|     return abstract; | ||||
| } | ||||
|  | ||||
| void *janet_abstract_end_threaded(void *x) { | ||||
|     janet_gc_settype((void *)(janet_abstract_head(x)), JANET_MEMORY_THREADED_ABSTRACT); | ||||
|     return x; | ||||
| } | ||||
|  | ||||
| void *janet_abstract_threaded(const JanetAbstractType *atype, size_t size) { | ||||
|     return janet_abstract_end_threaded(janet_abstract_begin_threaded(atype, size)); | ||||
| } | ||||
|  | ||||
| /* Refcounting primitives */ | ||||
|  | ||||
| #ifdef JANET_WINDOWS | ||||
|  | ||||
| static int32_t janet_incref(JanetAbstractHead *ab) { | ||||
|     return InterlockedIncrement(&ab->gc.refcount); | ||||
| } | ||||
|  | ||||
| static int32_t janet_decref(JanetAbstractHead *ab) { | ||||
|     return InterlockedDecrement(&ab->gc.refcount); | ||||
| } | ||||
|  | ||||
| #else | ||||
|  | ||||
| static int32_t janet_incref(JanetAbstractHead *ab) { | ||||
|     return __atomic_add_fetch(&ab->gc.refcount, 1, __ATOMIC_RELAXED); | ||||
| } | ||||
|  | ||||
| static int32_t janet_decref(JanetAbstractHead *ab) { | ||||
|     return __atomic_add_fetch(&ab->gc.refcount, -1, __ATOMIC_RELAXED); | ||||
| } | ||||
|  | ||||
| #endif | ||||
|  | ||||
| int32_t janet_abstract_incref(void *abst) { | ||||
|     return janet_incref(janet_abstract_head(abst)); | ||||
| } | ||||
|  | ||||
| int32_t janet_abstract_decref(void *abst) { | ||||
|     return janet_decref(janet_abstract_head(abst)); | ||||
| } | ||||
|  | ||||
| #endif | ||||
|   | ||||
| @@ -531,6 +531,7 @@ void janet_ev_init_common(void) { | ||||
|     janet_vm.tq_count = 0; | ||||
|     janet_vm.tq_capacity = 0; | ||||
|     janet_table_init_raw(&janet_vm.channel_map, 0); | ||||
|     janet_table_init_raw(&janet_vm.threaded_abstracts, 0); | ||||
|     janet_rng_seed(&janet_vm.ev_rng, 0); | ||||
| } | ||||
|  | ||||
| @@ -541,6 +542,7 @@ void janet_ev_deinit_common(void) { | ||||
|     janet_free(janet_vm.listeners); | ||||
|     janet_vm.listeners = NULL; | ||||
|     janet_table_deinit(&janet_vm.channel_map); | ||||
|     janet_table_deinit(&janet_vm.threaded_abstracts); | ||||
| } | ||||
|  | ||||
| /* Short hand to yield to event loop */ | ||||
|   | ||||
| @@ -105,6 +105,14 @@ static void janet_mark_buffer(JanetBuffer *buffer) { | ||||
| } | ||||
|  | ||||
| static void janet_mark_abstract(void *adata) { | ||||
| #ifdef JANET_EV | ||||
|     /* Check if abstract type is a threaded abstract type. If it is, marking means | ||||
|      * updating the threaded_abstract table. */ | ||||
|     if ((janet_abstract_head(adata)->gc.flags & JANET_MEM_TYPEBITS) == JANET_MEMORY_THREADED_ABSTRACT) { | ||||
|         janet_table_put(&janet_vm.threaded_abstracts, janet_wrap_abstract(adata), janet_wrap_true()); | ||||
|         return; | ||||
|     } | ||||
| #endif | ||||
|     if (janet_gc_reachable(janet_abstract_head(adata))) | ||||
|         return; | ||||
|     janet_gc_mark(janet_abstract_head(adata)); | ||||
| @@ -329,6 +337,42 @@ void janet_sweep() { | ||||
|         } | ||||
|         current = next; | ||||
|     } | ||||
| #ifdef JANET_EV | ||||
|     /* Sweep threaded abstract types for references to decrement */ | ||||
|     JanetKV *items = janet_vm.threaded_abstracts.data; | ||||
|     for (int32_t i = 0; i < janet_vm.threaded_abstracts.capacity; i++) { | ||||
|         if (janet_checktype(items[i].key, JANET_ABSTRACT)) { | ||||
|  | ||||
|             /* If item was not visited during the mark phase, then this | ||||
|              * abstract type isn't present in the heap and needs its refcount | ||||
|              * decremented, and shouuld be removed from table. If the refcount is | ||||
|              * then 0, the item will be collected. This ensures that only one interpreter | ||||
|              * will clean up the threaded abstract. */ | ||||
|  | ||||
|             /* If not visited... */ | ||||
|             if (!janet_truthy(items[i].value)) { | ||||
|                 void *abst = janet_unwrap_abstract(items[i].key); | ||||
|                 if (0 == janet_abstract_decref(abst)) { | ||||
|                     /* Run finalizer */ | ||||
|                     JanetAbstractHead *head = janet_abstract_head(abst); | ||||
|                     if (head->type->gc) { | ||||
|                         janet_assert(!head->type->gc(head->data, head->size), "finalizer failed"); | ||||
|                     } | ||||
|                     /* Mark as tombstone in place */ | ||||
|                     items[i].key = janet_wrap_nil(); | ||||
|                     items[i].value = janet_wrap_false(); | ||||
|                     janet_vm.threaded_abstracts.deleted++; | ||||
|                     janet_vm.threaded_abstracts.count--; | ||||
|                     /* Free memory */ | ||||
|                     janet_free(janet_abstract_head(abst)); | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             /* Reset for next sweep */ | ||||
|             items[i].value = janet_wrap_false(); | ||||
|         } | ||||
|     } | ||||
| #endif | ||||
| } | ||||
|  | ||||
| /* Allocate some memory that is tracked for garbage collection */ | ||||
|   | ||||
| @@ -55,10 +55,11 @@ enum JanetMemoryType { | ||||
|     JANET_MEMORY_FUNCTION, | ||||
|     JANET_MEMORY_ABSTRACT, | ||||
|     JANET_MEMORY_FUNCENV, | ||||
|     JANET_MEMORY_FUNCDEF | ||||
|     JANET_MEMORY_FUNCDEF, | ||||
|     JANET_MEMORY_THREADED_ABSTRACT, | ||||
| }; | ||||
|  | ||||
| /* To allocate collectable memory, one must calk janet_alloc, initialize the memory, | ||||
| /* To allocate collectable memory, one must call janet_alloc, initialize the memory, | ||||
|  * and then call when janet_enablegc when it is initailize and reachable by the gc (on the JANET stack) */ | ||||
| void *janet_gcalloc(enum JanetMemoryType type, size_t size); | ||||
|  | ||||
|   | ||||
| @@ -63,7 +63,10 @@ enum { | ||||
|     LB_FUNCENV_REF, /* 219 */ | ||||
|     LB_FUNCDEF_REF, /* 220 */ | ||||
|     LB_UNSAFE_CFUNCTION, /* 221 */ | ||||
|     LB_UNSAFE_POINTER /* 222 */ | ||||
|     LB_UNSAFE_POINTER, /* 222 */ | ||||
| #ifdef JANET_EV | ||||
|     LB_THREADED_ABSTRACT/* 223 */ | ||||
| #endif | ||||
| } LeadBytes; | ||||
|  | ||||
| /* Helper to look inside an entry in an environment */ | ||||
| @@ -369,6 +372,20 @@ void janet_marshal_abstract(JanetMarshalContext *ctx, void *abstract) { | ||||
|  | ||||
| static void marshal_one_abstract(MarshalState *st, Janet x, int flags) { | ||||
|     void *abstract = janet_unwrap_abstract(x); | ||||
| #ifdef JANET_EV | ||||
|     /* Threaded abstract types get passed through as pointers in the unsafe mode */ | ||||
|     if ((flags & JANET_MARSHAL_UNSAFE) && | ||||
|             (JANET_MEMORY_THREADED_ABSTRACT == (janet_abstract_head(abstract)->gc.flags & JANET_MEM_TYPEBITS))) { | ||||
|  | ||||
|         /* Increment refcount before sending message. This prevents a "death in transit" problem | ||||
|          * where a message is garbage collected while in transit between two threads - i.e., the sending threads | ||||
|          * loses the reference and runs a garbage collection before the receiving thread gets the message. */ | ||||
|         janet_abstract_incref(abstract); | ||||
|         pushbyte(st, LB_THREADED_ABSTRACT); | ||||
|         pushbytes(st, (uint8_t *) &abstract, sizeof(abstract)); | ||||
|         return; | ||||
|     } | ||||
| #endif | ||||
|     const JanetAbstractType *at = janet_abstract_type(abstract); | ||||
|     if (at->marshal) { | ||||
|         pushbyte(st, LB_ABSTRACT); | ||||
| @@ -376,7 +393,7 @@ static void marshal_one_abstract(MarshalState *st, Janet x, int flags) { | ||||
|         JanetMarshalContext context = {st, NULL, flags, NULL, at}; | ||||
|         at->marshal(abstract, &context); | ||||
|     } else { | ||||
|         janet_panicf("try to marshal unregistered abstract type, cannot marshal %p", x); | ||||
|         janet_panicf("cannot marshal %p", x); | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -1361,6 +1378,37 @@ static const uint8_t *unmarshal_one( | ||||
|             janet_v_push(st->lookup, *out); | ||||
|             return data; | ||||
|         } | ||||
| #ifdef JANET_EV | ||||
|         case LB_THREADED_ABSTRACT: { | ||||
|             MARSH_EOS(st, data + sizeof(void *)); | ||||
|             data++; | ||||
|             if (!(flags & JANET_MARSHAL_UNSAFE)) { | ||||
|                 janet_panicf("unsafe flag not given, " | ||||
|                              "will not unmarshal threaded abstract pointer at index %d", | ||||
|                              (int)(data - st->start)); | ||||
|             } | ||||
|             union { | ||||
|                 void *ptr; | ||||
|                 uint8_t bytes[sizeof(void *)]; | ||||
|             } u; | ||||
|             memcpy(u.bytes, data, sizeof(void *)); | ||||
|             data += sizeof(void *); | ||||
|             *out = janet_wrap_abstract(u.ptr); | ||||
|  | ||||
|             /* Check if we have already seen this abstract type - if we have, decrement refcount */ | ||||
|             Janet check = janet_table_get(&janet_vm.threaded_abstracts, *out); | ||||
|             if (janet_checktype(check, JANET_NIL)) { | ||||
|                 /* Transfers reference from threaded channel buffer to current heap */ | ||||
|                 janet_table_put(&janet_vm.threaded_abstracts, *out, janet_wrap_false()); | ||||
|             } else { | ||||
|                 /* Heap reference already accounted for, remove threaded channel reference. */ | ||||
|                 janet_abstract_decref(u.ptr); | ||||
|             } | ||||
|  | ||||
|             janet_v_push(st->lookup, *out); | ||||
|             return data; | ||||
|         } | ||||
| #endif | ||||
|         default: { | ||||
|             janet_panicf("unknown byte %x at index %d", | ||||
|                          *data, | ||||
|   | ||||
| @@ -161,6 +161,7 @@ struct JanetVM { | ||||
|     size_t listener_cap; | ||||
|     size_t extra_listeners; | ||||
|     JanetTable channel_map; /* threaded channel lookup table, no  gc */ | ||||
|     JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */ | ||||
| #ifdef JANET_WINDOWS | ||||
|     void **iocp; | ||||
| #elif defined(JANET_EV_EPOLL) | ||||
|   | ||||
| @@ -839,7 +839,10 @@ JANET_API JanetAbstract janet_checkabstract(Janet x, const JanetAbstractType *at | ||||
|  * list of blocks, which is naive but works. */ | ||||
| struct JanetGCObject { | ||||
|     int32_t flags; | ||||
|     JanetGCObject *next; | ||||
|     union { | ||||
|         JanetGCObject *next; | ||||
|         int32_t refcount; /* For threaded abstract types */ | ||||
|     }; | ||||
| }; | ||||
|  | ||||
| /* A lightweight green thread in janet. Does not correspond to | ||||
| @@ -1344,6 +1347,13 @@ JANET_API void janet_addtimeout(double sec); | ||||
| JANET_API void janet_ev_inc_refcount(void); | ||||
| JANET_API void janet_ev_dec_refcount(void); | ||||
|  | ||||
| /* Thread aware abstract types and helpers */ | ||||
| JANET_API void *janet_abstract_begin_threaded(const JanetAbstractType *atype, size_t size); | ||||
| JANET_API void *janet_abstract_end_threaded(void *x); | ||||
| JANET_API void *janet_abstract_threaded(const JanetAbstractType *atype, size_t size); | ||||
| JANET_API int32_t janet_abstract_incref(void *abst); | ||||
| JANET_API int32_t janet_abstract_decref(void *abst); | ||||
|  | ||||
| /* Get last error from an IO operation */ | ||||
| JANET_API Janet janet_ev_lasterr(void); | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Calvin Rose
					Calvin Rose