Don't use gcroot/unroot for tracking IO operations.

This could have bad effects in higher load situations, and
duplicates code. It is better to keep a dedicated list of
scheduled IO operations which can be efficiently added and
removed from. It also provides and easy way to enumerate
scheduled IO operations.
This commit is contained in:
Calvin Rose 2020-11-16 09:30:04 -06:00
parent 1b6272db2e
commit 9ec5689d6b
2 changed files with 66 additions and 97 deletions

View File

@ -2229,7 +2229,7 @@
:on-compile-error (fn compile-error [msg errf &]
(error (string "compile error: " msg)))
:on-parse-error (fn parse-error [p x]
(error (string "parse error: " (parser/error p))))
(error (string "parse error: " (:error p))))
:fiber-flags :i
:on-status (fn on-status [f val]
(if-not (= (fiber/status f) :dead)
@ -2699,9 +2699,9 @@
(getline
(string
"repl:"
((parser/where p) 0)
((:where p) 0)
":"
(parser/state p :delimiters) "> ")
(:state p :delimiters) "> ")
buf env)))
(defn make-onsignal
[e level]
@ -2716,8 +2716,8 @@
(debug/stacktrace f x)
(eflush)
(defn debugger-chunks [buf p]
(def status (parser/state p :delimiters))
(def c ((parser/where p) 0))
(def status (:state p :delimiters))
(def c ((:where p) 0))
(def prpt (string "debug[" level "]:" c ":" status "> "))
(getline prpt buf nextenv))
(print "entering debug[" level "] - (quit) to exit")

View File

@ -139,12 +139,14 @@ struct JanetTimeout {
static void janet_unlisten(JanetListenerState *state);
/* Global data */
JANET_THREAD_LOCAL size_t janet_vm_active_listeners = 0;
JANET_THREAD_LOCAL size_t janet_vm_tq_count = 0;
JANET_THREAD_LOCAL size_t janet_vm_tq_capacity = 0;
JANET_THREAD_LOCAL JanetQueue janet_vm_spawn;
JANET_THREAD_LOCAL JanetTimeout *janet_vm_tq = NULL;
JANET_THREAD_LOCAL JanetRNG janet_vm_ev_rng;
JANET_THREAD_LOCAL JanetListenerState **janet_vm_listeners = NULL;
JANET_THREAD_LOCAL size_t janet_vm_listener_count = 0;
JANET_THREAD_LOCAL size_t janet_vm_listener_cap = 0;
/* Get current timestamp (millisecond precision) */
static JanetTimestamp ts_now(void);
@ -234,11 +236,24 @@ static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener
janet_vm_root_fiber->waiting = state;
state->stream = stream;
state->_mask = mask;
state->_index = 0;
stream->_mask |= mask;
janet_vm_active_listeners++;
state->_next = stream->state;
stream->state = state;
/* Keep track of a listener for GC purposes */
int resize = janet_vm_listener_cap == janet_vm_listener_count;
if (resize) {
size_t newcap = janet_vm_listener_count ? janet_vm_listener_cap * 2 : 16;
janet_vm_listeners = realloc(janet_vm_listeners, newcap * sizeof(JanetListenerState *));
if (NULL == janet_vm_listeners) {
JANET_OUT_OF_MEMORY;
}
janet_vm_listener_cap = newcap;
}
size_t index = janet_vm_listener_count++;
janet_vm_listeners[index] = state;
state->_index = index;
/* Emit INIT event for convenience */
state->event = user;
state->machine(state, JANET_ASYNC_EVENT_INIT);
@ -255,7 +270,6 @@ static void janet_unlisten_impl(JanetListenerState *state) {
iter = &((*iter)->_next);
janet_assert(*iter, "failed to remove listener");
*iter = state->_next;
janet_vm_active_listeners--;
/* Remove mask */
state->stream->_mask &= ~(state->_mask);
/* Ensure fiber does not reference this state */
@ -263,6 +277,10 @@ static void janet_unlisten_impl(JanetListenerState *state) {
if (NULL != fiber && fiber->waiting == state) {
fiber->waiting = NULL;
}
/* Untrack a listener for gc purposes */
size_t index = state->_index;
janet_vm_listeners[index] = janet_vm_listeners[--janet_vm_listener_count];
janet_vm_listeners[index]->_index = index;
free(state);
}
@ -388,11 +406,10 @@ void janet_fiber_did_resume(JanetFiber *fiber) {
if (fiber->waiting) janet_unlisten(fiber->waiting);
}
/* forward declaration for marking extra state */
static void janet_ev_mark_extra(void);
/* Mark all pending tasks */
void janet_ev_mark(void) {
/* Pending tasks */
JanetTask *tasks = janet_vm_spawn.data;
if (janet_vm_spawn.head <= janet_vm_spawn.tail) {
for (int32_t i = janet_vm_spawn.head; i < janet_vm_spawn.tail; i++) {
@ -409,10 +426,21 @@ void janet_ev_mark(void) {
janet_mark(tasks[i].value);
}
}
/* Pending timeouts */
for (size_t i = 0; i < janet_vm_tq_count; i++) {
janet_mark(janet_wrap_fiber(janet_vm_tq[i].fiber));
}
janet_ev_mark_extra();
/* Pending listeners */
for (size_t i = 0; i < janet_vm_listener_count; i++) {
JanetListenerState *state = janet_vm_listeners[i];
if (NULL != state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
janet_stream_mark(state->stream, sizeof(JanetStream));
(state->machine)(state, JANET_ASYNC_EVENT_MARK);
}
}
/* Run a top level task */
@ -428,7 +456,9 @@ static void run_one(JanetFiber *fiber, Janet value, JanetSignal sigin) {
/* Common init code */
void janet_ev_init_common(void) {
janet_q_init(&janet_vm_spawn);
janet_vm_active_listeners = 0;
janet_vm_listener_count = 0;
janet_vm_listener_cap = 0;
janet_vm_listeners = NULL;
janet_vm_tq = NULL;
janet_vm_tq_count = 0;
janet_vm_tq_capacity = 0;
@ -438,6 +468,8 @@ void janet_ev_init_common(void) {
/* Common deinit code */
void janet_ev_deinit_common(void) {
janet_q_deinit(&janet_vm_spawn);
free(janet_vm_listeners);
janet_vm_listeners = NULL;
}
/* Short hand to yield to event loop */
@ -746,7 +778,7 @@ void janet_loop1(void) {
run_one(task.fiber, task.value, task.sig);
}
/* Poll for events */
if (janet_vm_active_listeners || janet_vm_tq_count) {
if (janet_vm_listener_count || janet_vm_tq_count) {
JanetTimeout to;
memset(&to, 0, sizeof(to));
int has_timeout;
@ -760,18 +792,15 @@ void janet_loop1(void) {
}
void janet_loop(void) {
while (janet_vm_active_listeners || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) {
while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) {
janet_loop1();
}
}
#ifdef JANET_WINDOWS
/* Epoll global data */
JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL;
static void janet_ev_mark_extra(void) {}
static JanetTimestamp ts_now(void) {
return (JanetTimestamp) GetTickCount64();
}
@ -788,9 +817,6 @@ void janet_ev_deinit(void) {
}
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
if (stream->state == NULL) {
janet_gcroot(janet_wrap_abstract(stream));
}
/* Add the handle to the io completion port if not already added */
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
if (!(stream->flags & JANET_STREAM_IOCP)) {
@ -804,11 +830,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream;
janet_unlisten_impl(state);
if (NULL == stream->state) {
janet_gcunroot(janet_wrap_abstract(stream));
}
}
void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
@ -856,11 +878,9 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
#elif defined(JANET_EV_EPOLL)
/*
* Start linux/epoll implementation
*/
static void janet_ev_mark_extra(void) {}
JANET_THREAD_LOCAL int janet_vm_epoll = 0;
JANET_THREAD_LOCAL int janet_vm_timerfd = 0;
JANET_THREAD_LOCAL int janet_vm_timer_enabled = 0;
static JanetTimestamp ts_now(void) {
struct timespec now;
@ -870,11 +890,6 @@ static JanetTimestamp ts_now(void) {
return res;
}
/* Epoll global data */
JANET_THREAD_LOCAL int janet_vm_epoll = 0;
JANET_THREAD_LOCAL int janet_vm_timerfd = 0;
JANET_THREAD_LOCAL int janet_vm_timer_enabled = 0;
static int make_epoll_events(int mask) {
int events = EPOLLET;
if (mask & JANET_ASYNC_LISTEN_READ)
@ -900,9 +915,6 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
janet_unlisten_impl(state);
janet_panicv(janet_ev_lasterr());
}
if (is_first) {
janet_gcroot(janet_wrap_abstract(stream));
}
return state;
}
@ -925,9 +937,6 @@ static void janet_unlisten(JanetListenerState *state) {
}
/* Destroy state machine and free memory */
janet_unlisten_impl(state);
if (NULL == stream->state) {
janet_gcunroot(janet_wrap_abstract(stream));
}
}
#define JANET_EPOLL_MAX_EVENTS 64
@ -1015,7 +1024,7 @@ void janet_ev_deinit(void) {
#include <poll.h>
/* Poll implementation */
JANET_THREAD_LOCAL struct pollfd *janet_vm_fds = NULL;
static JanetTimestamp ts_now(void) {
struct timespec now;
@ -1025,23 +1034,6 @@ static JanetTimestamp ts_now(void) {
return res;
}
/* Epoll global data */
JANET_THREAD_LOCAL struct pollfd *janet_vm_fds = NULL;
JANET_THREAD_LOCAL JanetListenerState **janet_vm_listener_map = NULL;
JANET_THREAD_LOCAL size_t janet_vm_fdcap = 0;
JANET_THREAD_LOCAL size_t janet_vm_fdcount = 0;
static void janet_ev_mark_extra(void) {
for (size_t i = 0; i < janet_vm_fdcount; i++) {
JanetListenerState *state = janet_vm_listener_map[i];
if (NULL != state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
janet_stream_mark(state->stream, sizeof(JanetStream));
(state->machine)(state, JANET_ASYNC_EVENT_MARK);
}
}
static int make_poll_events(int mask) {
int events = 0;
if (mask & JANET_ASYNC_LISTEN_READ)
@ -1051,43 +1043,27 @@ static int make_poll_events(int mask) {
return events;
}
static void janet_push_pollfd(struct pollfd pfd) {
if (janet_vm_fdcap == janet_vm_fdcount) {
size_t newcap = janet_vm_fdcount ? janet_vm_fdcount * 2 : 16;
janet_vm_fds = realloc(janet_vm_fds, newcap * sizeof(struct pollfd));
/* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
size_t oldsize = janet_vm_listener_cap;
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
size_t newsize = janet_vm_listener_cap;
if (newsize > oldsize) {
janet_vm_fds = realloc(janet_vm_fds, newsize * sizeof(struct pollfd));
if (NULL == janet_vm_fds) {
JANET_OUT_OF_MEMORY;
}
janet_vm_listener_map = realloc(janet_vm_listener_map, newcap * sizeof(JanetListenerState *));
if (NULL == janet_vm_listener_map) {
JANET_OUT_OF_MEMORY;
}
janet_vm_fdcap = newcap;
}
janet_vm_fds[janet_vm_fdcount++] = pfd;
}
/* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
struct pollfd ev;
ev.fd = stream->handle;
ev.events = make_poll_events(state->stream->_mask);
ev.revents = 0;
state->_index = janet_vm_fdcount;
janet_push_pollfd(ev);
janet_vm_listener_map[state->_index] = state;
janet_vm_fds[state->_index] = ev;
return state;
}
/* Tell system we are done listening for a certain event */
static void janet_unlisten(JanetListenerState *state) {
janet_vm_fds[state->_index] = janet_vm_fds[--janet_vm_fdcount];
JanetListenerState *replacer = janet_vm_listener_map[janet_vm_fdcount];
janet_vm_listener_map[state->_index] = replacer;
/* Update pointers in replacer */
replacer->_index = state->_index;
/* Destroy state machine and free memory */
janet_vm_fds[state->_index] = janet_vm_fds[janet_vm_listener_count - 1];
janet_unlisten_impl(state);
}
@ -1095,22 +1071,22 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
/* Poll for events */
int ready;
do {
int to = -1;
if (has_timeout) {
JanetTimestamp now = ts_now();
ready = poll(janet_vm_fds, janet_vm_fdcount, now > timeout ? 0 : (int)(timeout - now));
} else {
ready = poll(janet_vm_fds, janet_vm_fdcount, -1);
to = now > timeout ? 0 : (int)(timeout - now);
}
ready = poll(janet_vm_fds, janet_vm_listener_count, to);
} while (ready == -1 && errno == EINTR);
if (ready == -1) {
JANET_EXIT("failed to poll events");
}
/* Step state machines */
for (size_t i = 0; i < janet_vm_fdcount; i++) {
for (size_t i = 0; i < janet_vm_listener_count; i++) {
struct pollfd *pfd = janet_vm_fds + i;
/* Skip fds where nothing interesting happened */
JanetListenerState *state = janet_vm_listener_map[i];
JanetListenerState *state = janet_vm_listeners[i];
/* Normal event */
int mask = janet_vm_fds[i].revents;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
@ -1137,20 +1113,13 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
void janet_ev_init(void) {
janet_ev_init_common();
janet_vm_fds = NULL;
janet_vm_listener_map = NULL;
janet_vm_fdcap = 0;
janet_vm_fdcount = 0;
return;
}
void janet_ev_deinit(void) {
janet_ev_deinit_common();
free(janet_vm_fds);
free(janet_vm_listener_map);
janet_vm_fds = NULL;
janet_vm_listener_map = NULL;
janet_vm_fdcap = 0;
janet_vm_fdcount = 0;
}
#endif