diff --git a/examples/threaded-channels.janet b/examples/threaded-channels.janet new file mode 100644 index 00000000..c4e1bc1d --- /dev/null +++ b/examples/threaded-channels.janet @@ -0,0 +1,22 @@ +(def chan (ev/thread-chan 10)) + +(ev/spawn + (ev/sleep 0) + (print "started fiber!") + (ev/give chan (math/random)) + (ev/give chan (math/random)) + (ev/give chan (math/random)) + (ev/sleep 0.5) + (for i 0 10 + (print "giving to channel...") + (ev/give chan (math/random)) + (ev/sleep 1)) + (print "finished fiber!") + (:close chan)) + +(ev/do-thread + (print "started thread!") + (ev/sleep 1) + (while (def x (do (print "taking from channel...") (ev/take chan))) + (print "got " x " from thread!")) + (print "finished thread!")) diff --git a/src/core/ev.c b/src/core/ev.c index 902b47f8..698a0b78 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -818,6 +818,7 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { int mode = msg.tag; JanetChannel *channel = (JanetChannel *) msg.argp; Janet x = msg.argj; + janet_ev_dec_refcount(); if (fiber->sched_id == sched_id) { if (mode == JANET_CP_MODE_CHOICE_READ) { janet_assert(!janet_chan_unpack(channel, &x), "packing error"); @@ -911,6 +912,7 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { janet_q_push(&channel->write_pending, &pending, sizeof(pending)); janet_chan_unlock(channel); if (is_threaded) { + janet_ev_inc_refcount(); janet_gcroot(janet_wrap_fiber(pending.fiber)); } return 1; @@ -960,6 +962,7 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) janet_q_push(&channel->read_pending, &pending, sizeof(pending)); janet_chan_unlock(channel); if (is_threaded) { + janet_ev_inc_refcount(); janet_gcroot(janet_wrap_fiber(pending.fiber)); } return 0; @@ -1805,10 +1808,10 @@ void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGenericMessage ms status = write(fd, &event, sizeof(event)); } while (status == -1 && errno == EINTR); if (status > 0) break; - sleep(1); + sleep(0); tries--; } - janet_assert(tries > 0, "failed to write event to self-pipe"); + //janet_assert(tries > 0, "failed to write event to self-pipe"); #endif } @@ -1934,6 +1937,7 @@ void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) { JANET_NO_RETURN void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) { JanetEVGenericMessage arguments; + memset(&arguments, 0, sizeof(arguments)); arguments.tag = tag; arguments.argi = argi; arguments.argp = argp; @@ -2640,6 +2644,7 @@ JANET_CORE_FN(cfun_ev_thread, if (flags & 0x1) { /* Return immediately */ JanetEVGenericMessage arguments; + memset(&arguments, 0, sizeof(arguments)); arguments.tag = (uint32_t) flags; arguments.argi = argc; arguments.argp = buffer; diff --git a/src/core/os.c b/src/core/os.c index a712b413..0128a568 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -2147,13 +2147,13 @@ void janet_lib_os(JanetTable *env) { * in the thread tree sets up the critical section. */ static volatile long env_lock_initializing = 0; static volatile long env_lock_initialized = 0; - if(!InterlockedExchange(&env_lock_initializing, 1)){ - InitializeCriticalSection(&env_lock); - InterlockedOr(&env_lock_initialized, 1); + if (!InterlockedExchange(&env_lock_initializing, 1)) { + InitializeCriticalSection(&env_lock); + InterlockedOr(&env_lock_initialized, 1); } else { - while (!InterlockedOr(&env_lock_initialized, 0)) { - Sleep(0); - } + while (!InterlockedOr(&env_lock_initialized, 0)) { + Sleep(0); + } } #endif