Add preliminary channel implementation.

This commit is contained in:
Calvin Rose 2020-08-01 13:13:58 -05:00
parent 2eb2dddb59
commit 297de01d95
4 changed files with 258 additions and 24 deletions

2
.gitignore vendored
View File

@ -46,6 +46,7 @@ janet.wasm
# Generate test files
*.out
.orig
# Tools
xxd
@ -53,6 +54,7 @@ xxd.exe
# VSCode
.vs
.clangd
# Swap files
*.swp

View File

@ -301,6 +301,10 @@ grammar: build/janet.tmLanguage
build/janet.tmLanguage: tools/tm_lang_gen.janet $(JANET_TARGET)
$(JANET_TARGET) $< > $@
compile-commands:
# Requires pip install copmiledb
compiledb make
clean:
-rm -rf build vgcore.* callgrind.*
-rm -rf test/install/build test/install/modpath
@ -342,4 +346,4 @@ help:
@echo
.PHONY: clean install repl debug valgrind test \
valtest dist uninstall docs grammar format help
valtest dist uninstall docs grammar format help compile-commands

15
examples/channel.janet Normal file
View File

@ -0,0 +1,15 @@
(def c (ev/chan 4))
(defn writer []
(for i 0 10
(ev/sleep 0.1)
(print "writer giving item " i "...")
(ev/give c (string "item " i))))
(defn reader [name]
(forever
(print "reader " name " got " (ev/take c))))
(ev/call writer)
(each letter [:a :b :c :d :e :f :g]
(ev/call reader letter))

View File

