mirror of https://github.com/janet-lang/janet
Add ev/select and ev/rselect initial implementation.
Getting closer to a CSP implmententation. Probably useful to move scheduling fields outside of fibers and into an external table.
This commit is contained in:
parent
78ffb63429
commit
fb26c9b2c4
|
@ -0,0 +1,22 @@
|
||||||
|
(def channels
|
||||||
|
(seq [:repeat 5] (ev/chan 4)))
|
||||||
|
|
||||||
|
(defn writer [c]
|
||||||
|
(for i 0 3
|
||||||
|
(ev/sleep 0.1)
|
||||||
|
(print "writer giving item " i " to " c "...")
|
||||||
|
(ev/give c (string "item " i)))
|
||||||
|
(print "Done!"))
|
||||||
|
|
||||||
|
(defn reader [name]
|
||||||
|
(forever
|
||||||
|
(def c (ev/select ;channels))
|
||||||
|
(print "reader " name " got " (ev/take c) " from " c)))
|
||||||
|
|
||||||
|
# Readers
|
||||||
|
(each letter [:a :b :c :d :e :f :g]
|
||||||
|
(ev/call reader letter))
|
||||||
|
|
||||||
|
# Writers
|
||||||
|
(each c channels
|
||||||
|
(ev/call writer c))
|
122
src/core/ev.c
122
src/core/ev.c
|
@ -86,21 +86,25 @@ static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) {
|
||||||
if (q->head > q->tail) {
|
if (q->head > q->tail) {
|
||||||
/* Two segments, fix 2nd seg. */
|
/* Two segments, fix 2nd seg. */
|
||||||
int32_t newhead = q->head + (newcap - q->capacity);
|
int32_t newhead = q->head + (newcap - q->capacity);
|
||||||
int32_t seg1 = q->capacity - q->head;
|
size_t seg1 = (size_t)(q->capacity - q->head);
|
||||||
memmove(q->data + newhead, q->data + q->head, seg1 * itemsize);
|
if (seg1 > 0) {
|
||||||
|
memmove(q->data + (newhead * itemsize),
|
||||||
|
q->data + (q->head * itemsize),
|
||||||
|
seg1 * itemsize);
|
||||||
|
}
|
||||||
q->head = newhead;
|
q->head = newhead;
|
||||||
}
|
}
|
||||||
q->capacity = newcap;
|
q->capacity = newcap;
|
||||||
}
|
}
|
||||||
memcpy(q->data + itemsize * q->tail++, item, itemsize);
|
memcpy(q->data + itemsize * q->tail, item, itemsize);
|
||||||
if (q->tail >= q->capacity) q->tail = 0;
|
q->tail = q->tail + 1 < q->capacity ? q->tail + 1 : 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) {
|
static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) {
|
||||||
if (q->head == q->tail) return 1;
|
if (q->head == q->tail) return 1;
|
||||||
memcpy(out, q->data + itemsize * q->head++, itemsize);
|
memcpy(out, q->data + itemsize * q->head, itemsize);
|
||||||
if (q->head >= q->capacity) q->head = 0;
|
q->head = q->head + 1 < q->capacity ? q->head + 1 : 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,6 +132,7 @@ JANET_THREAD_LOCAL size_t janet_vm_tq_count = 0;
|
||||||
JANET_THREAD_LOCAL size_t janet_vm_tq_capacity = 0;
|
JANET_THREAD_LOCAL size_t janet_vm_tq_capacity = 0;
|
||||||
JANET_THREAD_LOCAL JanetQueue janet_vm_spawn;
|
JANET_THREAD_LOCAL JanetQueue janet_vm_spawn;
|
||||||
JANET_THREAD_LOCAL JanetTimeout *janet_vm_tq = NULL;
|
JANET_THREAD_LOCAL JanetTimeout *janet_vm_tq = NULL;
|
||||||
|
JANET_THREAD_LOCAL JanetRNG janet_vm_ev_rng;
|
||||||
|
|
||||||
/* Get current timestamp (millisecond precision) */
|
/* Get current timestamp (millisecond precision) */
|
||||||
static JanetTimestamp ts_now(void);
|
static JanetTimestamp ts_now(void);
|
||||||
|
@ -308,6 +313,7 @@ void janet_cancel(JanetFiber *fiber) {
|
||||||
void janet_schedule(JanetFiber *fiber, Janet value) {
|
void janet_schedule(JanetFiber *fiber, Janet value) {
|
||||||
if (fiber->flags & JANET_FIBER_FLAG_SCHEDULED) return;
|
if (fiber->flags & JANET_FIBER_FLAG_SCHEDULED) return;
|
||||||
fiber->flags |= JANET_FIBER_FLAG_SCHEDULED;
|
fiber->flags |= JANET_FIBER_FLAG_SCHEDULED;
|
||||||
|
fiber->sched_id++;
|
||||||
JanetTask t = { fiber, value };
|
JanetTask t = { fiber, value };
|
||||||
janet_q_push(&janet_vm_spawn, &t, sizeof(t));
|
janet_q_push(&janet_vm_spawn, &t, sizeof(t));
|
||||||
}
|
}
|
||||||
|
@ -352,6 +358,7 @@ void janet_ev_init_common(void) {
|
||||||
janet_vm_tq = NULL;
|
janet_vm_tq = NULL;
|
||||||
janet_vm_tq_count = 0;
|
janet_vm_tq_count = 0;
|
||||||
janet_vm_tq_capacity = 0;
|
janet_vm_tq_capacity = 0;
|
||||||
|
janet_rng_seed(&janet_vm_ev_rng, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Common deinit code */
|
/* Common deinit code */
|
||||||
|
@ -375,6 +382,16 @@ void janet_addtimeout(double sec) {
|
||||||
|
|
||||||
/* Channels */
|
/* Channels */
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
JanetFiber *fiber;
|
||||||
|
uint32_t sched_id;
|
||||||
|
enum {
|
||||||
|
JANET_CP_MODE_NONE,
|
||||||
|
JANET_CP_MODE_ITEM,
|
||||||
|
JANET_CP_MODE_SELECT
|
||||||
|
} mode;
|
||||||
|
} JanetChannelPending;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
JanetQueue items;
|
JanetQueue items;
|
||||||
JanetQueue read_pending;
|
JanetQueue read_pending;
|
||||||
|
@ -421,15 +438,15 @@ static int janet_chanat_gc(void *p, size_t s) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void janet_chanat_mark_fq(JanetQueue *fq) {
|
static void janet_chanat_mark_fq(JanetQueue *fq) {
|
||||||
JanetFiber **fibers = fq->data;
|
JanetChannelPending *pending = fq->data;
|
||||||
if (fq->head <= fq->tail) {
|
if (fq->head <= fq->tail) {
|
||||||
for (int32_t i = fq->head; i < fq->tail; i++)
|
for (int32_t i = fq->head; i < fq->tail; i++)
|
||||||
janet_mark(janet_wrap_fiber(fibers[i]));
|
janet_mark(janet_wrap_fiber(pending[i].fiber));
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = fq->head; i < fq->capacity; i++)
|
for (int32_t i = fq->head; i < fq->capacity; i++)
|
||||||
janet_mark(janet_wrap_fiber(fibers[i]));
|
janet_mark(janet_wrap_fiber(pending[i].fiber));
|
||||||
for (int32_t i = 0; i < fq->tail; i++)
|
for (int32_t i = 0; i < fq->tail; i++)
|
||||||
janet_mark(janet_wrap_fiber(fibers[i]));
|
janet_mark(janet_wrap_fiber(pending[i].fiber));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -457,18 +474,30 @@ static int janet_chanat_mark(void *p, size_t s) {
|
||||||
static Janet cfun_channel_push(int32_t argc, Janet *argv) {
|
static Janet cfun_channel_push(int32_t argc, Janet *argv) {
|
||||||
janet_fixarity(argc, 2);
|
janet_fixarity(argc, 2);
|
||||||
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
|
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
|
||||||
JanetFiber *reader;
|
JanetChannelPending reader;
|
||||||
if (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) {
|
int is_empty;
|
||||||
/* Pending reader */
|
do {
|
||||||
janet_schedule(reader, argv[1]);
|
is_empty = janet_q_pop(&channel->read_pending, &reader, sizeof(reader));
|
||||||
} else {
|
} while (!is_empty && (reader.sched_id != reader.fiber->sched_id));
|
||||||
|
if (is_empty) {
|
||||||
/* No pending reader */
|
/* No pending reader */
|
||||||
if (janet_q_push(&channel->items, argv + 1, sizeof(Janet))) {
|
if (janet_q_push(&channel->items, argv + 1, sizeof(Janet))) {
|
||||||
janet_panicf("channel overflow: %v", argv[1]);
|
janet_panicf("channel overflow: %v", argv[1]);
|
||||||
} else if (janet_q_count(&channel->items) > channel->limit) {
|
} else if (janet_q_count(&channel->items) > channel->limit) {
|
||||||
/* Pushed successfully, but should block. */
|
/* Pushed successfully, but should block. */
|
||||||
janet_q_push(&channel->write_pending, &janet_vm_root_fiber, sizeof(JanetFiber *));
|
JanetChannelPending pending;
|
||||||
janet_await();
|
pending.fiber = janet_vm_root_fiber,
|
||||||
|
pending.sched_id = janet_vm_root_fiber->sched_id,
|
||||||
|
pending.mode = JANET_CP_MODE_ITEM;
|
||||||
|
janet_q_push(&channel->write_pending, &pending, sizeof(pending));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
/* Pending reader */
|
||||||
|
if (reader.mode == JANET_CP_MODE_SELECT) {
|
||||||
|
janet_q_push(&channel->items, argv + 1, sizeof(Janet));
|
||||||
|
janet_schedule(reader.fiber, argv[0]);
|
||||||
|
} else {
|
||||||
|
janet_schedule(reader.fiber, argv[1]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return argv[0];
|
return argv[0];
|
||||||
|
@ -478,17 +507,41 @@ static Janet cfun_channel_pop(int32_t argc, Janet *argv) {
|
||||||
janet_fixarity(argc, 1);
|
janet_fixarity(argc, 1);
|
||||||
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
|
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
|
||||||
Janet item = janet_wrap_nil();
|
Janet item = janet_wrap_nil();
|
||||||
JanetFiber *writer;
|
JanetChannelPending writer;
|
||||||
if (janet_q_pop(&channel->items, &item, sizeof(item))) {
|
if (janet_q_pop(&channel->items, &item, sizeof(item))) {
|
||||||
/* Queue empty */
|
/* Queue empty */
|
||||||
janet_q_push(&channel->read_pending, &janet_vm_root_fiber, sizeof(JanetFiber *));
|
JanetChannelPending pending;
|
||||||
|
pending.fiber = janet_vm_root_fiber,
|
||||||
|
pending.sched_id = janet_vm_root_fiber->sched_id;
|
||||||
|
pending.mode = JANET_CP_MODE_ITEM;
|
||||||
|
janet_q_push(&channel->read_pending, &pending, sizeof(pending));
|
||||||
janet_await();
|
janet_await();
|
||||||
} else if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
|
}
|
||||||
|
janet_schedule(janet_vm_root_fiber, item);
|
||||||
|
if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
|
||||||
/* Got item, and there are pending writers. This means we should
|
/* Got item, and there are pending writers. This means we should
|
||||||
* schedule one. */
|
* schedule one. */
|
||||||
janet_schedule(writer, argv[0]);
|
janet_schedule(writer.fiber, argv[0]);
|
||||||
}
|
}
|
||||||
return item;
|
janet_await();
|
||||||
|
}
|
||||||
|
|
||||||
|
static Janet cfun_channel_select(int32_t argc, Janet *argv) {
|
||||||
|
janet_arity(argc, 1, -1);
|
||||||
|
for (int32_t i = 0; i < argc; i++) {
|
||||||
|
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
|
||||||
|
if (chan->items.head != chan->items.tail) return argv[i];
|
||||||
|
}
|
||||||
|
/* None of the channels have data, so we wait on all of them. */
|
||||||
|
for (int32_t i = 0; i < argc; i++) {
|
||||||
|
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
|
||||||
|
JanetChannelPending pending;
|
||||||
|
pending.fiber = janet_vm_root_fiber,
|
||||||
|
pending.sched_id = janet_vm_root_fiber->sched_id;
|
||||||
|
pending.mode = JANET_CP_MODE_SELECT;
|
||||||
|
janet_q_push(&chan->read_pending, &pending, sizeof(pending));
|
||||||
|
}
|
||||||
|
janet_await();
|
||||||
}
|
}
|
||||||
|
|
||||||
static Janet cfun_channel_full(int32_t argc, Janet *argv) {
|
static Janet cfun_channel_full(int32_t argc, Janet *argv) {
|
||||||
|
@ -509,6 +562,17 @@ static Janet cfun_channel_count(int32_t argc, Janet *argv) {
|
||||||
return janet_wrap_integer(janet_q_count(&channel->items));
|
return janet_wrap_integer(janet_q_count(&channel->items));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Janet cfun_channel_rselect(int32_t argc, Janet *argv) {
|
||||||
|
/* Fisher yates shuffle of arguments to get fairness */
|
||||||
|
for (int32_t i = argc; i > 1; i--) {
|
||||||
|
int32_t swap_index = janet_rng_u32(&janet_vm_ev_rng) % i;
|
||||||
|
Janet temp = argv[swap_index];
|
||||||
|
argv[swap_index] = argv[i - 1];
|
||||||
|
argv[i - 1] = temp;
|
||||||
|
}
|
||||||
|
return cfun_channel_select(argc, argv);
|
||||||
|
}
|
||||||
|
|
||||||
static Janet cfun_channel_new(int32_t argc, Janet *argv) {
|
static Janet cfun_channel_new(int32_t argc, Janet *argv) {
|
||||||
janet_arity(argc, 0, 1);
|
janet_arity(argc, 0, 1);
|
||||||
int32_t limit = janet_optnat(argv, argc, 0, 0);
|
int32_t limit = janet_optnat(argv, argc, 0, 0);
|
||||||
|
@ -773,6 +837,20 @@ static const JanetReg ev_cfuns[] = {
|
||||||
JDOC("(ev/count channel)\n\n"
|
JDOC("(ev/count channel)\n\n"
|
||||||
"Get the number of items currently waiting in a channel.")
|
"Get the number of items currently waiting in a channel.")
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ev/select", cfun_channel_select,
|
||||||
|
JDOC("(ev/select & channels)\n\n"
|
||||||
|
"Get a channel that is not empty, suspending the current fiber until at least one channel "
|
||||||
|
"is not empty. Will prefer channels in the order they are passed as arguments (ordered choice). "
|
||||||
|
"Returns a non-empty channel.")
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ev/rselect", cfun_channel_rselect,
|
||||||
|
JDOC("(ev/rselect & channels)\n\n"
|
||||||
|
"Get a channel that is not empty, suspending the current fiber until at least one channel "
|
||||||
|
"is not empty. Will prefer channels in a random order (random choice). "
|
||||||
|
"Returns a non-empty channel.")
|
||||||
|
},
|
||||||
{NULL, NULL, NULL}
|
{NULL, NULL, NULL}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ static void fiber_reset(JanetFiber *fiber) {
|
||||||
#ifdef JANET_EV
|
#ifdef JANET_EV
|
||||||
fiber->waiting = NULL;
|
fiber->waiting = NULL;
|
||||||
fiber->timeout_index = -1;
|
fiber->timeout_index = -1;
|
||||||
|
fiber->sched_id = 0;
|
||||||
#endif
|
#endif
|
||||||
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
|
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
|
||||||
}
|
}
|
||||||
|
|
|
@ -806,6 +806,7 @@ struct JanetFiber {
|
||||||
#ifdef JANET_EV
|
#ifdef JANET_EV
|
||||||
JanetListenerState *waiting;
|
JanetListenerState *waiting;
|
||||||
int32_t timeout_index;
|
int32_t timeout_index;
|
||||||
|
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue