diff --git a/src/core/ev.c b/src/core/ev.c index 2183bef9..3243d304 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -672,19 +672,6 @@ static void janet_chan_init(JanetChannel *chan, int32_t limit, int threaded) { janet_os_mutex_init((JanetOSMutex *) &chan->lock); } -static void janet_chan_deinit(JanetChannel *chan) { - janet_q_deinit(&chan->read_pending); - janet_q_deinit(&chan->write_pending); - if (janet_chan_is_threaded(chan)) { - Janet item; - while (!janet_q_pop(&chan->items, &item, sizeof(item))) { - janet_chan_unpack(chan, &item, 1); - } - } - janet_q_deinit(&chan->items); - janet_os_mutex_deinit((JanetOSMutex *) &chan->lock); -} - static void janet_chan_lock(JanetChannel *chan) { if (!janet_chan_is_threaded(chan)) return; janet_os_mutex_lock((JanetOSMutex *) &chan->lock); @@ -695,6 +682,25 @@ static void janet_chan_unlock(JanetChannel *chan) { janet_os_mutex_unlock((JanetOSMutex *) &chan->lock); } +static void janet_chan_deinit(JanetChannel *chan) { + if (janet_chan_is_threaded(chan)) { + Janet item; + janet_chan_lock(chan); + janet_q_deinit(&chan->read_pending); + janet_q_deinit(&chan->write_pending); + while (!janet_q_pop(&chan->items, &item, sizeof(item))) { + janet_chan_unpack(chan, &item, 1); + } + janet_q_deinit(&chan->items); + janet_chan_unlock(chan); + } else { + janet_q_deinit(&chan->read_pending); + janet_q_deinit(&chan->write_pending); + janet_q_deinit(&chan->items); + } + janet_os_mutex_deinit((JanetOSMutex *) &chan->lock); +} + /* * Janet Channel abstract type */ @@ -771,6 +777,7 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { int mode = msg.tag; JanetChannel *channel = (JanetChannel *) msg.argp; Janet x = msg.argj; + janet_chan_lock(channel); if (fiber->sched_id == sched_id) { if (mode == JANET_CP_MODE_CHOICE_READ) { janet_assert(!janet_chan_unpack(channel, &x, 0), "packing error"); @@ -791,7 +798,6 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ); if (is_read) { JanetChannelPending reader; - janet_chan_lock(channel); if (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) { JanetVM *vm = reader.thread; JanetEVGenericMessage msg; @@ -802,10 +808,8 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { msg.argj = x; janet_ev_post_event(vm, janet_thread_chan_cb, msg); } - janet_chan_unlock(channel); } else { JanetChannelPending writer; - janet_chan_lock(channel); if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { JanetVM *vm = writer.thread; JanetEVGenericMessage msg; @@ -816,21 +820,21 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) { msg.argj = janet_wrap_nil(); janet_ev_post_event(vm, janet_thread_chan_cb, msg); } - janet_chan_unlock(channel); } } + janet_chan_unlock(channel); } /* Push a value to a channel, and return 1 if channel should block, zero otherwise. * If the push would block, will add to the write_pending queue in the channel. * Handles both threaded and unthreaded channels. */ -static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { +static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode) { JanetChannelPending reader; int is_empty; if (janet_chan_pack(channel, &x)) { + janet_chan_unlock(channel); janet_panicf("failed to pack value for channel: %v", x); } - janet_chan_lock(channel); if (channel->closed) { janet_chan_unlock(channel); janet_panic("cannot write to closed channel"); @@ -891,12 +895,16 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { return 0; } +static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { + janet_chan_lock(channel); + return janet_channel_push_with_lock(channel, x, mode); +} + /* Pop from a channel - returns 1 if item was obtained, 0 otherwise. The item * is returned by reference. If the pop would block, will add to the read_pending * queue in the channel. */ -static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) { +static int janet_channel_pop_with_lock(JanetChannel *channel, Janet *item, int is_choice) { JanetChannelPending writer; - janet_chan_lock(channel); if (channel->closed) { janet_chan_unlock(channel); *item = janet_wrap_nil(); @@ -941,6 +949,11 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) return 1; } +static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) { + janet_chan_lock(channel); + return janet_channel_pop_with_lock(channel, item, is_choice); +} + JanetChannel *janet_channel_unwrap(void *abstract) { return abstract; } @@ -983,6 +996,20 @@ JANET_CORE_FN(cfun_channel_pop, janet_await(); } +static void chan_unlock_args(const Janet *argv, int32_t n) { + for (int32_t i = 0; i < n; i++) { + int32_t len; + const Janet *data; + JanetChannel *chan; + if (janet_indexed_view(argv[i], &data, &len) && len == 2) { + chan = janet_getchannel(data, 0); + } else { + chan = janet_getchannel(argv, i); + } + janet_chan_unlock(chan); + } +} + JANET_CORE_FN(cfun_channel_choice, "(ev/select & clauses)", "Block until the first of several channel operations occur. Returns a tuple of the form [:give chan], [:take chan x], or [:close chan], where " @@ -1002,29 +1029,29 @@ JANET_CORE_FN(cfun_channel_choice, janet_chan_lock(chan); if (chan->closed) { janet_chan_unlock(chan); + chan_unlock_args(argv, i); return make_close_result(chan); } if (janet_q_count(&chan->items) < chan->limit) { - janet_chan_unlock(chan); - janet_channel_push(chan, data[1], 1); + janet_channel_push_with_lock(chan, data[1], 1); + chan_unlock_args(argv, i); return make_write_result(chan); } - janet_chan_unlock(chan); } else { /* Read */ JanetChannel *chan = janet_getchannel(argv, i); janet_chan_lock(chan); if (chan->closed) { janet_chan_unlock(chan); + chan_unlock_args(argv, i); return make_close_result(chan); } if (chan->items.head != chan->items.tail) { Janet item; - janet_chan_unlock(chan); - janet_channel_pop(chan, &item, 1); + janet_channel_pop_with_lock(chan, &item, 1); + chan_unlock_args(argv, i); return make_read_result(chan, item); } - janet_chan_unlock(chan); } } @@ -1033,12 +1060,12 @@ JANET_CORE_FN(cfun_channel_choice, if (janet_indexed_view(argv[i], &data, &len) && len == 2) { /* Write */ JanetChannel *chan = janet_getchannel(data, 0); - janet_channel_push(chan, data[1], 1); + janet_channel_push_with_lock(chan, data[1], 1); } else { /* Read */ Janet item; JanetChannel *chan = janet_getchannel(argv, i); - janet_channel_pop(chan, &item, 1); + janet_channel_pop_with_lock(chan, &item, 1); } }