diff --git a/CHANGELOG.md b/CHANGELOG.md index 88384461..773ff35f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ All notable changes to this project will be documented in this file. ## 1.17.0 - Unreleased +- Add support for threaded abstract types. Threaded abstract types can easily be shared between threads. - Deprecate the `thread` library. Use threaded channels and ev instead. - Channels can now be marshalled. - Add the ability to close channels with `ev/chan-close` (or `:close`). diff --git a/src/core/abstract.c b/src/core/abstract.c index df829ead..e5857daf 100644 --- a/src/core/abstract.c +++ b/src/core/abstract.c @@ -81,7 +81,7 @@ void *janet_abstract_threaded(const JanetAbstractType *atype, size_t size) { return janet_abstract_end_threaded(janet_abstract_begin_threaded(atype, size)); } -/* Refcounting primitives */ +/* Refcounting primitives and sync primitives */ #ifdef JANET_WINDOWS @@ -93,6 +93,22 @@ static int32_t janet_decref(JanetAbstractHead *ab) { return InterlockedDecrement(&ab->gc.refcount); } +void janet_os_mutex_init(JanetOSMutex *mutex) { + InitializeCriticalSection(mutex); +} + +void janet_os_mutex_deinit(JanetOSMutex *mutex) { + DeleteCriticalSection(mutex); +} + +void janet_os_mutex_lock(JanetOSMutex *mutex) { + EnterCriticalSection(mutex); +} + +void janet_os_mutex_unlock(JanetOSMutex *mutex) { + LeaveCriticalSection(mutex); +} + #else static int32_t janet_incref(JanetAbstractHead *ab) { @@ -103,6 +119,22 @@ static int32_t janet_decref(JanetAbstractHead *ab) { return __atomic_add_fetch(&ab->gc.refcount, -1, __ATOMIC_RELAXED); } +void janet_os_mutex_init(JanetOSMutex *mutex) { + pthread_mutex_init(mutex, NULL); +} + +void janet_os_mutex_deinit(JanetOSMutex *mutex) { + pthread_mutex_destroy(mutex); +} + +void janet_os_mutex_lock(JanetOSMutex *mutex) { + pthread_mutex_lock(mutex); +} + +void janet_os_mutex_unlock(JanetOSMutex *mutex) { + pthread_mutex_unlock(mutex); +} + #endif int32_t janet_abstract_incref(void *abst) { diff --git a/src/core/ev.c b/src/core/ev.c index 25fb221e..0ca0f647 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -65,11 +65,7 @@ typedef struct { int32_t limit; int closed; int is_threaded; -#ifdef JANET_WINDOWS - CRITICAL_SECTION lock; -#else - pthread_mutex_t lock; -#endif + JanetOSMutex lock; } JanetChannel; typedef struct { @@ -627,11 +623,7 @@ static void janet_chan_init(JanetChannel *chan, int32_t limit, int threaded) { janet_q_init(&chan->items); janet_q_init(&chan->read_pending); janet_q_init(&chan->write_pending); -#ifdef JANET_WINDOWS - InitializeCriticalSection(&chan->lock); -#else - pthread_mutex_init(&chan->lock, NULL); -#endif + janet_os_mutex_init(&chan->lock); } static void janet_chan_deinit(JanetChannel *chan) { @@ -644,29 +636,17 @@ static void janet_chan_deinit(JanetChannel *chan) { } } janet_q_deinit(&chan->items); -#ifdef JANET_WINDOWS - DeleteCriticalSection(&chan->lock); -#else - pthread_mutex_destroy(&chan->lock); -#endif + janet_os_mutex_deinit(&chan->lock); } static void janet_chan_lock(JanetChannel *chan) { if (!janet_chan_is_threaded(chan)) return; -#ifdef JANET_WINDOWS - EnterCriticalSection(&chan->lock); -#else - pthread_mutex_lock(&chan->lock); -#endif + janet_os_mutex_lock(&chan->lock); } static void janet_chan_unlock(JanetChannel *chan) { if (!janet_chan_is_threaded(chan)) return; -#ifdef JANET_WINDOWS - LeaveCriticalSection(&chan->lock); -#else - pthread_mutex_unlock(&chan->lock); -#endif + janet_os_mutex_unlock(&chan->lock); } /* diff --git a/src/include/janet.h b/src/include/janet.h index 4027b672..efce9ede 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -317,6 +317,17 @@ typedef struct { #include #include +/* Some extra includes if EV is enabled */ +#ifdef JANET_EV +#ifdef JANET_WINDOWS +#include +typedef CRTICAL_SECTION JanetOSMutex; +#else +#include +typedef pthread_mutex_t JanetOSMutex; +#endif +#endif + #ifdef JANET_BSD int _setjmp(jmp_buf); JANET_NO_RETURN void _longjmp(jmp_buf, int); @@ -1354,6 +1365,12 @@ JANET_API void *janet_abstract_threaded(const JanetAbstractType *atype, size_t s JANET_API int32_t janet_abstract_incref(void *abst); JANET_API int32_t janet_abstract_decref(void *abst); +/* Expose some OS sync primitives to make portable abstract types easier to implement */ +JANET_API void janet_os_mutex_init(JanetOSMutex *mutex); +JANET_API void janet_os_mutex_deinit(JanetOSMutex *mutex); +JANET_API void janet_os_mutex_lock(JanetOSMutex *mutex); +JANET_API void janet_os_mutex_unlock(JanetOSMutex *mutex); + /* Get last error from an IO operation */ JANET_API Janet janet_ev_lasterr(void);