mirror of
https://github.com/janet-lang/janet
synced 2024-11-25 09:47:17 +00:00
Work on threads.
Add send and receive.
This commit is contained in:
parent
5cd6580c2d
commit
6a763aac95
@ -2119,17 +2119,6 @@
|
||||
:on-status (or onsignal (make-onsignal env 1))
|
||||
:source "repl"}))
|
||||
|
||||
###
|
||||
###
|
||||
### Thread Extras
|
||||
###
|
||||
###
|
||||
|
||||
(defn thread/new
|
||||
"Create a new thread from a closure."
|
||||
[f]
|
||||
(thread/from-image (make-image f)))
|
||||
|
||||
###
|
||||
###
|
||||
### CLI Tool Main
|
||||
|
@ -1194,8 +1194,27 @@ JanetTable *janet_core_env(JanetTable *replacements) {
|
||||
#else
|
||||
|
||||
JanetTable *janet_core_dictionary(JanetTable *replacements) {
|
||||
JanetTable *dict = (NULL != replacements) ? replacements : janet_table(0);
|
||||
janet_load_libs(dict);
|
||||
JanetTable *dict;
|
||||
if (NULL == janet_vm_core_dictionary) {
|
||||
dict = janet_table(0);
|
||||
janet_load_libs(dict);
|
||||
janet_vm_core_dictionary = dict;
|
||||
janet_gcroot(janet_wrap_table(dict));
|
||||
/* do replacements */
|
||||
if (NULL != replacements) {
|
||||
for (int32_t i = 0; i < replacements->capacity; i++) {
|
||||
if (!janet_checktype(replacements->data[i].key, JANET_NIL)) {
|
||||
const JanetKV *kv = replacements->data + i;
|
||||
janet_table_put(dict, kv->key, kv->value);
|
||||
if (janet_checktype(kv->value, JANET_CFUNCTION)) {
|
||||
janet_table_put(janet_vm_registry, kv->value, kv->key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
dict = janet_vm_core_dictionary;
|
||||
}
|
||||
return dict;
|
||||
}
|
||||
|
||||
|
@ -32,6 +32,9 @@
|
||||
* be in it. However, thread local global variables for interpreter
|
||||
* state should allow easy multi-threading. */
|
||||
|
||||
/* The core dictionary is memoized */
|
||||
extern JANET_THREAD_LOCAL JanetTable *janet_vm_core_dictionary;
|
||||
|
||||
/* How many VM stacks have been entered */
|
||||
extern JANET_THREAD_LOCAL int janet_vm_stackn;
|
||||
|
||||
|
@ -24,19 +24,134 @@
|
||||
#include <janet.h>
|
||||
#include "gc.h"
|
||||
#include "util.h"
|
||||
#include "state.h"
|
||||
#endif
|
||||
|
||||
#ifdef JANET_THREADS
|
||||
|
||||
#include <pthread.h>
|
||||
#include <setjmp.h>
|
||||
|
||||
static void shared_cleanup(JanetThreadShared *shared) {
|
||||
static void janet_channel_init(JanetChannel *channel, size_t initialSize) {
|
||||
janet_buffer_init(&channel->buf, (int32_t) initialSize);
|
||||
pthread_mutex_init(&channel->lock, NULL);
|
||||
pthread_cond_init(&channel->cond, NULL);
|
||||
}
|
||||
|
||||
static void janet_channel_destroy(JanetChannel *channel) {
|
||||
janet_buffer_deinit(&channel->buf);
|
||||
pthread_mutex_destroy(&channel->lock);
|
||||
pthread_cond_destroy(&channel->cond);
|
||||
}
|
||||
|
||||
static JanetThreadShared *janet_shared_create(size_t initialSize) {
|
||||
const char *errmsg = "could not allocate memory for thread";
|
||||
JanetThreadShared *shared = malloc(sizeof(JanetThreadShared));
|
||||
if (NULL == shared) janet_panicf(errmsg);
|
||||
uint8_t *mem = malloc(initialSize);
|
||||
if (NULL == mem) janet_panic(errmsg);
|
||||
janet_channel_init(&shared->parent, 0);
|
||||
janet_channel_init(&shared->child, initialSize);
|
||||
shared->refCount = 2;
|
||||
pthread_mutex_init(&shared->refCountLock, NULL);
|
||||
return shared;
|
||||
}
|
||||
|
||||
static void janet_shared_destroy(JanetThreadShared *shared) {
|
||||
janet_channel_destroy(&shared->parent);
|
||||
janet_channel_destroy(&shared->child);
|
||||
pthread_mutex_destroy(&shared->refCountLock);
|
||||
pthread_mutex_destroy(&shared->memoryLock);
|
||||
free(shared->memory);
|
||||
free(shared);
|
||||
}
|
||||
|
||||
/* Returns 1 if could not send. Does not block or panic. Bytes should be a janet value that
|
||||
* has been marshalled. */
|
||||
static int janet_channel_send_any(JanetChannel *channel, Janet msg, JanetByteView bytes, int is_bytes, JanetTable *dict) {
|
||||
pthread_mutex_lock(&channel->lock);
|
||||
|
||||
/* Hack to capture all panics from marshalling. This works because
|
||||
* we know janet_marshal won't mess with other essential global state. */
|
||||
jmp_buf buf;
|
||||
jmp_buf *old_buf = janet_vm_jmp_buf;
|
||||
janet_vm_jmp_buf = &buf;
|
||||
int32_t oldcount = channel->buf.count;
|
||||
|
||||
int ret = 0;
|
||||
if (setjmp(buf)) {
|
||||
ret = 1;
|
||||
channel->buf.count = oldcount;
|
||||
} else {
|
||||
if (is_bytes) {
|
||||
janet_buffer_push_bytes(&channel->buf, bytes.bytes, bytes.len);
|
||||
} else {
|
||||
janet_marshal(&channel->buf, msg, dict, 0);
|
||||
}
|
||||
|
||||
/* Was empty, signal to cond */
|
||||
if (oldcount == 0) {
|
||||
pthread_cond_signal(&channel->cond);
|
||||
}
|
||||
}
|
||||
|
||||
/* Cleanup */
|
||||
janet_vm_jmp_buf = old_buf;
|
||||
pthread_mutex_unlock(&channel->lock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int janet_channel_send(JanetChannel *channel, Janet msg, JanetTable *dict) {
|
||||
JanetByteView dud = {0};
|
||||
return janet_channel_send_any(channel, msg, dud, 0, dict);
|
||||
}
|
||||
|
||||
/*
|
||||
static int janet_channel_send_image(JanetChannel *channel, JanetByteView bytes) {
|
||||
return janet_channel_send_any(channel, janet_wrap_nil(), bytes, 1, NULL);
|
||||
}
|
||||
*/
|
||||
|
||||
/* Returns 1 if nothing in queue or failed to get item. Does not block or panic. Uses dict to read bytes from
|
||||
* the channel and unmarshal them. */
|
||||
static int janet_channel_receive(JanetChannel *channel, Janet *msg_out, JanetTable *dict) {
|
||||
pthread_mutex_lock(&channel->lock);
|
||||
|
||||
/* If queue is empty, block for now. */
|
||||
while (channel->buf.count == 0) {
|
||||
pthread_cond_wait(&channel->cond, &channel->lock);
|
||||
}
|
||||
|
||||
/* Hack to capture all panics from marshalling. This works because
|
||||
* we know janet_marshal won't mess with other essential global state. */
|
||||
jmp_buf buf;
|
||||
jmp_buf *old_buf = janet_vm_jmp_buf;
|
||||
janet_vm_jmp_buf = &buf;
|
||||
|
||||
/* Handle errors */
|
||||
int ret = 0;
|
||||
if (setjmp(buf)) {
|
||||
/* Clear the channel on errors */
|
||||
channel->buf.count = 0;
|
||||
ret = 1;
|
||||
} else {
|
||||
/* Read from beginning of channel */
|
||||
const uint8_t *nextItem = NULL;
|
||||
Janet item = janet_unmarshal(channel->buf.data, channel->buf.count, 0, dict, &nextItem);
|
||||
|
||||
/* Update memory and put result into *msg_out */
|
||||
int32_t chunkCount = nextItem - channel->buf.data;
|
||||
memmove(channel->buf.data, nextItem, channel->buf.count - chunkCount);
|
||||
channel->buf.count -= chunkCount;
|
||||
*msg_out = item;
|
||||
}
|
||||
|
||||
/* Cleanup */
|
||||
janet_vm_jmp_buf = old_buf;
|
||||
pthread_mutex_unlock(&channel->lock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int thread_gc(void *p, size_t size) {
|
||||
JanetThread *thread = (JanetThread *)p;
|
||||
JanetThreadShared *shared = thread->shared;
|
||||
@ -45,17 +160,26 @@ static int thread_gc(void *p, size_t size) {
|
||||
pthread_mutex_lock(&shared->refCountLock);
|
||||
int refcount = --shared->refCount;
|
||||
if (refcount == 0) {
|
||||
shared_cleanup(shared);
|
||||
janet_shared_destroy(shared);
|
||||
} else {
|
||||
pthread_mutex_unlock(&shared->refCountLock);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int thread_mark(void *p, size_t size) {
|
||||
JanetThread *thread = (JanetThread *)p;
|
||||
(void) size;
|
||||
if (NULL != thread->dict) {
|
||||
janet_mark(janet_wrap_table(thread->dict));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static JanetAbstractType Thread_AT = {
|
||||
"core/thread",
|
||||
thread_gc,
|
||||
NULL,
|
||||
thread_mark,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
@ -63,43 +187,53 @@ static JanetAbstractType Thread_AT = {
|
||||
NULL
|
||||
};
|
||||
|
||||
static JanetThread *janet_make_thread(JanetThreadShared *shared, JanetTable *dict, int who) {
|
||||
JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread));
|
||||
thread->shared = shared;
|
||||
thread->kind = who;
|
||||
thread->dict = dict;
|
||||
thread->handle = NULL;
|
||||
return thread;
|
||||
}
|
||||
|
||||
JanetThread *janet_getthread(Janet *argv, int32_t n) {
|
||||
return (JanetThread *) janet_getabstract(argv, n, &Thread_AT);
|
||||
}
|
||||
|
||||
/* Runs in new thread */
|
||||
static int thread_worker(JanetThreadShared *shared) {
|
||||
pthread_t handle = pthread_self();
|
||||
pthread_detach(handle);
|
||||
|
||||
/* Init VM */
|
||||
janet_init();
|
||||
|
||||
/* Get dictionary */
|
||||
JanetTable *dict = janet_core_dictionary(NULL);
|
||||
const uint8_t *next = NULL;
|
||||
|
||||
/* Create self thread */
|
||||
JanetThread *thread = janet_make_thread(shared, dict, JANET_THREAD_SELF);
|
||||
thread->handle = handle;
|
||||
Janet threadv = janet_wrap_abstract(thread);
|
||||
|
||||
/* Unmarshal the function */
|
||||
Janet funcv = janet_unmarshal(shared->memory, shared->memorySize, 0, dict, &next);
|
||||
if (next == shared->memory) goto error;
|
||||
Janet funcv;
|
||||
int status = janet_channel_receive(&shared->child, &funcv, dict);
|
||||
if (status) goto error;
|
||||
if (!janet_checktype(funcv, JANET_FUNCTION)) goto error;
|
||||
JanetFunction *func = janet_unwrap_function(funcv);
|
||||
|
||||
/* Create self thread */
|
||||
JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread));
|
||||
thread->shared = shared;
|
||||
thread->handle = pthread_self();
|
||||
|
||||
/* Clean up thread when done, do not wait for a join. For
|
||||
* communicating with other threads, we will rely on the
|
||||
* JanetThreadShared structure. */
|
||||
pthread_detach(thread->handle);
|
||||
/* Arity check */
|
||||
if (func->def->min_arity > 1 || func->def->max_arity < 1) {
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Call function */
|
||||
JanetFiber *fiber = janet_fiber(func, 64, 0, NULL);
|
||||
fiber->env = janet_table(0);
|
||||
janet_table_put(fiber->env, janet_ckeywordv("worker"), janet_wrap_abstract(thread));
|
||||
Janet argv[1] = { threadv };
|
||||
JanetFiber *fiber = janet_fiber(func, 64, 1, argv);
|
||||
Janet out;
|
||||
janet_continue(fiber, janet_wrap_nil(), &out);
|
||||
|
||||
/* TODO - marshal 'out' into sharedMemory */
|
||||
|
||||
/* Success */
|
||||
janet_deinit();
|
||||
return 0;
|
||||
@ -110,47 +244,84 @@ error:
|
||||
return 1;
|
||||
}
|
||||
|
||||
void *janet_pthread_wrapper(void *param) {
|
||||
static void *janet_pthread_wrapper(void *param) {
|
||||
thread_worker((JanetThreadShared *)param);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static Janet cfun_from_image(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 1);
|
||||
JanetByteView bytes = janet_getbytes(argv, 0);
|
||||
|
||||
/* Create Shared memory chunk of thread object */
|
||||
JanetThreadShared *shared = malloc(sizeof(JanetThreadShared));
|
||||
uint8_t *mem = malloc(bytes.len);
|
||||
if (NULL == shared || NULL == mem) {
|
||||
janet_panicf("could not allocate memory for thread");
|
||||
}
|
||||
shared->memory = mem;
|
||||
shared->memorySize = bytes.len;
|
||||
memcpy(mem, bytes.bytes, bytes.len);
|
||||
shared->refCount = 2;
|
||||
pthread_mutex_init(&shared->refCountLock, NULL);
|
||||
pthread_mutex_init(&shared->memoryLock, NULL);
|
||||
|
||||
/* Create thread abstract */
|
||||
JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread));
|
||||
thread->shared = shared;
|
||||
|
||||
/* Run thread */
|
||||
static void janet_thread_start_child(JanetThread *thread) {
|
||||
JanetThreadShared *shared = thread->shared;
|
||||
int error = pthread_create(&thread->handle, NULL, janet_pthread_wrapper, shared);
|
||||
if (error) {
|
||||
thread->shared = NULL; /* Prevent GC from trying to mess with shared memory here */
|
||||
shared_cleanup(shared);
|
||||
janet_shared_destroy(shared);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Cfuns
|
||||
*/
|
||||
|
||||
static Janet cfun_thread_new(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 0, 1);
|
||||
JanetTable *dict = NULL;
|
||||
if (argc == 0 || janet_checktype(argv[0], JANET_NIL)) {
|
||||
dict = janet_core_dictionary(NULL);
|
||||
} else {
|
||||
dict = janet_gettable(argv, 0);
|
||||
}
|
||||
JanetThreadShared *shared = janet_shared_create(0);
|
||||
JanetThread *thread = janet_make_thread(shared, dict, JANET_THREAD_OTHER);
|
||||
janet_thread_start_child(thread);
|
||||
return janet_wrap_abstract(thread);
|
||||
}
|
||||
|
||||
static Janet cfun_thread_send(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 2);
|
||||
JanetThread *thread = janet_getthread(argv, 0);
|
||||
JanetThreadShared *shared = thread->shared;
|
||||
if (NULL == shared) janet_panic("channel has closed");
|
||||
int status = janet_channel_send(thread->kind == JANET_THREAD_SELF ? &shared->parent : &shared->child,
|
||||
argv[1],
|
||||
thread->dict);
|
||||
if (status) {
|
||||
janet_panicf("failed to send message %v", argv[1]);
|
||||
}
|
||||
return argv[0];
|
||||
}
|
||||
|
||||
static Janet cfun_thread_receive(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 1);
|
||||
JanetThread *thread = janet_getthread(argv, 0);
|
||||
JanetThreadShared *shared = thread->shared;
|
||||
if (NULL == shared) janet_panic("channel has closed");
|
||||
Janet out = janet_wrap_nil();
|
||||
int status = janet_channel_receive(thread->kind == JANET_THREAD_SELF ? &shared->child : &shared->parent,
|
||||
&out,
|
||||
thread->dict);
|
||||
if (status) {
|
||||
janet_panic("failed to receive message");
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
static const JanetReg threadlib_cfuns[] = {
|
||||
{
|
||||
"thread/from-image", cfun_from_image,
|
||||
JDOC("(thread/from-image image)\n\n"
|
||||
"Start a new thread. image is a byte sequence, containing a marshalled function.")
|
||||
"thread/new", cfun_thread_new,
|
||||
JDOC("(thread/new &opt dict)\n\n"
|
||||
"Start a new thread. The thread will wait for a message containing the function used to start the thread, which should be subsequently "
|
||||
"sent over after thread creation.")
|
||||
},
|
||||
{
|
||||
"thread/send", cfun_thread_send,
|
||||
JDOC("(thread/send thread msg)\n\n"
|
||||
"Send a message to the thread. This will never block and returns thread immediately. "
|
||||
"Will throw an error if there is a problem sending the message.")
|
||||
},
|
||||
{
|
||||
"thread/receive", cfun_thread_receive,
|
||||
JDOC("(thread/receive thread)\n\n"
|
||||
"Get a value sent to thread. Will block if there is no value was sent to this thread yet. Returns the message sent to the thread.")
|
||||
},
|
||||
{NULL, NULL, NULL}
|
||||
};
|
||||
|
@ -331,18 +331,9 @@ const JanetAbstractType *janet_get_abstract_type(Janet key) {
|
||||
void janet_core_def(JanetTable *env, const char *name, Janet x, const void *p) {
|
||||
(void) p;
|
||||
Janet key = janet_csymbolv(name);
|
||||
Janet value;
|
||||
/* During init, allow replacing core library cfunctions with values from
|
||||
* the env. */
|
||||
Janet check = janet_table_get(env, key);
|
||||
if (janet_checktype(check, JANET_NIL)) {
|
||||
value = x;
|
||||
} else {
|
||||
value = check;
|
||||
}
|
||||
janet_table_put(env, key, value);
|
||||
if (janet_checktype(value, JANET_CFUNCTION)) {
|
||||
janet_table_put(janet_vm_registry, value, key);
|
||||
janet_table_put(env, key, x);
|
||||
if (janet_checktype(x, JANET_CFUNCTION)) {
|
||||
janet_table_put(janet_vm_registry, x, key);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,7 @@
|
||||
#endif
|
||||
|
||||
/* VM state */
|
||||
JANET_THREAD_LOCAL JanetTable *janet_vm_core_dictionary;
|
||||
JANET_THREAD_LOCAL JanetTable *janet_vm_registry;
|
||||
JANET_THREAD_LOCAL int janet_vm_stackn = 0;
|
||||
JANET_THREAD_LOCAL JanetFiber *janet_vm_fiber = NULL;
|
||||
@ -1240,6 +1241,8 @@ int janet_init(void) {
|
||||
/* Initialize registry */
|
||||
janet_vm_registry = janet_table(0);
|
||||
janet_gcroot(janet_wrap_table(janet_vm_registry));
|
||||
/* Core env and dictionary */
|
||||
janet_vm_core_dictionary = NULL;
|
||||
/* Seed RNG */
|
||||
janet_rng_seed(janet_default_rng(), 0);
|
||||
return 0;
|
||||
@ -1254,4 +1257,5 @@ void janet_deinit(void) {
|
||||
janet_vm_root_count = 0;
|
||||
janet_vm_root_capacity = 0;
|
||||
janet_vm_registry = NULL;
|
||||
janet_vm_core_dictionary = NULL;
|
||||
}
|
||||
|
@ -324,24 +324,6 @@ typedef struct JanetRange JanetRange;
|
||||
typedef struct JanetRNG JanetRNG;
|
||||
typedef Janet(*JanetCFunction)(int32_t argc, Janet *argv);
|
||||
|
||||
/* Thread types */
|
||||
#ifdef JANET_THREADS
|
||||
#include <pthread.h>
|
||||
typedef struct JanetThread JanetThread;
|
||||
typedef struct JanetThreadShared JanetThreadShared;
|
||||
struct JanetThreadShared {
|
||||
pthread_mutex_t memoryLock;
|
||||
pthread_mutex_t refCountLock;
|
||||
uint8_t *memory;
|
||||
size_t memorySize;
|
||||
int refCount;
|
||||
};
|
||||
struct JanetThread {
|
||||
pthread_t handle;
|
||||
JanetThreadShared *shared;
|
||||
};
|
||||
#endif
|
||||
|
||||
/* Basic types for all Janet Values */
|
||||
typedef enum JanetType {
|
||||
JANET_NUMBER,
|
||||
@ -956,6 +938,35 @@ struct JanetRNG {
|
||||
uint32_t counter;
|
||||
};
|
||||
|
||||
/* Thread types */
|
||||
#ifdef JANET_THREADS
|
||||
#include <pthread.h>
|
||||
typedef struct JanetThread JanetThread;
|
||||
typedef struct JanetThreadShared JanetThreadShared;
|
||||
typedef struct JanetChannel JanetChannel;
|
||||
struct JanetChannel {
|
||||
pthread_mutex_t lock;
|
||||
pthread_cond_t cond;
|
||||
JanetBuffer buf;
|
||||
};
|
||||
struct JanetThreadShared {
|
||||
pthread_mutex_t refCountLock;
|
||||
int refCount;
|
||||
JanetChannel parent;
|
||||
JanetChannel child;
|
||||
};
|
||||
struct JanetThread {
|
||||
pthread_t handle;
|
||||
JanetThreadShared *shared;
|
||||
JanetTable *dict;
|
||||
enum {
|
||||
JANET_THREAD_SELF,
|
||||
JANET_THREAD_OTHER
|
||||
} kind;
|
||||
};
|
||||
#endif
|
||||
|
||||
|
||||
/***** END SECTION TYPES *****/
|
||||
|
||||
/***** START SECTION OPCODES *****/
|
||||
|
Loading…
Reference in New Issue
Block a user