1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-28 19:19:53 +00:00

Address #1125 - fix ev/select to only take and release locks once.

By take and releasing locks twice per channel in the case where nothing
is reading, there was an opportunity for ev/select to hang in the
multithreaded case. Also silence valgrind/helgrind errors.
This commit is contained in:
Calvin Rose 2023-05-07 11:52:11 -05:00
parent cabbaded68
commit 2360164e4f

View File

@ -672,19 +672,6 @@ static void janet_chan_init(JanetChannel *chan, int32_t limit, int threaded) {
janet_os_mutex_init((JanetOSMutex *) &chan->lock); janet_os_mutex_init((JanetOSMutex *) &chan->lock);
} }
static void janet_chan_deinit(JanetChannel *chan) {
janet_q_deinit(&chan->read_pending);
janet_q_deinit(&chan->write_pending);
if (janet_chan_is_threaded(chan)) {
Janet item;
while (!janet_q_pop(&chan->items, &item, sizeof(item))) {
janet_chan_unpack(chan, &item, 1);
}
}
janet_q_deinit(&chan->items);
janet_os_mutex_deinit((JanetOSMutex *) &chan->lock);
}
static void janet_chan_lock(JanetChannel *chan) { static void janet_chan_lock(JanetChannel *chan) {
if (!janet_chan_is_threaded(chan)) return; if (!janet_chan_is_threaded(chan)) return;
janet_os_mutex_lock((JanetOSMutex *) &chan->lock); janet_os_mutex_lock((JanetOSMutex *) &chan->lock);
@ -695,6 +682,25 @@ static void janet_chan_unlock(JanetChannel *chan) {
janet_os_mutex_unlock((JanetOSMutex *) &chan->lock); janet_os_mutex_unlock((JanetOSMutex *) &chan->lock);
} }
static void janet_chan_deinit(JanetChannel *chan) {
if (janet_chan_is_threaded(chan)) {
Janet item;
janet_chan_lock(chan);
janet_q_deinit(&chan->read_pending);
janet_q_deinit(&chan->write_pending);
while (!janet_q_pop(&chan->items, &item, sizeof(item))) {
janet_chan_unpack(chan, &item, 1);
}
janet_q_deinit(&chan->items);
janet_chan_unlock(chan);
} else {
janet_q_deinit(&chan->read_pending);
janet_q_deinit(&chan->write_pending);
janet_q_deinit(&chan->items);
}
janet_os_mutex_deinit((JanetOSMutex *) &chan->lock);
}
/* /*
* Janet Channel abstract type * Janet Channel abstract type
*/ */
@ -771,6 +777,7 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
int mode = msg.tag; int mode = msg.tag;
JanetChannel *channel = (JanetChannel *) msg.argp; JanetChannel *channel = (JanetChannel *) msg.argp;
Janet x = msg.argj; Janet x = msg.argj;
janet_chan_lock(channel);
if (fiber->sched_id == sched_id) { if (fiber->sched_id == sched_id) {
if (mode == JANET_CP_MODE_CHOICE_READ) { if (mode == JANET_CP_MODE_CHOICE_READ) {
janet_assert(!janet_chan_unpack(channel, &x, 0), "packing error"); janet_assert(!janet_chan_unpack(channel, &x, 0), "packing error");
@ -791,7 +798,6 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ); int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ);
if (is_read) { if (is_read) {
JanetChannelPending reader; JanetChannelPending reader;
janet_chan_lock(channel);
if (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) { if (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) {
JanetVM *vm = reader.thread; JanetVM *vm = reader.thread;
JanetEVGenericMessage msg; JanetEVGenericMessage msg;
@ -802,10 +808,8 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
msg.argj = x; msg.argj = x;
janet_ev_post_event(vm, janet_thread_chan_cb, msg); janet_ev_post_event(vm, janet_thread_chan_cb, msg);
} }
janet_chan_unlock(channel);
} else { } else {
JanetChannelPending writer; JanetChannelPending writer;
janet_chan_lock(channel);
if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
JanetVM *vm = writer.thread; JanetVM *vm = writer.thread;
JanetEVGenericMessage msg; JanetEVGenericMessage msg;
@ -816,21 +820,21 @@ static void janet_thread_chan_cb(JanetEVGenericMessage msg) {
msg.argj = janet_wrap_nil(); msg.argj = janet_wrap_nil();
janet_ev_post_event(vm, janet_thread_chan_cb, msg); janet_ev_post_event(vm, janet_thread_chan_cb, msg);
} }
janet_chan_unlock(channel);
} }
} }
janet_chan_unlock(channel);
} }
/* Push a value to a channel, and return 1 if channel should block, zero otherwise. /* Push a value to a channel, and return 1 if channel should block, zero otherwise.
* If the push would block, will add to the write_pending queue in the channel. * If the push would block, will add to the write_pending queue in the channel.
* Handles both threaded and unthreaded channels. */ * Handles both threaded and unthreaded channels. */
static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { static int janet_channel_push_with_lock(JanetChannel *channel, Janet x, int mode) {
JanetChannelPending reader; JanetChannelPending reader;
int is_empty; int is_empty;
if (janet_chan_pack(channel, &x)) { if (janet_chan_pack(channel, &x)) {
janet_chan_unlock(channel);
janet_panicf("failed to pack value for channel: %v", x); janet_panicf("failed to pack value for channel: %v", x);
} }
janet_chan_lock(channel);
if (channel->closed) { if (channel->closed) {
janet_chan_unlock(channel); janet_chan_unlock(channel);
janet_panic("cannot write to closed channel"); janet_panic("cannot write to closed channel");
@ -891,12 +895,16 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) {
return 0; return 0;
} }
static int janet_channel_push(JanetChannel *channel, Janet x, int mode) {
janet_chan_lock(channel);
return janet_channel_push_with_lock(channel, x, mode);
}
/* Pop from a channel - returns 1 if item was obtained, 0 otherwise. The item /* Pop from a channel - returns 1 if item was obtained, 0 otherwise. The item
* is returned by reference. If the pop would block, will add to the read_pending * is returned by reference. If the pop would block, will add to the read_pending
* queue in the channel. */ * queue in the channel. */
static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) { static int janet_channel_pop_with_lock(JanetChannel *channel, Janet *item, int is_choice) {
JanetChannelPending writer; JanetChannelPending writer;
janet_chan_lock(channel);
if (channel->closed) { if (channel->closed) {
janet_chan_unlock(channel); janet_chan_unlock(channel);
*item = janet_wrap_nil(); *item = janet_wrap_nil();
@ -941,6 +949,11 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
return 1; return 1;
} }
static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) {
janet_chan_lock(channel);
return janet_channel_pop_with_lock(channel, item, is_choice);
}
JanetChannel *janet_channel_unwrap(void *abstract) { JanetChannel *janet_channel_unwrap(void *abstract) {
return abstract; return abstract;
} }
@ -983,6 +996,20 @@ JANET_CORE_FN(cfun_channel_pop,
janet_await(); janet_await();
} }
static void chan_unlock_args(const Janet *argv, int32_t n) {
for (int32_t i = 0; i < n; i++) {
int32_t len;
const Janet *data;
JanetChannel *chan;
if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
chan = janet_getchannel(data, 0);
} else {
chan = janet_getchannel(argv, i);
}
janet_chan_unlock(chan);
}
}
JANET_CORE_FN(cfun_channel_choice, JANET_CORE_FN(cfun_channel_choice,
"(ev/select & clauses)", "(ev/select & clauses)",
"Block until the first of several channel operations occur. Returns a tuple of the form [:give chan], [:take chan x], or [:close chan], where " "Block until the first of several channel operations occur. Returns a tuple of the form [:give chan], [:take chan x], or [:close chan], where "
@ -1002,29 +1029,29 @@ JANET_CORE_FN(cfun_channel_choice,
janet_chan_lock(chan); janet_chan_lock(chan);
if (chan->closed) { if (chan->closed) {
janet_chan_unlock(chan); janet_chan_unlock(chan);
chan_unlock_args(argv, i);
return make_close_result(chan); return make_close_result(chan);
} }
if (janet_q_count(&chan->items) < chan->limit) { if (janet_q_count(&chan->items) < chan->limit) {
janet_chan_unlock(chan); janet_channel_push_with_lock(chan, data[1], 1);
janet_channel_push(chan, data[1], 1); chan_unlock_args(argv, i);
return make_write_result(chan); return make_write_result(chan);
} }
janet_chan_unlock(chan);
} else { } else {
/* Read */ /* Read */
JanetChannel *chan = janet_getchannel(argv, i); JanetChannel *chan = janet_getchannel(argv, i);
janet_chan_lock(chan); janet_chan_lock(chan);
if (chan->closed) { if (chan->closed) {
janet_chan_unlock(chan); janet_chan_unlock(chan);
chan_unlock_args(argv, i);
return make_close_result(chan); return make_close_result(chan);
} }
if (chan->items.head != chan->items.tail) { if (chan->items.head != chan->items.tail) {
Janet item; Janet item;
janet_chan_unlock(chan); janet_channel_pop_with_lock(chan, &item, 1);
janet_channel_pop(chan, &item, 1); chan_unlock_args(argv, i);
return make_read_result(chan, item); return make_read_result(chan, item);
} }
janet_chan_unlock(chan);
} }
} }
@ -1033,12 +1060,12 @@ JANET_CORE_FN(cfun_channel_choice,
if (janet_indexed_view(argv[i], &data, &len) && len == 2) { if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
/* Write */ /* Write */
JanetChannel *chan = janet_getchannel(data, 0); JanetChannel *chan = janet_getchannel(data, 0);
janet_channel_push(chan, data[1], 1); janet_channel_push_with_lock(chan, data[1], 1);
} else { } else {
/* Read */ /* Read */
Janet item; Janet item;
JanetChannel *chan = janet_getchannel(argv, i); JanetChannel *chan = janet_getchannel(argv, i);
janet_channel_pop(chan, &item, 1); janet_channel_pop_with_lock(chan, &item, 1);
} }
} }