@ -328,6 +328,7 @@ void janet_addtimeout(double sec) {
/* Channels */
/* Ring buffer for storing a list of fibers */
typedef struct {
int32_t capacity;
int32_t head;
@ -344,39 +345,33 @@ static void janet_fq_init(JanetFiberQueue *fq) {
fq->capacity = 0;
}
static void janet_fq_deinit(JanetFiberQueue *fq) {
free(fq->fibers);
}
static int32_t janet_fq_count(JanetFiberQueue *fq) {
return (fq->head > fq->tail)
? (fq->tail + fq->capacity - fq->head)
: (fq->tail - fq->head);
? (fq->tail + fq->capacity - fq->head)
: (fq->tail - fq->head);
}
static int janet_fq_push(JanetFiberQueue *fq, JanetFiber *fiber) {
int32_t count = janet_fq_count(fq);
if (count + 1 == JANET_MAX_FQ_CAPACITY) return 1;
/* Resize if needed */
if (count + 1 > fq->capacity) {
int32_t newcap = (count + 1) * 2;
if (count + 1 >= fq->capacity) {
if (count + 1 >= JANET_MAX_FQ_CAPACITY) return 1;
int32_t newcap = (count + 2) * 2;
if (newcap > JANET_MAX_FQ_CAPACITY) newcap = JANET_MAX_FQ_CAPACITY;
fq->fibers = realloc(fq->fibers, sizeof(JanetFiber *) * newcap);
if (NULL == fq->fibers) {
JANET_OUT_OF_MEMORY;
}
if (fq->head > fq->tail) {
/* Two segments, we need to allocate and memcpy x2 */
JanetFiber **newfibers = malloc(sizeof(JanetFiber *) * newcap);
if (NULL == newfibers) {
JANET_OUT_OF_MEMORY;
}
/* Two segments, fix 2nd seg. */
int32_t newhead = fq->head + (newcap - fq->capacity);
int32_t seg1 = fq->capacity - fq->head;
int32_t seq2 = fq->tail;
memcpy(newfibers, fq->fibers + fq->head, seg1 * sizeof(JanetFiber *));
memcpy(newfibers + seg1, fq->fibers, seq2 * sizeof(JanetFiber *));
free(fq->fibers);
fq->fibers = newfibers;
fq->head = 0;
fq->tail = count;
} else {
/* One segment, we can just realloc */
fq->fibers = realloc(fq->fibers, sizeof(JanetFiber *) * newcap);
if (NULL == fq->fibers) {
JANET_OUT_OF_MEMORY;
}
memmove(fq->fibers + newhead, fq->fibers + fq->head, seg1 * sizeof(JanetFiber *));
fq->head = newhead;
}
fq->capacity = newcap;
}
@ -392,6 +387,193 @@ static int janet_fq_pop(JanetFiberQueue *fq, JanetFiber **out) {
return 0;
}
typedef struct {
int32_t capacity;
int32_t head;
int32_t tail;
int32_t limit;
Janet *data;
JanetFiberQueue read_pending;
JanetFiberQueue write_pending;
} JanetChannel;
#define JANET_MAX_CHANNEL_CAPACITY 0xFFFFFF
static void janet_chan_init(JanetChannel *chan, int32_t limit) {
chan->head = 0;
chan->tail = 0;
chan->capacity = 0;
chan->limit = limit;
chan->data = NULL;
janet_fq_init(&chan->read_pending);
janet_fq_init(&chan->write_pending);
}
static void janet_chan_deinit(JanetChannel *chan) {
free(chan->data);
janet_fq_deinit(&chan->read_pending);
janet_fq_deinit(&chan->write_pending);
}
static int32_t janet_chan_count(JanetChannel *chan) {
return (chan->head > chan->tail)
? (chan->tail + chan->capacity - chan->head)
: (chan->tail - chan->head);
}
static int janet_chan_push(JanetChannel *chan, Janet x) {
int32_t count = janet_chan_count(chan);
/* Resize if needed */
if (count + 1 >= chan->capacity) {
if (count + 1 >= JANET_MAX_CHANNEL_CAPACITY) return 2;
int32_t newcap = (count + 2) * 2;
if (newcap > JANET_MAX_CHANNEL_CAPACITY) newcap = JANET_MAX_CHANNEL_CAPACITY;
chan->data = realloc(chan->data, sizeof(Janet) * newcap);
if (NULL == chan->data) {
JANET_OUT_OF_MEMORY;
}
if (chan->head > chan->tail) {
/* Two segments, fix second segment. */
int32_t newhead = chan->head + (newcap - chan->capacity);
int32_t seg1 = chan->capacity - chan->head;
memmove(chan->data + newhead, chan->data + chan->head, seg1 * sizeof(Janet));
chan->head = newhead;
}
chan->capacity = newcap;
}
chan->data[chan->tail++] = x;
if (chan->tail >= chan->capacity) chan->tail = 0;
return count >= chan->limit;
}
static int janet_chan_pop(JanetChannel *chan, Janet *out) {
if (chan->head == chan->tail) return 1;
*out = chan->data[chan->head++];
if (chan->head >= chan->capacity) chan->head = 0;
return 0;
}
/*
* Janet Channel abstract type
*/
/*static int janet_chanat_get(void *p, Janet key, Janet *out);*/
static int janet_chanat_mark(void *p, size_t s);
static int janet_chanat_gc(void *p, size_t s);
static const JanetAbstractType ChannelAT = {
"core/channel",
janet_chanat_gc,
janet_chanat_mark,
NULL, /* janet_chanat_get */
JANET_ATEND_GET
};
static int janet_chanat_gc(void *p, size_t s) {
(void) s;
JanetChannel *channel = p;
janet_chan_deinit(channel);
return 0;
}
static void janet_chanat_mark_fq(JanetFiberQueue *fq) {
if (fq->head <= fq->tail) {
for (int32_t i = fq->head; i < fq->tail; i++)
janet_mark(janet_wrap_fiber(fq->fibers[i]));
} else {
for (int32_t i = fq->head; i < fq->capacity; i++)
janet_mark(janet_wrap_fiber(fq->fibers[i]));
for (int32_t i = 0; i < fq->tail; i++)
janet_mark(janet_wrap_fiber(fq->fibers[i]));
}
}
static int janet_chanat_mark(void *p, size_t s) {
(void) s;
JanetChannel *chan = p;
janet_chanat_mark_fq(&chan->read_pending);
janet_chanat_mark_fq(&chan->write_pending);
if (chan->head <= chan->tail) {
for (int32_t i = chan->head; i < chan->tail; i++)
janet_mark(chan->data[i]);
} else {
for (int32_t i = chan->head; i < chan->capacity; i++)
janet_mark(chan->data[i]);
for (int32_t i = 0; i < chan->tail; i++)
janet_mark(chan->data[i]);
}
return 0;
}
/* Channel Methods */
static Janet cfun_channel_push(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
JanetFiber *reader = NULL;
if (!janet_fq_pop(&channel->read_pending, &reader)) {
/* Pending reader */
janet_schedule(reader, argv[1]);
} else {
/* No pending reader */
int status = janet_chan_push(channel, argv[1]);
if (status == 2) {
/* Unlikely, but could happen if millions of fibers try to write to a channel concurrently without a reader.
* Channel works a bit differently than some implementations, and blocked writers still push their payload to the
* queue. */
janet_panicf("channel overflow: %v", argv[1]);
} else if (status) {
/* Pushed successfully, but should block. */
janet_fq_push(&channel->write_pending, janet_vm_root_fiber);
janet_await();
}
}
return argv[0];
}
static Janet cfun_channel_pop(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
Janet item = janet_wrap_nil();
JanetFiber *writer;
if (janet_chan_pop(channel, &item)) {
/* Queue empty */
janet_fq_push(&channel->read_pending, janet_vm_root_fiber);
janet_await();
} else if (!janet_fq_pop(&channel->write_pending, &writer)) {
/* Got item, and there are pending writers. This means we should
* schedule one. */
janet_schedule(writer, argv[0]);
}
return item;
}
static Janet cfun_channel_full(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
return janet_wrap_boolean(janet_chan_count(channel) >= channel->limit);
}
static Janet cfun_channel_capacity(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
return janet_wrap_integer(channel->limit);
}
static Janet cfun_channel_count(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
return janet_wrap_integer(janet_chan_count(channel));
}
static Janet cfun_channel_new(int32_t argc, Janet *argv) {
janet_arity(argc, 0, 1);
int32_t limit = janet_optnat(argv, argc, 0, 10);
JanetChannel *channel = janet_abstract(&ChannelAT, sizeof(JanetChannel));
janet_chan_init(channel, limit);
return janet_wrap_abstract(channel);
}
/* Main event loop */
void janet_loop1_impl(void);
@ -619,6 +801,37 @@ static const JanetReg ev_cfuns[] = {
JDOC("(ev/sleep sec)\n\n"
"Suspend the current fiber for sec seconds without blocking the event loop.")
},
{
"ev/chan", cfun_channel_new,
JDOC("(ev/chan &opt capacity)\n\n"
"Create a new channel. capacity is the number of values to queue before "
"blocking writers, defaults to 10 if not provided. Returns a new channel.")
},
{
"ev/give", cfun_channel_push,
JDOC("(ev/give channel value)\n\n"
"Write a value to a channel, suspending the current fiber if the channel is full.")
},
{
"ev/take", cfun_channel_pop,
JDOC("(ev/take channel)\n\n"
"Read from a channel, suspending the current fiber if no value is available.")
},
{
"ev/full", cfun_channel_full,
JDOC("(ev/full channel)\n\n"
"Check if a channel is full or not.")
},
{
"ev/capacity", cfun_channel_capacity,
JDOC("(ev/capacity channel)\n\n"
"Get the number of items a channel will store before blocking writers.")
},
{
"ev/count", cfun_channel_count,
JDOC("(ev/count channel)\n\n"
"Get the number of items currently waiting in a channel.")
},
{NULL, NULL, NULL}
};