Add some thread coordination primitives.

Due to the nature of event loops, it is a bit difficult to integrate
lock and other primitives such that they don't block fibers on the same
thread.
This commit is contained in:
Calvin Rose 2022-06-05 15:24:34 -05:00
parent 2f64a6b0cb
commit 9c9f9d4fa6
3 changed files with 59 additions and 23 deletions

View File

@ -106,6 +106,7 @@ void janet_os_mutex_lock(JanetOSMutex *mutex) {
}
void janet_os_mutex_unlock(JanetOSMutex *mutex) {
/* error handling? May want to keep counter */
LeaveCriticalSection((CRITICAL_SECTION *) mutex);
}
@ -120,7 +121,10 @@ static int32_t janet_decref(JanetAbstractHead *ab) {
}
void janet_os_mutex_init(JanetOSMutex *mutex) {
pthread_mutex_init(mutex, NULL);
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(mutex, &attr);
}
void janet_os_mutex_deinit(JanetOSMutex *mutex) {
@ -132,7 +136,8 @@ void janet_os_mutex_lock(JanetOSMutex *mutex) {
}
void janet_os_mutex_unlock(JanetOSMutex *mutex) {
pthread_mutex_unlock(mutex);
int ret = pthread_mutex_unlock(mutex);
if (ret) janet_panic("cannot release lock");
}
#endif

View File

@ -3013,6 +3013,54 @@ JANET_CORE_FN(janet_cfun_stream_write,
janet_await();
}
typedef struct {
JanetOSMutex mutex;
int destroyed;
} JanetAbstractMutex;
static int mutexgc(void *p, size_t size) {
JanetAbstractMutex *mutex = (JanetAbstractMutex *) p;
(void) size;
janet_os_mutex_deinit(&mutex->mutex);
return 0;
}
const JanetAbstractType janet_mutex_type = {
"core/lock",
mutexgc,
JANET_ATEND_GC
};
JANET_CORE_FN(janet_cfun_mutex,
"(ev/lock)",
"Create a new lock to coordinate threads.") {
janet_fixarity(argc, 0);
(void) argv;
JanetAbstractMutex *mutex = janet_abstract_threaded(&janet_mutex_type, sizeof(JanetAbstractMutex));
janet_os_mutex_init(&mutex->mutex);
return janet_wrap_abstract(mutex);
}
JANET_CORE_FN(janet_cfun_mutex_acquire,
"(ev/acquire-lock lock)",
"Acquire a lock such that this operating system thread is the only thread with access to this resource."
" This will block this entire thread until the lock becomes available, and will not yield to other fibers "
"on this system thread.") {
janet_fixarity(argc, 1);
JanetAbstractMutex *mutex = janet_getabstract(argv, 0, &janet_mutex_type);
janet_os_mutex_lock(&mutex->mutex);
return argv[0];
}
JANET_CORE_FN(janet_cfun_mutex_release,
"(ev/release-lock lock)",
"Release a lock such that other threads may acquire it.") {
janet_fixarity(argc, 1);
JanetAbstractMutex *mutex = janet_getabstract(argv, 0, &janet_mutex_type);
janet_os_mutex_unlock(&mutex->mutex);
return argv[0];
}
void janet_lib_ev(JanetTable *env) {
JanetRegExt ev_cfuns_ext[] = {
JANET_CORE_REG("ev/give", cfun_channel_push),
@ -3035,12 +3083,16 @@ void janet_lib_ev(JanetTable *env) {
JANET_CORE_REG("ev/read", janet_cfun_stream_read),
JANET_CORE_REG("ev/chunk", janet_cfun_stream_chunk),
JANET_CORE_REG("ev/write", janet_cfun_stream_write),
JANET_CORE_REG("ev/lock", janet_cfun_mutex),
JANET_CORE_REG("ev/acquire-lock", janet_cfun_mutex_acquire),
JANET_CORE_REG("ev/release-lock", janet_cfun_mutex_release),
JANET_REG_END
};
janet_core_cfuns_ext(env, NULL, ev_cfuns_ext);
janet_register_abstract_type(&janet_stream_type);
janet_register_abstract_type(&janet_channel_type);
janet_register_abstract_type(&janet_mutex_type);
}
#endif

View File

@ -1180,17 +1180,6 @@ typedef struct {
Janet payload;
} JanetTryState;
/* Thread types */
#ifdef JANET_THREADS
typedef struct JanetThread JanetThread;
typedef struct JanetMailbox JanetMailbox;
struct JanetThread {
JanetMailbox *mailbox;
JanetTable *encode;
};
#endif
/***** END SECTION TYPES *****/
/***** START SECTION OPCODES *****/
@ -2078,16 +2067,6 @@ JANET_API int janet_scan_uint64(const uint8_t *str, int32_t len, uint64_t *out);
#endif
#ifdef JANET_THREADS
extern JANET_API const JanetAbstractType janet_thread_type;
JANET_API int janet_thread_receive(Janet *msg_out, double timeout);
JANET_API int janet_thread_send(JanetThread *thread, Janet msg, double timeout);
JANET_API JanetThread *janet_thread_current(void);
#endif
/* Custom allocator support */
JANET_API void *(janet_malloc)(size_t);
JANET_API void *(janet_realloc)(void *, size_t);