Merge pull request #785 from llmII/feature-kqueue

Add kqueue support to Janet
This commit is contained in:
Calvin Rose 2021-09-06 18:42:05 -05:00 committed by GitHub
commit 8081082251
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 223 additions and 0 deletions

View File

@ -73,6 +73,7 @@ conf.set('JANET_NO_REALPATH', not get_option('realpath'))
conf.set('JANET_NO_PROCESSES', not get_option('processes'))
conf.set('JANET_SIMPLE_GETLINE', get_option('simple_getline'))
conf.set('JANET_EV_NO_EPOLL', not get_option('epoll'))
conf.set('JANET_EV_NO_KQUEUE', not get_option('kqueue'))
conf.set('JANET_NO_THREADS', get_option('threads'))
conf.set('JANET_NO_INTERPRETER_INTERRUPT', not get_option('interpreter_interrupt'))
if get_option('os_name') != ''

View File

@ -18,6 +18,7 @@ option('umask', type : 'boolean', value : true)
option('realpath', type : 'boolean', value : true)
option('simple_getline', type : 'boolean', value : false)
option('epoll', type : 'boolean', value : false)
option('kqueue', type : 'boolen', value : false)
option('interpreter_interrupt', type : 'boolean', value : false)
option('recursion_guard', type : 'integer', min : 10, max : 8000, value : 1024)

View File

@ -48,6 +48,7 @@
/* #define JANET_OS_NAME my-custom-os */
/* #define JANET_ARCH_NAME pdp-8 */
/* #define JANET_EV_NO_EPOLL */
/* #define JANET_EV_NO_KQUEUE */
/* #define JANET_NO_INTERPRETER_INTERRUPT */
/* Custom vm allocator support */

View File

