diff --git a/src/core/ev.c b/src/core/ev.c index adcc56b1..7e5f1904 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -43,6 +43,9 @@ #include #include #endif +#ifdef JANET_EV_KQUEUE +#include +#endif #endif typedef struct { @@ -1567,6 +1570,153 @@ void janet_ev_deinit(void) { * End epoll implementation */ +#elif defined(JANET_EV_KQUEUE) +/* + * NOTE: need to touch up janet.h & state.h to have specifics for kqueue. + */ + +/* TODO: make this available be we using kqueue or epoll, instead of + * redefinining it for kqueue and epoll separately. + */ +static JanetTimestamp ts_now(void) { + struct timespec now; + janet_assert(-1 != clock_gettime(CLOCK_MONOTONIC, &now), "failed to get time"); + uint64_t res 1000 * now.tv_sec; + res += now.tv_nsec / 1000000; + return res; +} + +void add_events(const struct *kevent events, int length) { + /* NOTE: status should equal the amount of events *added*. This number + * isn't always known if deletions or modifications occur. We also can't + * use an event list for it to report to us what failed otherwise we may + * poll in events to handle! So we'll pretend it atomic, can either + * succeed, or fail, nothing inbetween, without violating constraints in + * which case we exit. + * + * The example code on the manual page shows a check against the event + * added itself for the error? Maybe we should do this? The description of + * the kevent call doesn't really say that that is where to check the + * error... Trying it anyway... */ + int status; + status = kevent(janet_vm.kq, events, length, NULL, 0, NULL); + if(status == -1 && errno != EINTR) + exit(-1); /* do a better exit */ + for(int i = 0; i < length; i++) { + if((kev[i].flags & EV_ERROR) && kev[i].data != EINTR) { + exit(-1); /* do a better exit */ + } + } +} + +JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size void *user) { + JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); + struct kevent kev[2]; + /* NOTE: NetBSD uses a different type for udata, might not work there or + * may warn! */ + EV_SET(&kev[0], stream->handle, EVFILT_READ, EV_ADD | (state->stream->_mask & JANET_ASYNC_LISTEN_READ ? EV_ENABLE : EV_DISABLE), 0, 0, stream); + EV_SET(&kev[1], stream->handle, EVFILT_WRITE, EV_ADD | (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE ? EV_ENABLE : EV_DISABLE), 0, 0, stream); + + add_events(kev, 2); + return state; +} + +static void janet_unlisten(JanetListenerState *state, int is_gc) { + JanetStream *stream = state->stream; + if(!(stream->flags & JANET_STREAM_CLOSED)) { + /* following epoll's example, might need to clarify this there, and + * here as to what is actually happening? */ + if(!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) { + int is_last = (state->_next == NULL && stream->state == state); + int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD; + struct kevent kev[2]; + EV_SET(&kev[0], stream->handle, EVFILT_READ, op, 0, 0, stream); + EV_SET(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream); + add_events(kev, 2); + } + } + janet_unlisten_impl(state, is_gc); +} + +#define JANET_KQUEUE_TIMER_IDENT 1 +#define JANET_KQUEUE_MAX_EVENTS 64 + +void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { + /* Push a timer kevent here, always use infinite timeout. Precision is in + * milliseconds, should it be different? is timeout a timestamp? or a + * timeout... we're treating it as a timeout */ + + /* NOTE: Probably be logic errors here! */ + + struct kevent timer; + if (janet_vm.timer_enabled || has_timeout) { + EV_SET(&timer, JANET_KQUEUE_TIMER_IDENT, EVFILT_TIMER, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_MSECONDS, timeout, &janet_vm.timer); + add_events(&timer, 1); + } + janet_vm.timer_enabled = has_timeout; + + int status; + struct kevent events[JANET_KQUEUE_MAX_EVENTS]; + do { + status = kevent(janet_vm.kq, NULL, 0, events, JANET_KQUEUE_MAX_EVENTS, NULL); + } while (status == -1 && errno == EINTR); + if(status == -1) + JANET_EXIT("failed to poll events"); + + for(int i = 0; i < status; i++) { + void *p = events[i].udata; + if(&janet_vm.timer == p) { + /* Timer expired, ignore */; + } else if (janet_vm.selfpipe = p) { + /* Self-pipe handling */ + janet_ev_handle_selfpipe(); + } else { + JanetStream *stream = p; + JanetListenerState *state = stream->state; + state->event = events + i; + JanetAsyncStatus statuses[4]; + for (int i = 0; i < 4; i++) + statuses[i] = JANET_ASYNC_STATUS_NOT_DONE; + + if (events[i].filter == EVFILT_WRITE) + status[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE); + if (events[i].filter == EVFILT_READ) + status[1] = state->machine(state, JANET_ASYNC_EVENT_READ); + if (events[i].flags & EV_ERROR) + status[2] = state->machine(state, JANET_ASYNC_EVENT_ERR); + if ((events[i].flags & EV_EOF) && !(events[i].data > 0)) + status[3] = state->maine(state, JANET_ASYNC_EVENT_HUP); + if(status[0] == JANET_ASYNC_STATUS_DONE || + status[1] == JANET_ASYNC_STATUS_DONE || + status[2] == JANET_ASYNC_STATUS_DONE || + status[3] == JANET_ASYNC_STATUSDONE) + janet_unlisten(state, 0); + } + } +} + +void janet_ev_init(void) { + janet_ev_init_common(); + janet_ev_setup_selfpipe(); + janet_vm.kq = kqueue(); + janet_vm.timer_enabled = 0; + if (janet_vm.kq == -1) goto error; + struct kevent events[2]; + EV_SET(events[0], JANET_KQUEUE_TIMER_IDENT, EVFILT_TIMER, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_MSECONDS, timeout, &janet_vm.timer); + EV_SET(events[1], janet_vm.selfpipe[0], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, janet_vm.selfpipe); + add_events(&events, 2); + return; +error: + JANET_EXIT("failed to initialize event loop"); +} + +void janet_ev_deinit(void) { + janet_ev_deinit_common(); + close(jnet_vm.kq); + janet_ev_cleanup_selfpipe(); + janet_vm.kq = 0; +} + #else #include diff --git a/src/core/state.h b/src/core/state.h index b9106631..f82d499c 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -171,6 +171,11 @@ struct JanetVM { int epoll; int timerfd; int timer_enabled; +#elif defined(JANET_EV_KQUEUE) + JanetHandle selfpipe[2]; + int kq; + int timer; + int timer_enabled #else JanetHandle selfpipe[2]; struct pollfd *fds; diff --git a/src/include/janet.h b/src/include/janet.h index 0916c1d0..9926ed4d 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -198,6 +198,11 @@ extern "C" { #define JANET_EV_EPOLL #endif +/* TODO: Probably breaks NetBSD, might need help here. */ +#if defined(JANET_BSD) && !defined(JANET_EV_NO_KQUEUE) +#define JANET_EV_KQUEUE +#endif + /* How to export symbols */ #ifndef JANET_API #ifdef JANET_WINDOWS