Working example for threaded channels.

Still no marshalling more complex values.
This commit is contained in:
Calvin Rose 2021-08-15 15:25:07 -05:00
parent dea4906144
commit b75b3e3984
3 changed files with 35 additions and 8 deletions

View File

@ -0,0 +1,22 @@
(def chan (ev/thread-chan 10))
(ev/spawn
(ev/sleep 0)
(print "started fiber!")
(ev/give chan (math/random))
(ev/give chan (math/random))
(ev/give chan (math/random))
(ev/sleep 0.5)
(for i 0 10
(print "giving to channel...")
(ev/give chan (math/random))
(ev/sleep 1))
(print "finished fiber!")
(:close chan))
(ev/do-thread
(print "started thread!")
(ev/sleep 1)
(while (def x (do (print "taking from channel...") (ev/take chan)))
(print "got " x " from thread!"))
(print "finished thread!"))

View File

@ -818,6 +818,7 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
int mode = msg.tag; int mode = msg.tag;
JanetChannel *channel = (JanetChannel *) msg.argp; JanetChannel *channel = (JanetChannel *) msg.argp;
Janet x = msg.argj; Janet x = msg.argj;
janet_ev_dec_refcount();
if (fiber->sched_id == sched_id) { if (fiber->sched_id == sched_id) {
if (mode == JANET_CP_MODE_CHOICE_READ) { if (mode == JANET_CP_MODE_CHOICE_READ) {
janet_assert(!janet_chan_unpack(channel, &x), "packing error"); janet_assert(!janet_chan_unpack(channel, &x), "packing error");
@ -911,6 +912,7 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) {
janet_q_push(&channel->write_pending, &pending, sizeof(pending)); janet_q_push(&channel->write_pending, &pending, sizeof(pending));
janet_chan_unlock(channel); janet_chan_unlock(channel);
if (is_threaded) { if (is_threaded) {
janet_ev_inc_refcount();
janet_gcroot(janet_wrap_fiber(pending.fiber)); janet_gcroot(janet_wrap_fiber(pending.fiber));
} }
return 1; return 1;
@ -960,6 +962,7 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
janet_q_push(&channel->read_pending, &pending, sizeof(pending)); janet_q_push(&channel->read_pending, &pending, sizeof(pending));
janet_chan_unlock(channel); janet_chan_unlock(channel);
if (is_threaded) { if (is_threaded) {
janet_ev_inc_refcount();
janet_gcroot(janet_wrap_fiber(pending.fiber)); janet_gcroot(janet_wrap_fiber(pending.fiber));
} }
return 0; return 0;
@ -1805,10 +1808,10 @@ void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGenericMessage ms
status = write(fd, &event, sizeof(event)); status = write(fd, &event, sizeof(event));
} while (status == -1 && errno == EINTR); } while (status == -1 && errno == EINTR);
if (status > 0) break; if (status > 0) break;
sleep(1); sleep(0);
tries--; tries--;
} }
janet_assert(tries > 0, "failed to write event to self-pipe"); //janet_assert(tries > 0, "failed to write event to self-pipe");
#endif #endif
} }
@ -1934,6 +1937,7 @@ void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) {
JANET_NO_RETURN JANET_NO_RETURN
void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) { void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) {
JanetEVGenericMessage arguments; JanetEVGenericMessage arguments;
memset(&arguments, 0, sizeof(arguments));
arguments.tag = tag; arguments.tag = tag;
arguments.argi = argi; arguments.argi = argi;
arguments.argp = argp; arguments.argp = argp;
@ -2640,6 +2644,7 @@ JANET_CORE_FN(cfun_ev_thread,
if (flags & 0x1) { if (flags & 0x1) {
/* Return immediately */ /* Return immediately */
JanetEVGenericMessage arguments; JanetEVGenericMessage arguments;
memset(&arguments, 0, sizeof(arguments));
arguments.tag = (uint32_t) flags; arguments.tag = (uint32_t) flags;
arguments.argi = argc; arguments.argi = argc;
arguments.argp = buffer; arguments.argp = buffer;

View File

@ -2147,13 +2147,13 @@ void janet_lib_os(JanetTable *env) {
* in the thread tree sets up the critical section. */ * in the thread tree sets up the critical section. */
static volatile long env_lock_initializing = 0; static volatile long env_lock_initializing = 0;
static volatile long env_lock_initialized = 0; static volatile long env_lock_initialized = 0;
if(!InterlockedExchange(&env_lock_initializing, 1)){ if (!InterlockedExchange(&env_lock_initializing, 1)) {
InitializeCriticalSection(&env_lock); InitializeCriticalSection(&env_lock);
InterlockedOr(&env_lock_initialized, 1); InterlockedOr(&env_lock_initialized, 1);
} else { } else {
while (!InterlockedOr(&env_lock_initialized, 0)) { while (!InterlockedOr(&env_lock_initialized, 0)) {
Sleep(0); Sleep(0);
} }
} }
#endif #endif