diff --git a/.gitignore b/.gitignore index 1aa622b4..5b82c06e 100644 --- a/.gitignore +++ b/.gitignore @@ -46,6 +46,7 @@ janet.wasm # Generate test files *.out +.orig # Tools xxd @@ -53,6 +54,7 @@ xxd.exe # VSCode .vs +.clangd # Swap files *.swp diff --git a/Makefile b/Makefile index 496818fb..169f653c 100644 --- a/Makefile +++ b/Makefile @@ -301,6 +301,10 @@ grammar: build/janet.tmLanguage build/janet.tmLanguage: tools/tm_lang_gen.janet $(JANET_TARGET) $(JANET_TARGET) $< > $@ +compile-commands: + # Requires pip install copmiledb + compiledb make + clean: -rm -rf build vgcore.* callgrind.* -rm -rf test/install/build test/install/modpath @@ -342,4 +346,4 @@ help: @echo .PHONY: clean install repl debug valgrind test \ - valtest dist uninstall docs grammar format help + valtest dist uninstall docs grammar format help compile-commands diff --git a/examples/channel.janet b/examples/channel.janet new file mode 100644 index 00000000..0fcf4d88 --- /dev/null +++ b/examples/channel.janet @@ -0,0 +1,15 @@ +(def c (ev/chan 4)) + +(defn writer [] + (for i 0 10 + (ev/sleep 0.1) + (print "writer giving item " i "...") + (ev/give c (string "item " i)))) + +(defn reader [name] + (forever + (print "reader " name " got " (ev/take c)))) + +(ev/call writer) +(each letter [:a :b :c :d :e :f :g] + (ev/call reader letter)) diff --git a/src/core/ev.c b/src/core/ev.c index 652a56d1..9f589d0a 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -328,6 +328,7 @@ void janet_addtimeout(double sec) { /* Channels */ +/* Ring buffer for storing a list of fibers */ typedef struct { int32_t capacity; int32_t head; @@ -344,39 +345,33 @@ static void janet_fq_init(JanetFiberQueue *fq) { fq->capacity = 0; } +static void janet_fq_deinit(JanetFiberQueue *fq) { + free(fq->fibers); +} + static int32_t janet_fq_count(JanetFiberQueue *fq) { return (fq->head > fq->tail) - ? (fq->tail + fq->capacity - fq->head) - : (fq->tail - fq->head); + ? (fq->tail + fq->capacity - fq->head) + : (fq->tail - fq->head); } static int janet_fq_push(JanetFiberQueue *fq, JanetFiber *fiber) { int32_t count = janet_fq_count(fq); - if (count + 1 == JANET_MAX_FQ_CAPACITY) return 1; /* Resize if needed */ - if (count + 1 > fq->capacity) { - int32_t newcap = (count + 1) * 2; + if (count + 1 >= fq->capacity) { + if (count + 1 >= JANET_MAX_FQ_CAPACITY) return 1; + int32_t newcap = (count + 2) * 2; if (newcap > JANET_MAX_FQ_CAPACITY) newcap = JANET_MAX_FQ_CAPACITY; + fq->fibers = realloc(fq->fibers, sizeof(JanetFiber *) * newcap); + if (NULL == fq->fibers) { + JANET_OUT_OF_MEMORY; + } if (fq->head > fq->tail) { - /* Two segments, we need to allocate and memcpy x2 */ - JanetFiber **newfibers = malloc(sizeof(JanetFiber *) * newcap); - if (NULL == newfibers) { - JANET_OUT_OF_MEMORY; - } + /* Two segments, fix 2nd seg. */ + int32_t newhead = fq->head + (newcap - fq->capacity); int32_t seg1 = fq->capacity - fq->head; - int32_t seq2 = fq->tail; - memcpy(newfibers, fq->fibers + fq->head, seg1 * sizeof(JanetFiber *)); - memcpy(newfibers + seg1, fq->fibers, seq2 * sizeof(JanetFiber *)); - free(fq->fibers); - fq->fibers = newfibers; - fq->head = 0; - fq->tail = count; - } else { - /* One segment, we can just realloc */ - fq->fibers = realloc(fq->fibers, sizeof(JanetFiber *) * newcap); - if (NULL == fq->fibers) { - JANET_OUT_OF_MEMORY; - } + memmove(fq->fibers + newhead, fq->fibers + fq->head, seg1 * sizeof(JanetFiber *)); + fq->head = newhead; } fq->capacity = newcap; } @@ -392,6 +387,193 @@ static int janet_fq_pop(JanetFiberQueue *fq, JanetFiber **out) { return 0; } +typedef struct { + int32_t capacity; + int32_t head; + int32_t tail; + int32_t limit; + Janet *data; + JanetFiberQueue read_pending; + JanetFiberQueue write_pending; +} JanetChannel; + +#define JANET_MAX_CHANNEL_CAPACITY 0xFFFFFF + +static void janet_chan_init(JanetChannel *chan, int32_t limit) { + chan->head = 0; + chan->tail = 0; + chan->capacity = 0; + chan->limit = limit; + chan->data = NULL; + janet_fq_init(&chan->read_pending); + janet_fq_init(&chan->write_pending); +} + +static void janet_chan_deinit(JanetChannel *chan) { + free(chan->data); + janet_fq_deinit(&chan->read_pending); + janet_fq_deinit(&chan->write_pending); +} + +static int32_t janet_chan_count(JanetChannel *chan) { + return (chan->head > chan->tail) + ? (chan->tail + chan->capacity - chan->head) + : (chan->tail - chan->head); +} + +static int janet_chan_push(JanetChannel *chan, Janet x) { + int32_t count = janet_chan_count(chan); + /* Resize if needed */ + if (count + 1 >= chan->capacity) { + if (count + 1 >= JANET_MAX_CHANNEL_CAPACITY) return 2; + int32_t newcap = (count + 2) * 2; + if (newcap > JANET_MAX_CHANNEL_CAPACITY) newcap = JANET_MAX_CHANNEL_CAPACITY; + chan->data = realloc(chan->data, sizeof(Janet) * newcap); + if (NULL == chan->data) { + JANET_OUT_OF_MEMORY; + } + if (chan->head > chan->tail) { + /* Two segments, fix second segment. */ + int32_t newhead = chan->head + (newcap - chan->capacity); + int32_t seg1 = chan->capacity - chan->head; + memmove(chan->data + newhead, chan->data + chan->head, seg1 * sizeof(Janet)); + chan->head = newhead; + } + chan->capacity = newcap; + } + chan->data[chan->tail++] = x; + if (chan->tail >= chan->capacity) chan->tail = 0; + return count >= chan->limit; +} + +static int janet_chan_pop(JanetChannel *chan, Janet *out) { + if (chan->head == chan->tail) return 1; + *out = chan->data[chan->head++]; + if (chan->head >= chan->capacity) chan->head = 0; + return 0; +} + +/* + * Janet Channel abstract type + */ + +/*static int janet_chanat_get(void *p, Janet key, Janet *out);*/ +static int janet_chanat_mark(void *p, size_t s); +static int janet_chanat_gc(void *p, size_t s); + +static const JanetAbstractType ChannelAT = { + "core/channel", + janet_chanat_gc, + janet_chanat_mark, + NULL, /* janet_chanat_get */ + JANET_ATEND_GET +}; + +static int janet_chanat_gc(void *p, size_t s) { + (void) s; + JanetChannel *channel = p; + janet_chan_deinit(channel); + return 0; +} + +static void janet_chanat_mark_fq(JanetFiberQueue *fq) { + if (fq->head <= fq->tail) { + for (int32_t i = fq->head; i < fq->tail; i++) + janet_mark(janet_wrap_fiber(fq->fibers[i])); + } else { + for (int32_t i = fq->head; i < fq->capacity; i++) + janet_mark(janet_wrap_fiber(fq->fibers[i])); + for (int32_t i = 0; i < fq->tail; i++) + janet_mark(janet_wrap_fiber(fq->fibers[i])); + } +} + +static int janet_chanat_mark(void *p, size_t s) { + (void) s; + JanetChannel *chan = p; + janet_chanat_mark_fq(&chan->read_pending); + janet_chanat_mark_fq(&chan->write_pending); + if (chan->head <= chan->tail) { + for (int32_t i = chan->head; i < chan->tail; i++) + janet_mark(chan->data[i]); + } else { + for (int32_t i = chan->head; i < chan->capacity; i++) + janet_mark(chan->data[i]); + for (int32_t i = 0; i < chan->tail; i++) + janet_mark(chan->data[i]); + } + return 0; +} + +/* Channel Methods */ + +static Janet cfun_channel_push(int32_t argc, Janet *argv) { + janet_fixarity(argc, 2); + JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); + JanetFiber *reader = NULL; + if (!janet_fq_pop(&channel->read_pending, &reader)) { + /* Pending reader */ + janet_schedule(reader, argv[1]); + } else { + /* No pending reader */ + int status = janet_chan_push(channel, argv[1]); + if (status == 2) { + /* Unlikely, but could happen if millions of fibers try to write to a channel concurrently without a reader. + * Channel works a bit differently than some implementations, and blocked writers still push their payload to the + * queue. */ + janet_panicf("channel overflow: %v", argv[1]); + } else if (status) { + /* Pushed successfully, but should block. */ + janet_fq_push(&channel->write_pending, janet_vm_root_fiber); + janet_await(); + } + } + return argv[0]; +} + +static Janet cfun_channel_pop(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); + Janet item = janet_wrap_nil(); + JanetFiber *writer; + if (janet_chan_pop(channel, &item)) { + /* Queue empty */ + janet_fq_push(&channel->read_pending, janet_vm_root_fiber); + janet_await(); + } else if (!janet_fq_pop(&channel->write_pending, &writer)) { + /* Got item, and there are pending writers. This means we should + * schedule one. */ + janet_schedule(writer, argv[0]); + } + return item; +} + +static Janet cfun_channel_full(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); + return janet_wrap_boolean(janet_chan_count(channel) >= channel->limit); +} + +static Janet cfun_channel_capacity(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); + return janet_wrap_integer(channel->limit); +} + +static Janet cfun_channel_count(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); + return janet_wrap_integer(janet_chan_count(channel)); +} + +static Janet cfun_channel_new(int32_t argc, Janet *argv) { + janet_arity(argc, 0, 1); + int32_t limit = janet_optnat(argv, argc, 0, 10); + JanetChannel *channel = janet_abstract(&ChannelAT, sizeof(JanetChannel)); + janet_chan_init(channel, limit); + return janet_wrap_abstract(channel); +} + /* Main event loop */ void janet_loop1_impl(void); @@ -619,6 +801,37 @@ static const JanetReg ev_cfuns[] = { JDOC("(ev/sleep sec)\n\n" "Suspend the current fiber for sec seconds without blocking the event loop.") }, + { + "ev/chan", cfun_channel_new, + JDOC("(ev/chan &opt capacity)\n\n" + "Create a new channel. capacity is the number of values to queue before " + "blocking writers, defaults to 10 if not provided. Returns a new channel.") + }, + { + "ev/give", cfun_channel_push, + JDOC("(ev/give channel value)\n\n" + "Write a value to a channel, suspending the current fiber if the channel is full.") + }, + { + "ev/take", cfun_channel_pop, + JDOC("(ev/take channel)\n\n" + "Read from a channel, suspending the current fiber if no value is available.") + }, + { + "ev/full", cfun_channel_full, + JDOC("(ev/full channel)\n\n" + "Check if a channel is full or not.") + }, + { + "ev/capacity", cfun_channel_capacity, + JDOC("(ev/capacity channel)\n\n" + "Get the number of items a channel will store before blocking writers.") + }, + { + "ev/count", cfun_channel_count, + JDOC("(ev/count channel)\n\n" + "Get the number of items currently waiting in a channel.") + }, {NULL, NULL, NULL} };