diff --git a/src/core/ev.c b/src/core/ev.c index 0c4d8212..b39e4a97 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -1,14 +1,4 @@ -/* -* Copyright (c) 2021 Calvin Rose and contributors. -* -* Permission is hereby granted, free of charge, to any person obtaining a copy -* of this software and associated documentation files (the "Software"), to -* deal in the Software without restriction, including without limitation the -* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or -* sell copies of the Software, and to permit persons to whom the Software is -* furnished to do so, subject to the following conditions: -* -* The above copyright notice and this permission notice shall be included in +/* The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR @@ -70,6 +60,7 @@ typedef struct { JanetQueue read_pending; JanetQueue write_pending; int32_t limit; + int closed; } JanetChannel; typedef struct { @@ -571,6 +562,7 @@ void janet_ev_dec_refcount(void) { static void janet_chan_init(JanetChannel *chan, int32_t limit) { chan->limit = limit; + chan->closed = 0; janet_q_init(&chan->items); janet_q_init(&chan->read_pending); janet_q_init(&chan->write_pending); @@ -660,6 +652,13 @@ static Janet make_read_result(JanetChannel *channel, Janet x) { return janet_wrap_tuple(janet_tuple_end(tup)); } +static Janet make_close_result(JanetChannel *channel) { + Janet *tup = janet_tuple_begin(2); + tup[0] = janet_ckeywordv("close"); + tup[1] = janet_wrap_abstract(channel); + return janet_wrap_tuple(janet_tuple_end(tup)); +} + /* Push a value to a channel, and return 1 if channel should block, zero otherwise. * If the push would block, will add to the write_pending queue in the channel. */ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { @@ -723,9 +722,13 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) JANET_CORE_FN(cfun_channel_push, "(ev/give channel value)", - "Write a value to a channel, suspending the current fiber if the channel is full.") { + "Write a value to a channel, suspending the current fiber if the channel is full. " + "Returns the channel if the write succeeded, nil otherwise.") { janet_fixarity(argc, 2); JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); + if (channel->closed) { + janet_panic("cannot write to closed channel"); + } if (janet_channel_push(channel, argv[1], 0)) { janet_await(); } @@ -738,6 +741,7 @@ JANET_CORE_FN(cfun_channel_pop, janet_fixarity(argc, 1); JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); Janet item; + if (channel->closed) return janet_wrap_nil(); if (janet_channel_pop(channel, &item, 0)) { janet_schedule(janet_vm.root_fiber, item); } @@ -746,10 +750,10 @@ JANET_CORE_FN(cfun_channel_pop, JANET_CORE_FN(cfun_channel_choice, "(ev/select & clauses)", - "Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where " + "Block until the first of several channel operations occur. Returns a tuple of the form [:give chan], [:take chan x], or [:close chan], where " "a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for " "a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first " - "clauses will take precedence over later clauses.") { + "clauses will take precedence over later clauses. Both and give and take operation can return a [:close chan] tuple, which indicates that the specified channel was closed.") { janet_arity(argc, 1, -1); int32_t len; const Janet *data; @@ -759,6 +763,7 @@ JANET_CORE_FN(cfun_channel_choice, if (janet_indexed_view(argv[i], &data, &len) && len == 2) { /* Write */ JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT); + if (chan->closed) continue; if (janet_q_count(&chan->items) < chan->limit) { janet_channel_push(chan, data[1], 1); return make_write_result(chan); @@ -766,6 +771,7 @@ JANET_CORE_FN(cfun_channel_choice, } else { /* Read */ JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT); + if (chan->closed) continue; if (chan->items.head != chan->items.tail) { Janet item; janet_channel_pop(chan, &item, 1); @@ -779,11 +785,13 @@ JANET_CORE_FN(cfun_channel_choice, if (janet_indexed_view(argv[i], &data, &len) && len == 2) { /* Write */ JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT); + if (chan->closed) continue; janet_channel_push(chan, data[1], 1); } else { /* Read */ Janet item; JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT); + if (chan->closed) continue; janet_channel_pop(chan, &item, 1); } } @@ -843,6 +851,34 @@ JANET_CORE_FN(cfun_channel_new, return janet_wrap_abstract(channel); } +JANET_CORE_FN(cfun_channel_close, + "(ev/chan-close chan)", + "Close a channel. A closed channel will cause all pending reads and writes to return nil. " + "Returns the channel.") { + janet_fixarity(argc, 1); + JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); + if (!channel->closed) { + channel->closed = 1; + JanetChannelPending writer; + while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { + if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { + janet_schedule(writer.fiber, janet_wrap_nil()); + } else { + janet_schedule(writer.fiber, make_close_result(channel)); + } + } + JanetChannelPending reader; + while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) { + if (reader.mode == JANET_CP_MODE_CHOICE_READ) { + janet_schedule(reader.fiber, janet_wrap_nil()); + } else { + janet_schedule(reader.fiber, make_close_result(channel)); + } + } + } + return janet_wrap_abstract(channel); +} + static const JanetMethod ev_chanat_methods[] = { {"select", cfun_channel_choice}, {"rselect", cfun_channel_rchoice}, @@ -851,6 +887,7 @@ static const JanetMethod ev_chanat_methods[] = { {"give", cfun_channel_push}, {"capacity", cfun_channel_capacity}, {"full", cfun_channel_full}, + {"close", cfun_channel_close}, {NULL, NULL} }; @@ -2413,6 +2450,7 @@ void janet_lib_ev(JanetTable *env) { JANET_CORE_REG("ev/read", janet_cfun_stream_read), JANET_CORE_REG("ev/chunk", janet_cfun_stream_chunk), JANET_CORE_REG("ev/write", janet_cfun_stream_write), + JANET_CORE_REG("ev/chan-close", cfun_channel_close), JANET_REG_END }; diff --git a/test/suite0009.janet b/test/suite0009.janet index fdbe1b81..b57c05e4 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -116,7 +116,6 @@ (assert (= "123\n456\n" (string (slurp "unique.txt"))) "File writing 4.2") (os/rm "unique.txt")) - # ev/gather (assert (deep= @[1 2 3] (ev/gather 1 2 3)) "ev/gather 1") @@ -180,4 +179,46 @@ (assert (os/execute [janet "-e" `(+ 1 2 3)`] :xp) "os/execute self") +# Test some channel + +(def c1 (ev/chan)) +(def c2 (ev/chan)) +(def arr @[]) +(ev/spawn + (while (def x (ev/take c1)) + (array/push arr x)) + (ev/chan-close c2)) +(for i 0 1000 + (ev/give c1 i)) +(ev/chan-close c1) +(ev/take c2) +(assert (= (slice arr) (slice (range 1000))) "ev/chan-close 1") + +(def c1 (ev/chan)) +(def c2 (ev/chan)) +(def arr @[]) +(ev/spawn + (while (def x (ev/take c1)) + (array/push arr x)) + (ev/sleep 0.1) + (ev/chan-close c2)) +(for i 0 100 + (ev/give c1 i)) +(ev/chan-close c1) +(ev/select c2) +(assert (= (slice arr) (slice (range 100))) "ev/chan-close 2") + +(def c1 (ev/chan)) +(def c2 (ev/chan)) +(def arr @[]) +(ev/spawn + (while (def x (ev/take c1)) + (array/push arr x)) + (ev/chan-close c2)) +(for i 0 100 + (ev/give c1 i)) +(ev/chan-close c1) +(ev/rselect c2) +(assert (= (slice arr) (slice (range 100))) "ev/chan-close 3") + (end-suite)