From a209a0128425f856327dedaef9630672e0cb4123 Mon Sep 17 00:00:00 2001 From: llmII Date: Fri, 3 Sep 2021 14:29:13 -0500 Subject: [PATCH] Add kqueue support to Janet Note that this is a work in progress and simply a first attempt at getting some code into place before being able to test it. This code follows of sorts both the poll and epoll sections of the codebase hoping to achieve the exact same. --- src/core/ev.c | 150 ++++++++++++++++++++++++++++++++++++++++++++ src/core/state.h | 5 ++ src/include/janet.h | 5 ++ 3 files changed, 160 insertions(+) diff --git a/src/core/ev.c b/src/core/ev.c index adcc56b1..7e5f1904 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -43,6 +43,9 @@ #include #include #endif +#ifdef JANET_EV_KQUEUE +#include +#endif #endif typedef struct { @@ -1567,6 +1570,153 @@ void janet_ev_deinit(void) { * End epoll implementation */ +#elif defined(JANET_EV_KQUEUE) +/* + * NOTE: need to touch up janet.h & state.h to have specifics for kqueue. + */ + +/* TODO: make this available be we using kqueue or epoll, instead of + * redefinining it for kqueue and epoll separately. + */ +static JanetTimestamp ts_now(void) { + struct timespec now; + janet_assert(-1 != clock_gettime(CLOCK_MONOTONIC, &now), "failed to get time"); + uint64_t res 1000 * now.tv_sec; + res += now.tv_nsec / 1000000; + return res; +} + +void add_events(const struct *kevent events, int length) { + /* NOTE: status should equal the amount of events *added*. This number + * isn't always known if deletions or modifications occur. We also can't + * use an event list for it to report to us what failed otherwise we may + * poll in events to handle! So we'll pretend it atomic, can either + * succeed, or fail, nothing inbetween, without violating constraints in + * which case we exit. + * + * The example code on the manual page shows a check against the event + * added itself for the error? Maybe we should do this? The description of + * the kevent call doesn't really say that that is where to check the + * error... Trying it anyway... */ + int status; + status = kevent(janet_vm.kq, events, length, NULL, 0, NULL); + if(status == -1 && errno != EINTR) + exit(-1); /* do a better exit */ + for(int i = 0; i < length; i++) { + if((kev[i].flags & EV_ERROR) && kev[i].data != EINTR) { + exit(-1); /* do a better exit */ + } + } +} + +JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size void *user) { + JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); + struct kevent kev[2]; + /* NOTE: NetBSD uses a different type for udata, might not work there or + * may warn! */ + EV_SET(&kev[0], stream->handle, EVFILT_READ, EV_ADD | (state->stream->_mask & JANET_ASYNC_LISTEN_READ ? EV_ENABLE : EV_DISABLE), 0, 0, stream); + EV_SET(&kev[1], stream->handle, EVFILT_WRITE, EV_ADD | (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE ? EV_ENABLE : EV_DISABLE), 0, 0, stream); + + add_events(kev, 2); + return state; +} + +static void janet_unlisten(JanetListenerState *state, int is_gc) { + JanetStream *stream = state->stream; + if(!(stream->flags & JANET_STREAM_CLOSED)) { + /* following epoll's example, might need to clarify this there, and + * here as to what is actually happening? */ + if(!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) { + int is_last = (state->_next == NULL && stream->state == state); + int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD; + struct kevent kev[2]; + EV_SET(&kev[0], stream->handle, EVFILT_READ, op, 0, 0, stream); + EV_SET(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream); + add_events(kev, 2); + } + } + janet_unlisten_impl(state, is_gc); +} + +#define JANET_KQUEUE_TIMER_IDENT 1 +#define JANET_KQUEUE_MAX_EVENTS 64 + +void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { + /* Push a timer kevent here, always use infinite timeout. Precision is in + * milliseconds, should it be different? is timeout a timestamp? or a + * timeout... we're treating it as a timeout */ + + /* NOTE: Probably be logic errors here! */ + + struct kevent timer; + if (janet_vm.timer_enabled || has_timeout) { + EV_SET(&timer, JANET_KQUEUE_TIMER_IDENT, EVFILT_TIMER, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_MSECONDS, timeout, &janet_vm.timer); + add_events(&timer, 1); + } + janet_vm.timer_enabled = has_timeout; + + int status; + struct kevent events[JANET_KQUEUE_MAX_EVENTS]; + do { + status = kevent(janet_vm.kq, NULL, 0, events, JANET_KQUEUE_MAX_EVENTS, NULL); + } while (status == -1 && errno == EINTR); + if(status == -1) + JANET_EXIT("failed to poll events"); + + for(int i = 0; i < status; i++) { + void *p = events[i].udata; + if(&janet_vm.timer == p) { + /* Timer expired, ignore */; + } else if (janet_vm.selfpipe = p) { + /* Self-pipe handling */ + janet_ev_handle_selfpipe(); + } else { + JanetStream *stream = p; + JanetListenerState *state = stream->state; + state->event = events + i; + JanetAsyncStatus statuses[4]; + for (int i = 0; i < 4; i++) + statuses[i] = JANET_ASYNC_STATUS_NOT_DONE; + + if (events[i].filter == EVFILT_WRITE) + status[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE); + if (events[i].filter == EVFILT_READ) + status[1] = state->machine(state, JANET_ASYNC_EVENT_READ); + if (events[i].flags & EV_ERROR) + status[2] = state->machine(state, JANET_ASYNC_EVENT_ERR); + if ((events[i].flags & EV_EOF) && !(events[i].data > 0)) + status[3] = state->maine(state, JANET_ASYNC_EVENT_HUP); + if(status[0] == JANET_ASYNC_STATUS_DONE || + status[1] == JANET_ASYNC_STATUS_DONE || + status[2] == JANET_ASYNC_STATUS_DONE || + status[3] == JANET_ASYNC_STATUSDONE) + janet_unlisten(state, 0); + } + } +} + +void janet_ev_init(void) { + janet_ev_init_common(); + janet_ev_setup_selfpipe(); + janet_vm.kq = kqueue(); + janet_vm.timer_enabled = 0; + if (janet_vm.kq == -1) goto error; + struct kevent events[2]; + EV_SET(events[0], JANET_KQUEUE_TIMER_IDENT, EVFILT_TIMER, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_MSECONDS, timeout, &janet_vm.timer); + EV_SET(events[1], janet_vm.selfpipe[0], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, janet_vm.selfpipe); + add_events(&events, 2); + return; +error: + JANET_EXIT("failed to initialize event loop"); +} + +void janet_ev_deinit(void) { + janet_ev_deinit_common(); + close(jnet_vm.kq); + janet_ev_cleanup_selfpipe(); + janet_vm.kq = 0; +} + #else #include diff --git a/src/core/state.h b/src/core/state.h index b9106631..f82d499c 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -171,6 +171,11 @@ struct JanetVM { int epoll; int timerfd; int timer_enabled; +#elif defined(JANET_EV_KQUEUE) + JanetHandle selfpipe[2]; + int kq; + int timer; + int timer_enabled #else JanetHandle selfpipe[2]; struct pollfd *fds; diff --git a/src/include/janet.h b/src/include/janet.h index 0916c1d0..9926ed4d 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -198,6 +198,11 @@ extern "C" { #define JANET_EV_EPOLL #endif +/* TODO: Probably breaks NetBSD, might need help here. */ +#if defined(JANET_BSD) && !defined(JANET_EV_NO_KQUEUE) +#define JANET_EV_KQUEUE +#endif + /* How to export symbols */ #ifndef JANET_API #ifdef JANET_WINDOWS