From bca039273801fbf9ae69c3d44a7bd077edd0d0db Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Tue, 26 Nov 2019 23:13:53 -0600 Subject: [PATCH] First work on threading. Posix only, needs to be disabled on windows. Also the Makefile needs to be configurable, and meson.build needs to take pthreads into account. --- Makefile | 3 +- meson.build | 1 + src/boot/boot.janet | 41 +++++++++-- src/core/corelib.c | 100 ++++++++++++++++---------- src/core/thread.c | 163 ++++++++++++++++++++++++++++++++++++++++++ src/core/util.h | 3 + src/include/janet.h | 24 +++++++ src/mainclient/line.c | 26 +++---- 8 files changed, 303 insertions(+), 58 deletions(-) create mode 100644 src/core/thread.c diff --git a/Makefile b/Makefile index 5935b63e..1c00e317 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ MANPATH?=$(PREFIX)/share/man/man1/ PKG_CONFIG_PATH?=$(LIBDIR)/pkgconfig DEBUGGER=gdb -CFLAGS=-std=c99 -Wall -Wextra -Isrc/include -Isrc/conf -fPIC -O2 -fvisibility=hidden \ +CFLAGS=-std=c99 -Wall -Wextra -Isrc/include -Isrc/conf -fPIC -O2 -fvisibility=hidden -pthread \ -DJANET_BUILD=$(JANET_BUILD) LDFLAGS=-rdynamic @@ -106,6 +106,7 @@ JANET_CORE_SOURCES=src/core/abstract.c \ src/core/struct.c \ src/core/symcache.c \ src/core/table.c \ + src/core/thread.c \ src/core/tuple.c \ src/core/typedarray.c \ src/core/util.c \ diff --git a/meson.build b/meson.build index 9e482cab..7bf00e87 100644 --- a/meson.build +++ b/meson.build @@ -128,6 +128,7 @@ core_src = [ 'src/core/struct.c', 'src/core/symcache.c', 'src/core/table.c', + 'src/core/thread.c', 'src/core/tuple.c', 'src/core/typedarray.c', 'src/core/util.c', diff --git a/src/boot/boot.janet b/src/boot/boot.janet index ec98a691..840b3305 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -1842,16 +1842,27 @@ (res) (error (res :error)))) + +(def make-image-dict + "A table used in combination with marshal to marshal code (images), such that + (make-image x) is the same as (marshal x make-image-dict)." + @{}) + +(def load-image-dict + "A table used in combination with unmarshal to unmarshal byte sequences created + by make-image, such that (load-image bytes) is the same as (unmarshal bytes load-images-dict)." + @{}) + (defn make-image "Create an image from an environment returned by require. Returns the image source as a string." [env] - (marshal env (invert (env-lookup _env)))) + (marshal env make-image-dict)) (defn load-image "The inverse operation to make-image. Returns an environment." [image] - (unmarshal image (env-lookup _env))) + (unmarshal image load-image-dict)) (def- nati (if (= :windows (os/which)) ".dll" ".so")) (defn- check-. [x] (if (string/has-prefix? "." x) x)) @@ -2107,9 +2118,16 @@ :on-status (or onsignal (make-onsignal env 1)) :source "repl"})) -# Clean up some extra defs -(put _env 'boot/opts nil) -(put _env '_env nil) +### +### +### Thread Extras +### +### + +(defn thread/new + "Create a new thread from a closure." + [f] + (thread/from-image (make-image f))) ### ### @@ -2231,6 +2249,19 @@ (setdyn :err-color (if *colorize* true)) (repl getchunk onsig))) + +### +### +### Clean up +### +### + +(do + (put _env 'boot/opts nil) + (put _env '_env nil) + (merge-into load-image-dict (env-lookup _env)) + (merge-into make-image-dict (invert load-image-dict))) + ### ### ### Bootstrap diff --git a/src/core/corelib.c b/src/core/corelib.c index b6be3b5f..cc0ace11 100644 --- a/src/core/corelib.c +++ b/src/core/corelib.c @@ -987,13 +987,54 @@ static const uint32_t propagate_asm[] = { JOP_PROPAGATE | (1 << 24), JOP_RETURN }; -#endif /* ifndef JANET_NO_BOOTSTRAP */ +#endif /* ifdef JANET_BOOTSTRAP */ + +/* + * Setup Environment + */ + +static void janet_load_libs(JanetTable *env) { + janet_core_cfuns(env, NULL, corelib_cfuns); + janet_lib_io(env); + janet_lib_math(env); + janet_lib_array(env); + janet_lib_tuple(env); + janet_lib_buffer(env); + janet_lib_table(env); + janet_lib_fiber(env); + janet_lib_os(env); + janet_lib_parse(env); + janet_lib_compile(env); + janet_lib_debug(env); + janet_lib_string(env); + janet_lib_marsh(env); +#ifdef JANET_PEG + janet_lib_peg(env); +#endif +#ifdef JANET_ASSEMBLER + janet_lib_asm(env); +#endif +#ifdef JANET_TYPED_ARRAY + janet_lib_typed_array(env); +#endif +#ifdef JANET_INT_TYPES + janet_lib_inttypes(env); +#endif +#ifdef JANET_THREADS + janet_lib_thread(env); +#endif +} + +#ifdef JANET_BOOTSTRAP + +JanetTable *janet_core_dictionary(JanetTable *replacements) { + (void) replacements; + janet_panic("not defined in bootstrap"); + return NULL; +} JanetTable *janet_core_env(JanetTable *replacements) { JanetTable *env = (NULL != replacements) ? replacements : janet_table(0); - janet_core_cfuns(env, NULL, corelib_cfuns); - -#ifdef JANET_BOOTSTRAP janet_quick_asm(env, JANET_FUN_PROP, "propagate", 2, 2, 2, 2, propagate_asm, sizeof(propagate_asm), JDOC("(propagate x fiber)\n\n" @@ -1145,48 +1186,29 @@ JanetTable *janet_core_env(JanetTable *replacements) { /* Allow references to the environment */ janet_def(env, "_env", janet_wrap_table(env), JDOC("The environment table for the current scope.")); - /* Set as gc root */ + janet_load_libs(env); janet_gcroot(janet_wrap_table(env)); -#endif + return env; +} - /* Load auxiliary envs */ - janet_lib_io(env); - janet_lib_math(env); - janet_lib_array(env); - janet_lib_tuple(env); - janet_lib_buffer(env); - janet_lib_table(env); - janet_lib_fiber(env); - janet_lib_os(env); - janet_lib_parse(env); - janet_lib_compile(env); - janet_lib_debug(env); - janet_lib_string(env); - janet_lib_marsh(env); -#ifdef JANET_PEG - janet_lib_peg(env); -#endif -#ifdef JANET_ASSEMBLER - janet_lib_asm(env); -#endif -#ifdef JANET_TYPED_ARRAY - janet_lib_typed_array(env); -#endif -#ifdef JANET_INT_TYPES - janet_lib_inttypes(env); -#endif +#else -#ifndef JANET_BOOTSTRAP - /* Unmarshal from core image */ +JanetTable *janet_core_dictionary(JanetTable *replacements) { + JanetTable *dict = (NULL != replacements) ? replacements : janet_table(0); + janet_load_libs(dict); + return dict; +} + +JanetTable *janet_core_env(JanetTable *replacements) { + JanetTable *dict = janet_core_dictionary(replacements); Janet marsh_out = janet_unmarshal( janet_core_image, janet_core_image_size, 0, - env, + dict, NULL); janet_gcroot(marsh_out); - env = janet_unwrap_table(marsh_out); -#endif - - return env; + return janet_unwrap_table(marsh_out); } + +#endif diff --git a/src/core/thread.c b/src/core/thread.c new file mode 100644 index 00000000..a58a0bfd --- /dev/null +++ b/src/core/thread.c @@ -0,0 +1,163 @@ +/* +* Copyright (c) 2019 Calvin Rose +* +* Permission is hereby granted, free of charge, to any person obtaining a copy +* of this software and associated documentation files (the "Software"), to +* deal in the Software without restriction, including without limitation the +* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +* sell copies of the Software, and to permit persons to whom the Software is +* furnished to do so, subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in +* all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +* IN THE SOFTWARE. +*/ + +#ifndef JANET_AMALG +#include +#include "gc.h" +#include "util.h" +#endif + +#ifdef JANET_THREADS + +#include + +static void shared_cleanup(JanetThreadShared *shared) { + pthread_mutex_destroy(&shared->refCountLock); + pthread_mutex_destroy(&shared->memoryLock); + free(shared->memory); + free(shared); +} + +static int thread_gc(void *p, size_t size) { + JanetThread *thread = (JanetThread *)p; + JanetThreadShared *shared = thread->shared; + if (NULL == shared) return 0; + (void) size; + pthread_mutex_lock(&shared->refCountLock); + int refcount = --shared->refCount; + if (refcount == 0) { + shared_cleanup(shared); + } else { + pthread_mutex_unlock(&shared->refCountLock); + } + return 0; +} + +static JanetAbstractType Thread_AT = { + "core/thread", + thread_gc, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL +}; + +JanetThread *janet_getthread(Janet *argv, int32_t n) { + return (JanetThread *) janet_getabstract(argv, n, &Thread_AT); +} + +/* Runs in new thread */ +static int thread_worker(JanetThreadShared *shared) { + /* Init VM */ + janet_init(); + + JanetTable *dict = janet_core_dictionary(NULL); + const uint8_t *next = NULL; + + /* Unmarshal the function */ + Janet funcv = janet_unmarshal(shared->memory, shared->memorySize, 0, dict, &next); + if (next == shared->memory) goto error; + if (!janet_checktype(funcv, JANET_FUNCTION)) goto error; + JanetFunction *func = janet_unwrap_function(funcv); + + /* Create self thread */ + JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread)); + thread->shared = shared; + thread->handle = pthread_self(); + + /* Clean up thread when done, do not wait for a join. For + * communicating with other threads, we will rely on the + * JanetThreadShared structure. */ + pthread_detach(thread->handle); + + /* Call function */ + JanetFiber *fiber = janet_fiber(func, 64, 0, NULL); + fiber->env = janet_table(0); + janet_table_put(fiber->env, janet_ckeywordv("worker"), janet_wrap_abstract(thread)); + Janet out; + janet_continue(fiber, janet_wrap_nil(), &out); + + /* TODO - marshal 'out' into sharedMemory */ + + /* Success */ + janet_deinit(); + return 0; + + /* Fail */ +error: + janet_deinit(); + return 1; +} + +void *janet_pthread_wrapper(void *param) { + thread_worker((JanetThreadShared *)param); + return NULL; +} + +static Janet cfun_from_image(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetByteView bytes = janet_getbytes(argv, 0); + + /* Create Shared memory chunk of thread object */ + JanetThreadShared *shared = malloc(sizeof(JanetThreadShared)); + uint8_t *mem = malloc(bytes.len); + if (NULL == shared || NULL == mem) { + janet_panicf("could not allocate memory for thread"); + } + shared->memory = mem; + shared->memorySize = bytes.len; + memcpy(mem, bytes.bytes, bytes.len); + shared->refCount = 2; + pthread_mutex_init(&shared->refCountLock, NULL); + pthread_mutex_init(&shared->memoryLock, NULL); + + /* Create thread abstract */ + JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread)); + thread->shared = shared; + + /* Run thread */ + int error = pthread_create(&thread->handle, NULL, janet_pthread_wrapper, shared); + if (error) { + thread->shared = NULL; /* Prevent GC from trying to mess with shared memory here */ + shared_cleanup(shared); + } + + return janet_wrap_abstract(thread); +} + +static const JanetReg it_cfuns[] = { + { + "thread/from-image", cfun_from_image, + JDOC("(thread/from-image image)\n\n" + "Start a new thread. image is a byte sequence, containing a marshalled function.") + }, + {NULL, NULL, NULL} +}; + +/* Module entry point */ +void janet_lib_thread(JanetTable *env) { + janet_core_cfuns(env, NULL, it_cfuns); +} + +#endif diff --git a/src/core/util.h b/src/core/util.h index 4c7a0b0c..adc4b396 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -120,5 +120,8 @@ void janet_lib_typed_array(JanetTable *env); #ifdef JANET_INT_TYPES void janet_lib_inttypes(JanetTable *env); #endif +#ifdef JANET_THREADS +void janet_lib_thread(JanetTable *env); +#endif #endif diff --git a/src/include/janet.h b/src/include/janet.h index a8a04d04..2cb75db8 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -143,6 +143,11 @@ extern "C" { #define JANET_INT_TYPES #endif +/* Enable or disable threads */ +#ifndef JANET_NO_THREADS +#define JANET_THREADS +#endif + /* How to export symbols */ #ifndef JANET_API #ifdef JANET_WINDOWS @@ -319,6 +324,24 @@ typedef struct JanetRange JanetRange; typedef struct JanetRNG JanetRNG; typedef Janet(*JanetCFunction)(int32_t argc, Janet *argv); +/* Thread types */ +#ifdef JANET_THREADS +#include +typedef struct JanetThread JanetThread; +typedef struct JanetThreadShared JanetThreadShared; +struct JanetThreadShared { + pthread_mutex_t memoryLock; + pthread_mutex_t refCountLock; + uint8_t *memory; + size_t memorySize; + int refCount; +}; +struct JanetThread { + pthread_t handle; + JanetThreadShared *shared; +}; +#endif + /* Basic types for all Janet Values */ typedef enum JanetType { JANET_NUMBER, @@ -1092,6 +1115,7 @@ struct JanetCompileResult { JANET_API JanetCompileResult janet_compile(Janet source, JanetTable *env, const uint8_t *where); /* Get the default environment for janet */ +JANET_API JanetTable *janet_core_dictionary(JanetTable *replacements); /* Used for unmarshaling images */ JANET_API JanetTable *janet_core_env(JanetTable *replacements); JANET_API int janet_dobytes(JanetTable *env, const uint8_t *bytes, int32_t len, const char *sourcePath, Janet *out); diff --git a/src/mainclient/line.c b/src/mainclient/line.c index 5578c7b0..e6b27816 100644 --- a/src/mainclient/line.c +++ b/src/mainclient/line.c @@ -89,18 +89,18 @@ https://github.com/antirez/linenoise/blob/master/linenoise.c /* static state */ #define JANET_LINE_MAX 1024 #define JANET_HISTORY_MAX 100 -static int gbl_israwmode = 0; -static const char *gbl_prompt = "> "; -static int gbl_plen = 2; -static char gbl_buf[JANET_LINE_MAX]; -static int gbl_len = 0; -static int gbl_pos = 0; -static int gbl_cols = 80; -static char *gbl_history[JANET_HISTORY_MAX]; -static int gbl_history_count = 0; -static int gbl_historyi = 0; -static int gbl_sigint_flag = 0; -static struct termios gbl_termios_start; +static JANET_THREAD_LOCAL int gbl_israwmode = 0; +static JANET_THREAD_LOCAL const char *gbl_prompt = "> "; +static JANET_THREAD_LOCAL int gbl_plen = 2; +static JANET_THREAD_LOCAL char gbl_buf[JANET_LINE_MAX]; +static JANET_THREAD_LOCAL int gbl_len = 0; +static JANET_THREAD_LOCAL int gbl_pos = 0; +static JANET_THREAD_LOCAL int gbl_cols = 80; +static JANET_THREAD_LOCAL char *gbl_history[JANET_HISTORY_MAX]; +static JANET_THREAD_LOCAL int gbl_history_count = 0; +static JANET_THREAD_LOCAL int gbl_historyi = 0; +static JANET_THREAD_LOCAL int gbl_sigint_flag = 0; +static JANET_THREAD_LOCAL struct termios gbl_termios_start; /* Unsupported terminal list from linenoise */ static const char *badterms[] = { @@ -126,7 +126,6 @@ static int rawmode() { if (tcgetattr(STDIN_FILENO, &gbl_termios_start) == -1) goto fatal; t = gbl_termios_start; t.c_iflag &= ~(BRKINT | ICRNL | INPCK | ISTRIP | IXON); - t.c_oflag &= ~(OPOST); t.c_cflag |= (CS8); t.c_lflag &= ~(ECHO | ICANON | IEXTEN | ISIG); t.c_cc[VMIN] = 1; @@ -490,6 +489,7 @@ void janet_line_get(const char *p, JanetBuffer *buffer) { } return; } + fflush(stdin); norawmode(); fputc('\n', out); janet_buffer_ensure(buffer, gbl_len + 1, 2);