From 97e5117a3fe09a0386c8389b18e22f6b081ef99b Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 15 Aug 2021 13:14:33 -0500 Subject: [PATCH] Fix some issues and improve channel closing. Still not fully working, seems to be deadlock/channel issue when sending events between threads. --- src/boot/boot.janet | 5 +++ src/core/ev.c | 105 +++++++++++++++++++++++++++----------------- src/core/vm.c | 2 +- src/include/janet.h | 1 + 4 files changed, 71 insertions(+), 42 deletions(-) diff --git a/src/boot/boot.janet b/src/boot/boot.janet index 3d008f8e..b86ee961 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -3369,6 +3369,11 @@ [& body] ~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t))) + (defmacro ev/spawn-thread + ``Run some code in a new thread. Returns a fiber that can be `` + [& body] + ~(,ev/thread (fiber/new (fn _thread [&] ,;body) :t) :n)) + (defmacro ev/with-deadline `Run a body of code with a deadline, such that if the code does not complete before the deadline is up, it will be canceled.` diff --git a/src/core/ev.c b/src/core/ev.c index 83a117c6..902b47f8 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -53,7 +53,8 @@ typedef struct { JANET_CP_MODE_READ, JANET_CP_MODE_WRITE, JANET_CP_MODE_CHOICE_READ, - JANET_CP_MODE_CHOICE_WRITE + JANET_CP_MODE_CHOICE_WRITE, + JANET_CP_MODE_CLOSE } mode; } JanetChannelPending; @@ -658,28 +659,6 @@ static void janet_chan_unlock(JanetChannel *chan) { * Janet Channel abstract type */ -static int janet_chanat_mark(void *p, size_t s); -static int janet_chanat_gc(void *p, size_t s); -static Janet janet_chanat_next(void *p, Janet key); -static int janet_chanat_get(void *p, Janet key, Janet *out); -static void janet_chanat_marshal(void *p, JanetMarshalContext *ctx); -static void *janet_chanat_unmarshal(JanetMarshalContext *ctx); - -static const JanetAbstractType ChannelAT = { - "core/channel", - janet_chanat_gc, - janet_chanat_mark, - janet_chanat_get, - NULL, /* put */ - janet_chanat_marshal, - janet_chanat_unmarshal, - NULL, /* tostring */ - NULL, /* compare */ - NULL, /* hash */ - janet_chanat_next, - JANET_ATEND_NEXT -}; - static Janet janet_wrap_channel(JanetChannel *channel) { if (janet_chan_is_threaded(channel)) { return janet_table_get(&janet_vm.channel_map, janet_wrap_pointer(channel)); @@ -848,10 +827,12 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { } else if (mode == JANET_CP_MODE_READ) { janet_assert(!janet_chan_unpack(channel, &x), "packing error"); janet_schedule(fiber, x); - } else { /* MODE_WRITE */ + } else if (mode == JANET_CP_MODE_WRITE) { janet_schedule(fiber, janet_wrap_channel(channel)); + } else { /* (mode == JANET_CP_MODE_CLOSE) */ + janet_schedule(fiber, janet_wrap_nil()); } - } else { + } else if (mode != JANET_CP_MODE_CLOSE) { /* Fiber has already been cancelled or resumed. */ /* Resend event to another waiting thread, depending on mode */ int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ); @@ -902,9 +883,14 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { janet_panic("cannot write to closed channel"); } int is_threaded = janet_chan_is_threaded(channel); - do { + if (is_threaded) { + /* don't dereference fiber from another thread */ is_empty = janet_q_pop(&channel->read_pending, &reader, sizeof(reader)); - } while (!is_empty && (reader.sched_id != reader.fiber->sched_id)); + } else { + do { + is_empty = janet_q_pop(&channel->read_pending, &reader, sizeof(reader)); + } while (!is_empty && (reader.sched_id != reader.fiber->sched_id)); + } if (is_empty) { /* No pending reader */ if (janet_q_push(&channel->items, &x, sizeof(Janet))) { @@ -963,7 +949,7 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) *item = janet_wrap_nil(); return 1; } - int is_threaded = channel->ref_count >= 0; + int is_threaded = janet_chan_is_threaded(channel); if (janet_q_pop(&channel->items, item, sizeof(Janet))) { /* Queue empty */ JanetChannelPending pending; @@ -1002,8 +988,8 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) return 1; } -static JanetChannel *janet_getchannel(const Janet *argv, int32_t n) { - void *p = janet_getabstract(argv, n, &ChannelAT); +JanetChannel *janet_getchannel(const Janet *argv, int32_t n) { + void *p = janet_getabstract(argv, n, &janet_channel_type); /* Rely on Janet's abstract type size tracking and the fact that a channel will * surely be bigger than a pointer to a channel */ if (janet_abstract_size(p) == sizeof(JanetChannel *)) { @@ -1152,7 +1138,7 @@ JANET_CORE_FN(cfun_channel_new, "blocking writers, defaults to 0 if not provided. Returns a new channel.") { janet_arity(argc, 0, 1); int32_t limit = janet_optnat(argv, argc, 0, 0); - JanetChannel *channel = janet_abstract(&ChannelAT, sizeof(JanetChannel)); + JanetChannel *channel = janet_abstract(&janet_channel_type, sizeof(JanetChannel)); janet_chan_init(channel, limit, 0); return janet_wrap_abstract(channel); } @@ -1165,7 +1151,7 @@ JANET_CORE_FN(cfun_channel_new_threaded, int32_t limit = janet_optnat(argv, argc, 0, 0); JanetChannel *tchan = janet_malloc(sizeof(JanetChannel)); janet_chan_init(tchan, limit, 1); - JanetChannel **wrap = janet_abstract(&ChannelAT, sizeof(JanetChannel *)); + JanetChannel **wrap = janet_abstract(&janet_channel_type, sizeof(JanetChannel *)); *wrap = tchan; Janet ret = janet_wrap_abstract(wrap); janet_table_put(&janet_vm.channel_map, janet_wrap_pointer(tchan), ret); @@ -1184,7 +1170,14 @@ JANET_CORE_FN(cfun_channel_close, JanetChannelPending writer; while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { if (writer.thread != &janet_vm) { - /* TODO - post message */ + JanetVM *vm = writer.thread; + JanetEVGenericMessage msg; + msg.fiber = writer.fiber; + msg.argp = channel; + msg.tag = JANET_CP_MODE_CLOSE; + msg.argi = (int32_t) writer.sched_id; + msg.argj = janet_wrap_nil(); + janet_ev_post_event(vm, janet_thread_chan_cb, msg); } else { if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { janet_schedule(writer.fiber, janet_wrap_nil()); @@ -1196,7 +1189,14 @@ JANET_CORE_FN(cfun_channel_close, JanetChannelPending reader; while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) { if (reader.thread != &janet_vm) { - /* TODO - post message */ + JanetVM *vm = reader.thread; + JanetEVGenericMessage msg; + msg.fiber = reader.fiber; + msg.argp = channel; + msg.tag = JANET_CP_MODE_CLOSE; + msg.argi = (int32_t) reader.sched_id; + msg.argj = janet_wrap_nil(); + janet_ev_post_event(vm, janet_thread_chan_cb, msg); } else { if (reader.mode == JANET_CP_MODE_CHOICE_READ) { janet_schedule(reader.fiber, janet_wrap_nil()); @@ -1233,6 +1233,21 @@ static Janet janet_chanat_next(void *p, Janet key) { return janet_nextmethod(ev_chanat_methods, key); } +const JanetAbstractType janet_channel_type = { + "core/channel", + janet_chanat_gc, + janet_chanat_mark, + janet_chanat_get, + NULL, /* put */ + janet_chanat_marshal, + janet_chanat_unmarshal, + NULL, /* tostring */ + NULL, /* compare */ + NULL, /* hash */ + janet_chanat_next, + JANET_ATEND_NEXT +}; + /* Main event loop */ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout); @@ -1778,6 +1793,7 @@ void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGenericMessage ms "failed to post completion event"); #else JanetSelfPipeEvent event; + memset(&event, 0, sizeof(event)); event.msg = msg; event.cb = cb; int fd = vm->selfpipe[1]; @@ -1826,6 +1842,7 @@ static void *janet_thread_body(void *ptr) { int fd = init->write_pipe; janet_free(init); JanetSelfPipeEvent response; + memset(&response, 0, sizeof(response)); response.msg = subr(msg); response.cb = cb; /* handle a bit of back pressure before giving up. */ @@ -2514,7 +2531,7 @@ JANET_CORE_FN(cfun_ev_go, janet_arity(argc, 1, 3); JanetFiber *fiber = janet_getfiber(argv, 0); Janet value = argc >= 2 ? argv[1] : janet_wrap_nil(); - JanetChannel *supervisor_channel = janet_optabstract(argv, argc, 2, &ChannelAT, + JanetChannel *supervisor_channel = janet_optabstract(argv, argc, 2, &janet_channel_type, janet_vm.root_fiber->supervisor_channel); fiber->supervisor_channel = supervisor_channel; janet_schedule(fiber, value); @@ -2534,7 +2551,7 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) { if (!signal) { /* Set abstract registry */ - if (flags & 0x2) { + if (!(flags & 0x2)) { Janet aregv = janet_unmarshal(nextbytes, endbytes - nextbytes, JANET_MARSHAL_UNSAFE, NULL, &nextbytes); if (!janet_checktype(aregv, JANET_TABLE)) janet_panic("expected table for abstract registry"); @@ -2542,7 +2559,7 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) { } /* Set cfunction registry */ - if (flags & 0x4) { + if (!(flags & 0x4)) { uint32_t count1; memcpy(&count1, nextbytes, sizeof(count1)); size_t count = (size_t) count1; @@ -2556,14 +2573,16 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) { JANET_OUT_OF_MEMORY; } janet_vm.registry_dirty = 1; + nextbytes += sizeof(uint32_t); memcpy(janet_vm.registry, nextbytes, count * sizeof(JanetCFunRegistry)); + nextbytes += count * sizeof(JanetCFunRegistry); } Janet fiberv = janet_unmarshal(nextbytes, endbytes - nextbytes, JANET_MARSHAL_UNSAFE, NULL, &nextbytes); Janet value = janet_unmarshal(nextbytes, endbytes - nextbytes, JANET_MARSHAL_UNSAFE, NULL, &nextbytes); - if (!janet_checktype(fiberv, JANET_FIBER)) janet_panic("expected fiber"); + if (!janet_checktype(fiberv, JANET_FIBER)) janet_panicf("expected fiber, got %v", fiberv); JanetFiber *fiber = janet_unwrap_fiber(fiberv); janet_schedule(fiber, value); janet_loop(); @@ -2578,6 +2597,7 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) { } } janet_buffer_deinit(buffer); + janet_free(buffer); janet_restore(&tstate); janet_deinit(); return args; @@ -2606,8 +2626,10 @@ JANET_CORE_FN(cfun_ev_thread, JANET_OUT_OF_MEMORY; } janet_buffer_init(buffer, 0); - if (flags & 0x2) janet_marshal(buffer, janet_wrap_table(janet_vm.abstract_registry), NULL, JANET_MARSHAL_UNSAFE); - if (flags & 0x4) { + if (!(flags & 0x2)) { + janet_marshal(buffer, janet_wrap_table(janet_vm.abstract_registry), NULL, JANET_MARSHAL_UNSAFE); + } + if (!(flags & 0x4)) { janet_assert(janet_vm.registry_count <= UINT32_MAX, "assert failed size check"); uint32_t temp = (uint32_t) janet_vm.registry_count; janet_buffer_push_bytes(buffer, (uint8_t *) &temp, sizeof(temp)); @@ -2618,7 +2640,7 @@ JANET_CORE_FN(cfun_ev_thread, if (flags & 0x1) { /* Return immediately */ JanetEVGenericMessage arguments; - arguments.tag = (uint32_t) flags;; + arguments.tag = (uint32_t) flags; arguments.argi = argc; arguments.argp = buffer; arguments.fiber = NULL; @@ -2788,6 +2810,7 @@ void janet_lib_ev(JanetTable *env) { janet_core_cfuns_ext(env, NULL, ev_cfuns_ext); janet_register_abstract_type(&janet_stream_type); + janet_register_abstract_type(&janet_channel_type); } #endif diff --git a/src/core/vm.c b/src/core/vm.c index c2c6cb35..27e200c1 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -1576,7 +1576,6 @@ void janet_deinit(void) { janet_vm.roots = NULL; janet_vm.root_count = 0; janet_vm.root_capacity = 0; - janet_vm.registry = NULL; janet_vm.abstract_registry = NULL; janet_vm.core_env = NULL; janet_vm.top_dyns = NULL; @@ -1584,6 +1583,7 @@ void janet_deinit(void) { janet_vm.fiber = NULL; janet_vm.root_fiber = NULL; janet_free(janet_vm.registry); + janet_vm.registry = NULL; #ifdef JANET_THREADS janet_threads_deinit(); #endif diff --git a/src/include/janet.h b/src/include/janet.h index bf460484..d4e904da 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1287,6 +1287,7 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT]; #ifdef JANET_EV extern JANET_API const JanetAbstractType janet_stream_type; +extern JANET_API const JanetAbstractType janet_channel_type; /* Run the event loop */ JANET_API void janet_loop(void);