Add kqueue support to Janet

Note that this is a work in progress and simply a first attempt at
getting some code into place before being able to test it. This code
follows of sorts both the poll and epoll sections of the codebase hoping
to achieve the exact same.
This commit is contained in:
llmII 2021-09-03 14:29:13 -05:00
parent 7037532943
commit a209a01284
No known key found for this signature in database
GPG Key ID: E3AD2E259F58A9A0
3 changed files with 160 additions and 0 deletions

View File

@ -43,6 +43,9 @@
#include <sys/epoll.h>
#include <sys/timerfd.h>
#endif
#ifdef JANET_EV_KQUEUE
#include <sys/event.h>
#endif
#endif
typedef struct {
@ -1567,6 +1570,153 @@ void janet_ev_deinit(void) {
* End epoll implementation
*/
#elif defined(JANET_EV_KQUEUE)
/*
* NOTE: need to touch up janet.h & state.h to have specifics for kqueue.
*/
/* TODO: make this available be we using kqueue or epoll, instead of
* redefinining it for kqueue and epoll separately.
*/
static JanetTimestamp ts_now(void) {
struct timespec now;
janet_assert(-1 != clock_gettime(CLOCK_MONOTONIC, &now), "failed to get time");
uint64_t res 1000 * now.tv_sec;
res += now.tv_nsec / 1000000;
return res;
}
void add_events(const struct *kevent events, int length) {
/* NOTE: status should equal the amount of events *added*. This number
* isn't always known if deletions or modifications occur. We also can't
* use an event list for it to report to us what failed otherwise we may
* poll in events to handle! So we'll pretend it atomic, can either
* succeed, or fail, nothing inbetween, without violating constraints in
* which case we exit.
*
* The example code on the manual page shows a check against the event
* added itself for the error? Maybe we should do this? The description of
* the kevent call doesn't really say that that is where to check the
* error... Trying it anyway... */
int status;
status = kevent(janet_vm.kq, events, length, NULL, 0, NULL);
if(status == -1 && errno != EINTR)
exit(-1); /* do a better exit */
for(int i = 0; i < length; i++) {
if((kev[i].flags & EV_ERROR) && kev[i].data != EINTR) {
exit(-1); /* do a better exit */
}
}
}
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size void *user) {
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
struct kevent kev[2];
/* NOTE: NetBSD uses a different type for udata, might not work there or
* may warn! */
EV_SET(&kev[0], stream->handle, EVFILT_READ, EV_ADD | (state->stream->_mask & JANET_ASYNC_LISTEN_READ ? EV_ENABLE : EV_DISABLE), 0, 0, stream);
EV_SET(&kev[1], stream->handle, EVFILT_WRITE, EV_ADD | (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE ? EV_ENABLE : EV_DISABLE), 0, 0, stream);
add_events(kev, 2);
return state;
}
static void janet_unlisten(JanetListenerState *state, int is_gc) {
JanetStream *stream = state->stream;
if(!(stream->flags & JANET_STREAM_CLOSED)) {
/* following epoll's example, might need to clarify this there, and
* here as to what is actually happening? */
if(!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) {
int is_last = (state->_next == NULL && stream->state == state);
int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD;
struct kevent kev[2];
EV_SET(&kev[0], stream->handle, EVFILT_READ, op, 0, 0, stream);
EV_SET(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
add_events(kev, 2);
}
}
janet_unlisten_impl(state, is_gc);
}
#define JANET_KQUEUE_TIMER_IDENT 1
#define JANET_KQUEUE_MAX_EVENTS 64
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
/* Push a timer kevent here, always use infinite timeout. Precision is in
* milliseconds, should it be different? is timeout a timestamp? or a
* timeout... we're treating it as a timeout */
/* NOTE: Probably be logic errors here! */
struct kevent timer;
if (janet_vm.timer_enabled || has_timeout) {
EV_SET(&timer, JANET_KQUEUE_TIMER_IDENT, EVFILT_TIMER, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_MSECONDS, timeout, &janet_vm.timer);
add_events(&timer, 1);
}
janet_vm.timer_enabled = has_timeout;
int status;
struct kevent events[JANET_KQUEUE_MAX_EVENTS];
do {
status = kevent(janet_vm.kq, NULL, 0, events, JANET_KQUEUE_MAX_EVENTS, NULL);
} while (status == -1 && errno == EINTR);
if(status == -1)
JANET_EXIT("failed to poll events");
for(int i = 0; i < status; i++) {
void *p = events[i].udata;
if(&janet_vm.timer == p) {
/* Timer expired, ignore */;
} else if (janet_vm.selfpipe = p) {
/* Self-pipe handling */
janet_ev_handle_selfpipe();
} else {
JanetStream *stream = p;
JanetListenerState *state = stream->state;
state->event = events + i;
JanetAsyncStatus statuses[4];
for (int i = 0; i < 4; i++)
statuses[i] = JANET_ASYNC_STATUS_NOT_DONE;
if (events[i].filter == EVFILT_WRITE)
status[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (events[i].filter == EVFILT_READ)
status[1] = state->machine(state, JANET_ASYNC_EVENT_READ);
if (events[i].flags & EV_ERROR)
status[2] = state->machine(state, JANET_ASYNC_EVENT_ERR);
if ((events[i].flags & EV_EOF) && !(events[i].data > 0))
status[3] = state->maine(state, JANET_ASYNC_EVENT_HUP);
if(status[0] == JANET_ASYNC_STATUS_DONE ||
status[1] == JANET_ASYNC_STATUS_DONE ||
status[2] == JANET_ASYNC_STATUS_DONE ||
status[3] == JANET_ASYNC_STATUSDONE)
janet_unlisten(state, 0);
}
}
}
void janet_ev_init(void) {
janet_ev_init_common();
janet_ev_setup_selfpipe();
janet_vm.kq = kqueue();
janet_vm.timer_enabled = 0;
if (janet_vm.kq == -1) goto error;
struct kevent events[2];
EV_SET(events[0], JANET_KQUEUE_TIMER_IDENT, EVFILT_TIMER, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_MSECONDS, timeout, &janet_vm.timer);
EV_SET(events[1], janet_vm.selfpipe[0], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, janet_vm.selfpipe);
add_events(&events, 2);
return;
error:
JANET_EXIT("failed to initialize event loop");
}
void janet_ev_deinit(void) {
janet_ev_deinit_common();
close(jnet_vm.kq);
janet_ev_cleanup_selfpipe();
janet_vm.kq = 0;
}
#else
#include <poll.h>

View File

@ -171,6 +171,11 @@ struct JanetVM {
int epoll;
int timerfd;
int timer_enabled;
#elif defined(JANET_EV_KQUEUE)
JanetHandle selfpipe[2];
int kq;
int timer;
int timer_enabled
#else
JanetHandle selfpipe[2];
struct pollfd *fds;

View File

@ -198,6 +198,11 @@ extern "C" {
#define JANET_EV_EPOLL
#endif
/* TODO: Probably breaks NetBSD, might need help here. */
#if defined(JANET_BSD) && !defined(JANET_EV_NO_KQUEUE)
#define JANET_EV_KQUEUE
#endif
/* How to export symbols */
#ifndef JANET_API
#ifdef JANET_WINDOWS