Use a common queue implementation.

Queues occur in three places, so we use a single
implementation rather than three separate ones. This also
has the result that janet_vm_spawn will not overflow in the case
of channel-heavy, IO-light operation.
This commit is contained in:
Calvin Rose 2020-08-01 14:20:58 -05:00
parent 297de01d95
commit 742c5bb639
1 changed files with 118 additions and 167 deletions

View File

@ -44,6 +44,67 @@
#include <fcntl.h>
#include <sys/timerfd.h>
/* General queue */
/* Ring buffer for storing a list of fibers */
typedef struct {
int32_t capacity;
int32_t head;
int32_t tail;
void *data;
} JanetQueue;
#define JANET_MAX_Q_CAPACITY 0x7FFFFFFF
static void janet_q_init(JanetQueue *q) {
q->data = NULL;
q->head = 0;
q->tail = 0;
q->capacity = 0;
}
static void janet_q_deinit(JanetQueue *q) {
free(q->data);
}
static int32_t janet_q_count(JanetQueue *q) {
return (q->head > q->tail)
? (q->tail + q->capacity - q->head)
: (q->tail - q->head);
}
static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) {
int32_t count = janet_q_count(q);
/* Resize if needed */
if (count + 1 >= q->capacity) {
if (count + 1 >= JANET_MAX_Q_CAPACITY) return 1;
int32_t newcap = (count + 2) * 2;
if (newcap > JANET_MAX_Q_CAPACITY) newcap = JANET_MAX_Q_CAPACITY;
q->data = realloc(q->data, itemsize * newcap);
if (NULL == q->data) {
JANET_OUT_OF_MEMORY;
}
if (q->head > q->tail) {
/* Two segments, fix 2nd seg. */
int32_t newhead = q->head + (newcap - q->capacity);
int32_t seg1 = q->capacity - q->head;
memmove(q->data + newhead, q->data + q->head, seg1 * itemsize);
q->head = newhead;
}
q->capacity = newcap;
}
memcpy(q->data + itemsize * q->tail++, item, itemsize);
if (q->tail >= q->capacity) q->tail = 0;
return 0;
}
static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) {
if (q->head == q->tail) return 1;
memcpy(out, q->data + itemsize * q->head++, itemsize);
if (q->head >= q->capacity) q->head = 0;
return 0;
}
/* New fibers to spawn or resume */
typedef struct JanetTask JanetTask;
struct JanetTask {
@ -64,11 +125,9 @@ static void janet_unlisten(JanetListenerState *state);
/* Global data */
JANET_THREAD_LOCAL size_t janet_vm_active_listeners = 0;
JANET_THREAD_LOCAL size_t janet_vm_spawn_capacity = 0;
JANET_THREAD_LOCAL size_t janet_vm_spawn_count = 0;
JANET_THREAD_LOCAL size_t janet_vm_tq_count = 0;
JANET_THREAD_LOCAL size_t janet_vm_tq_capacity = 0;
JANET_THREAD_LOCAL JanetTask *janet_vm_spawn = NULL;
JANET_THREAD_LOCAL JanetQueue janet_vm_spawn;
JANET_THREAD_LOCAL JanetTimeout *janet_vm_tq = NULL;
/* Get current timestamp (millisecond precision) */
@ -259,27 +318,27 @@ void janet_cancel(JanetFiber *fiber) {
void janet_schedule(JanetFiber *fiber, Janet value) {
if (fiber->flags & JANET_FIBER_FLAG_SCHEDULED) return;
fiber->flags |= JANET_FIBER_FLAG_SCHEDULED;
size_t oldcount = janet_vm_spawn_count;
size_t newcount = oldcount + 1;
if (newcount > janet_vm_spawn_capacity) {
size_t newcap = 2 * newcount;
JanetTask *tasks = realloc(janet_vm_spawn, newcap * sizeof(JanetTask));
if (NULL == tasks) {
JANET_OUT_OF_MEMORY;
}
janet_vm_spawn = tasks;
janet_vm_spawn_capacity = newcap;
}
janet_vm_spawn_count = newcount;
janet_vm_spawn[oldcount].fiber = fiber;
janet_vm_spawn[oldcount].value = value;
JanetTask t = { fiber, value };
janet_q_push(&janet_vm_spawn, &t, sizeof(t));
}
/* Mark all pending tasks */
void janet_ev_mark(void) {
for (size_t i = 0; i < janet_vm_spawn_count; i++) {
janet_mark(janet_wrap_fiber(janet_vm_spawn[i].fiber));
janet_mark(janet_vm_spawn[i].value);
JanetTask *tasks = janet_vm_spawn.data;
if (janet_vm_spawn.head <= janet_vm_spawn.tail) {
for (int32_t i = janet_vm_spawn.head; i < janet_vm_spawn.tail; i++) {
janet_mark(janet_wrap_fiber(tasks[i].fiber));
janet_mark(tasks[i].value);
}
} else {
for (int32_t i = janet_vm_spawn.head; i < janet_vm_spawn.capacity; i++) {
janet_mark(janet_wrap_fiber(tasks[i].fiber));
janet_mark(tasks[i].value);
}
for (int32_t i = 0; i < janet_vm_spawn.tail; i++) {
janet_mark(janet_wrap_fiber(tasks[i].fiber));
janet_mark(tasks[i].value);
}
}
for (size_t i = 0; i < janet_vm_tq_count; i++) {
janet_mark(janet_wrap_fiber(janet_vm_tq[i].fiber));
@ -298,9 +357,7 @@ static void run_one(JanetFiber *fiber, Janet value) {
/* Common init code */
void janet_ev_init_common(void) {
janet_vm_spawn_capacity = 0;
janet_vm_spawn_count = 0;
janet_vm_spawn = NULL;
janet_q_init(&janet_vm_spawn);
janet_vm_active_listeners = 0;
janet_vm_tq = NULL;
janet_vm_tq_count = 0;
@ -309,7 +366,7 @@ void janet_ev_init_common(void) {
/* Common deinit code */
void janet_ev_deinit_common(void) {
free(janet_vm_spawn);
janet_q_deinit(&janet_vm_spawn);
}
/* Short hand to yield to event loop */
@ -328,129 +385,26 @@ void janet_addtimeout(double sec) {
/* Channels */
/* Ring buffer for storing a list of fibers */
typedef struct {
int32_t capacity;
int32_t head;
int32_t tail;
JanetFiber **fibers;
} JanetFiberQueue;
#define JANET_MAX_FQ_CAPACITY 0xFFFFFF
static void janet_fq_init(JanetFiberQueue *fq) {
fq->fibers = NULL;
fq->head = 0;
fq->tail = 0;
fq->capacity = 0;
}
static void janet_fq_deinit(JanetFiberQueue *fq) {
free(fq->fibers);
}
static int32_t janet_fq_count(JanetFiberQueue *fq) {
return (fq->head > fq->tail)
? (fq->tail + fq->capacity - fq->head)
: (fq->tail - fq->head);
}
static int janet_fq_push(JanetFiberQueue *fq, JanetFiber *fiber) {
int32_t count = janet_fq_count(fq);
/* Resize if needed */
if (count + 1 >= fq->capacity) {
if (count + 1 >= JANET_MAX_FQ_CAPACITY) return 1;
int32_t newcap = (count + 2) * 2;
if (newcap > JANET_MAX_FQ_CAPACITY) newcap = JANET_MAX_FQ_CAPACITY;
fq->fibers = realloc(fq->fibers, sizeof(JanetFiber *) * newcap);
if (NULL == fq->fibers) {
JANET_OUT_OF_MEMORY;
}
if (fq->head > fq->tail) {
/* Two segments, fix 2nd seg. */
int32_t newhead = fq->head + (newcap - fq->capacity);
int32_t seg1 = fq->capacity - fq->head;
memmove(fq->fibers + newhead, fq->fibers + fq->head, seg1 * sizeof(JanetFiber *));
fq->head = newhead;
}
fq->capacity = newcap;
}
fq->fibers[fq->tail++] = fiber;
if (fq->tail >= fq->capacity) fq->tail = 0;
return 0;
}
static int janet_fq_pop(JanetFiberQueue *fq, JanetFiber **out) {
if (fq->head == fq->tail) return 1;
*out = fq->fibers[fq->head++];
if (fq->head >= fq->capacity) fq->head = 0;
return 0;
}
typedef struct {
int32_t capacity;
int32_t head;
int32_t tail;
JanetQueue items;
JanetQueue read_pending;
JanetQueue write_pending;
int32_t limit;
Janet *data;
JanetFiberQueue read_pending;
JanetFiberQueue write_pending;
} JanetChannel;
#define JANET_MAX_CHANNEL_CAPACITY 0xFFFFFF
static void janet_chan_init(JanetChannel *chan, int32_t limit) {
chan->head = 0;
chan->tail = 0;
chan->capacity = 0;
chan->limit = limit;
chan->data = NULL;
janet_fq_init(&chan->read_pending);
janet_fq_init(&chan->write_pending);
janet_q_init(&chan->items);
janet_q_init(&chan->read_pending);
janet_q_init(&chan->write_pending);
}
static void janet_chan_deinit(JanetChannel *chan) {
free(chan->data);
janet_fq_deinit(&chan->read_pending);
janet_fq_deinit(&chan->write_pending);
}
static int32_t janet_chan_count(JanetChannel *chan) {
return (chan->head > chan->tail)
? (chan->tail + chan->capacity - chan->head)
: (chan->tail - chan->head);
}
static int janet_chan_push(JanetChannel *chan, Janet x) {
int32_t count = janet_chan_count(chan);
/* Resize if needed */
if (count + 1 >= chan->capacity) {
if (count + 1 >= JANET_MAX_CHANNEL_CAPACITY) return 2;
int32_t newcap = (count + 2) * 2;
if (newcap > JANET_MAX_CHANNEL_CAPACITY) newcap = JANET_MAX_CHANNEL_CAPACITY;
chan->data = realloc(chan->data, sizeof(Janet) * newcap);
if (NULL == chan->data) {
JANET_OUT_OF_MEMORY;
}
if (chan->head > chan->tail) {
/* Two segments, fix second segment. */
int32_t newhead = chan->head + (newcap - chan->capacity);
int32_t seg1 = chan->capacity - chan->head;
memmove(chan->data + newhead, chan->data + chan->head, seg1 * sizeof(Janet));
chan->head = newhead;
}
chan->capacity = newcap;
}
chan->data[chan->tail++] = x;
if (chan->tail >= chan->capacity) chan->tail = 0;
return count >= chan->limit;
}
static int janet_chan_pop(JanetChannel *chan, Janet *out) {
if (chan->head == chan->tail) return 1;
*out = chan->data[chan->head++];
if (chan->head >= chan->capacity) chan->head = 0;
return 0;
janet_q_deinit(&chan->read_pending);
janet_q_deinit(&chan->write_pending);
janet_q_deinit(&chan->items);
}
/*
@ -476,15 +430,16 @@ static int janet_chanat_gc(void *p, size_t s) {
return 0;
}
static void janet_chanat_mark_fq(JanetFiberQueue *fq) {
static void janet_chanat_mark_fq(JanetQueue *fq) {
JanetFiber **fibers = fq->data;
if (fq->head <= fq->tail) {
for (int32_t i = fq->head; i < fq->tail; i++)
janet_mark(janet_wrap_fiber(fq->fibers[i]));
janet_mark(janet_wrap_fiber(fibers[i]));
} else {
for (int32_t i = fq->head; i < fq->capacity; i++)
janet_mark(janet_wrap_fiber(fq->fibers[i]));
janet_mark(janet_wrap_fiber(fibers[i]));
for (int32_t i = 0; i < fq->tail; i++)
janet_mark(janet_wrap_fiber(fq->fibers[i]));
janet_mark(janet_wrap_fiber(fibers[i]));
}
}
@ -493,14 +448,16 @@ static int janet_chanat_mark(void *p, size_t s) {
JanetChannel *chan = p;
janet_chanat_mark_fq(&chan->read_pending);
janet_chanat_mark_fq(&chan->write_pending);
if (chan->head <= chan->tail) {
for (int32_t i = chan->head; i < chan->tail; i++)
janet_mark(chan->data[i]);
JanetQueue *items = &chan->items;
Janet *data = chan->items.data;
if (items->head <= items->tail) {
for (int32_t i = items->head; i < items->tail; i++)
janet_mark(data[i]);
} else {
for (int32_t i = chan->head; i < chan->capacity; i++)
janet_mark(chan->data[i]);
for (int32_t i = 0; i < chan->tail; i++)
janet_mark(chan->data[i]);
for (int32_t i = items->head; i < items->capacity; i++)
janet_mark(data[i]);
for (int32_t i = 0; i < items->tail; i++)
janet_mark(data[i]);
}
return 0;
}
@ -510,21 +467,17 @@ static int janet_chanat_mark(void *p, size_t s) {
static Janet cfun_channel_push(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
JanetFiber *reader = NULL;
if (!janet_fq_pop(&channel->read_pending, &reader)) {
JanetFiber *reader;
if (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) {
/* Pending reader */
janet_schedule(reader, argv[1]);
} else {
/* No pending reader */
int status = janet_chan_push(channel, argv[1]);
if (status == 2) {
/* Unlikely, but could happen if millions of fibers try to write to a channel concurrently without a reader.
* Channel works a bit differently than some implementations, and blocked writers still push their payload to the
* queue. */
if (janet_q_push(&channel->items, argv + 1, sizeof(Janet))) {
janet_panicf("channel overflow: %v", argv[1]);
} else if (status) {
} else if (janet_q_count(&channel->items) > channel->limit) {
/* Pushed successfully, but should block. */
janet_fq_push(&channel->write_pending, janet_vm_root_fiber);
janet_q_push(&channel->write_pending, &janet_vm_root_fiber, sizeof(JanetFiber *));
janet_await();
}
}
@ -536,11 +489,11 @@ static Janet cfun_channel_pop(int32_t argc, Janet *argv) {
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
Janet item = janet_wrap_nil();
JanetFiber *writer;
if (janet_chan_pop(channel, &item)) {
if (janet_q_pop(&channel->items, &item, sizeof(item))) {
/* Queue empty */
janet_fq_push(&channel->read_pending, janet_vm_root_fiber);
janet_q_push(&channel->read_pending, &janet_vm_root_fiber, sizeof(JanetFiber *));
janet_await();
} else if (!janet_fq_pop(&channel->write_pending, &writer)) {
} else if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
/* Got item, and there are pending writers. This means we should
* schedule one. */
janet_schedule(writer, argv[0]);
@ -551,7 +504,7 @@ static Janet cfun_channel_pop(int32_t argc, Janet *argv) {
static Janet cfun_channel_full(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
return janet_wrap_boolean(janet_chan_count(channel) >= channel->limit);
return janet_wrap_boolean(janet_q_count(&channel->items) >= channel->limit);
}
static Janet cfun_channel_capacity(int32_t argc, Janet *argv) {
@ -563,7 +516,7 @@ static Janet cfun_channel_capacity(int32_t argc, Janet *argv) {
static Janet cfun_channel_count(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
return janet_wrap_integer(janet_chan_count(channel));
return janet_wrap_integer(janet_q_count(&channel->items));
}
static Janet cfun_channel_new(int32_t argc, Janet *argv) {
@ -579,7 +532,7 @@ static Janet cfun_channel_new(int32_t argc, Janet *argv) {
void janet_loop1_impl(void);
void janet_loop(void) {
while (janet_vm_active_listeners || janet_vm_spawn_count || janet_vm_tq_count) {
while (janet_vm_active_listeners || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) {
/* Run expired timers */
JanetTimeout to;
while (peek_timeout(&to) && to.when <= ts_now()) {
@ -587,13 +540,11 @@ void janet_loop(void) {
janet_schedule(to.fiber, janet_wrap_nil());
}
/* Run scheduled fibers */
size_t index = 0;
while (index < janet_vm_spawn_count) {
JanetTask task = janet_vm_spawn[index];
while (janet_vm_spawn.head != janet_vm_spawn.tail) {
JanetTask task = {NULL, janet_wrap_nil()};
janet_q_pop(&janet_vm_spawn, &task, sizeof(task));
run_one(task.fiber, task.value);
index++;
}
janet_vm_spawn_count = 0;
/* Poll for events */
if (janet_vm_active_listeners || janet_vm_tq_count) {
janet_loop1_impl();