1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-28 19:19:53 +00:00

Port threads code to Windows API

Can run demo in examples/threads.janet
This commit is contained in:
Calvin Rose 2019-12-10 20:32:41 -05:00
parent 4187c972a3
commit 38f7e256d0
2 changed files with 169 additions and 69 deletions

View File

@ -433,8 +433,9 @@ static Janet os_time(int32_t argc, Janet *argv) {
/* Clock shims */
#ifdef JANET_WINDOWS
static int gettime(struct timespec *spec) {
int64_t wintime = 0LL;
GetSystemTimeAsFileTime((FILETIME *)&wintime);
FILETIME ftime;
GetSystemTimeAsFileTime(&ftime);
int64_t wintime = (int64_t)(ftime.dwLowDateTime) | ((int64_t)(ftime.dwHighDateTime) << 32);
/* Windows epoch is January 1, 1601 apparently */
wintime -= 116444736000000000LL;
spec->tv_sec = wintime / 10000000LL;

View File

@ -29,16 +29,25 @@
#ifdef JANET_THREADS
#ifdef JANET_WINDOWS
#include <windows.h>
#else
#include <setjmp.h>
#include <time.h>
#include <pthread.h>
#endif
/* typedefed in janet.h */
struct JanetMailbox {
/* Synchronization */
#ifdef JANET_WINDOWS
CRITICAL_SECTION lock;
CONDITION_VARIABLE cond;
#else
pthread_mutex_t lock;
pthread_cond_t cond;
#endif
/* Receiving messages - (only by owner thread) */
JanetTable *decode;
@ -69,8 +78,13 @@ static JanetMailbox *janet_mailbox_create(JanetMailbox *parent, int refCount, ui
if (NULL == mailbox) {
JANET_OUT_OF_MEMORY;
}
#ifdef JANET_WINDOWS
InitializeCriticalSection(&mailbox->lock);
InitializeConditionVariable(&mailbox->cond);
#else
pthread_mutex_init(&mailbox->lock, NULL);
pthread_cond_init(&mailbox->cond, NULL);
#endif
mailbox->refCount = refCount;
mailbox->closed = 0;
mailbox->parent = parent;
@ -85,26 +99,47 @@ static JanetMailbox *janet_mailbox_create(JanetMailbox *parent, int refCount, ui
}
static void janet_mailbox_destroy(JanetMailbox *mailbox) {
#ifdef JANET_WINDOWS
DeleteCriticalSection(&mailbox->lock);
#else
pthread_mutex_destroy(&mailbox->lock);
pthread_cond_destroy(&mailbox->cond);
#endif
for (uint16_t i = 0; i < mailbox->messageCapacity; i++) {
janet_buffer_deinit(mailbox->messages + i);
}
free(mailbox);
}
static void janet_mailbox_lock(JanetMailbox *mailbox) {
#ifdef JANET_WINDOWS
EnterCriticalSection(&mailbox->lock);
#else
pthread_mutex_lock(&mailbox->lock);
#endif
}
static void janet_mailbox_unlock(JanetMailbox *mailbox) {
#ifdef JANET_WINDOWS
LeaveCriticalSection(&mailbox->lock);
#else
pthread_mutex_unlock(&mailbox->lock);
#endif
}
/* Assumes you have the mailbox lock already */
static void janet_mailbox_ref_with_lock(JanetMailbox *mailbox, int delta) {
mailbox->refCount += delta;
if (mailbox->refCount <= 0) {
janet_mailbox_unlock(mailbox);
janet_mailbox_destroy(mailbox);
} else {
pthread_mutex_unlock(&mailbox->lock);
janet_mailbox_unlock(mailbox);
}
}
static void janet_mailbox_ref(JanetMailbox *mailbox, int delta) {
pthread_mutex_lock(&mailbox->lock);
janet_mailbox_lock(mailbox);
janet_mailbox_ref_with_lock(mailbox, delta);
}
@ -131,8 +166,40 @@ static int thread_mark(void *p, size_t size) {
return 0;
}
/* Convert an interval from now in an absolute timespec */
static void janet_sec2ts(double sec, struct timespec *ts) {
/* Abstract waiting for timeout across windows/posix */
typedef struct {
int timedwait;
int nowait;
#ifdef JANET_WINDOWS
DWORD interval;
DWORD ticksLeft;
#else
struct timespec ts;
#endif
} JanetWaiter;
static void janet_waiter_init(JanetWaiter *waiter, double sec) {
waiter->timedwait = 0;
waiter->nowait = 0;
if (sec == 0.0 || isnan(sec)) {
waiter->nowait = 1;
return;
}
waiter->timedwait = sec > 0.0;
/* Set maximum wait time to 30 days */
if (sec > (60.0 * 60.0 * 24.0 * 30.0)) {
sec = 60.0 * 60.0 * 24.0 * 30.0;
}
#ifdef JANET_WINDOWS
if (waiter->timedwait) {
waiter->ticksLeft = waiter->interval = (DWORD) floor(1000.0 * sec);
}
#else
if (waiter->timedwait) {
/* N seconds -> timespec of (now + sec) */
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
time_t tvsec = (time_t) floor(sec);
@ -143,13 +210,47 @@ static void janet_sec2ts(double sec, struct timespec *ts) {
tvnsec -= 1000000000L;
tvsec += 1;
}
ts->tv_sec = tvsec;
ts->tv_nsec = tvnsec;
waiter->ts.tv_sec = tvsec;
waiter->ts.tv_nsec = tvnsec;
}
#endif
}
static int janet_waiter_wait(JanetWaiter *wait, JanetMailbox *mailbox) {
if (wait->nowait) return 1;
#ifdef JANET_WINDOWS
if (wait->timedwait) {
if (wait->ticksLeft == 0) return 1;
DWORD startTime = GetTickCount();
int status = !SleepConditionVariableCS(&mailbox->cond, &mailbox->lock, wait->ticksLeft);
DWORD dTick = GetTickCount() - startTime;
/* Be careful about underflow */
wait->ticksLeft = dTick > wait->ticksLeft ? 0 : dTick;
return status;
} else {
SleepConditionVariableCS(&mailbox->cond, &mailbox->lock, INFINITE);
return 0;
}
#else
if (wait->timedwait) {
return pthread_cond_timedwait(&mailbox->cond, &mailbox->lock, &wait->ts);
} else {
pthread_cond_wait(&mailbox->cond, &mailbox->lock);
return 0;
}
#endif
}
static void janet_mailbox_wakeup(JanetMailbox *mailbox) {
#ifdef JANET_WINDOWS
WakeConditionVariable(&mailbox->cond);
#else
pthread_cond_signal(&mailbox->cond);
#endif
}
static int mailbox_at_capacity(JanetMailbox *mailbox) {
return mailbox->messageCapacity > 0
&& mailbox->messageCount == mailbox->messageCapacity;
return mailbox->messageCount >= mailbox->messageCapacity;
}
/* Returns 1 if could not send (encode error or timeout), 2 for mailbox closed, and
@ -159,36 +260,33 @@ int janet_thread_send(JanetThread *thread, Janet msg, double timeout) {
/* Ensure mailbox is not closed. */
JanetMailbox *mailbox = thread->mailbox;
if (NULL == mailbox) return 2;
pthread_mutex_lock(&mailbox->lock);
janet_mailbox_lock(mailbox);
if (mailbox->closed) {
janet_mailbox_ref_with_lock(mailbox, -1);
thread->mailbox = NULL;
return 2;
}
int didWait = 0;
/* Back pressure */
if (mailbox_at_capacity(mailbox)) {
struct timespec timeout_ts;
int timedwait = timeout > 0.0;
int nowait = timeout == 0.0;
if (timedwait) janet_sec2ts(timeout, &timeout_ts);
if (nowait) {
janet_mailbox_ref_with_lock(mailbox, -1);
JanetWaiter wait;
janet_waiter_init(&wait, timeout);
if (wait.nowait) {
janet_mailbox_unlock(mailbox);
return 1;
}
/* Retry loop, as there can be multiple writers */
while (mailbox->messageCount > mailbox->messageCapacity) {
if (timedwait) {
int status = pthread_cond_timedwait(&mailbox->cond, &mailbox->lock, &timeout_ts);
if (status) {
/* Timedout */
pthread_mutex_unlock(&mailbox->lock);
while (mailbox_at_capacity(mailbox)) {
didWait = 1;
if (janet_waiter_wait(&wait, mailbox)) {
janet_mailbox_unlock(mailbox);
janet_mailbox_wakeup(mailbox);
return 1;
}
} else {
pthread_cond_wait(&mailbox->cond, &mailbox->lock);
}
}
}
@ -217,12 +315,10 @@ int janet_thread_send(JanetThread *thread, Janet msg, double timeout) {
/* Cleanup */
janet_vm_jmp_buf = old_buf;
pthread_mutex_unlock(&mailbox->lock);
janet_mailbox_unlock(mailbox);
/* Potentially wake up a blocked thread */
if (oldmcount == 0 && ret == 0) {
pthread_cond_signal(&mailbox->cond);
}
if (didWait || (oldmcount == 0 && ret == 0)) janet_mailbox_wakeup(mailbox);
return ret;
}
@ -230,13 +326,11 @@ int janet_thread_send(JanetThread *thread, Janet msg, double timeout) {
/* Returns 0 on successful message. Returns 1 if timedout */
int janet_thread_receive(Janet *msg_out, double timeout) {
JanetMailbox *mailbox = janet_vm_mailbox;
pthread_mutex_lock(&mailbox->lock);
janet_mailbox_lock(mailbox);
/* For timeouts */
struct timespec timeout_ts;
int timedwait = timeout > 0.0;
int nowait = timeout == 0.0;
if (timedwait) janet_sec2ts(timeout, &timeout_ts);
JanetWaiter wait;
janet_waiter_init(&wait, timeout);
for (;;) {
@ -271,39 +365,25 @@ int janet_thread_receive(Janet *msg_out, double timeout) {
/* Cleanup */
janet_vm_jmp_buf = old_buf;
pthread_mutex_unlock(&mailbox->lock);
janet_mailbox_unlock(mailbox);
/* Wake up pending writers */
if (wasAtCapacity) {
pthread_cond_signal(&mailbox->cond);
}
if (wasAtCapacity) janet_mailbox_wakeup(mailbox);
return 0;
}
}
if (nowait || mailbox->refCount <= 1) {
/* If there is only one ref, it is us. This means that if we
* start waiting now, we can never possibly get a message, as
* our reference will not propogate to other threads while we are blocked. */
pthread_mutex_unlock(&mailbox->lock);
if (wait.nowait || mailbox->refCount <= 1) {
janet_mailbox_unlock(mailbox);
return 1;
}
/* Wait for next message */
if (timedwait) {
if (pthread_cond_timedwait(
&mailbox->cond,
&mailbox->lock,
&timeout_ts)) {
pthread_mutex_unlock(&mailbox->lock);
if (janet_waiter_wait(&wait, mailbox)) {
janet_mailbox_unlock(mailbox);
return 1;
}
} else {
pthread_cond_wait(
&mailbox->cond,
&mailbox->lock);
}
}
}
@ -390,11 +470,28 @@ static int thread_worker(JanetMailbox *mailbox) {
/* Fail to set something up */
error:
janet_eprintf("thread failed to start\n");
janet_eprintf("\nthread failed to start\n");
janet_deinit();
return 1;
}
#ifdef JANET_WINDOWS
static DWORD janet_create_thread_wrapper(void *param) {
thread_worker((JanetMailbox *)param);
return 0;
}
static int janet_thread_start_child(JanetThread *thread) {
HANDLE handle = CreateThread(NULL, 0, janet_create_thread_wrapper, thread->mailbox, 0, NULL);
int ret = NULL == handle;
/* Does not kill thread, simply detatches */
if (!ret) CloseHandle(handle);
return ret;
}
#else
static void *janet_pthread_wrapper(void *param) {
thread_worker((JanetMailbox *)param);
return NULL;
@ -411,6 +508,8 @@ static int janet_thread_start_child(JanetThread *thread) {
}
}
#endif
/*
* Setup/Teardown
*/
@ -422,7 +521,7 @@ void janet_threads_init(void) {
}
void janet_threads_deinit(void) {
pthread_mutex_lock(&janet_vm_mailbox->lock);
janet_mailbox_lock(janet_vm_mailbox);
janet_vm_mailbox->closed = 1;
janet_mailbox_ref_with_lock(janet_vm_mailbox, -1);
janet_vm_mailbox = NULL;