1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-28 02:59:54 +00:00

First work on threading.

Posix only, needs to be disabled on windows. Also
the Makefile needs to be configurable, and meson.build
needs to take pthreads into account.
This commit is contained in:
Calvin Rose 2019-11-26 23:13:53 -06:00
parent 74d51ab08b
commit bca0392738
8 changed files with 303 additions and 58 deletions

View File

@ -37,7 +37,7 @@ MANPATH?=$(PREFIX)/share/man/man1/
PKG_CONFIG_PATH?=$(LIBDIR)/pkgconfig PKG_CONFIG_PATH?=$(LIBDIR)/pkgconfig
DEBUGGER=gdb DEBUGGER=gdb
CFLAGS=-std=c99 -Wall -Wextra -Isrc/include -Isrc/conf -fPIC -O2 -fvisibility=hidden \ CFLAGS=-std=c99 -Wall -Wextra -Isrc/include -Isrc/conf -fPIC -O2 -fvisibility=hidden -pthread \
-DJANET_BUILD=$(JANET_BUILD) -DJANET_BUILD=$(JANET_BUILD)
LDFLAGS=-rdynamic LDFLAGS=-rdynamic
@ -106,6 +106,7 @@ JANET_CORE_SOURCES=src/core/abstract.c \
src/core/struct.c \ src/core/struct.c \
src/core/symcache.c \ src/core/symcache.c \
src/core/table.c \ src/core/table.c \
src/core/thread.c \
src/core/tuple.c \ src/core/tuple.c \
src/core/typedarray.c \ src/core/typedarray.c \
src/core/util.c \ src/core/util.c \

View File

