1
0
mirror of https://github.com/janet-lang/janet synced 2025-01-13 17:10:27 +00:00

Add ev/all-tasks to get running and pending root fibers.

This commit is contained in:
Calvin Rose 2023-04-01 18:57:13 -05:00
parent 9a76e77981
commit 3858b2e177
6 changed files with 82 additions and 66 deletions

View File

@ -472,8 +472,12 @@ const JanetAbstractType janet_stream_type = {
/* Register a fiber to resume with value */ /* Register a fiber to resume with value */
void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) { void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) {
if (fiber->gc.flags & JANET_FIBER_EV_FLAG_CANCELED) return; if (fiber->gc.flags & JANET_FIBER_EV_FLAG_CANCELED) return;
fiber->gc.flags |= JANET_FIBER_FLAG_ROOT; if (!(fiber->gc.flags & JANET_FIBER_FLAG_ROOT)) {
Janet task_element = janet_wrap_fiber(fiber);
janet_table_put(&janet_vm.active_tasks, task_element, janet_wrap_true());
}
JanetTask t = { fiber, value, sig, ++fiber->sched_id }; JanetTask t = { fiber, value, sig, ++fiber->sched_id };
fiber->gc.flags |= JANET_FIBER_FLAG_ROOT;
if (sig == JANET_SIGNAL_ERROR) fiber->gc.flags |= JANET_FIBER_EV_FLAG_CANCELED; if (sig == JANET_SIGNAL_ERROR) fiber->gc.flags |= JANET_FIBER_EV_FLAG_CANCELED;
janet_q_push(&janet_vm.spawn, &t, sizeof(t)); janet_q_push(&janet_vm.spawn, &t, sizeof(t));
} }
@ -559,6 +563,7 @@ void janet_ev_init_common(void) {
janet_vm.tq_count = 0; janet_vm.tq_count = 0;
janet_vm.tq_capacity = 0; janet_vm.tq_capacity = 0;
janet_table_init_raw(&janet_vm.threaded_abstracts, 0); janet_table_init_raw(&janet_vm.threaded_abstracts, 0);
janet_table_init_raw(&janet_vm.active_tasks, 0);
janet_rng_seed(&janet_vm.ev_rng, 0); janet_rng_seed(&janet_vm.ev_rng, 0);
#ifndef JANET_WINDOWS #ifndef JANET_WINDOWS
pthread_attr_init(&janet_vm.new_thread_attr); pthread_attr_init(&janet_vm.new_thread_attr);
@ -573,13 +578,15 @@ void janet_ev_deinit_common(void) {
janet_free(janet_vm.listeners); janet_free(janet_vm.listeners);
janet_vm.listeners = NULL; janet_vm.listeners = NULL;
janet_table_deinit(&janet_vm.threaded_abstracts); janet_table_deinit(&janet_vm.threaded_abstracts);
janet_table_deinit(&janet_vm.active_tasks);
#ifndef JANET_WINDOWS #ifndef JANET_WINDOWS
pthread_attr_destroy(&janet_vm.new_thread_attr); pthread_attr_destroy(&janet_vm.new_thread_attr);
#endif #endif
} }
/* Short hand to yield to event loop */ /* Shorthand to yield to event loop */
void janet_await(void) { void janet_await(void) {
/* Store the fiber in a gobal table */
janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil());
} }
@ -1252,16 +1259,7 @@ JanetFiber *janet_loop1(void) {
while (peek_timeout(&to) && to.when <= now) { while (peek_timeout(&to) && to.when <= now) {
pop_timeout(0); pop_timeout(0);
if (to.curr_fiber != NULL) { if (to.curr_fiber != NULL) {
/* This is a deadline (for a fiber, not a function call) */ if (janet_fiber_can_resume(to.curr_fiber)) {
JanetFiberStatus s = janet_fiber_status(to.curr_fiber);
int isFinished = (s == JANET_STATUS_DEAD ||
s == JANET_STATUS_ERROR ||
s == JANET_STATUS_USER0 ||
s == JANET_STATUS_USER1 ||
s == JANET_STATUS_USER2 ||
s == JANET_STATUS_USER3 ||
s == JANET_STATUS_USER4);
if (!isFinished) {
janet_cancel(to.fiber, janet_cstringv("deadline expired")); janet_cancel(to.fiber, janet_cstringv("deadline expired"));
} }
} else { } else {
@ -1285,6 +1283,9 @@ JanetFiber *janet_loop1(void) {
if (task.expected_sched_id != task.fiber->sched_id) continue; if (task.expected_sched_id != task.fiber->sched_id) continue;
Janet res; Janet res;
JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig); JanetSignal sig = janet_continue_signal(task.fiber, task.value, &res, task.sig);
if (!janet_fiber_can_resume(task.fiber)) {
janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(task.fiber));
}
void *sv = task.fiber->supervisor_channel; void *sv = task.fiber->supervisor_channel;
int is_suspended = sig == JANET_SIGNAL_EVENT || sig == JANET_SIGNAL_YIELD || sig == JANET_SIGNAL_INTERRUPT; int is_suspended = sig == JANET_SIGNAL_EVENT || sig == JANET_SIGNAL_YIELD || sig == JANET_SIGNAL_INTERRUPT;
if (is_suspended) { if (is_suspended) {
@ -1316,15 +1317,8 @@ JanetFiber *janet_loop1(void) {
/* Drop timeouts that are no longer needed */ /* Drop timeouts that are no longer needed */
while ((has_timeout = peek_timeout(&to))) { while ((has_timeout = peek_timeout(&to))) {
if (to.curr_fiber != NULL) { if (to.curr_fiber != NULL) {
JanetFiberStatus s = janet_fiber_status(to.curr_fiber); if (!janet_fiber_can_resume(to.curr_fiber)) {
int is_finished = (s == JANET_STATUS_DEAD || janet_table_remove(&janet_vm.active_tasks, janet_wrap_fiber(to.curr_fiber));
s == JANET_STATUS_ERROR ||
s == JANET_STATUS_USER0 ||
s == JANET_STATUS_USER1 ||
s == JANET_STATUS_USER2 ||
s == JANET_STATUS_USER3 ||
s == JANET_STATUS_USER4);
if (is_finished) {
pop_timeout(0); pop_timeout(0);
continue; continue;
} }
@ -3174,6 +3168,20 @@ JANET_CORE_FN(janet_cfun_rwlock_write_release,
return argv[0]; return argv[0];
} }
JANET_CORE_FN(janet_cfun_ev_all_tasks,
"(ev/all-tasks)",
"Get an array of all active fibers that are being used by the scheduler.") {
janet_fixarity(argc, 0);
(void) argv;
JanetArray *array = janet_array(janet_vm.active_tasks.count);
for (int32_t i = 0; i < janet_vm.active_tasks.capacity; i++) {
if (!janet_checktype(janet_vm.active_tasks.data[i].key, JANET_NIL)) {
janet_array_push(array, janet_vm.active_tasks.data[i].key);
}
}
return janet_wrap_array(array);
}
void janet_lib_ev(JanetTable *env) { void janet_lib_ev(JanetTable *env) {
JanetRegExt ev_cfuns_ext[] = { JanetRegExt ev_cfuns_ext[] = {
JANET_CORE_REG("ev/give", cfun_channel_push), JANET_CORE_REG("ev/give", cfun_channel_push),
@ -3204,6 +3212,7 @@ void janet_lib_ev(JanetTable *env) {
JANET_CORE_REG("ev/acquire-wlock", janet_cfun_rwlock_write_lock), JANET_CORE_REG("ev/acquire-wlock", janet_cfun_rwlock_write_lock),
JANET_CORE_REG("ev/release-rlock", janet_cfun_rwlock_read_release), JANET_CORE_REG("ev/release-rlock", janet_cfun_rwlock_read_release),
JANET_CORE_REG("ev/release-wlock", janet_cfun_rwlock_write_release), JANET_CORE_REG("ev/release-wlock", janet_cfun_rwlock_write_release),
JANET_CORE_REG("ev/all-tasks", janet_cfun_ev_all_tasks),
JANET_REG_END JANET_REG_END
}; };

View File

@ -625,11 +625,7 @@ JANET_CORE_FN(cfun_fiber_setmaxstack,
return argv[0]; return argv[0];
} }
JANET_CORE_FN(cfun_fiber_can_resume, int janet_fiber_can_resume(JanetFiber *fiber) {
"(fiber/can-resume? fiber)",
"Check if a fiber is finished and cannot be resumed.") {
janet_fixarity(argc, 1);
JanetFiber *fiber = janet_getfiber(argv, 0);
JanetFiberStatus s = janet_fiber_status(fiber); JanetFiberStatus s = janet_fiber_status(fiber);
int isFinished = s == JANET_STATUS_DEAD || int isFinished = s == JANET_STATUS_DEAD ||
s == JANET_STATUS_ERROR || s == JANET_STATUS_ERROR ||
@ -638,7 +634,15 @@ JANET_CORE_FN(cfun_fiber_can_resume,
s == JANET_STATUS_USER2 || s == JANET_STATUS_USER2 ||
s == JANET_STATUS_USER3 || s == JANET_STATUS_USER3 ||
s == JANET_STATUS_USER4; s == JANET_STATUS_USER4;
return janet_wrap_boolean(!isFinished); return !isFinished;
}
JANET_CORE_FN(cfun_fiber_can_resume,
"(fiber/can-resume? fiber)",
"Check if a fiber is finished and cannot be resumed.") {
janet_fixarity(argc, 1);
JanetFiber *fiber = janet_getfiber(argv, 0);
return janet_wrap_boolean(janet_fiber_can_resume(fiber));
} }
JANET_CORE_FN(cfun_fiber_last_value, JANET_CORE_FN(cfun_fiber_last_value,

View File

@ -254,45 +254,45 @@ JANET_CORE_FN(janet_srand,
return janet_wrap_nil(); return janet_wrap_nil();
} }
#define JANET_DEFINE_NAMED_MATHOP(c_name, janet_name, fop, doc)\ #define JANET_DEFINE_NAMED_MATHOP(janet_name, fop, doc)\
JANET_CORE_FN(janet_##c_name, "(math/" #janet_name " x)", doc) {\ JANET_CORE_FN(janet_##fop, "(math/" janet_name " x)", doc) {\
janet_fixarity(argc, 1); \ janet_fixarity(argc, 1); \
double x = janet_getnumber(argv, 0); \ double x = janet_getnumber(argv, 0); \
return janet_wrap_number(fop(x)); \ return janet_wrap_number(fop(x)); \
} }
#define JANET_DEFINE_MATHOP(name, fop, doc) JANET_DEFINE_NAMED_MATHOP(name, name, fop, doc) #define JANET_DEFINE_MATHOP(fop, doc) JANET_DEFINE_NAMED_MATHOP(#fop, fop, doc)
JANET_DEFINE_MATHOP(acos, acos, "Returns the arccosine of x.") JANET_DEFINE_MATHOP(acos, "Returns the arccosine of x.")
JANET_DEFINE_MATHOP(asin, asin, "Returns the arcsin of x.") JANET_DEFINE_MATHOP(asin, "Returns the arcsin of x.")
JANET_DEFINE_MATHOP(atan, atan, "Returns the arctangent of x.") JANET_DEFINE_MATHOP(atan, "Returns the arctangent of x.")
JANET_DEFINE_MATHOP(cos, cos, "Returns the cosine of x.") JANET_DEFINE_MATHOP(cos, "Returns the cosine of x.")
JANET_DEFINE_MATHOP(cosh, cosh, "Returns the hyperbolic cosine of x.") JANET_DEFINE_MATHOP(cosh, "Returns the hyperbolic cosine of x.")
JANET_DEFINE_MATHOP(acosh, acosh, "Returns the hyperbolic arccosine of x.") JANET_DEFINE_MATHOP(acosh, "Returns the hyperbolic arccosine of x.")
JANET_DEFINE_MATHOP(sin, sin, "Returns the sine of x.") JANET_DEFINE_MATHOP(sin, "Returns the sine of x.")
JANET_DEFINE_MATHOP(sinh, sinh, "Returns the hyperbolic sine of x.") JANET_DEFINE_MATHOP(sinh, "Returns the hyperbolic sine of x.")
JANET_DEFINE_MATHOP(asinh, asinh, "Returns the hyperbolic arcsine of x.") JANET_DEFINE_MATHOP(asinh, "Returns the hyperbolic arcsine of x.")
JANET_DEFINE_MATHOP(tan, tan, "Returns the tangent of x.") JANET_DEFINE_MATHOP(tan, "Returns the tangent of x.")
JANET_DEFINE_MATHOP(tanh, tanh, "Returns the hyperbolic tangent of x.") JANET_DEFINE_MATHOP(tanh, "Returns the hyperbolic tangent of x.")
JANET_DEFINE_MATHOP(atanh, atanh, "Returns the hyperbolic arctangent of x.") JANET_DEFINE_MATHOP(atanh, "Returns the hyperbolic arctangent of x.")
JANET_DEFINE_MATHOP(exp, exp, "Returns e to the power of x.") JANET_DEFINE_MATHOP(exp, "Returns e to the power of x.")
JANET_DEFINE_MATHOP(exp2, exp2, "Returns 2 to the power of x.") JANET_DEFINE_MATHOP(exp2, "Returns 2 to the power of x.")
JANET_DEFINE_MATHOP(expm1, expm1, "Returns e to the power of x minus 1.") JANET_DEFINE_MATHOP(expm1, "Returns e to the power of x minus 1.")
JANET_DEFINE_MATHOP(log, log, "Returns the natural logarithm of x.") JANET_DEFINE_MATHOP(log, "Returns the natural logarithm of x.")
JANET_DEFINE_MATHOP(log10, log10, "Returns the log base 10 of x.") JANET_DEFINE_MATHOP(log10, "Returns the log base 10 of x.")
JANET_DEFINE_MATHOP(log2, log2, "Returns the log base 2 of x.") JANET_DEFINE_MATHOP(log2, "Returns the log base 2 of x.")
JANET_DEFINE_MATHOP(sqrt, sqrt, "Returns the square root of x.") JANET_DEFINE_MATHOP(sqrt, "Returns the square root of x.")
JANET_DEFINE_MATHOP(cbrt, cbrt, "Returns the cube root of x.") JANET_DEFINE_MATHOP(cbrt, "Returns the cube root of x.")
JANET_DEFINE_MATHOP(ceil, ceil, "Returns the smallest integer value number that is not less than x.") JANET_DEFINE_MATHOP(ceil, "Returns the smallest integer value number that is not less than x.")
JANET_DEFINE_MATHOP(abs, fabs, "Return the absolute value of x.") JANET_DEFINE_MATHOP(floor, "Returns the largest integer value number that is not greater than x.")
JANET_DEFINE_MATHOP(floor, floor, "Returns the largest integer value number that is not greater than x.") JANET_DEFINE_MATHOP(trunc, "Returns the integer between x and 0 nearest to x.")
JANET_DEFINE_MATHOP(trunc, trunc, "Returns the integer between x and 0 nearest to x.") JANET_DEFINE_MATHOP(round, "Returns the integer nearest to x.")
JANET_DEFINE_MATHOP(round, round, "Returns the integer nearest to x.") JANET_DEFINE_MATHOP(log1p, "Returns (log base e of x) + 1 more accurately than (+ (math/log x) 1)")
JANET_DEFINE_MATHOP(gamma, tgamma, "Returns gamma(x).") JANET_DEFINE_MATHOP(erf, "Returns the error function of x.")
JANET_DEFINE_NAMED_MATHOP(lgamma, log-gamma, lgamma, "Returns log-gamma(x).") JANET_DEFINE_MATHOP(erfc, "Returns the complementary error function of x.")
JANET_DEFINE_MATHOP(log1p, log1p, "Returns (log base e of x) + 1 more accurately than (+ (math/log x) 1)") JANET_DEFINE_NAMED_MATHOP("log-gamma", lgamma, "Returns log-gamma(x).")
JANET_DEFINE_MATHOP(erf, erf, "Returns the error function of x.") JANET_DEFINE_NAMED_MATHOP("abs", fabs, "Return the absolute value of x.")
JANET_DEFINE_MATHOP(erfc, erfc, "Returns the complementary error function of x.") JANET_DEFINE_NAMED_MATHOP("gamma", tgamma, "Returns gamma(x).")
#define JANET_DEFINE_MATH2OP(name, fop, signature, doc)\ #define JANET_DEFINE_MATH2OP(name, fop, signature, doc)\
JANET_CORE_FN(janet_##name, signature, doc) {\ JANET_CORE_FN(janet_##name, signature, doc) {\
@ -370,7 +370,7 @@ void janet_lib_math(JanetTable *env) {
JANET_CORE_REG("math/floor", janet_floor), JANET_CORE_REG("math/floor", janet_floor),
JANET_CORE_REG("math/ceil", janet_ceil), JANET_CORE_REG("math/ceil", janet_ceil),
JANET_CORE_REG("math/pow", janet_pow), JANET_CORE_REG("math/pow", janet_pow),
JANET_CORE_REG("math/abs", janet_abs), JANET_CORE_REG("math/abs", janet_fabs),
JANET_CORE_REG("math/sinh", janet_sinh), JANET_CORE_REG("math/sinh", janet_sinh),
JANET_CORE_REG("math/cosh", janet_cosh), JANET_CORE_REG("math/cosh", janet_cosh),
JANET_CORE_REG("math/tanh", janet_tanh), JANET_CORE_REG("math/tanh", janet_tanh),
@ -385,7 +385,7 @@ void janet_lib_math(JanetTable *env) {
JANET_CORE_REG("math/hypot", janet_hypot), JANET_CORE_REG("math/hypot", janet_hypot),
JANET_CORE_REG("math/exp2", janet_exp2), JANET_CORE_REG("math/exp2", janet_exp2),
JANET_CORE_REG("math/log1p", janet_log1p), JANET_CORE_REG("math/log1p", janet_log1p),
JANET_CORE_REG("math/gamma", janet_gamma), JANET_CORE_REG("math/gamma", janet_tgamma),
JANET_CORE_REG("math/log-gamma", janet_lgamma), JANET_CORE_REG("math/log-gamma", janet_lgamma),
JANET_CORE_REG("math/erfc", janet_erfc), JANET_CORE_REG("math/erfc", janet_erfc),
JANET_CORE_REG("math/erf", janet_erf), JANET_CORE_REG("math/erf", janet_erf),

View File

@ -23,6 +23,7 @@
#ifndef JANET_STATE_H_defined #ifndef JANET_STATE_H_defined
#define JANET_STATE_H_defined #define JANET_STATE_H_defined
#include <janet.h>
#include <stdint.h> #include <stdint.h>
#ifdef JANET_EV #ifdef JANET_EV
@ -158,6 +159,7 @@ struct JanetVM {
size_t listener_cap; size_t listener_cap;
size_t extra_listeners; size_t extra_listeners;
JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */ JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */
JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
void **iocp; void **iocp;
#elif defined(JANET_EV_EPOLL) #elif defined(JANET_EV_EPOLL)

View File

@ -92,8 +92,8 @@ const char *const janet_signal_names[14] = {
"user5", "user5",
"user6", "user6",
"user7", "user7",
"user8", "interrupt",
"user9" "await"
}; };
const char *const janet_status_names[16] = { const char *const janet_status_names[16] = {
@ -109,8 +109,8 @@ const char *const janet_status_names[16] = {
"user5", "user5",
"user6", "user6",
"user7", "user7",
"user8", "interrupted",
"user9", "suspended",
"new", "new",
"alive" "alive"
}; };

View File

@ -1687,6 +1687,7 @@ JANET_API void janet_table_clear(JanetTable *table);
JANET_API JanetFiber *janet_fiber(JanetFunction *callee, int32_t capacity, int32_t argc, const Janet *argv); JANET_API JanetFiber *janet_fiber(JanetFunction *callee, int32_t capacity, int32_t argc, const Janet *argv);
JANET_API JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t argc, const Janet *argv); JANET_API JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t argc, const Janet *argv);
JANET_API JanetFiberStatus janet_fiber_status(JanetFiber *fiber); JANET_API JanetFiberStatus janet_fiber_status(JanetFiber *fiber);
JANET_API int janet_fiber_can_resume(JanetFiber *fiber);
JANET_API JanetFiber *janet_current_fiber(void); JANET_API JanetFiber *janet_current_fiber(void);
JANET_API JanetFiber *janet_root_fiber(void); JANET_API JanetFiber *janet_root_fiber(void);