1
0
mirror of https://github.com/janet-lang/janet synced 2025-10-26 13:17:40 +00:00

Have separate encode and decode dicts for threads

This is more correct and mirrors the way marshal -> unmarshal works.
This commit is contained in:
Calvin Rose
2019-12-01 21:53:39 -06:00
parent 5b1e59b535
commit 212479188a
4 changed files with 46 additions and 24 deletions

View File

@@ -8,7 +8,7 @@
(defn make-worker (defn make-worker
[name] [name]
(-> (thread/new make-image-dict) (thread/send worker-main) (thread/send name))) (-> (thread/new) (thread/send worker-main) (thread/send name)))
(def bob (make-worker "bob")) (def bob (make-worker "bob"))
(os/sleep 0.5) (os/sleep 0.5)

View File

@@ -1851,7 +1851,7 @@
(def load-image-dict (def load-image-dict
"A table used in combination with unmarshal to unmarshal byte sequences created "A table used in combination with unmarshal to unmarshal byte sequences created
by make-image, such that (load-image bytes) is the same as (unmarshal bytes load-images-dict)." by make-image, such that (load-image bytes) is the same as (unmarshal bytes load-image-dict)."
@{}) @{})
(defn make-image (defn make-image
@@ -2068,6 +2068,17 @@
[& modules] [& modules]
~(do ,;(map (fn [x] ~(,import* ,(string x) :prefix "")) modules))) ~(do ,;(map (fn [x] ~(,import* ,(string x) :prefix "")) modules)))
###
###
### Thread Extras
###
###
(defn thread/new
"Create a new thread. Same as (thread/new-ext make-image-dict load-image-dict)."
[]
(thread/new-ext make-image-dict load-image-dict))
### ###
### ###
### REPL ### REPL

View File