@ -128,6 +128,7 @@ core_src = [
'src/core/struct.c', 'src/core/struct.c',
'src/core/symcache.c', 'src/core/symcache.c',
'src/core/table.c', 'src/core/table.c',
'src/core/thread.c',
'src/core/tuple.c', 'src/core/tuple.c',
'src/core/typedarray.c', 'src/core/typedarray.c',
'src/core/util.c', 'src/core/util.c',

View File

@ -1842,16 +1842,27 @@
(res) (res)
(error (res :error)))) (error (res :error))))
(def make-image-dict
"A table used in combination with marshal to marshal code (images), such that
(make-image x) is the same as (marshal x make-image-dict)."
@{})
(def load-image-dict
"A table used in combination with unmarshal to unmarshal byte sequences created
by make-image, such that (load-image bytes) is the same as (unmarshal bytes load-images-dict)."
@{})
(defn make-image (defn make-image
"Create an image from an environment returned by require. "Create an image from an environment returned by require.
Returns the image source as a string." Returns the image source as a string."
[env] [env]
(marshal env (invert (env-lookup _env)))) (marshal env make-image-dict))
(defn load-image (defn load-image
"The inverse operation to make-image. Returns an environment." "The inverse operation to make-image. Returns an environment."
[image] [image]
(unmarshal image (env-lookup _env))) (unmarshal image load-image-dict))
(def- nati (if (= :windows (os/which)) ".dll" ".so")) (def- nati (if (= :windows (os/which)) ".dll" ".so"))
(defn- check-. [x] (if (string/has-prefix? "." x) x)) (defn- check-. [x] (if (string/has-prefix? "." x) x))
@ -2107,9 +2118,16 @@
:on-status (or onsignal (make-onsignal env 1)) :on-status (or onsignal (make-onsignal env 1))
:source "repl"})) :source "repl"}))
# Clean up some extra defs ###
(put _env 'boot/opts nil) ###
(put _env '_env nil) ### Thread Extras
###
###
(defn thread/new
"Create a new thread from a closure."
[f]
(thread/from-image (make-image f)))
### ###
### ###
@ -2231,6 +2249,19 @@
(setdyn :err-color (if *colorize* true)) (setdyn :err-color (if *colorize* true))
(repl getchunk onsig))) (repl getchunk onsig)))
###
###
### Clean up
###
###
(do
(put _env 'boot/opts nil)
(put _env '_env nil)
(merge-into load-image-dict (env-lookup _env))
(merge-into make-image-dict (invert load-image-dict)))
### ###
### ###
### Bootstrap ### Bootstrap

View File

@ -987,13 +987,54 @@ static const uint32_t propagate_asm[] = {
JOP_PROPAGATE | (1 << 24), JOP_PROPAGATE | (1 << 24),
JOP_RETURN JOP_RETURN
}; };
#endif /* ifndef JANET_NO_BOOTSTRAP */ #endif /* ifdef JANET_BOOTSTRAP */
/*
* Setup Environment
*/
static void janet_load_libs(JanetTable *env) {
janet_core_cfuns(env, NULL, corelib_cfuns);
janet_lib_io(env);
janet_lib_math(env);
janet_lib_array(env);
janet_lib_tuple(env);
janet_lib_buffer(env);
janet_lib_table(env);
janet_lib_fiber(env);
janet_lib_os(env);
janet_lib_parse(env);
janet_lib_compile(env);
janet_lib_debug(env);
janet_lib_string(env);
janet_lib_marsh(env);
#ifdef JANET_PEG
janet_lib_peg(env);
#endif
#ifdef JANET_ASSEMBLER
janet_lib_asm(env);
#endif
#ifdef JANET_TYPED_ARRAY
janet_lib_typed_array(env);
#endif
#ifdef JANET_INT_TYPES
janet_lib_inttypes(env);
#endif
#ifdef JANET_THREADS
janet_lib_thread(env);
#endif
}
#ifdef JANET_BOOTSTRAP
JanetTable *janet_core_dictionary(JanetTable *replacements) {
(void) replacements;
janet_panic("not defined in bootstrap");
return NULL;
}
JanetTable *janet_core_env(JanetTable *replacements) { JanetTable *janet_core_env(JanetTable *replacements) {
JanetTable *env = (NULL != replacements) ? replacements : janet_table(0); JanetTable *env = (NULL != replacements) ? replacements : janet_table(0);
janet_core_cfuns(env, NULL, corelib_cfuns);
#ifdef JANET_BOOTSTRAP
janet_quick_asm(env, JANET_FUN_PROP, janet_quick_asm(env, JANET_FUN_PROP,
"propagate", 2, 2, 2, 2, propagate_asm, sizeof(propagate_asm), "propagate", 2, 2, 2, 2, propagate_asm, sizeof(propagate_asm),
JDOC("(propagate x fiber)\n\n" JDOC("(propagate x fiber)\n\n"
@ -1145,48 +1186,29 @@ JanetTable *janet_core_env(JanetTable *replacements) {
/* Allow references to the environment */ /* Allow references to the environment */
janet_def(env, "_env", janet_wrap_table(env), JDOC("The environment table for the current scope.")); janet_def(env, "_env", janet_wrap_table(env), JDOC("The environment table for the current scope."));
/* Set as gc root */ janet_load_libs(env);
janet_gcroot(janet_wrap_table(env)); janet_gcroot(janet_wrap_table(env));
#endif return env;
}
/* Load auxiliary envs */ #else
janet_lib_io(env);
janet_lib_math(env);
janet_lib_array(env);
janet_lib_tuple(env);
janet_lib_buffer(env);
janet_lib_table(env);
janet_lib_fiber(env);
janet_lib_os(env);
janet_lib_parse(env);
janet_lib_compile(env);
janet_lib_debug(env);
janet_lib_string(env);
janet_lib_marsh(env);
#ifdef JANET_PEG
janet_lib_peg(env);
#endif
#ifdef JANET_ASSEMBLER
janet_lib_asm(env);
#endif
#ifdef JANET_TYPED_ARRAY
janet_lib_typed_array(env);
#endif
#ifdef JANET_INT_TYPES
janet_lib_inttypes(env);
#endif
#ifndef JANET_BOOTSTRAP JanetTable *janet_core_dictionary(JanetTable *replacements) {
/* Unmarshal from core image */ JanetTable *dict = (NULL != replacements) ? replacements : janet_table(0);
janet_load_libs(dict);
return dict;
}
JanetTable *janet_core_env(JanetTable *replacements) {
JanetTable *dict = janet_core_dictionary(replacements);
Janet marsh_out = janet_unmarshal( Janet marsh_out = janet_unmarshal(
janet_core_image, janet_core_image,
janet_core_image_size, janet_core_image_size,
0, 0,
env, dict,
NULL); NULL);
janet_gcroot(marsh_out); janet_gcroot(marsh_out);
env = janet_unwrap_table(marsh_out); return janet_unwrap_table(marsh_out);
#endif
return env;
} }
#endif

163
src/core/thread.c Normal file
View File

@ -0,0 +1,163 @@
/*
* Copyright (c) 2019 Calvin Rose
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#ifndef JANET_AMALG
#include <janet.h>
#include "gc.h"
#include "util.h"
#endif
#ifdef JANET_THREADS
#include <pthread.h>
static void shared_cleanup(JanetThreadShared *shared) {
pthread_mutex_destroy(&shared->refCountLock);
pthread_mutex_destroy(&shared->memoryLock);
free(shared->memory);
free(shared);
}
static int thread_gc(void *p, size_t size) {
JanetThread *thread = (JanetThread *)p;
JanetThreadShared *shared = thread->shared;
if (NULL == shared) return 0;
(void) size;
pthread_mutex_lock(&shared->refCountLock);
int refcount = --shared->refCount;
if (refcount == 0) {
shared_cleanup(shared);
} else {
pthread_mutex_unlock(&shared->refCountLock);
}
return 0;
}
static JanetAbstractType Thread_AT = {
"core/thread",
thread_gc,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL
};
JanetThread *janet_getthread(Janet *argv, int32_t n) {
return (JanetThread *) janet_getabstract(argv, n, &Thread_AT);
}
/* Runs in new thread */
static int thread_worker(JanetThreadShared *shared) {
/* Init VM */
janet_init();
JanetTable *dict = janet_core_dictionary(NULL);
const uint8_t *next = NULL;
/* Unmarshal the function */
Janet funcv = janet_unmarshal(shared->memory, shared->memorySize, 0, dict, &next);
if (next == shared->memory) goto error;
if (!janet_checktype(funcv, JANET_FUNCTION)) goto error;
JanetFunction *func = janet_unwrap_function(funcv);
/* Create self thread */
JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread));
thread->shared = shared;
thread->handle = pthread_self();
/* Clean up thread when done, do not wait for a join. For
* communicating with other threads, we will rely on the
* JanetThreadShared structure. */
pthread_detach(thread->handle);
/* Call function */
JanetFiber *fiber = janet_fiber(func, 64, 0, NULL);
fiber->env = janet_table(0);
janet_table_put(fiber->env, janet_ckeywordv("worker"), janet_wrap_abstract(thread));
Janet out;
janet_continue(fiber, janet_wrap_nil(), &out);
/* TODO - marshal 'out' into sharedMemory */
/* Success */
janet_deinit();
return 0;
/* Fail */
error:
janet_deinit();
return 1;
}
void *janet_pthread_wrapper(void *param) {
thread_worker((JanetThreadShared *)param);
return NULL;
}
static Janet cfun_from_image(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetByteView bytes = janet_getbytes(argv, 0);
/* Create Shared memory chunk of thread object */
JanetThreadShared *shared = malloc(sizeof(JanetThreadShared));
uint8_t *mem = malloc(bytes.len);
if (NULL == shared || NULL == mem) {
janet_panicf("could not allocate memory for thread");
}
shared->memory = mem;
shared->memorySize = bytes.len;
memcpy(mem, bytes.bytes, bytes.len);
shared->refCount = 2;
pthread_mutex_init(&shared->refCountLock, NULL);
pthread_mutex_init(&shared->memoryLock, NULL);
/* Create thread abstract */
JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread));
thread->shared = shared;
/* Run thread */
int error = pthread_create(&thread->handle, NULL, janet_pthread_wrapper, shared);
if (error) {
thread->shared = NULL; /* Prevent GC from trying to mess with shared memory here */
shared_cleanup(shared);
}
return janet_wrap_abstract(thread);
}
static const JanetReg it_cfuns[] = {
{
"thread/from-image", cfun_from_image,
JDOC("(thread/from-image image)\n\n"
"Start a new thread. image is a byte sequence, containing a marshalled function.")
},
{NULL, NULL, NULL}
};
/* Module entry point */
void janet_lib_thread(JanetTable *env) {
janet_core_cfuns(env, NULL, it_cfuns);
}
#endif

View File

@ -120,5 +120,8 @@ void janet_lib_typed_array(JanetTable *env);
#ifdef JANET_INT_TYPES #ifdef JANET_INT_TYPES
void janet_lib_inttypes(JanetTable *env); void janet_lib_inttypes(JanetTable *env);
#endif #endif
#ifdef JANET_THREADS
void janet_lib_thread(JanetTable *env);
#endif
#endif #endif

View File

@ -143,6 +143,11 @@ extern "C" {
#define JANET_INT_TYPES #define JANET_INT_TYPES
#endif #endif
/* Enable or disable threads */
#ifndef JANET_NO_THREADS
#define JANET_THREADS
#endif
/* How to export symbols */ /* How to export symbols */
#ifndef JANET_API #ifndef JANET_API
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
@ -319,6 +324,24 @@ typedef struct JanetRange JanetRange;
typedef struct JanetRNG JanetRNG; typedef struct JanetRNG JanetRNG;
typedef Janet(*JanetCFunction)(int32_t argc, Janet *argv); typedef Janet(*JanetCFunction)(int32_t argc, Janet *argv);
/* Thread types */
#ifdef JANET_THREADS
#include <pthread.h>
typedef struct JanetThread JanetThread;
typedef struct JanetThreadShared JanetThreadShared;
struct JanetThreadShared {
pthread_mutex_t memoryLock;
pthread_mutex_t refCountLock;
uint8_t *memory;
size_t memorySize;
int refCount;
};
struct JanetThread {
pthread_t handle;
JanetThreadShared *shared;
};
#endif
/* Basic types for all Janet Values */ /* Basic types for all Janet Values */
typedef enum JanetType { typedef enum JanetType {
JANET_NUMBER, JANET_NUMBER,
@ -1092,6 +1115,7 @@ struct JanetCompileResult {
JANET_API JanetCompileResult janet_compile(Janet source, JanetTable *env, const uint8_t *where); JANET_API JanetCompileResult janet_compile(Janet source, JanetTable *env, const uint8_t *where);
/* Get the default environment for janet */ /* Get the default environment for janet */
JANET_API JanetTable *janet_core_dictionary(JanetTable *replacements); /* Used for unmarshaling images */
JANET_API JanetTable *janet_core_env(JanetTable *replacements); JANET_API JanetTable *janet_core_env(JanetTable *replacements);
JANET_API int janet_dobytes(JanetTable *env, const uint8_t *bytes, int32_t len, const char *sourcePath, Janet *out); JANET_API int janet_dobytes(JanetTable *env, const uint8_t *bytes, int32_t len, const char *sourcePath, Janet *out);

View File

@ -89,18 +89,18 @@ https://github.com/antirez/linenoise/blob/master/linenoise.c
/* static state */ /* static state */
#define JANET_LINE_MAX 1024 #define JANET_LINE_MAX 1024
#define JANET_HISTORY_MAX 100 #define JANET_HISTORY_MAX 100
static int gbl_israwmode = 0; static JANET_THREAD_LOCAL int gbl_israwmode = 0;
static const char *gbl_prompt = "> "; static JANET_THREAD_LOCAL const char *gbl_prompt = "> ";
static int gbl_plen = 2; static JANET_THREAD_LOCAL int gbl_plen = 2;
static char gbl_buf[JANET_LINE_MAX]; static JANET_THREAD_LOCAL char gbl_buf[JANET_LINE_MAX];
static int gbl_len = 0; static JANET_THREAD_LOCAL int gbl_len = 0;
static int gbl_pos = 0; static JANET_THREAD_LOCAL int gbl_pos = 0;
static int gbl_cols = 80; static JANET_THREAD_LOCAL int gbl_cols = 80;
static char *gbl_history[JANET_HISTORY_MAX]; static JANET_THREAD_LOCAL char *gbl_history[JANET_HISTORY_MAX];
static int gbl_history_count = 0; static JANET_THREAD_LOCAL int gbl_history_count = 0;
static int gbl_historyi = 0; static JANET_THREAD_LOCAL int gbl_historyi = 0;
static int gbl_sigint_flag = 0; static JANET_THREAD_LOCAL int gbl_sigint_flag = 0;
static struct termios gbl_termios_start; static JANET_THREAD_LOCAL struct termios gbl_termios_start;
/* Unsupported terminal list from linenoise */ /* Unsupported terminal list from linenoise */
static const char *badterms[] = { static const char *badterms[] = {
@ -126,7 +126,6 @@ static int rawmode() {
if (tcgetattr(STDIN_FILENO, &gbl_termios_start) == -1) goto fatal; if (tcgetattr(STDIN_FILENO, &gbl_termios_start) == -1) goto fatal;
t = gbl_termios_start; t = gbl_termios_start;
t.c_iflag &= ~(BRKINT | ICRNL | INPCK | ISTRIP | IXON); t.c_iflag &= ~(BRKINT | ICRNL | INPCK | ISTRIP | IXON);
t.c_oflag &= ~(OPOST);
t.c_cflag |= (CS8); t.c_cflag |= (CS8);
t.c_lflag &= ~(ECHO | ICANON | IEXTEN | ISIG); t.c_lflag &= ~(ECHO | ICANON | IEXTEN | ISIG);
t.c_cc[VMIN] = 1; t.c_cc[VMIN] = 1;
@ -490,6 +489,7 @@ void janet_line_get(const char *p, JanetBuffer *buffer) {
} }
return; return;
} }
fflush(stdin);
norawmode(); norawmode();
fputc('\n', out); fputc('\n', out);
janet_buffer_ensure(buffer, gbl_len + 1, 2); janet_buffer_ensure(buffer, gbl_len + 1, 2);