1
0
mirror of https://github.com/janet-lang/janet synced 2025-09-08 05:46:06 +00:00

Add some flags to creating threads for more control.

Allow lightweight/heavyweight threads, and make default lightweight.
This means multithreaded programs can save lots of memory by default.
This commit is contained in:
Calvin Rose
2020-06-13 09:42:16 -05:00
parent 86e12369b6
commit c87a0910d0

View File

@@ -66,9 +66,15 @@ struct JanetMailbox {
JanetBuffer messages[]; JanetBuffer messages[];
}; };
#define JANET_THREAD_HEAVYWEIGHT 0x1
#define JANET_THREAD_ABSTRACTS 0x2
#define JANET_THREAD_CFUNCTIONS 0x4
static const char janet_thread_flags[] = "hac";
typedef struct { typedef struct {
JanetMailbox *original; JanetMailbox *original;
JanetMailbox *newbox; JanetMailbox *newbox;
int flags;
} JanetMailboxPair; } JanetMailboxPair;
static JANET_THREAD_LOCAL JanetMailbox *janet_vm_mailbox = NULL; static JANET_THREAD_LOCAL JanetMailbox *janet_vm_mailbox = NULL;
@@ -175,7 +181,7 @@ static int thread_mark(void *p, size_t size) {
return 0; return 0;
} }
static JanetMailboxPair *make_mailbox_pair(JanetMailbox *original) { static JanetMailboxPair *make_mailbox_pair(JanetMailbox *original, int flags) {
JanetMailboxPair *pair = malloc(sizeof(JanetMailboxPair)); JanetMailboxPair *pair = malloc(sizeof(JanetMailboxPair));
if (NULL == pair) { if (NULL == pair) {
JANET_OUT_OF_MEMORY; JANET_OUT_OF_MEMORY;
@@ -183,6 +189,7 @@ static JanetMailboxPair *make_mailbox_pair(JanetMailbox *original) {
pair->original = original; pair->original = original;
janet_mailbox_ref(original, 1); janet_mailbox_ref(original, 1);
pair->newbox = janet_mailbox_create(1, 16); pair->newbox = janet_mailbox_create(1, 16);
pair->flags = flags;
return pair; return pair;
} }
@@ -442,16 +449,44 @@ static int thread_worker(JanetMailboxPair *pair) {
janet_init(); janet_init();
/* Get dictionaries for default encode/decode */ /* Get dictionaries for default encode/decode */
JanetTable *encode = janet_get_core_table("make-image-dict"); JanetTable *encode;
if (pair->flags & JANET_THREAD_HEAVYWEIGHT) {
encode = janet_get_core_table("make-image-dict");
} else {
encode = NULL;
janet_vm_thread_decode = janet_table(0);
janet_gcroot(janet_wrap_table(janet_vm_thread_decode));
}
/* Create parent thread */ /* Create parent thread */
JanetThread *parent = janet_make_thread(pair->original, encode); JanetThread *parent = janet_make_thread(pair->original, encode);
Janet parentv = janet_wrap_abstract(parent); Janet parentv = janet_wrap_abstract(parent);
/* Unmarshal the abstract registry */
if (pair->flags & JANET_THREAD_ABSTRACTS) {
Janet reg;
int status = janet_thread_receive(&reg, INFINITY);
if (status) goto error;
if (!janet_checktype(reg, JANET_TABLE)) goto error;
janet_gcunroot(janet_wrap_table(janet_vm_abstract_registry));
janet_vm_abstract_registry = janet_unwrap_table(reg);
janet_gcroot(janet_wrap_table(janet_vm_abstract_registry));
}
/* Unmarshal the normal registry */
if (pair->flags & JANET_THREAD_CFUNCTIONS) {
Janet reg;
int status = janet_thread_receive(&reg, INFINITY);
if (status) goto error;
if (!janet_checktype(reg, JANET_TABLE)) goto error;
janet_gcunroot(janet_wrap_table(janet_vm_registry));
janet_vm_registry = janet_unwrap_table(reg);
janet_gcroot(janet_wrap_table(janet_vm_registry));
}
/* Unmarshal the function */ /* Unmarshal the function */
Janet funcv; Janet funcv;
int status = janet_thread_receive(&funcv, INFINITY); int status = janet_thread_receive(&funcv, INFINITY);
if (status) goto error; if (status) goto error;
if (!janet_checktype(funcv, JANET_FUNCTION)) goto error; if (!janet_checktype(funcv, JANET_FUNCTION)) goto error;
JanetFunction *func = janet_unwrap_function(funcv); JanetFunction *func = janet_unwrap_function(funcv);
@@ -558,22 +593,40 @@ static Janet cfun_thread_current(int32_t argc, Janet *argv) {
} }
static Janet cfun_thread_new(int32_t argc, Janet *argv) { static Janet cfun_thread_new(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 2); janet_arity(argc, 1, 3);
/* Just type checking */ /* Just type checking */
janet_getfunction(argv, 0); janet_getfunction(argv, 0);
int32_t cap = janet_optinteger(argv, argc, 1, 10); int32_t cap = janet_optinteger(argv, argc, 1, 10);
if (cap < 1 || cap > UINT16_MAX) { if (cap < 1 || cap > UINT16_MAX) {
janet_panicf("bad slot #1, expected integer in range [1, 65535], got %d", cap); janet_panicf("bad slot #1, expected integer in range [1, 65535], got %d", cap);
} }
JanetTable *encode = janet_get_core_table("make-image-dict"); int flags = argc >= 3 ? janet_getflags(argv, 2, janet_thread_flags) : JANET_THREAD_ABSTRACTS;
JanetTable *encode;
if (flags & JANET_THREAD_HEAVYWEIGHT) {
encode = janet_get_core_table("make-image-dict");
} else {
encode = NULL;
}
JanetMailboxPair *pair = make_mailbox_pair(janet_vm_mailbox); JanetMailboxPair *pair = make_mailbox_pair(janet_vm_mailbox, flags);
JanetThread *thread = janet_make_thread(pair->newbox, encode); JanetThread *thread = janet_make_thread(pair->newbox, encode);
if (janet_thread_start_child(pair)) { if (janet_thread_start_child(pair)) {
destroy_mailbox_pair(pair); destroy_mailbox_pair(pair);
janet_panic("could not start thread"); janet_panic("could not start thread");
} }
if (flags & JANET_THREAD_ABSTRACTS) {
if (janet_thread_send(thread, janet_wrap_table(janet_vm_abstract_registry), INFINITY)) {
janet_panic("could not send abstract registry to thread");
}
}
if (flags & JANET_THREAD_CFUNCTIONS) {
if (janet_thread_send(thread, janet_wrap_table(janet_vm_registry), INFINITY)) {
janet_panic("could not send registry to thread");
}
}
/* If thread started, send the worker function. */ /* If thread started, send the worker function. */
if (janet_thread_send(thread, argv[0], INFINITY)) { if (janet_thread_send(thread, argv[0], INFINITY)) {
janet_panicf("could not send worker function %v to thread", argv[0]); janet_panicf("could not send worker function %v to thread", argv[0]);
@@ -638,10 +691,14 @@ static const JanetReg threadlib_cfuns[] = {
}, },
{ {
"thread/new", cfun_thread_new, "thread/new", cfun_thread_new,
JDOC("(thread/new func &opt capacity)\n\n" JDOC("(thread/new func &opt capacity flags)\n\n"
"Start a new thread that will start immediately. " "Start a new thread that will start immediately. "
"If capacity is provided, that is how many messages can be stored in the thread's mailbox before blocking senders. " "If capacity is provided, that is how many messages can be stored in the thread's mailbox before blocking senders. "
"The capacity must be between 1 and 65535 inclusive, and defaults to 10. " "The capacity must be between 1 and 65535 inclusive, and defaults to 10. "
"Can optionally provide flags to the new thread - supported flags are:\n"
"\t:h - Start a heavyweight thread. This loads the core environment by default, so may use more memory initially. Messages may compress better, though.\n"
"\t:a - Allow sending over registered abstract types to the new thread\n"
"\t:c - Send over cfunction information to the new thread.\n"
"Returns a handle to the new thread.") "Returns a handle to the new thread.")
}, },
{ {