@@ -170,8 +170,11 @@ static int thread_gc(void *p, size_t size) {
static int thread_mark(void *p, size_t size) { static int thread_mark(void *p, size_t size) {
JanetThread *thread = (JanetThread *)p; JanetThread *thread = (JanetThread *)p;
(void) size; (void) size;
if (NULL != thread->dict) { if (NULL != thread->encode) {
janet_mark(janet_wrap_table(thread->dict)); janet_mark(janet_wrap_table(thread->encode));
}
if (NULL != thread->decode) {
janet_mark(janet_wrap_table(thread->decode));
} }
return 0; return 0;
} }
@@ -187,11 +190,12 @@ static JanetAbstractType Thread_AT = {
NULL NULL
}; };
static JanetThread *janet_make_thread(JanetThreadShared *shared, JanetTable *dict, int who) { static JanetThread *janet_make_thread(JanetThreadShared *shared, JanetTable *encode, JanetTable *decode, int who) {
JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread)); JanetThread *thread = janet_abstract(&Thread_AT, sizeof(JanetThread));
thread->shared = shared; thread->shared = shared;
thread->kind = who; thread->kind = who;
thread->dict = dict; thread->encode = encode;
thread->decode = decode;
return thread; return thread;
} }
@@ -207,16 +211,26 @@ static int thread_worker(JanetThreadShared *shared) {
/* Init VM */ /* Init VM */
janet_init(); janet_init();
/* Get dictionary */ /* Get dictionaries */
JanetTable *dict = janet_core_dictionary(NULL); JanetTable *decode = janet_core_dictionary(NULL);
JanetTable *encode = janet_table(decode->count);
for (int32_t i = 0; i < decode->capacity; i++) {
JanetKV *kv = decode->data + i;
if (!janet_checktype(kv->key, JANET_NIL)) {
janet_table_put(encode, kv->value, kv->key);
}
}
janet_gcroot(janet_wrap_table(encode));
/* Create self thread */ /* Create self thread */
JanetThread *thread = janet_make_thread(shared, dict, JANET_THREAD_SELF); JanetThread *thread = janet_make_thread(shared, encode, decode, JANET_THREAD_SELF);
thread->encode = encode;
thread->decode = decode;
Janet threadv = janet_wrap_abstract(thread); Janet threadv = janet_wrap_abstract(thread);
/* Unmarshal the function */ /* Unmarshal the function */
Janet funcv; Janet funcv;
int status = janet_channel_receive(&shared->child, &funcv, dict); int status = janet_channel_receive(&shared->child, &funcv, decode);
if (status) goto error; if (status) goto error;
if (!janet_checktype(funcv, JANET_FUNCTION)) goto error; if (!janet_checktype(funcv, JANET_FUNCTION)) goto error;
JanetFunction *func = janet_unwrap_function(funcv); JanetFunction *func = janet_unwrap_function(funcv);
@@ -261,16 +275,12 @@ static void janet_thread_start_child(JanetThread *thread) {
* Cfuns * Cfuns
*/ */
static Janet cfun_thread_new(int32_t argc, Janet *argv) { static Janet cfun_thread_new_ext(int32_t argc, Janet *argv) {
janet_arity(argc, 0, 1); janet_fixarity(argc, 2);
JanetTable *dict = NULL; JanetTable *encode = janet_gettable(argv, 0);
if (argc == 0 || janet_checktype(argv[0], JANET_NIL)) { JanetTable *decode = janet_gettable(argv, 1);
dict = janet_core_dictionary(NULL);
} else {
dict = janet_gettable(argv, 0);
}
JanetThreadShared *shared = janet_shared_create(0); JanetThreadShared *shared = janet_shared_create(0);
JanetThread *thread = janet_make_thread(shared, dict, JANET_THREAD_OTHER); JanetThread *thread = janet_make_thread(shared, encode, decode, JANET_THREAD_OTHER);
janet_thread_start_child(thread); janet_thread_start_child(thread);
return janet_wrap_abstract(thread); return janet_wrap_abstract(thread);
} }
@@ -282,7 +292,7 @@ static Janet cfun_thread_send(int32_t argc, Janet *argv) {
if (NULL == shared) janet_panic("channel has closed"); if (NULL == shared) janet_panic("channel has closed");
int status = janet_channel_send(thread->kind == JANET_THREAD_SELF ? &shared->parent : &shared->child, int status = janet_channel_send(thread->kind == JANET_THREAD_SELF ? &shared->parent : &shared->child,
argv[1], argv[1],
thread->dict); thread->encode);
if (status) { if (status) {
janet_panicf("failed to send message %v", argv[1]); janet_panicf("failed to send message %v", argv[1]);
} }
@@ -297,7 +307,7 @@ static Janet cfun_thread_receive(int32_t argc, Janet *argv) {
Janet out = janet_wrap_nil(); Janet out = janet_wrap_nil();
int status = janet_channel_receive(thread->kind == JANET_THREAD_SELF ? &shared->child : &shared->parent, int status = janet_channel_receive(thread->kind == JANET_THREAD_SELF ? &shared->child : &shared->parent,
&out, &out,
thread->dict); thread->decode);
if (status) { if (status) {
janet_panic("failed to receive message"); janet_panic("failed to receive message");
} }
@@ -306,8 +316,8 @@ static Janet cfun_thread_receive(int32_t argc, Janet *argv) {
static const JanetReg threadlib_cfuns[] = { static const JanetReg threadlib_cfuns[] = {
{ {
"thread/new", cfun_thread_new, "thread/new-ext", cfun_thread_new_ext,
JDOC("(thread/new &opt dict)\n\n" JDOC("(thread/new-ext encode-book decode-book)\n\n"
"Start a new thread. The thread will wait for a message containing the function used to start the thread, which should be subsequently " "Start a new thread. The thread will wait for a message containing the function used to start the thread, which should be subsequently "
"sent over after thread creation.") "sent over after thread creation.")
}, },

View File

@@ -957,7 +957,8 @@ struct JanetThreadShared {
}; };
struct JanetThread { struct JanetThread {
JanetThreadShared *shared; JanetThreadShared *shared;
JanetTable *dict; JanetTable *encode;
JanetTable *decode;
enum { enum {
JANET_THREAD_SELF, JANET_THREAD_SELF,
JANET_THREAD_OTHER JANET_THREAD_OTHER