mirror of
https://github.com/janet-lang/janet
synced 2024-12-26 16:30:26 +00:00
Overhaul of poll loop, redo ev/select.
This commit is contained in:
parent
ea45165db8
commit
cb4903fa86
@ -10,8 +10,8 @@
|
|||||||
|
|
||||||
(defn reader [name]
|
(defn reader [name]
|
||||||
(forever
|
(forever
|
||||||
(def c (ev/select ;channels))
|
(def [_ c x] (ev/select ;channels))
|
||||||
(print "reader " name " got " (ev/take c) " from " c)))
|
(print "reader " name " got " x " from " c)))
|
||||||
|
|
||||||
# Readers
|
# Readers
|
||||||
(each letter [:a :b :c :d :e :f :g]
|
(each letter [:a :b :c :d :e :f :g]
|
||||||
|
264
src/core/ev.c
264
src/core/ev.c
@ -129,6 +129,7 @@ typedef struct JanetTimeout JanetTimeout;
|
|||||||
struct JanetTimeout {
|
struct JanetTimeout {
|
||||||
JanetTimestamp when;
|
JanetTimestamp when;
|
||||||
JanetFiber *fiber;
|
JanetFiber *fiber;
|
||||||
|
uint32_t sched_id;
|
||||||
int is_error;
|
int is_error;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -163,9 +164,7 @@ static int peek_timeout(JanetTimeout *out) {
|
|||||||
/* Remove the next timeout from the priority queue */
|
/* Remove the next timeout from the priority queue */
|
||||||
static void pop_timeout(size_t index) {
|
static void pop_timeout(size_t index) {
|
||||||
if (janet_vm_tq_count <= index) return;
|
if (janet_vm_tq_count <= index) return;
|
||||||
janet_vm_tq[index].fiber->timeout_index = -1;
|
|
||||||
janet_vm_tq[index] = janet_vm_tq[--janet_vm_tq_count];
|
janet_vm_tq[index] = janet_vm_tq[--janet_vm_tq_count];
|
||||||
janet_vm_tq[index].fiber->timeout_index = index;
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
size_t left = (index << 1) + 1;
|
size_t left = (index << 1) + 1;
|
||||||
size_t right = left + 1;
|
size_t right = left + 1;
|
||||||
@ -180,8 +179,6 @@ static void pop_timeout(size_t index) {
|
|||||||
JanetTimeout temp = janet_vm_tq[index];
|
JanetTimeout temp = janet_vm_tq[index];
|
||||||
janet_vm_tq[index] = janet_vm_tq[smallest];
|
janet_vm_tq[index] = janet_vm_tq[smallest];
|
||||||
janet_vm_tq[smallest] = temp;
|
janet_vm_tq[smallest] = temp;
|
||||||
janet_vm_tq[index].fiber->timeout_index = index;
|
|
||||||
janet_vm_tq[smallest].fiber->timeout_index = smallest;
|
|
||||||
index = smallest;
|
index = smallest;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -204,10 +201,6 @@ static void add_timeout(JanetTimeout to) {
|
|||||||
janet_vm_tq[oldcount] = to;
|
janet_vm_tq[oldcount] = to;
|
||||||
/* Heapify */
|
/* Heapify */
|
||||||
size_t index = oldcount;
|
size_t index = oldcount;
|
||||||
if (to.fiber->timeout_index >= 0) {
|
|
||||||
pop_timeout(to.fiber->timeout_index);
|
|
||||||
}
|
|
||||||
to.fiber->timeout_index = index;
|
|
||||||
while (index > 0) {
|
while (index > 0) {
|
||||||
size_t parent = (index - 1) >> 1;
|
size_t parent = (index - 1) >> 1;
|
||||||
if (janet_vm_tq[parent].when <= janet_vm_tq[index].when) break;
|
if (janet_vm_tq[parent].when <= janet_vm_tq[index].when) break;
|
||||||
@ -330,10 +323,6 @@ void janet_schedule(JanetFiber *fiber, Janet value) {
|
|||||||
void janet_fiber_did_resume(JanetFiber *fiber) {
|
void janet_fiber_did_resume(JanetFiber *fiber) {
|
||||||
/* Cancel any pending fibers */
|
/* Cancel any pending fibers */
|
||||||
if (fiber->waiting) janet_unlisten(fiber->waiting);
|
if (fiber->waiting) janet_unlisten(fiber->waiting);
|
||||||
if (fiber->timeout_index >= 0) {
|
|
||||||
pop_timeout(fiber->timeout_index);
|
|
||||||
fiber->timeout_index = -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Mark all pending tasks */
|
/* Mark all pending tasks */
|
||||||
@ -395,6 +384,7 @@ void janet_addtimeout(double sec) {
|
|||||||
JanetTimeout to;
|
JanetTimeout to;
|
||||||
to.when = ts_delta(ts_now(), sec);
|
to.when = ts_delta(ts_now(), sec);
|
||||||
to.fiber = fiber;
|
to.fiber = fiber;
|
||||||
|
to.sched_id = fiber->sched_id;
|
||||||
to.is_error = 1;
|
to.is_error = 1;
|
||||||
add_timeout(to);
|
add_timeout(to);
|
||||||
}
|
}
|
||||||
@ -405,9 +395,9 @@ typedef struct {
|
|||||||
JanetFiber *fiber;
|
JanetFiber *fiber;
|
||||||
uint32_t sched_id;
|
uint32_t sched_id;
|
||||||
enum {
|
enum {
|
||||||
JANET_CP_MODE_NONE,
|
|
||||||
JANET_CP_MODE_ITEM,
|
JANET_CP_MODE_ITEM,
|
||||||
JANET_CP_MODE_SELECT
|
JANET_CP_MODE_CHOICE_READ,
|
||||||
|
JANET_CP_MODE_CHOICE_WRITE
|
||||||
} mode;
|
} mode;
|
||||||
} JanetChannelPending;
|
} JanetChannelPending;
|
||||||
|
|
||||||
@ -488,11 +478,24 @@ static int janet_chanat_mark(void *p, size_t s) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Channel Methods */
|
static Janet make_write_result(JanetChannel *channel) {
|
||||||
|
Janet *tup = janet_tuple_begin(2);
|
||||||
|
tup[0] = janet_ckeywordv("give");
|
||||||
|
tup[1] = janet_wrap_abstract(channel);
|
||||||
|
return janet_wrap_tuple(janet_tuple_end(tup));
|
||||||
|
}
|
||||||
|
|
||||||
static Janet cfun_channel_push(int32_t argc, Janet *argv) {
|
static Janet make_read_result(JanetChannel *channel, Janet x) {
|
||||||
janet_fixarity(argc, 2);
|
Janet *tup = janet_tuple_begin(3);
|
||||||
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
|
tup[0] = janet_ckeywordv("take");
|
||||||
|
tup[1] = janet_wrap_abstract(channel);
|
||||||
|
tup[2] = x;
|
||||||
|
return janet_wrap_tuple(janet_tuple_end(tup));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Push a value to a channel, and return 1 if channel should block, zero otherwise.
|
||||||
|
* If the push would block, will add to the write_pending queue in the channel. */
|
||||||
|
static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice) {
|
||||||
JanetChannelPending reader;
|
JanetChannelPending reader;
|
||||||
int is_empty;
|
int is_empty;
|
||||||
do {
|
do {
|
||||||
@ -500,66 +503,113 @@ static Janet cfun_channel_push(int32_t argc, Janet *argv) {
|
|||||||
} while (!is_empty && (reader.sched_id != reader.fiber->sched_id));
|
} while (!is_empty && (reader.sched_id != reader.fiber->sched_id));
|
||||||
if (is_empty) {
|
if (is_empty) {
|
||||||
/* No pending reader */
|
/* No pending reader */
|
||||||
if (janet_q_push(&channel->items, argv + 1, sizeof(Janet))) {
|
if (janet_q_push(&channel->items, &x, sizeof(Janet))) {
|
||||||
janet_panicf("channel overflow: %v", argv[1]);
|
janet_panicf("channel overflow: %v", x);
|
||||||
} else if (janet_q_count(&channel->items) > channel->limit) {
|
} else if (janet_q_count(&channel->items) > channel->limit) {
|
||||||
/* Pushed successfully, but should block. */
|
/* Pushed successfully, but should block. */
|
||||||
JanetChannelPending pending;
|
JanetChannelPending pending;
|
||||||
pending.fiber = janet_vm_root_fiber,
|
pending.fiber = janet_vm_root_fiber,
|
||||||
pending.sched_id = janet_vm_root_fiber->sched_id,
|
pending.sched_id = janet_vm_root_fiber->sched_id,
|
||||||
pending.mode = JANET_CP_MODE_ITEM;
|
pending.mode = is_choice ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_ITEM;
|
||||||
janet_q_push(&channel->write_pending, &pending, sizeof(pending));
|
janet_q_push(&channel->write_pending, &pending, sizeof(pending));
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* Pending reader */
|
/* Pending reader */
|
||||||
if (reader.mode == JANET_CP_MODE_SELECT) {
|
if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
|
||||||
janet_q_push(&channel->items, argv + 1, sizeof(Janet));
|
janet_schedule(reader.fiber, make_read_result(channel, x));
|
||||||
janet_schedule(reader.fiber, argv[0]);
|
|
||||||
} else {
|
} else {
|
||||||
janet_schedule(reader.fiber, argv[1]);
|
janet_schedule(reader.fiber, x);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Pop from a channel - returns 1 if item was obtain, 0 otherwise. The item
|
||||||
|
* is returned by reference. If the pop would block, will add to the read_pending
|
||||||
|
* queue in the channel. */
|
||||||
|
static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) {
|
||||||
|
JanetChannelPending writer;
|
||||||
|
if (janet_q_pop(&channel->items, item, sizeof(Janet))) {
|
||||||
|
/* Queue empty */
|
||||||
|
JanetChannelPending pending;
|
||||||
|
pending.fiber = janet_vm_root_fiber,
|
||||||
|
pending.sched_id = janet_vm_root_fiber->sched_id;
|
||||||
|
pending.mode = is_choice ? JANET_CP_MODE_CHOICE_READ : JANET_CP_MODE_ITEM;
|
||||||
|
janet_q_push(&channel->read_pending, &pending, sizeof(pending));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
|
||||||
|
/* pending writer */
|
||||||
|
if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) {
|
||||||
|
janet_schedule(writer.fiber, make_write_result(channel));
|
||||||
|
} else {
|
||||||
|
janet_schedule(writer.fiber, janet_wrap_abstract(channel));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Channel Methods */
|
||||||
|
|
||||||
|
static Janet cfun_channel_push(int32_t argc, Janet *argv) {
|
||||||
|
janet_fixarity(argc, 2);
|
||||||
|
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
|
||||||
|
if (janet_channel_push(channel, argv[1], 0)) {
|
||||||
|
janet_await();
|
||||||
|
}
|
||||||
return argv[0];
|
return argv[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
static Janet cfun_channel_pop(int32_t argc, Janet *argv) {
|
static Janet cfun_channel_pop(int32_t argc, Janet *argv) {
|
||||||
janet_fixarity(argc, 1);
|
janet_fixarity(argc, 1);
|
||||||
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
|
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
|
||||||
Janet item = janet_wrap_nil();
|
Janet item;
|
||||||
JanetChannelPending writer;
|
if (janet_channel_pop(channel, &item, 0)) {
|
||||||
if (janet_q_pop(&channel->items, &item, sizeof(item))) {
|
|
||||||
/* Queue empty */
|
|
||||||
JanetChannelPending pending;
|
|
||||||
pending.fiber = janet_vm_root_fiber,
|
|
||||||
pending.sched_id = janet_vm_root_fiber->sched_id;
|
|
||||||
pending.mode = JANET_CP_MODE_ITEM;
|
|
||||||
janet_q_push(&channel->read_pending, &pending, sizeof(pending));
|
|
||||||
janet_await();
|
|
||||||
}
|
|
||||||
janet_schedule(janet_vm_root_fiber, item);
|
janet_schedule(janet_vm_root_fiber, item);
|
||||||
if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
|
|
||||||
/* Got item, and there are pending writers. This means we should
|
|
||||||
* schedule one. */
|
|
||||||
janet_schedule(writer.fiber, argv[0]);
|
|
||||||
}
|
}
|
||||||
janet_await();
|
janet_await();
|
||||||
}
|
}
|
||||||
|
|
||||||
static Janet cfun_channel_select(int32_t argc, Janet *argv) {
|
static Janet cfun_channel_choice(int32_t argc, Janet *argv) {
|
||||||
janet_arity(argc, 1, -1);
|
janet_arity(argc, 1, -1);
|
||||||
|
int32_t len;
|
||||||
|
const Janet *data;
|
||||||
|
|
||||||
|
/* Check channels for immediate reads and writes */
|
||||||
for (int32_t i = 0; i < argc; i++) {
|
for (int32_t i = 0; i < argc; i++) {
|
||||||
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
|
if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
|
||||||
if (chan->items.head != chan->items.tail) return argv[i];
|
/* Write */
|
||||||
|
JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT);
|
||||||
|
if (janet_q_count(&chan->items) < chan->limit) {
|
||||||
|
janet_channel_push(chan, data[1], 1);
|
||||||
|
return make_write_result(chan);
|
||||||
}
|
}
|
||||||
/* None of the channels have data, so we wait on all of them. */
|
} else {
|
||||||
|
/* Read */
|
||||||
|
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
|
||||||
|
if (chan->items.head != chan->items.tail) {
|
||||||
|
Janet item;
|
||||||
|
janet_channel_pop(chan, &item, 1);
|
||||||
|
return make_read_result(chan, item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Wait for all readers or writers */
|
||||||
for (int32_t i = 0; i < argc; i++) {
|
for (int32_t i = 0; i < argc; i++) {
|
||||||
|
if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
|
||||||
|
/* Write */
|
||||||
|
JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT);
|
||||||
|
janet_channel_push(chan, data[1], 1);
|
||||||
|
} else {
|
||||||
|
/* Read */
|
||||||
|
Janet item;
|
||||||
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
|
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
|
||||||
JanetChannelPending pending;
|
janet_channel_pop(chan, &item, 1);
|
||||||
pending.fiber = janet_vm_root_fiber,
|
|
||||||
pending.sched_id = janet_vm_root_fiber->sched_id;
|
|
||||||
pending.mode = JANET_CP_MODE_SELECT;
|
|
||||||
janet_q_push(&chan->read_pending, &pending, sizeof(pending));
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
janet_await();
|
janet_await();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -581,15 +631,19 @@ static Janet cfun_channel_count(int32_t argc, Janet *argv) {
|
|||||||
return janet_wrap_integer(janet_q_count(&channel->items));
|
return janet_wrap_integer(janet_q_count(&channel->items));
|
||||||
}
|
}
|
||||||
|
|
||||||
static Janet cfun_channel_rselect(int32_t argc, Janet *argv) {
|
|
||||||
/* Fisher yates shuffle of arguments to get fairness */
|
/* Fisher yates shuffle of arguments to get fairness */
|
||||||
|
static void fisher_yates_args(int32_t argc, Janet *argv) {
|
||||||
for (int32_t i = argc; i > 1; i--) {
|
for (int32_t i = argc; i > 1; i--) {
|
||||||
int32_t swap_index = janet_rng_u32(&janet_vm_ev_rng) % i;
|
int32_t swap_index = janet_rng_u32(&janet_vm_ev_rng) % i;
|
||||||
Janet temp = argv[swap_index];
|
Janet temp = argv[swap_index];
|
||||||
argv[swap_index] = argv[i - 1];
|
argv[swap_index] = argv[i - 1];
|
||||||
argv[i - 1] = temp;
|
argv[i - 1] = temp;
|
||||||
}
|
}
|
||||||
return cfun_channel_select(argc, argv);
|
}
|
||||||
|
|
||||||
|
static Janet cfun_channel_rchoice(int32_t argc, Janet *argv) {
|
||||||
|
fisher_yates_args(argc, argv);
|
||||||
|
return cfun_channel_choice(argc, argv);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Janet cfun_channel_new(int32_t argc, Janet *argv) {
|
static Janet cfun_channel_new(int32_t argc, Janet *argv) {
|
||||||
@ -602,16 +656,22 @@ static Janet cfun_channel_new(int32_t argc, Janet *argv) {
|
|||||||
|
|
||||||
/* Main event loop */
|
/* Main event loop */
|
||||||
|
|
||||||
void janet_loop1_impl(void);
|
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout);
|
||||||
|
|
||||||
void janet_loop(void) {
|
void janet_loop1(void) {
|
||||||
while (janet_vm_active_listeners || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) {
|
/* Schedule expired timers */
|
||||||
/* Run expired timers */
|
|
||||||
JanetTimeout to;
|
JanetTimeout to;
|
||||||
while (peek_timeout(&to) && to.when <= ts_now()) {
|
JanetTimestamp now = ts_now();
|
||||||
|
while (peek_timeout(&to) && to.when <= now) {
|
||||||
pop_timeout(0);
|
pop_timeout(0);
|
||||||
|
if (to.fiber->sched_id == to.sched_id) {
|
||||||
|
if (to.is_error) {
|
||||||
|
janet_cancel(to.fiber, janet_cstringv("timeout"));
|
||||||
|
} else {
|
||||||
janet_schedule(to.fiber, janet_wrap_nil());
|
janet_schedule(to.fiber, janet_wrap_nil());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
/* Run scheduled fibers */
|
/* Run scheduled fibers */
|
||||||
while (janet_vm_spawn.head != janet_vm_spawn.tail) {
|
while (janet_vm_spawn.head != janet_vm_spawn.tail) {
|
||||||
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK};
|
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK};
|
||||||
@ -620,11 +680,24 @@ void janet_loop(void) {
|
|||||||
}
|
}
|
||||||
/* Poll for events */
|
/* Poll for events */
|
||||||
if (janet_vm_active_listeners || janet_vm_tq_count) {
|
if (janet_vm_active_listeners || janet_vm_tq_count) {
|
||||||
janet_loop1_impl();
|
JanetTimeout to;
|
||||||
|
memset(&to, 0, sizeof(to));
|
||||||
|
int has_timeout;
|
||||||
|
/* Drop timeouts that are no longer needed */
|
||||||
|
while ((has_timeout = peek_timeout(&to)) && to.fiber->sched_id != to.sched_id) {
|
||||||
|
pop_timeout(0);
|
||||||
}
|
}
|
||||||
|
/* Run polling implementation */
|
||||||
|
janet_loop1_impl(has_timeout, to.when);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void janet_loop(void) {
|
||||||
|
while (janet_vm_active_listeners || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) {
|
||||||
|
janet_loop1();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
;
|
||||||
#ifdef JANET_EV_EPOLL
|
#ifdef JANET_EV_EPOLL
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -692,17 +765,12 @@ static void janet_unlisten(JanetListenerState *state) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#define JANET_EPOLL_MAX_EVENTS 64
|
#define JANET_EPOLL_MAX_EVENTS 64
|
||||||
void janet_loop1_impl(void) {
|
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
|
||||||
/* Set timer */
|
|
||||||
JanetTimeout to;
|
|
||||||
struct itimerspec its;
|
|
||||||
memset(&to, 0, sizeof(to));
|
|
||||||
int has_timeout = peek_timeout(&to);
|
|
||||||
if (janet_vm_timer_enabled || has_timeout) {
|
if (janet_vm_timer_enabled || has_timeout) {
|
||||||
memset(&its, 0, sizeof(its));
|
memset(&its, 0, sizeof(its));
|
||||||
if (has_timeout) {
|
if (has_timeout) {
|
||||||
its.it_value.tv_sec = to.when / 1000;
|
its.it_value.tv_sec = timeout / 1000;
|
||||||
its.it_value.tv_nsec = (to.when % 1000) * 1000000;
|
its.it_value.tv_nsec = (timeout % 1000) * 1000000;
|
||||||
}
|
}
|
||||||
timerfd_settime(janet_vm_timerfd, TFD_TIMER_ABSTIME, &its, NULL);
|
timerfd_settime(janet_vm_timerfd, TFD_TIMER_ABSTIME, &its, NULL);
|
||||||
}
|
}
|
||||||
@ -721,17 +789,7 @@ void janet_loop1_impl(void) {
|
|||||||
/* Step state machines */
|
/* Step state machines */
|
||||||
for (int i = 0; i < ready; i++) {
|
for (int i = 0; i < ready; i++) {
|
||||||
JanetPollable *pollable = events[i].data.ptr;
|
JanetPollable *pollable = events[i].data.ptr;
|
||||||
if (NULL == pollable) {
|
if (NULL != pollable) { /* If NULL, is a timeout */
|
||||||
/* Timer event */
|
|
||||||
pop_timeout(0);
|
|
||||||
/* Cancel waiters for this fiber */
|
|
||||||
if (to.is_error) {
|
|
||||||
janet_cancel(to.fiber, janet_cstringv("timeout"));
|
|
||||||
} else {
|
|
||||||
janet_schedule(to.fiber, janet_wrap_nil());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
/* Normal event */
|
|
||||||
int mask = events[i].events;
|
int mask = events[i].events;
|
||||||
JanetListenerState *state = pollable->state;
|
JanetListenerState *state = pollable->state;
|
||||||
while (NULL != state) {
|
while (NULL != state) {
|
||||||
@ -845,18 +903,13 @@ static void janet_unlisten(JanetListenerState *state) {
|
|||||||
janet_unlisten_impl(state);
|
janet_unlisten_impl(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
void janet_loop1_impl(void) {
|
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
|
||||||
/* Set timer */
|
|
||||||
JanetTimeout to;
|
|
||||||
memset(&to, 0, sizeof(to));
|
|
||||||
int has_timeout = peek_timeout(&to);
|
|
||||||
|
|
||||||
/* Poll for events */
|
/* Poll for events */
|
||||||
int ready;
|
int ready;
|
||||||
do {
|
do {
|
||||||
if (has_timeout) {
|
if (has_timeout) {
|
||||||
int64_t diff = to.when - ts_now();
|
JanetTimestamp now = ts_now();
|
||||||
ready = poll(janet_vm_fds, janet_vm_fdcount, diff < 0 ? 0 : (int) diff);
|
ready = poll(janet_vm_fds, janet_vm_fdcount, now > timeout ? 0 : (int) (timeout - now));
|
||||||
} else {
|
} else {
|
||||||
ready = poll(janet_vm_fds, janet_vm_fdcount, -1);
|
ready = poll(janet_vm_fds, janet_vm_fdcount, -1);
|
||||||
}
|
}
|
||||||
@ -866,10 +919,8 @@ void janet_loop1_impl(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Step state machines */
|
/* Step state machines */
|
||||||
int did_handle_something = 0;
|
|
||||||
for (size_t i = 0; i < janet_vm_fdcount; i++) {
|
for (size_t i = 0; i < janet_vm_fdcount; i++) {
|
||||||
struct pollfd *pfd = janet_vm_fds + i;
|
struct pollfd *pfd = janet_vm_fds + i;
|
||||||
did_handle_something |= pfd->revents;
|
|
||||||
/* Skip fds where nothing interesting happened */
|
/* Skip fds where nothing interesting happened */
|
||||||
if (!(pfd->revents & (pfd->events | POLLHUP | POLLERR | POLLNVAL))) continue;
|
if (!(pfd->revents & (pfd->events | POLLHUP | POLLERR | POLLNVAL))) continue;
|
||||||
JanetListenerState *state = janet_vm_listener_map[i];
|
JanetListenerState *state = janet_vm_listener_map[i];
|
||||||
@ -884,19 +935,6 @@ void janet_loop1_impl(void) {
|
|||||||
if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE)
|
if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE)
|
||||||
janet_unlisten(state);
|
janet_unlisten(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If nothing was handled and poll returned, then we know that it timedout and we should trigger
|
|
||||||
* one of our timers. */
|
|
||||||
if (!did_handle_something) {
|
|
||||||
/* Timer event */
|
|
||||||
pop_timeout(0);
|
|
||||||
/* Cancel waiters for this fiber */
|
|
||||||
if (to.is_error) {
|
|
||||||
janet_cancel(to.fiber, janet_cstringv("timeout"));
|
|
||||||
} else {
|
|
||||||
janet_schedule(to.fiber, janet_wrap_nil());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void janet_ev_init(void) {
|
void janet_ev_init(void) {
|
||||||
@ -945,6 +983,7 @@ static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
|
|||||||
to.when = ts_delta(ts_now(), sec);
|
to.when = ts_delta(ts_now(), sec);
|
||||||
to.fiber = janet_vm_root_fiber;
|
to.fiber = janet_vm_root_fiber;
|
||||||
to.is_error = 0;
|
to.is_error = 0;
|
||||||
|
to.sched_id = to.fiber->sched_id;
|
||||||
add_timeout(to);
|
add_timeout(to);
|
||||||
janet_await();
|
janet_await();
|
||||||
}
|
}
|
||||||
@ -1006,25 +1045,24 @@ static const JanetReg ev_cfuns[] = {
|
|||||||
JDOC("(ev/count channel)\n\n"
|
JDOC("(ev/count channel)\n\n"
|
||||||
"Get the number of items currently waiting in a channel.")
|
"Get the number of items currently waiting in a channel.")
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"ev/select", cfun_channel_select,
|
|
||||||
JDOC("(ev/select & channels)\n\n"
|
|
||||||
"Get a channel that is not empty, suspending the current fiber until at least one channel "
|
|
||||||
"is not empty. Will prefer channels in the order they are passed as arguments (ordered choice). "
|
|
||||||
"Returns a non-empty channel.")
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"ev/rselect", cfun_channel_rselect,
|
|
||||||
JDOC("(ev/rselect & channels)\n\n"
|
|
||||||
"Get a channel that is not empty, suspending the current fiber until at least one channel "
|
|
||||||
"is not empty. Will prefer channels in a random order (random choice). "
|
|
||||||
"Returns a non-empty channel.")
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"ev/cancel", cfun_ev_cancel,
|
"ev/cancel", cfun_ev_cancel,
|
||||||
JDOC("(ev/cancel fiber err)\n\n"
|
JDOC("(ev/cancel fiber err)\n\n"
|
||||||
"Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately")
|
"Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately")
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"ev/select", cfun_channel_choice,
|
||||||
|
JDOC("(ev/select & clauses)\n\n"
|
||||||
|
"Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where "
|
||||||
|
"a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for "
|
||||||
|
"a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first "
|
||||||
|
"clauses will take precedence over later clauses.")
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ev/rselect", cfun_channel_rchoice,
|
||||||
|
JDOC("(ev/rselect & clauses)\n\n"
|
||||||
|
"Similar to ev/choice, but will try clauses in a random order for fairness.")
|
||||||
|
},
|
||||||
{NULL, NULL, NULL}
|
{NULL, NULL, NULL}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -39,7 +39,6 @@ static void fiber_reset(JanetFiber *fiber) {
|
|||||||
fiber->env = NULL;
|
fiber->env = NULL;
|
||||||
#ifdef JANET_EV
|
#ifdef JANET_EV
|
||||||
fiber->waiting = NULL;
|
fiber->waiting = NULL;
|
||||||
fiber->timeout_index = -1;
|
|
||||||
fiber->sched_id = 0;
|
fiber->sched_id = 0;
|
||||||
#endif
|
#endif
|
||||||
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
|
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
|
||||||
|
@ -934,8 +934,10 @@ static const uint8_t *unmarshal_one_fiber(
|
|||||||
fiber->data = NULL;
|
fiber->data = NULL;
|
||||||
fiber->child = NULL;
|
fiber->child = NULL;
|
||||||
fiber->env = NULL;
|
fiber->env = NULL;
|
||||||
|
#ifdef JANET_EV
|
||||||
fiber->waiting = NULL;
|
fiber->waiting = NULL;
|
||||||
fiber->timeout_index = -1;
|
fiber->sched_id = 0;
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Push fiber to seen stack */
|
/* Push fiber to seen stack */
|
||||||
janet_v_push(st->lookup, janet_wrap_fiber(fiber));
|
janet_v_push(st->lookup, janet_wrap_fiber(fiber));
|
||||||
|
@ -826,7 +826,6 @@ struct JanetFiber {
|
|||||||
JanetFiber *child; /* Keep linked list of fibers for restarting pending fibers */
|
JanetFiber *child; /* Keep linked list of fibers for restarting pending fibers */
|
||||||
#ifdef JANET_EV
|
#ifdef JANET_EV
|
||||||
JanetListenerState *waiting;
|
JanetListenerState *waiting;
|
||||||
int32_t timeout_index;
|
|
||||||
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
|
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user