From 9c9f9d4fa670273006b4a2102fe50af6edb9782b Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 5 Jun 2022 15:24:34 -0500 Subject: [PATCH] Add some thread coordination primitives. Due to the nature of event loops, it is a bit difficult to integrate lock and other primitives such that they don't block fibers on the same thread. --- src/core/abstract.c | 9 ++++++-- src/core/ev.c | 52 +++++++++++++++++++++++++++++++++++++++++++++ src/include/janet.h | 21 ------------------ 3 files changed, 59 insertions(+), 23 deletions(-) diff --git a/src/core/abstract.c b/src/core/abstract.c index b568fb20..da48ca1d 100644 --- a/src/core/abstract.c +++ b/src/core/abstract.c @@ -106,6 +106,7 @@ void janet_os_mutex_lock(JanetOSMutex *mutex) { } void janet_os_mutex_unlock(JanetOSMutex *mutex) { + /* error handling? May want to keep counter */ LeaveCriticalSection((CRITICAL_SECTION *) mutex); } @@ -120,7 +121,10 @@ static int32_t janet_decref(JanetAbstractHead *ab) { } void janet_os_mutex_init(JanetOSMutex *mutex) { - pthread_mutex_init(mutex, NULL); + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init(mutex, &attr); } void janet_os_mutex_deinit(JanetOSMutex *mutex) { @@ -132,7 +136,8 @@ void janet_os_mutex_lock(JanetOSMutex *mutex) { } void janet_os_mutex_unlock(JanetOSMutex *mutex) { - pthread_mutex_unlock(mutex); + int ret = pthread_mutex_unlock(mutex); + if (ret) janet_panic("cannot release lock"); } #endif diff --git a/src/core/ev.c b/src/core/ev.c index 70860826..665e651f 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -3013,6 +3013,54 @@ JANET_CORE_FN(janet_cfun_stream_write, janet_await(); } +typedef struct { + JanetOSMutex mutex; + int destroyed; +} JanetAbstractMutex; + +static int mutexgc(void *p, size_t size) { + JanetAbstractMutex *mutex = (JanetAbstractMutex *) p; + (void) size; + janet_os_mutex_deinit(&mutex->mutex); + return 0; +} + +const JanetAbstractType janet_mutex_type = { + "core/lock", + mutexgc, + JANET_ATEND_GC +}; + +JANET_CORE_FN(janet_cfun_mutex, + "(ev/lock)", + "Create a new lock to coordinate threads.") { + janet_fixarity(argc, 0); + (void) argv; + JanetAbstractMutex *mutex = janet_abstract_threaded(&janet_mutex_type, sizeof(JanetAbstractMutex)); + janet_os_mutex_init(&mutex->mutex); + return janet_wrap_abstract(mutex); +} + +JANET_CORE_FN(janet_cfun_mutex_acquire, + "(ev/acquire-lock lock)", + "Acquire a lock such that this operating system thread is the only thread with access to this resource." + " This will block this entire thread until the lock becomes available, and will not yield to other fibers " + "on this system thread.") { + janet_fixarity(argc, 1); + JanetAbstractMutex *mutex = janet_getabstract(argv, 0, &janet_mutex_type); + janet_os_mutex_lock(&mutex->mutex); + return argv[0]; +} + +JANET_CORE_FN(janet_cfun_mutex_release, + "(ev/release-lock lock)", + "Release a lock such that other threads may acquire it.") { + janet_fixarity(argc, 1); + JanetAbstractMutex *mutex = janet_getabstract(argv, 0, &janet_mutex_type); + janet_os_mutex_unlock(&mutex->mutex); + return argv[0]; +} + void janet_lib_ev(JanetTable *env) { JanetRegExt ev_cfuns_ext[] = { JANET_CORE_REG("ev/give", cfun_channel_push), @@ -3035,12 +3083,16 @@ void janet_lib_ev(JanetTable *env) { JANET_CORE_REG("ev/read", janet_cfun_stream_read), JANET_CORE_REG("ev/chunk", janet_cfun_stream_chunk), JANET_CORE_REG("ev/write", janet_cfun_stream_write), + JANET_CORE_REG("ev/lock", janet_cfun_mutex), + JANET_CORE_REG("ev/acquire-lock", janet_cfun_mutex_acquire), + JANET_CORE_REG("ev/release-lock", janet_cfun_mutex_release), JANET_REG_END }; janet_core_cfuns_ext(env, NULL, ev_cfuns_ext); janet_register_abstract_type(&janet_stream_type); janet_register_abstract_type(&janet_channel_type); + janet_register_abstract_type(&janet_mutex_type); } #endif diff --git a/src/include/janet.h b/src/include/janet.h index 5952ed1b..9b90a506 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1180,17 +1180,6 @@ typedef struct { Janet payload; } JanetTryState; -/* Thread types */ -#ifdef JANET_THREADS -typedef struct JanetThread JanetThread; -typedef struct JanetMailbox JanetMailbox; -struct JanetThread { - JanetMailbox *mailbox; - JanetTable *encode; -}; -#endif - - /***** END SECTION TYPES *****/ /***** START SECTION OPCODES *****/ @@ -2078,16 +2067,6 @@ JANET_API int janet_scan_uint64(const uint8_t *str, int32_t len, uint64_t *out); #endif -#ifdef JANET_THREADS - -extern JANET_API const JanetAbstractType janet_thread_type; - -JANET_API int janet_thread_receive(Janet *msg_out, double timeout); -JANET_API int janet_thread_send(JanetThread *thread, Janet msg, double timeout); -JANET_API JanetThread *janet_thread_current(void); - -#endif - /* Custom allocator support */ JANET_API void *(janet_malloc)(size_t); JANET_API void *(janet_realloc)(void *, size_t);