1
0
mirror of https://github.com/janet-lang/janet synced 2024-12-26 08:20:27 +00:00

Fix some issues and improve channel closing.

Still not fully working, seems to be deadlock/channel issue when
sending events between threads.
This commit is contained in:
Calvin Rose 2021-08-15 13:14:33 -05:00
parent 037215f7c4
commit 97e5117a3f
4 changed files with 71 additions and 42 deletions

View File

@ -3369,6 +3369,11 @@
[& body] [& body]
~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t))) ~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t)))
(defmacro ev/spawn-thread
``Run some code in a new thread. Returns a fiber that can be ``
[& body]
~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t) :n))
(defmacro ev/with-deadline (defmacro ev/with-deadline
`Run a body of code with a deadline, such that if the code does not complete before `Run a body of code with a deadline, such that if the code does not complete before
the deadline is up, it will be canceled.` the deadline is up, it will be canceled.`

View File

@ -53,7 +53,8 @@ typedef struct {
JANET_CP_MODE_READ, JANET_CP_MODE_READ,
JANET_CP_MODE_WRITE, JANET_CP_MODE_WRITE,
JANET_CP_MODE_CHOICE_READ, JANET_CP_MODE_CHOICE_READ,
JANET_CP_MODE_CHOICE_WRITE JANET_CP_MODE_CHOICE_WRITE,
JANET_CP_MODE_CLOSE
} mode; } mode;
} JanetChannelPending; } JanetChannelPending;
@ -658,28 +659,6 @@ static void janet_chan_unlock(JanetChannel *chan) {
* Janet Channel abstract type * Janet Channel abstract type
*/ */
static int janet_chanat_mark(void *p, size_t s);
static int janet_chanat_gc(void *p, size_t s);
static Janet janet_chanat_next(void *p, Janet key);
static int janet_chanat_get(void *p, Janet key, Janet *out);
static void janet_chanat_marshal(void *p, JanetMarshalContext *ctx);
static void *janet_chanat_unmarshal(JanetMarshalContext *ctx);
static const JanetAbstractType ChannelAT = {
"core/channel",
janet_chanat_gc,
janet_chanat_mark,
janet_chanat_get,
NULL, /* put */
janet_chanat_marshal,
janet_chanat_unmarshal,
NULL, /* tostring */
NULL, /* compare */
NULL, /* hash */
janet_chanat_next,
JANET_ATEND_NEXT
};
static Janet janet_wrap_channel(JanetChannel *channel) { static Janet janet_wrap_channel(JanetChannel *channel) {
if (janet_chan_is_threaded(channel)) { if (janet_chan_is_threaded(channel)) {
return janet_table_get(&janet_vm.channel_map, janet_wrap_pointer(channel)); return janet_table_get(&janet_vm.channel_map, janet_wrap_pointer(channel));
@ -848,10 +827,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
} else if (mode == JANET_CP_MODE_READ) { } else if (mode == JANET_CP_MODE_READ) {
janet_assert(!janet_chan_unpack(channel, &x), "packing error"); janet_assert(!janet_chan_unpack(channel, &x), "packing error");
janet_schedule(fiber, x); janet_schedule(fiber, x);
} else { /* MODE_WRITE */ } else if (mode == JANET_CP_MODE_WRITE) {
janet_schedule(fiber, janet_wrap_channel(channel)); janet_schedule(fiber, janet_wrap_channel(channel));
} else { /* (mode == JANET_CP_MODE_CLOSE) */
janet_schedule(fiber, janet_wrap_nil());
} }
} else { } else if (mode != JANET_CP_MODE_CLOSE) {
/* Fiber has already been cancelled or resumed. */ /* Fiber has already been cancelled or resumed. */
/* Resend event to another waiting thread, depending on mode */ /* Resend event to another waiting thread, depending on mode */
int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ); int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ);
@ -902,9 +883,14 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) {
janet_panic("cannot write to closed channel"); janet_panic("cannot write to closed channel");
} }
int is_threaded = janet_chan_is_threaded(channel); int is_threaded = janet_chan_is_threaded(channel);
do { if (is_threaded) {
/* don't dereference fiber from another thread */
is_empty = janet_q_pop(&channel->read_pending, &reader, sizeof(reader)); is_empty = janet_q_pop(&channel->read_pending, &reader, sizeof(reader));
} while (!is_empty && (reader.sched_id != reader.fiber->sched_id)); } else {
do {
is_empty = janet_q_pop(&channel->read_pending, &reader, sizeof(reader));
} while (!is_empty && (reader.sched_id != reader.fiber->sched_id));
}
if (is_empty) { if (is_empty) {
/* No pending reader */ /* No pending reader */
if (janet_q_push(&channel->items, &x, sizeof(Janet))) { if (janet_q_push(&channel->items, &x, sizeof(Janet))) {
@ -963,7 +949,7 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
*item = janet_wrap_nil(); *item = janet_wrap_nil();
return 1; return 1;
} }
int is_threaded = channel->ref_count >= 0; int is_threaded = janet_chan_is_threaded(channel);
if (janet_q_pop(&channel->items, item, sizeof(Janet))) { if (janet_q_pop(&channel->items, item, sizeof(Janet))) {
/* Queue empty */ /* Queue empty */
JanetChannelPending pending; JanetChannelPending pending;
@ -1002,8 +988,8 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
return 1; return 1;
} }
static JanetChannel *janet_getchannel(const Janet *argv, int32_t n) { JanetChannel *janet_getchannel(const Janet *argv, int32_t n) {
void *p = janet_getabstract(argv, n, &ChannelAT); void *p = janet_getabstract(argv, n, &janet_channel_type);
/* Rely on Janet's abstract type size tracking and the fact that a channel will /* Rely on Janet's abstract type size tracking and the fact that a channel will
* surely be bigger than a pointer to a channel */ * surely be bigger than a pointer to a channel */
if (janet_abstract_size(p) == sizeof(JanetChannel *)) { if (janet_abstract_size(p) == sizeof(JanetChannel *)) {
@ -1152,7 +1138,7 @@ JANET_CORE_FN(cfun_channel_new,
"blocking writers, defaults to 0 if not provided. Returns a new channel.") { "blocking writers, defaults to 0 if not provided. Returns a new channel.") {
janet_arity(argc, 0, 1); janet_arity(argc, 0, 1);
int32_t limit = janet_optnat(argv, argc, 0, 0); int32_t limit = janet_optnat(argv, argc, 0, 0);
JanetChannel *channel = janet_abstract(&ChannelAT, sizeof(JanetChannel)); JanetChannel *channel = janet_abstract(&janet_channel_type, sizeof(JanetChannel));
janet_chan_init(channel, limit, 0); janet_chan_init(channel, limit, 0);
return janet_wrap_abstract(channel); return janet_wrap_abstract(channel);
} }
@ -1165,7 +1151,7 @@ JANET_CORE_FN(cfun_channel_new_threaded,
int32_t limit = janet_optnat(argv, argc, 0, 0); int32_t limit = janet_optnat(argv, argc, 0, 0);
JanetChannel *tchan = janet_malloc(sizeof(JanetChannel)); JanetChannel *tchan = janet_malloc(sizeof(JanetChannel));
janet_chan_init(tchan, limit, 1); janet_chan_init(tchan, limit, 1);
JanetChannel **wrap = janet_abstract(&ChannelAT, sizeof(JanetChannel *)); JanetChannel **wrap = janet_abstract(&janet_channel_type, sizeof(JanetChannel *));
*wrap = tchan; *wrap = tchan;
Janet ret = janet_wrap_abstract(wrap); Janet ret = janet_wrap_abstract(wrap);
janet_table_put(&janet_vm.channel_map, janet_wrap_pointer(tchan), ret); janet_table_put(&janet_vm.channel_map, janet_wrap_pointer(tchan), ret);
@ -1184,7 +1170,14 @@ JANET_CORE_FN(cfun_channel_close,
JanetChannelPending writer; JanetChannelPending writer;
while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
if (writer.thread != &janet_vm) { if (writer.thread != &janet_vm) {
/* TODO - post message */ JanetVM *vm = writer.thread;
JanetEVGenericMessage msg;
msg.fiber = writer.fiber;
msg.argp = channel;
msg.tag = JANET_CP_MODE_CLOSE;
msg.argi = (int32_t) writer.sched_id;
msg.argj = janet_wrap_nil();
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
} else { } else {
if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) {
janet_schedule(writer.fiber, janet_wrap_nil()); janet_schedule(writer.fiber, janet_wrap_nil());
@ -1196,7 +1189,14 @@ JANET_CORE_FN(cfun_channel_close,
JanetChannelPending reader; JanetChannelPending reader;
while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) { while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) {
if (reader.thread != &janet_vm) { if (reader.thread != &janet_vm) {
/* TODO - post message */ JanetVM *vm = reader.thread;
JanetEVGenericMessage msg;
msg.fiber = reader.fiber;
msg.argp = channel;
msg.tag = JANET_CP_MODE_CLOSE;
msg.argi = (int32_t) reader.sched_id;
msg.argj = janet_wrap_nil();
janet_ev_post_event(vm, janet_thread_chan_cb, msg);
} else { } else {
if (reader.mode == JANET_CP_MODE_CHOICE_READ) { if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
janet_schedule(reader.fiber, janet_wrap_nil()); janet_schedule(reader.fiber, janet_wrap_nil());
@ -1233,6 +1233,21 @@ static Janet janet_chanat_next(void *p, Janet key) {
return janet_nextmethod(ev_chanat_methods, key); return janet_nextmethod(ev_chanat_methods, key);
} }
const JanetAbstractType janet_channel_type = {
"core/channel",
janet_chanat_gc,
janet_chanat_mark,
janet_chanat_get,
NULL, /* put */
janet_chanat_marshal,
janet_chanat_unmarshal,
NULL, /* tostring */
NULL, /* compare */
NULL, /* hash */
janet_chanat_next,
JANET_ATEND_NEXT
};
/* Main event loop */ /* Main event loop */
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout); void janet_loop1_impl(int has_timeout, JanetTimestamp timeout);
@ -1778,6 +1793,7 @@ void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGenericMessage ms
"failed to post completion event"); "failed to post completion event");
#else #else
JanetSelfPipeEvent event; JanetSelfPipeEvent event;
memset(&event, 0, sizeof(event));
event.msg = msg; event.msg = msg;
event.cb = cb; event.cb = cb;
int fd = vm->selfpipe[1]; int fd = vm->selfpipe[1];
@ -1826,6 +1842,7 @@ static void *janet_thread_body(void *ptr) {
int fd = init->write_pipe; int fd = init->write_pipe;
janet_free(init); janet_free(init);
JanetSelfPipeEvent response; JanetSelfPipeEvent response;
memset(&response, 0, sizeof(response));
response.msg = subr(msg); response.msg = subr(msg);
response.cb = cb; response.cb = cb;
/* handle a bit of back pressure before giving up. */ /* handle a bit of back pressure before giving up. */
@ -2514,7 +2531,7 @@ JANET_CORE_FN(cfun_ev_go,
janet_arity(argc, 1, 3); janet_arity(argc, 1, 3);
JanetFiber *fiber = janet_getfiber(argv, 0); JanetFiber *fiber = janet_getfiber(argv, 0);
Janet value = argc >= 2 ? argv[1] : janet_wrap_nil(); Janet value = argc >= 2 ? argv[1] : janet_wrap_nil();
JanetChannel *supervisor_channel = janet_optabstract(argv, argc, 2, &ChannelAT, JanetChannel *supervisor_channel = janet_optabstract(argv, argc, 2, &janet_channel_type,
janet_vm.root_fiber->supervisor_channel); janet_vm.root_fiber->supervisor_channel);
fiber->supervisor_channel = supervisor_channel; fiber->supervisor_channel = supervisor_channel;
janet_schedule(fiber, value); janet_schedule(fiber, value);
@ -2534,7 +2551,7 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
if (!signal) { if (!signal) {
/* Set abstract registry */ /* Set abstract registry */
if (flags & 0x2) { if (!(flags & 0x2)) {
Janet aregv = janet_unmarshal(nextbytes, endbytes - nextbytes, Janet aregv = janet_unmarshal(nextbytes, endbytes - nextbytes,
JANET_MARSHAL_UNSAFE, NULL, &nextbytes); JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
if (!janet_checktype(aregv, JANET_TABLE)) janet_panic("expected table for abstract registry"); if (!janet_checktype(aregv, JANET_TABLE)) janet_panic("expected table for abstract registry");
@ -2542,7 +2559,7 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
} }
/* Set cfunction registry */ /* Set cfunction registry */
if (flags & 0x4) { if (!(flags & 0x4)) {
uint32_t count1; uint32_t count1;
memcpy(&count1, nextbytes, sizeof(count1)); memcpy(&count1, nextbytes, sizeof(count1));
size_t count = (size_t) count1; size_t count = (size_t) count1;
@ -2556,14 +2573,16 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
JANET_OUT_OF_MEMORY; JANET_OUT_OF_MEMORY;
} }
janet_vm.registry_dirty = 1; janet_vm.registry_dirty = 1;
nextbytes += sizeof(uint32_t);
memcpy(janet_vm.registry, nextbytes, count * sizeof(JanetCFunRegistry)); memcpy(janet_vm.registry, nextbytes, count * sizeof(JanetCFunRegistry));
nextbytes += count * sizeof(JanetCFunRegistry);
} }
Janet fiberv = janet_unmarshal(nextbytes, endbytes - nextbytes, Janet fiberv = janet_unmarshal(nextbytes, endbytes - nextbytes,
JANET_MARSHAL_UNSAFE, NULL, &nextbytes); JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
Janet value = janet_unmarshal(nextbytes, endbytes - nextbytes, Janet value = janet_unmarshal(nextbytes, endbytes - nextbytes,
JANET_MARSHAL_UNSAFE, NULL, &nextbytes); JANET_MARSHAL_UNSAFE, NULL, &nextbytes);
if (!janet_checktype(fiberv, JANET_FIBER)) janet_panic("expected fiber"); if (!janet_checktype(fiberv, JANET_FIBER)) janet_panicf("expected fiber, got %v", fiberv);
JanetFiber *fiber = janet_unwrap_fiber(fiberv); JanetFiber *fiber = janet_unwrap_fiber(fiberv);
janet_schedule(fiber, value); janet_schedule(fiber, value);
janet_loop(); janet_loop();
@ -2578,6 +2597,7 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
} }
} }
janet_buffer_deinit(buffer); janet_buffer_deinit(buffer);
janet_free(buffer);
janet_restore(&tstate); janet_restore(&tstate);
janet_deinit(); janet_deinit();
return args; return args;
@ -2606,8 +2626,10 @@ JANET_CORE_FN(cfun_ev_thread,
JANET_OUT_OF_MEMORY; JANET_OUT_OF_MEMORY;
} }
janet_buffer_init(buffer, 0); janet_buffer_init(buffer, 0);
if (flags & 0x2) janet_marshal(buffer, janet_wrap_table(janet_vm.abstract_registry), NULL, JANET_MARSHAL_UNSAFE); if (!(flags & 0x2)) {
if (flags & 0x4) { janet_marshal(buffer, janet_wrap_table(janet_vm.abstract_registry), NULL, JANET_MARSHAL_UNSAFE);
}
if (!(flags & 0x4)) {
janet_assert(janet_vm.registry_count <= UINT32_MAX, "assert failed size check"); janet_assert(janet_vm.registry_count <= UINT32_MAX, "assert failed size check");
uint32_t temp = (uint32_t) janet_vm.registry_count; uint32_t temp = (uint32_t) janet_vm.registry_count;
janet_buffer_push_bytes(buffer, (uint8_t *) &temp, sizeof(temp)); janet_buffer_push_bytes(buffer, (uint8_t *) &temp, sizeof(temp));
@ -2618,7 +2640,7 @@ JANET_CORE_FN(cfun_ev_thread,
if (flags & 0x1) { if (flags & 0x1) {
/* Return immediately */ /* Return immediately */
JanetEVGenericMessage arguments; JanetEVGenericMessage arguments;
arguments.tag = (uint32_t) flags;; arguments.tag = (uint32_t) flags;
arguments.argi = argc; arguments.argi = argc;
arguments.argp = buffer; arguments.argp = buffer;
arguments.fiber = NULL; arguments.fiber = NULL;
@ -2788,6 +2810,7 @@ void janet_lib_ev(JanetTable *env) {
janet_core_cfuns_ext(env, NULL, ev_cfuns_ext); janet_core_cfuns_ext(env, NULL, ev_cfuns_ext);
janet_register_abstract_type(&janet_stream_type); janet_register_abstract_type(&janet_stream_type);
janet_register_abstract_type(&janet_channel_type);
} }
#endif #endif

View File

@ -1576,7 +1576,6 @@ void janet_deinit(void) {
janet_vm.roots = NULL; janet_vm.roots = NULL;
janet_vm.root_count = 0; janet_vm.root_count = 0;
janet_vm.root_capacity = 0; janet_vm.root_capacity = 0;
janet_vm.registry = NULL;
janet_vm.abstract_registry = NULL; janet_vm.abstract_registry = NULL;
janet_vm.core_env = NULL; janet_vm.core_env = NULL;
janet_vm.top_dyns = NULL; janet_vm.top_dyns = NULL;
@ -1584,6 +1583,7 @@ void janet_deinit(void) {
janet_vm.fiber = NULL; janet_vm.fiber = NULL;
janet_vm.root_fiber = NULL; janet_vm.root_fiber = NULL;
janet_free(janet_vm.registry); janet_free(janet_vm.registry);
janet_vm.registry = NULL;
#ifdef JANET_THREADS #ifdef JANET_THREADS
janet_threads_deinit(); janet_threads_deinit();
#endif #endif

View File

@ -1287,6 +1287,7 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT];
#ifdef JANET_EV #ifdef JANET_EV
extern JANET_API const JanetAbstractType janet_stream_type; extern JANET_API const JanetAbstractType janet_stream_type;
extern JANET_API const JanetAbstractType janet_channel_type;
/* Run the event loop */ /* Run the event loop */
JANET_API void janet_loop(void); JANET_API void janet_loop(void);