Fix EPOLL implementation.

This commit is contained in:
Calvin Rose 2020-11-15 19:40:47 -06:00
parent 717fac02d1
commit a25b030e36
1 changed files with 36 additions and 2 deletions

View File

@ -237,7 +237,6 @@ static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener
state->_index = 0;
stream->_mask |= mask;
janet_vm_active_listeners++;
/* Prepend to linked list */
state->_next = stream->state;
stream->state = state;
/* Emit INIT event for convenience */
@ -389,6 +388,9 @@ void janet_fiber_did_resume(JanetFiber *fiber) {
if (fiber->waiting) janet_unlisten(fiber->waiting);
}
/* forward declaration for marking extra state */
static void janet_ev_mark_extra(void);
/* Mark all pending tasks */
void janet_ev_mark(void) {
JanetTask *tasks = janet_vm_spawn.data;
@ -410,6 +412,7 @@ void janet_ev_mark(void) {
for (size_t i = 0; i < janet_vm_tq_count; i++) {
janet_mark(janet_wrap_fiber(janet_vm_tq[i].fiber));
}
janet_ev_mark_extra();
}
/* Run a top level task */
@ -767,6 +770,8 @@ void janet_loop(void) {
/* Epoll global data */
JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL;
static void janet_ev_mark_extra(void) {}
static JanetTimestamp ts_now(void) {
return (JanetTimestamp) GetTickCount64();
}
@ -783,6 +788,9 @@ void janet_ev_deinit(void) {
}
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
if (stream->state == NULL) {
janet_gcroot(janet_wrap_abstract(stream));
}
/* Add the handle to the io completion port if not already added */
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
if (!(stream->flags & JANET_STREAM_IOCP)) {
@ -796,7 +804,11 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream;
janet_unlisten_impl(state);
if (NULL == stream->state) {
janet_gcunroot(janet_wrap_abstract(stream));
}
}
void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
@ -842,12 +854,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
}
}
#elif defined(JANET_EV_POLL)
#elif defined(JANET_EV_EPOLL)
/*
* Start linux/epoll implementation
*/
static void janet_ev_mark_extra(void) {}
static JanetTimestamp ts_now(void) {
struct timespec now;
janet_assert(-1 != clock_gettime(CLOCK_MONOTONIC, &now), "failed to get time");
@ -886,6 +900,9 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
janet_unlisten_impl(state);
janet_panicv(janet_ev_lasterr());
}
if (is_first) {
janet_gcroot(janet_wrap_abstract(stream));
}
return state;
}
@ -908,10 +925,14 @@ static void janet_unlisten(JanetListenerState *state) {
}
/* Destroy state machine and free memory */
janet_unlisten_impl(state);
if (NULL == stream->state) {
janet_gcunroot(janet_wrap_abstract(stream));
}
}
#define JANET_EPOLL_MAX_EVENTS 64
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
struct itimerspec its;
if (janet_vm_timer_enabled || has_timeout) {
memset(&its, 0, sizeof(its));
if (has_timeout) {
@ -943,6 +964,8 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
JanetListenerState *next_state = state->_next;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE;
if (mask & EPOLLOUT)
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (mask & EPOLLIN)
@ -1008,6 +1031,17 @@ JANET_THREAD_LOCAL JanetListenerState **janet_vm_listener_map = NULL;
JANET_THREAD_LOCAL size_t janet_vm_fdcap = 0;
JANET_THREAD_LOCAL size_t janet_vm_fdcount = 0;
static void janet_ev_mark_extra(void) {
for (size_t i = 0; i < janet_vm_fdcount; i++) {
JanetListenerState *state = janet_vm_listener_map[i];
if (NULL != state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
janet_stream_mark(state->stream, sizeof(JanetStream));
(state->machine)(state, JANET_ASYNC_EVENT_MARK);
}
}
static int make_poll_events(int mask) {
int events = 0;
if (mask & JANET_ASYNC_LISTEN_READ)