diff --git a/examples/evlocks.janet b/examples/evlocks.janet new file mode 100644 index 00000000..08b497f7 --- /dev/null +++ b/examples/evlocks.janet @@ -0,0 +1,45 @@ +(defn sleep + "Sleep the entire thread, not just a single fiber." + [n] + (os/sleep (* 0.1 n))) + +(defn work [lock n] + (ev/acquire-lock lock) + (print "working " n "...") + (sleep n) + (print "done working...") + (ev/release-lock lock)) + +(defn reader + [rwlock n] + (ev/acquire-rlock rwlock) + (print "reading " n "...") + (sleep n) + (print "done reading " n "...") + (ev/release-rlock rwlock)) + +(defn writer + [rwlock n] + (ev/acquire-wlock rwlock) + (print "writing " n "...") + (sleep n) + (print "done writing...") + (ev/release-wlock rwlock)) + +(defn test-lock + [] + (def lock (ev/lock)) + (for i 3 7 + (ev/spawn-thread + (work lock i)))) + +(defn test-rwlock + [] + (def rwlock (ev/rwlock)) + (for i 0 20 + (if (> 0.1 (math/random)) + (ev/spawn-thread (writer rwlock i)) + (ev/spawn-thread (reader rwlock i))))) + +(test-rwlock) +(test-lock) diff --git a/src/core/abstract.c b/src/core/abstract.c index da48ca1d..ba2e0cf5 100644 --- a/src/core/abstract.c +++ b/src/core/abstract.c @@ -110,6 +110,31 @@ void janet_os_mutex_unlock(JanetOSMutex *mutex) { LeaveCriticalSection((CRITICAL_SECTION *) mutex); } +void janet_os_rwlock_init(JanetOSRWLock *rwlock) { + InitializeSRWLock((PSRWLOCK) rwlock); +} + +void janet_os_rwlock_deinit(JanetOSRWLock *rwlock) { + /* no op? */ + (void) rwlock; +} + +void janet_os_rwlock_rlock(JanetOSRWLock *rwlock) { + AcquireSRWLockShared((PSRWLOCK) rwlock); +} + +void janet_os_rwlock_wlock(JanetOSRWLock *rwlock) { + AcquireSRWLockExclusive((PSRWLOCK) rwlock); +} + +void janet_os_rwlock_runlock(JanetOSRWLock *rwlock) { + ReleaseSRWLockShared((PSRWLOCK) rwlock); +} + +void janet_os_rwlock_wunlock(JanetOSRWLock *rwlock) { + ReleaseSRWLockExclusive((PSRWLOCK) rwlock); +} + #else static int32_t janet_incref(JanetAbstractHead *ab) { @@ -140,6 +165,30 @@ void janet_os_mutex_unlock(JanetOSMutex *mutex) { if (ret) janet_panic("cannot release lock"); } +void janet_os_rwlock_init(JanetOSRWLock *rwlock) { + pthread_rwlock_init(rwlock, NULL); +} + +void janet_os_rwlock_deinit(JanetOSRWLock *rwlock) { + pthread_rwlock_destroy(rwlock); +} + +void janet_os_rwlock_rlock(JanetOSRWLock *rwlock) { + pthread_rwlock_rdlock(rwlock); +} + +void janet_os_rwlock_wlock(JanetOSRWLock *rwlock) { + pthread_rwlock_wrlock(rwlock); +} + +void janet_os_rwlock_runlock(JanetOSRWLock *rwlock) { + pthread_rwlock_unlock(rwlock); +} + +void janet_os_rwlock_wunlock(JanetOSRWLock *rwlock) { + pthread_rwlock_unlock(rwlock); +} + #endif int32_t janet_abstract_incref(void *abst) { diff --git a/src/core/ev.c b/src/core/ev.c index 665e651f..41931138 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -3015,7 +3015,6 @@ JANET_CORE_FN(janet_cfun_stream_write, typedef struct { JanetOSMutex mutex; - int destroyed; } JanetAbstractMutex; static int mutexgc(void *p, size_t size) { @@ -3061,6 +3060,69 @@ JANET_CORE_FN(janet_cfun_mutex_release, return argv[0]; } +typedef struct { + JanetOSRWLock rwlock; +} JanetAbstractRWLock; + +static int rwlockgc(void *p, size_t size) { + JanetAbstractRWLock *rwlock = (JanetAbstractRWLock *) p; + (void) size; + janet_os_rwlock_deinit(&rwlock->rwlock); + return 0; +} + +const JanetAbstractType janet_rwlock_type = { + "core/rwlock", + rwlockgc, + JANET_ATEND_GC +}; + +JANET_CORE_FN(janet_cfun_rwlock, + "(ev/rwlock)", + "Create a new read-write lock to coordinate threads.") { + janet_fixarity(argc, 0); + (void) argv; + JanetAbstractRWLock *rwlock = janet_abstract_threaded(&janet_rwlock_type, sizeof(JanetAbstractRWLock)); + janet_os_rwlock_init(&rwlock->rwlock); + return janet_wrap_abstract(rwlock); +} + +JANET_CORE_FN(janet_cfun_rwlock_read_lock, + "(ev/acquire-rlock rwlock)", + "Acquire a read lock an a read-write lock.") { + janet_fixarity(argc, 1); + JanetAbstractRWLock *rwlock = janet_getabstract(argv, 0, &janet_rwlock_type); + janet_os_rwlock_rlock(&rwlock->rwlock); + return argv[0]; +} + +JANET_CORE_FN(janet_cfun_rwlock_write_lock, + "(ev/acquire-wlock rwlock)", + "Acquire a write lock on a read-write lock.") { + janet_fixarity(argc, 1); + JanetAbstractRWLock *rwlock = janet_getabstract(argv, 0, &janet_rwlock_type); + janet_os_rwlock_wlock(&rwlock->rwlock); + return argv[0]; +} + +JANET_CORE_FN(janet_cfun_rwlock_read_release, + "(ev/release-rlock rwlock)", + "Release a read lock on a read-write lock") { + janet_fixarity(argc, 1); + JanetAbstractRWLock *rwlock = janet_getabstract(argv, 0, &janet_rwlock_type); + janet_os_rwlock_runlock(&rwlock->rwlock); + return argv[0]; +} + +JANET_CORE_FN(janet_cfun_rwlock_write_release, + "(ev/release-wlock rwlock)", + "Release a write lock on a read-write lock") { + janet_fixarity(argc, 1); + JanetAbstractRWLock *rwlock = janet_getabstract(argv, 0, &janet_rwlock_type); + janet_os_rwlock_wunlock(&rwlock->rwlock); + return argv[0]; +} + void janet_lib_ev(JanetTable *env) { JanetRegExt ev_cfuns_ext[] = { JANET_CORE_REG("ev/give", cfun_channel_push), @@ -3086,6 +3148,11 @@ void janet_lib_ev(JanetTable *env) { JANET_CORE_REG("ev/lock", janet_cfun_mutex), JANET_CORE_REG("ev/acquire-lock", janet_cfun_mutex_acquire), JANET_CORE_REG("ev/release-lock", janet_cfun_mutex_release), + JANET_CORE_REG("ev/rwlock", janet_cfun_rwlock), + JANET_CORE_REG("ev/acquire-rlock", janet_cfun_rwlock_read_lock), + JANET_CORE_REG("ev/acquire-wlock", janet_cfun_rwlock_write_lock), + JANET_CORE_REG("ev/release-rlock", janet_cfun_rwlock_read_release), + JANET_CORE_REG("ev/release-wlock", janet_cfun_rwlock_write_release), JANET_REG_END }; @@ -3093,6 +3160,7 @@ void janet_lib_ev(JanetTable *env) { janet_register_abstract_type(&janet_stream_type); janet_register_abstract_type(&janet_channel_type); janet_register_abstract_type(&janet_mutex_type); + janet_register_abstract_type(&janet_rwlock_type); } #endif diff --git a/src/core/features.h b/src/core/features.h index 40067414..ce5e3bf1 100644 --- a/src/core/features.h +++ b/src/core/features.h @@ -45,9 +45,13 @@ #define WIN32_LEAN_AND_MEAN #endif -/* Needed for realpath on linux */ -#if !defined(_XOPEN_SOURCE) && (defined(__linux__) || defined(__EMSCRIPTEN__)) -#define _XOPEN_SOURCE 500 +/* Needed for realpath on linux, as well as pthread rwlocks. */ +#ifndef _XOPEN_SOURCE +#define _XOPEN_SOURCE 600 +#endif +#if _XOPEN_SOURCE < 600 +#undef _XOPEN_SOURCE +#define _XOPEN_SOURCE 600 #endif /* Needed for timegm and other extensions when building with -std=c99. diff --git a/src/include/janet.h b/src/include/janet.h index 9b90a506..3a11a6f5 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -299,6 +299,18 @@ typedef struct { JANET_CURRENT_CONFIG_BITS }) #endif +/* Feature include for pthreads. Most feature detection code should go in + * features.h instead. */ +#ifndef JANET_WINDOWS +#ifndef _XOPEN_SOURCE +#define _XOPEN_SOURCE 600 +#endif +#if _XOPEN_SOURCE < 600 +#undef _XOPEN_SOURCE +#define _XOPEN_SOURCE 600 +#endif +#endif + /* What to do when out of memory */ #ifndef JANET_OUT_OF_MEMORY #include @@ -335,9 +347,13 @@ typedef struct JanetDudCriticalSection { void *lock_semaphore; unsigned long spin_count; } JanetOSMutex; +typedef struct JanetDudRWLock { + void *ptr; +} JanetOSRWLock; #else #include typedef pthread_mutex_t JanetOSMutex; +typedef pthread_rwlock_t JanetOSRWLock; #endif #endif @@ -1368,11 +1384,17 @@ JANET_API void *janet_abstract_threaded(const JanetAbstractType *atype, size_t s JANET_API int32_t janet_abstract_incref(void *abst); JANET_API int32_t janet_abstract_decref(void *abst); -/* Expose some OS sync primitives to make portable abstract types easier to implement */ +/* Expose some OS sync primitives */ JANET_API void janet_os_mutex_init(JanetOSMutex *mutex); JANET_API void janet_os_mutex_deinit(JanetOSMutex *mutex); JANET_API void janet_os_mutex_lock(JanetOSMutex *mutex); JANET_API void janet_os_mutex_unlock(JanetOSMutex *mutex); +JANET_API void janet_os_rwlock_init(JanetOSRWLock *rwlock); +JANET_API void janet_os_rwlock_deinit(JanetOSRWLock *rwlock); +JANET_API void janet_os_rwlock_rlock(JanetOSRWLock *rwlock); +JANET_API void janet_os_rwlock_wlock(JanetOSRWLock *rwlock); +JANET_API void janet_os_rwlock_runlock(JanetOSRWLock *rwlock); +JANET_API void janet_os_rwlock_wunlock(JanetOSRWLock *rwlock); /* Get last error from an IO operation */ JANET_API Janet janet_ev_lasterr(void);