1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-24 17:27:18 +00:00

Add ev/deadline and ev/with-deadline.

This should be more useful than timeouts in real-world
use cases. The deadline system is based on fibers and is target
to much more coarse-grained (and therfor reliable) timeouts than things
like ev/sleep and timeout arguments.
This commit is contained in:
Calvin Rose 2020-12-05 10:32:34 -06:00
parent c4a4916055
commit 9d23192614
2 changed files with 61 additions and 7 deletions

View File

@ -3019,7 +3019,16 @@
(defmacro ev/spawn (defmacro ev/spawn
"Run some code in a new fiber. This is shorthand for (ev/call (fn [] ;body))." "Run some code in a new fiber. This is shorthand for (ev/call (fn [] ;body))."
[& body] [& body]
~(,ev/call (fn [] ,;body)))) ~(,ev/call (fn [] ,;body)))
(defmacro ev/with-deadline
`Run a body of code with a deadline, such that if the code does not complete before
the deadline is up, it will be canceled.`
[deadline & body]
(with-syms [f]
~(let [,f (coro ,;body)]
(,ev/deadline ,deadline nil ,f)
(,resume ,f)))))
(compwhen (dyn 'net/listen) (compwhen (dyn 'net/listen)
(defn net/server (defn net/server

View File

@ -131,6 +131,7 @@ typedef struct JanetTimeout JanetTimeout;
struct JanetTimeout { struct JanetTimeout {
JanetTimestamp when; JanetTimestamp when;
JanetFiber *fiber; JanetFiber *fiber;
JanetFiber *curr_fiber;
uint32_t sched_id; uint32_t sched_id;
int is_error; int is_error;
}; };
@ -433,6 +434,9 @@ void janet_ev_mark(void) {
/* Pending timeouts */ /* Pending timeouts */
for (size_t i = 0; i < janet_vm_tq_count; i++) { for (size_t i = 0; i < janet_vm_tq_count; i++) {
janet_mark(janet_wrap_fiber(janet_vm_tq[i].fiber)); janet_mark(janet_wrap_fiber(janet_vm_tq[i].fiber));
if (janet_vm_tq[i].curr_fiber != NULL) {
janet_mark(janet_wrap_fiber(janet_vm_tq[i].curr_fiber));
}
} }
/* Pending listeners */ /* Pending listeners */
@ -486,6 +490,7 @@ void janet_addtimeout(double sec) {
JanetTimeout to; JanetTimeout to;
to.when = ts_delta(ts_now(), sec); to.when = ts_delta(ts_now(), sec);
to.fiber = fiber; to.fiber = fiber;
to.curr_fiber = NULL;
to.sched_id = fiber->sched_id; to.sched_id = fiber->sched_id;
to.is_error = 1; to.is_error = 1;
add_timeout(to); add_timeout(to);
@ -766,6 +771,21 @@ void janet_loop1(void) {
JanetTimestamp now = ts_now(); JanetTimestamp now = ts_now();
while (peek_timeout(&to) && to.when <= now) { while (peek_timeout(&to) && to.when <= now) {
pop_timeout(0); pop_timeout(0);
if (to.curr_fiber != NULL) {
/* This is a deadline (for a fiber, not a function call) */
JanetFiberStatus s = janet_fiber_status(to.curr_fiber);
int isFinished = s == (JANET_STATUS_DEAD ||
s == JANET_STATUS_ERROR ||
s == JANET_STATUS_USER0 ||
s == JANET_STATUS_USER1 ||
s == JANET_STATUS_USER2 ||
s == JANET_STATUS_USER3 ||
s == JANET_STATUS_USER4);
if (!isFinished) {
janet_cancel(to.fiber, janet_cstringv("deadline expired"));
}
} else {
/* This is a timeout (for a function call, not a whole fiber) */
if (to.fiber->sched_id == to.sched_id) { if (to.fiber->sched_id == to.sched_id) {
if (to.is_error) { if (to.is_error) {
janet_cancel(to.fiber, janet_cstringv("timeout")); janet_cancel(to.fiber, janet_cstringv("timeout"));
@ -774,6 +794,7 @@ void janet_loop1(void) {
} }
} }
} }
}
/* Run scheduled fibers */ /* Run scheduled fibers */
while (janet_vm_spawn.head != janet_vm_spawn.tail) { while (janet_vm_spawn.head != janet_vm_spawn.tail) {
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK}; JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK};
@ -786,7 +807,7 @@ void janet_loop1(void) {
memset(&to, 0, sizeof(to)); memset(&to, 0, sizeof(to));
int has_timeout; int has_timeout;
/* Drop timeouts that are no longer needed */ /* Drop timeouts that are no longer needed */
while ((has_timeout = peek_timeout(&to)) && to.fiber->sched_id != to.sched_id) { while ((has_timeout = peek_timeout(&to)) && (to.curr_fiber == NULL) && to.fiber->sched_id != to.sched_id) {
pop_timeout(0); pop_timeout(0);
} }
/* Run polling implementation only if pending timeouts or pending events */ /* Run polling implementation only if pending timeouts or pending events */
@ -1699,10 +1720,26 @@ static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
to.fiber = janet_vm_root_fiber; to.fiber = janet_vm_root_fiber;
to.is_error = 0; to.is_error = 0;
to.sched_id = to.fiber->sched_id; to.sched_id = to.fiber->sched_id;
to.curr_fiber = NULL;
add_timeout(to); add_timeout(to);
janet_await(); janet_await();
} }
static Janet cfun_ev_deadline(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 3);
double sec = janet_getnumber(argv, 0);
JanetFiber *tocancel = janet_optfiber(argv, argc, 1, janet_vm_root_fiber);
JanetFiber *tocheck = janet_optfiber(argv, argc, 2, janet_vm_fiber);
JanetTimeout to;
to.when = ts_delta(ts_now(), sec);
to.fiber = tocancel;
to.curr_fiber = tocheck;
to.is_error = 0;
to.sched_id = to.fiber->sched_id;
add_timeout(to);
return janet_wrap_fiber(tocancel);
}
static Janet cfun_ev_cancel(int32_t argc, Janet *argv) { static Janet cfun_ev_cancel(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2); janet_fixarity(argc, 2);
JanetFiber *fiber = janet_getfiber(argv, 0); JanetFiber *fiber = janet_getfiber(argv, 0);
@ -1776,6 +1813,14 @@ static const JanetReg ev_cfuns[] = {
JDOC("(ev/sleep sec)\n\n" JDOC("(ev/sleep sec)\n\n"
"Suspend the current fiber for sec seconds without blocking the event loop.") "Suspend the current fiber for sec seconds without blocking the event loop.")
}, },
{
"ev/deadline", cfun_ev_deadline,
JDOC("(ev/deadline sec &opt tocancel tocheck)\n\n"
"Set a deadline for a fiber `tocheck`. If `tocheck` is not finished after `sec` seconds, "
"`tocancel` will be canceled as with `ev/cancel`. "
"If `tocancel` and `tocheck` are not given, they default to `(fiber/root)` and "
"`(fiber/current)` respectively. Returns `tocancel`.")
},
{ {
"ev/chan", cfun_channel_new, "ev/chan", cfun_channel_new,
JDOC("(ev/chan &opt capacity)\n\n" JDOC("(ev/chan &opt capacity)\n\n"