@ -43,6 +43,9 @@
#include <sys/epoll.h>
#include <sys/timerfd.h>
#endif
#ifdef JANET_EV_KQUEUE
#include <sys/event.h>
#endif
#endif
typedef struct {
@ -1567,6 +1570,208 @@ void janet_ev_deinit(void) {
* End epoll implementation
*/
#elif defined(JANET_EV_KQUEUE)
/* Definition from:
* https://github.com/wahern/cqueues/blob/master/src/lib/kpoll.c
* NetBSD uses intptr_t while others use void * for .udata */
#define EV_SETx(ev, a, b, c, d, e, f) EV_SET((ev), (a), (b), (c), (d), (e), ((__typeof__((ev)->udata))(f)))
#define JANET_KQUEUE_TF (EV_ADD | EV_ENABLE | EV_CLEAR | EV_ONESHOT)
/* NOTE:
* NetBSD doesn't like intervals less than 1 millisecond so lets make that the
* default anywhere JANET_KQUEUE_TS will be used. */
#ifdef __NetBSD__
#define JANET_KQUEUE_MIN_INTERVAL 1
#else
#define JANET_KQUEUE_MIN_INTERVAL 0
#endif
#ifdef __FreeBSD__
#define JANET_KQUEUE_TS(timestamp) (timestamp)
#else
/* NOTE:
* NetBSD and OpenBSD expect things are always intervals, so fake that we have
* abstime capability by changing how a timestamp is used in all kqueue calls
* and defining absent macros. Additionally NetBSD expects intervals be
* greater than 1 millisecond, so correct all intervals to be at least 1
* millisecond under NetBSD. */
JanetTimestamp fix_interval(const JanetTimestamp ts) {
return ts >= JANET_KQUEUE_MIN_INTERVAL ? ts : JANET_KQUEUE_MIN_INTERVAL;
}
#define JANET_KQUEUE_TS(timestamp) (fix_interval((timestamp - ts_now())))
#define NOTE_MSECONDS 0
#define NOTE_ABSTIME 0
#endif
/* TODO: make this available be we using kqueue or epoll, instead of
* redefinining it for kqueue and epoll separately? */
static JanetTimestamp ts_now(void) {
struct timespec now;
janet_assert(-1 != clock_gettime(CLOCK_MONOTONIC, &now), "failed to get time");
uint64_t res = 1000 * now.tv_sec;
res += now.tv_nsec / 1000000;
return res;
}
void add_kqueue_events(const struct kevent *events, int length) {
/* NOTE: Status should be equal to the amount of events added, which isn't
* always known since deletions or modifications occur. Can't use the
* eventlist argument for it to report to us what failed otherwise we may
* poll in events to handle! This code assumes atomicity, that kqueue can
* either succeed or fail, but never partially (which is seemingly how it
* works in practice). When encountering an "inbetween" state we currently
* just panic!
*
* The FreeBSD man page kqueue(2) shows a check through the change list to
* check if kqueue had an error with any of the events being pushed to
* change. Maybe we should do this, even tho the man page also doesn't
* note that kqueue actually does this. We do not do this at this time. */
int status;
status = kevent(janet_vm.kq, events, length, NULL, 0, NULL);
if (status == -1 && errno != EINTR)
janet_panicv(janet_ev_lasterr());
}
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
struct kevent kev[2];
int length = 0;
if (state->stream->_mask & JANET_ASYNC_LISTEN_READ) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, stream);
length++;
}
if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE) {
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, stream);
length++;
}
if (length > 0) {
add_kqueue_events(kev, length);
}
return state;
}
static void janet_unlisten(JanetListenerState *state, int is_gc) {
JanetStream *stream = state->stream;
if (!(stream->flags & JANET_STREAM_CLOSED)) {
/* Use flag to indicate state is not registered in kqueue */
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) {
int is_last = (state->_next == NULL && stream->state == state);
int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD;
struct kevent kev[2];
EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
int length = 0;
if (stream->_mask & JANET_ASYNC_EVENT_WRITE) {
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
length++;
}
if (stream->_mask & JANET_ASYNC_EVENT_READ) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream);
length++;
}
add_kqueue_events(kev, length);
}
}
janet_unlisten_impl(state, is_gc);
}
#define JANET_KQUEUE_TIMER_IDENT 1
#define JANET_KQUEUE_MAX_EVENTS 64
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
/* Construct our timer which is a definite time on the clock, not an
* interval, in milliseconds as that is `JanetTimestamp`'s precision. */
struct kevent timer;
if (janet_vm.timer_enabled || has_timeout) {
EV_SETx(&timer,
JANET_KQUEUE_TIMER_IDENT,
EVFILT_TIMER,
JANET_KQUEUE_TF,
NOTE_MSECONDS | NOTE_ABSTIME,
JANET_KQUEUE_TS(timeout), &janet_vm.timer);
add_kqueue_events(&timer, 1);
}
janet_vm.timer_enabled = has_timeout;
/* Poll for events */
int status;
struct kevent events[JANET_KQUEUE_MAX_EVENTS];
do {
status = kevent(janet_vm.kq, NULL, 0, events, JANET_KQUEUE_MAX_EVENTS, NULL);
} while (status == -1 && errno == EINTR);
if (status == -1)
JANET_EXIT("failed to poll events");
/* Step state machines */
for (int i = 0; i < status; i++) {
void *p = (void*) events[i].udata;
if (&janet_vm.timer == p) {
/* Timer expired, ignore */;
} else if (janet_vm.selfpipe == p) {
/* Self-pipe handling */
janet_ev_handle_selfpipe();
} else {
JanetStream *stream = p;
JanetListenerState *state = stream->state;
if (NULL != state) {
state->event = events + i;
JanetAsyncStatus statuses[4];
for (int i = 0; i < 4; i++)
statuses[i] = JANET_ASYNC_STATUS_NOT_DONE;
if (!(events[i].flags & EV_ERROR)) {
if (events[i].filter == EVFILT_WRITE)
statuses[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (events[i].filter == EVFILT_READ)
statuses[1] = state->machine(state, JANET_ASYNC_EVENT_READ);
if ((events[i].flags & EV_EOF) && !(events[i].data > 0))
statuses[3] = state->machine(state, JANET_ASYNC_EVENT_HUP);
} else {
statuses[2] = state->machine(state, JANET_ASYNC_EVENT_ERR);
}
if (statuses[0] == JANET_ASYNC_STATUS_DONE ||
statuses[1] == JANET_ASYNC_STATUS_DONE ||
statuses[2] == JANET_ASYNC_STATUS_DONE ||
statuses[3] == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state, 0);
}
}
}
}
void janet_ev_init(void) {
janet_ev_init_common();
janet_ev_setup_selfpipe();
janet_vm.kq = kqueue();
janet_vm.timer_enabled = 0;
if (janet_vm.kq == -1) goto error;
struct kevent events[2];
/* Don't use JANET_KQUEUE_TS here, as even under FreeBSD we use intervals
* here. */
EV_SETx(&events[0],
JANET_KQUEUE_TIMER_IDENT,
EVFILT_TIMER,
JANET_KQUEUE_TF,
NOTE_MSECONDS, JANET_KQUEUE_MIN_INTERVAL, &janet_vm.timer);
EV_SETx(&events[1], janet_vm.selfpipe[0], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, janet_vm.selfpipe);
add_kqueue_events(events, 2);
return;
error:
JANET_EXIT("failed to initialize event loop");
}
void janet_ev_deinit(void) {
janet_ev_deinit_common();
close(janet_vm.kq);
janet_ev_cleanup_selfpipe();
janet_vm.kq = 0;
}
#else
#include <poll.h>

View File

@ -171,6 +171,11 @@ struct JanetVM {
int epoll;
int timerfd;
int timer_enabled;
#elif defined(JANET_EV_KQUEUE)
JanetHandle selfpipe[2];
int kq;
int timer;
int timer_enabled;
#else
JanetHandle selfpipe[2];
struct pollfd *fds;

View File

@ -198,6 +198,16 @@ extern "C" {
#define JANET_EV_EPOLL
#endif
/* Enable or disable kqueue on BSD */
#if defined(JANET_BSD) && !defined(JANET_EV_NO_KQUEUE)
#define JANET_EV_KQUEUE
#endif
/* Enable or disable kqueue on Apple */
#if defined(JANET_APPLE) && !defined(JANET_EV_NO_KQUEUE)
#define JANET_EV_KQUEUE
#endif
/* How to export symbols */
#ifndef JANET_API
#ifdef JANET_WINDOWS