Simpler async model that is better suited to epoll

This commit is contained in:
Calvin Rose 2023-10-06 00:37:19 -05:00
parent 66292beec9
commit 0ff8f58be8
7 changed files with 300 additions and 351 deletions

View File

@ -56,6 +56,9 @@
#ifdef JANET_EV_KQUEUE
#include <sys/event.h>
#endif
#ifdef JANET_EV_POLL
#include <poll.h>
#endif
#endif
typedef struct {
@ -179,9 +182,6 @@ static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) {
return 0;
}
/* Forward declaration */
static void janet_unlisten(JanetListenerState *state);
/* Get current timestamp (millisecond precision) */
static JanetTimestamp ts_now(void);
@ -194,8 +194,7 @@ static JanetTimestamp ts_delta(JanetTimestamp ts, double delta) {
return ts;
}
/* Look at the next timeout value without
* removing it. */
/* Look at the next timeout value without removing it. */
static int peek_timeout(JanetTimeout *out) {
if (janet_vm.tq_count == 0) return 0;
*out = janet_vm.tq[0];
@ -254,102 +253,48 @@ static void add_timeout(JanetTimeout to) {
}
}
static int janet_listener_gc(void *p, size_t s);
static int janet_listener_mark(void *p, size_t s);
static const JanetAbstractType janet_listener_AT = {
"core/ev-listener",
janet_listener_gc,
janet_listener_mark,
JANET_ATEND_GCMARK
};
/* Create a new event listener */
static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
if (stream->flags & JANET_STREAM_CLOSED) {
janet_panic("cannot listen on closed stream");
void janet_async_end(JanetFiber *fiber) {
if (fiber->ev_callback) {
janet_gcunroot(janet_wrap_abstract(fiber->ev_stream));
fiber->ev_callback = NULL;
if (fiber->ev_state) {
janet_free(fiber->ev_state);
fiber->ev_state = NULL;
}
janet_ev_dec_refcount();
}
if ((mask & JANET_ASYNC_LISTEN_READ) && stream->read_state) goto bad_listen_read;
if ((mask & JANET_ASYNC_LISTEN_WRITE) && stream->write_state) goto bad_listen_write;
janet_assert(size >= sizeof(JanetListenerState), "bad size");
JanetListenerState *state = janet_abstract(&janet_listener_AT, size);
state->machine = behavior;
state->fiber = janet_vm.root_fiber;
state->flags = 0;
janet_vm.root_fiber->waiting = state;
if (mask & JANET_ASYNC_LISTEN_READ) stream->read_state = state;
if (mask & JANET_ASYNC_LISTEN_WRITE) stream->write_state = state;
state->stream = stream;
state->event = user;
state->machine(state, JANET_ASYNC_EVENT_INIT);
}
void *janet_async_start(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, size_t data_size) {
janet_async_end(fiber); /* Clear existing callback */
if (mode & JANET_ASYNC_LISTEN_READ) stream->read_fiber = fiber;
if (mode & JANET_ASYNC_LISTEN_WRITE) stream->write_fiber = fiber;
fiber->ev_callback = callback;
fiber->ev_stream = stream;
janet_ev_inc_refcount();
state->index = janet_vm.listeners->count;
janet_array_push(janet_vm.listeners, janet_wrap_abstract(state));
return state;
bad_listen_write:
janet_panic("cannot listen for duplicate write event on stream");
bad_listen_read:
janet_panic("cannot listen for duplicate read event on stream");
janet_gcroot(janet_wrap_abstract(stream));
if (data_size) {
void *data = janet_malloc(data_size);
fiber->ev_state = data;
return data;
} else {
return NULL;
}
}
void janet_fiber_did_resume(JanetFiber *fiber) {
if (fiber->waiting) {
janet_unlisten(fiber->waiting);
fiber->waiting = NULL;
}
}
static void janet_unlisten_impl(JanetListenerState *state) {
/* Move last listener to position of this listener - O(1) removal and keep things densely packed. */
if (state->stream) {
Janet popped = janet_array_pop(janet_vm.listeners);
janet_assert(janet_checktype(popped, JANET_ABSTRACT), "pop check");
JanetListenerState *popped_state = (JanetListenerState *) janet_unwrap_abstract(popped);
janet_vm.listeners->data[state->index] = popped;
popped_state->index = state->index;
state->index = UINT32_MAX; /* just in case */
janet_ev_dec_refcount();
if (state->stream->read_state == state) {
state->stream->read_state = NULL;
}
if (state->stream->write_state == state) {
state->stream->write_state = NULL;
}
state->stream = NULL;
}
}
static int janet_listener_gc(void *p, size_t size) {
(void) size;
JanetListenerState *state = (JanetListenerState *)p;
if (state->stream) {
janet_ev_dec_refcount();
}
if (state->machine) {
state->machine(state, JANET_ASYNC_EVENT_DEINIT);
}
return 0;
}
static int janet_listener_mark(void *p, size_t size) {
(void) size;
JanetListenerState *state = (JanetListenerState *)p;
if (state->stream) {
janet_mark(janet_wrap_abstract(state->stream));
}
if (state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
state->machine(state, JANET_ASYNC_EVENT_MARK);
return 0;
janet_async_end(fiber);
}
static void janet_stream_checktoclose(JanetStream *stream) {
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->read_state && !stream->write_state) {
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->read_fiber && !stream->write_fiber) {
janet_stream_close(stream);
}
}
/* Forward declaration */
static void janet_register_stream(JanetStream *stream);
static const JanetMethod ev_default_stream_methods[] = {
{"close", janet_cfun_stream_close},
{"read", janet_cfun_stream_read},
@ -363,10 +308,12 @@ JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod
JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream));
stream->handle = handle;
stream->flags = flags;
stream->read_state = NULL;
stream->write_state = NULL;
stream->read_fiber = NULL;
stream->write_fiber = NULL;
if (methods == NULL) methods = ev_default_stream_methods;
stream->methods = methods;
stream->index = 0;
janet_register_stream(stream);
return stream;
}
@ -388,18 +335,30 @@ static void janet_stream_close_impl(JanetStream *stream) {
if (stream->handle != -1) {
close(stream->handle);
stream->handle = -1;
#ifdef JANET_EV_POLL
uint32_t i = stream->index;
size_t j = janet_vm.stream_count - 1;
JanetStream *last = janet_vm.streams[j];
struct pollfd lastfd = janet_vm.fds[j + 1];
janet_vm.fds[i + 1] = lastfd;
janet_vm.streams[i] = last;
last->index = stream->index;
janet_vm.stream_count--;
#endif
}
#endif
}
void janet_stream_close(JanetStream *stream) {
if (stream->read_state) {
stream->read_state->machine(stream->read_state, JANET_ASYNC_EVENT_CLOSE);
janet_unlisten(stream->read_state);
JanetFiber *rf = stream->read_fiber;
JanetFiber *wf = stream->write_fiber;
if (rf && rf->ev_callback) {
rf->ev_callback(rf, JANET_ASYNC_EVENT_CLOSE);
stream->read_fiber = NULL;
}
if (stream->write_state) {
stream->write_state->machine(stream->write_state, JANET_ASYNC_EVENT_CLOSE);
janet_unlisten(stream->write_state);
if (wf && wf->ev_callback) {
wf->ev_callback(wf, JANET_ASYNC_EVENT_CLOSE);
stream->write_fiber = NULL;
}
janet_stream_close_impl(stream);
}
@ -416,11 +375,13 @@ static int janet_stream_gc(void *p, size_t s) {
static int janet_stream_mark(void *p, size_t s) {
(void) s;
JanetStream *stream = (JanetStream *) p;
if (NULL != stream->read_state) {
janet_mark(janet_wrap_abstract(stream->read_state));
JanetFiber *rf = stream->read_fiber;
JanetFiber *wf = stream->write_fiber;
if (rf) {
janet_mark(janet_wrap_fiber(rf));
}
if (NULL != stream->write_state) {
janet_mark(janet_wrap_abstract(stream->write_state));
if (wf) {
janet_mark(janet_wrap_fiber(wf));
}
return 0;
}
@ -473,8 +434,8 @@ static void *janet_stream_unmarshal(JanetMarshalContext *ctx) {
}
JanetStream *p = janet_unmarshal_abstract(ctx, sizeof(JanetStream));
/* Can't share listening state and such across threads */
p->read_state = NULL;
p->write_state = NULL;
p->read_fiber = NULL;
p->write_fiber = NULL;
p->flags = (uint32_t) janet_unmarshal_int(ctx);
p->methods = janet_unmarshal_ptr(ctx);
#ifdef JANET_WINDOWS
@ -593,8 +554,6 @@ void janet_ev_init_common(void) {
janet_table_init_raw(&janet_vm.active_tasks, 0);
janet_table_init_raw(&janet_vm.signal_handlers, 0);
janet_rng_seed(&janet_vm.ev_rng, 0);
janet_vm.listeners = janet_array(0);
janet_gcroot(janet_wrap_array(janet_vm.listeners));
#ifndef JANET_WINDOWS
pthread_attr_init(&janet_vm.new_thread_attr);
pthread_attr_setdetachstate(&janet_vm.new_thread_attr, PTHREAD_CREATE_DETACHED);
@ -608,7 +567,6 @@ void janet_ev_deinit_common(void) {
janet_table_deinit(&janet_vm.threaded_abstracts);
janet_table_deinit(&janet_vm.active_tasks);
janet_table_deinit(&janet_vm.signal_handlers);
janet_vm.listeners = NULL;
#ifndef JANET_WINDOWS
pthread_attr_destroy(&janet_vm.new_thread_attr);
#endif
@ -1554,62 +1512,25 @@ static JanetTimestamp ts_now(void) {
return res;
}
static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
JanetListenerState *state = msg.argp;
if (state->stream) {
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
if (state == state->stream->read_state)
status1 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (state == state->stream->write_state)
status2 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
} else {
/* Repost event */
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
}
}
}
/* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
if (!(stream->flags & JANET_STREAM_REGISTERED)) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = stream;
int status;
do {
status = epoll_ctl(janet_vm.epoll, EPOLL_CTL_ADD, stream->handle, &ev);
} while (status == -1 && errno == EINTR);
if (status == -1) {
if (errno == EPERM) {
/* Couldn't add to event loop, so assume that it completes
* synchronously. In that case, fire the completion
* event manually, since this should be a read or write
* event to a file. So we just post a custom event to do the read/write
* asap. */
/* Use flag to indicate state is not registered in epoll */
state->flags = 1;
JanetEVGenericMessage msg = {0};
msg.argp = state;
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
} else {
/* Unexpected error */
janet_unlisten_impl(state);
janet_panicv(janet_ev_lasterr());
}
static void janet_register_stream(JanetStream *stream) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = stream;
int status;
do {
status = epoll_ctl(janet_vm.epoll, EPOLL_CTL_ADD, stream->handle, &ev);
} while (status == -1 && errno == EINTR);
if (status == -1) {
if (errno == EPERM) {
/* Couldn't add to event loop, so assume that it completes
* synchronously. */
stream->flags |= JANET_STREAM_UNREGISTERED;
} else {
/* Unexpected error */
janet_panicv(janet_ev_lasterr());
}
stream->flags |= JANET_STREAM_REGISTERED;
}
return state;
}
/* Tell system we are done listening for a certain event */
static void janet_unlisten(JanetListenerState *state) {
janet_unlisten_impl(state);
}
#define JANET_EPOLL_MAX_EVENTS 64
@ -1646,28 +1567,30 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
} else {
JanetStream *stream = p;
int mask = events[i].events;
JanetListenerState *states[2] = {stream->read_state, stream->write_state};
for (int j = 0; j < 2; j++) {
JanetListenerState *state = states[j];
if (!state) continue;
state->event = events + i;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE;
if (mask & EPOLLOUT)
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (mask & EPOLLIN)
status2 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (mask & EPOLLERR)
status3 = state->machine(state, JANET_ASYNC_EVENT_ERR);
if ((mask & EPOLLHUP) && !(mask & (EPOLLOUT | EPOLLIN)))
status4 = state->machine(state, JANET_ASYNC_EVENT_HUP);
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
int has_err = mask & EPOLLERR;
int has_hup = mask & EPOLLHUP;
JanetFiber *rf = stream->read_fiber;
JanetFiber *wf = stream->write_fiber;
if (rf) {
if (rf->ev_callback && (mask & EPOLLIN)) {
rf->ev_callback(rf, JANET_ASYNC_EVENT_READ);
}
if (rf->ev_callback && has_err) {
rf->ev_callback(rf, JANET_ASYNC_EVENT_ERR);
}
if (rf->ev_callback && has_hup) {
rf->ev_callback(rf, JANET_ASYNC_EVENT_HUP);
}
}
if (wf) {
if (wf->ev_callback && (mask & EPOLLOUT)) {
wf->ev_callback(wf, JANET_ASYNC_EVENT_WRITE);
}
if (wf->ev_callback && has_err) {
wf->ev_callback(wf, JANET_ASYNC_EVENT_ERR);
}
if (wf->ev_callback && has_hup) {
wf->ev_callback(wf, JANET_ASYNC_EVENT_HUP);
}
}
janet_stream_checktoclose(stream);
@ -1853,9 +1776,7 @@ void janet_ev_deinit(void) {
janet_vm.kq = 0;
}
#else
#include <poll.h>
#elif defined(JANET_EV_POLL)
static JanetTimestamp ts_now(void) {
struct timespec now;
@ -1866,34 +1787,37 @@ static JanetTimestamp ts_now(void) {
}
/* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
size_t oldsize = janet_vm.listeners->capacity;
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
size_t newsize = janet_vm.listeners->capacity;
if (newsize > oldsize) {
janet_vm.fds = janet_realloc(janet_vm.fds, (newsize + 1) * sizeof(struct pollfd));
if (NULL == janet_vm.fds) {
void janet_register_stream(JanetStream *stream) {
struct pollfd ev = {0};
stream->index = (uint32_t) janet_vm.stream_count;
size_t new_count = janet_vm.stream_count + 1;
if (new_count > janet_vm.stream_capacity) {
size_t new_cap = new_count * 2;
janet_vm.fds = janet_realloc(janet_vm.fds, (1 + new_cap) * sizeof(struct pollfd));
janet_vm.streams = janet_realloc(janet_vm.streams, new_cap * sizeof(JanetStream *));
if (!janet_vm.fds || !janet_vm.streams) {
JANET_OUT_OF_MEMORY;
}
janet_vm.stream_capacity = new_cap;
}
struct pollfd ev;
ev.fd = stream->handle;
ev.events = 0;
if (stream->read_state) ev.events |= POLLIN;
if (stream->write_state) ev.events |= POLLOUT;
ev.revents = 0;
janet_vm.fds[state->index + 1] = ev;
return state;
}
static void janet_unlisten(JanetListenerState *state) {
if (state->stream) {
janet_vm.fds[state->index + 1] = janet_vm.fds[janet_vm.listeners->count];
}
janet_unlisten_impl(state);
ev.events = POLLIN | POLLOUT;
janet_vm.fds[janet_vm.stream_count + 1] = ev;
janet_vm.streams[janet_vm.stream_count] = stream;
janet_vm.stream_count = new_count;
}
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
/* set event flags */
for (size_t i = 0; i < janet_vm.stream_count; i++) {
JanetStream *stream = janet_vm.streams[i];
janet_vm.fds[i + 1].events = 0;
janet_vm.fds[i + 1].revents = 0;
if (stream->read_fiber) janet_vm.fds[i + 1].events |= POLLIN;
if (stream->write_fiber) janet_vm.fds[i + 1].events |= POLLOUT;
}
/* Poll for events */
int ready;
do {
@ -1902,7 +1826,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
JanetTimestamp now = ts_now();
to = now > timeout ? 0 : (int)(timeout - now);
}
ready = poll(janet_vm.fds, janet_vm.listeners->count + 1, to);
ready = poll(janet_vm.fds, janet_vm.stream_count + 1, to);
} while (ready == -1 && errno == EINTR);
if (ready == -1) {
JANET_EXIT("failed to poll events");
@ -1915,31 +1839,32 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
}
/* Step state machines */
for (int32_t i = 0; i < janet_vm.listeners->count; i++) {
for (size_t i = 0; i < janet_vm.stream_count; i++) {
struct pollfd *pfd = janet_vm.fds + i + 1;
/* Skip fds where nothing interesting happened */
JanetListenerState *state = (JanetListenerState *) janet_unwrap_abstract(janet_vm.listeners->data[i]);
/* Normal event */
JanetStream *stream = janet_vm.streams[i];
int mask = pfd->revents;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE;
state->event = pfd;
JanetStream *stream = state->stream;
if (mask & POLLOUT)
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (mask & POLLIN)
status2 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (mask & POLLERR)
status3 = state->machine(state, JANET_ASYNC_EVENT_ERR);
if ((mask & POLLHUP) && !(mask & (POLLIN | POLLOUT)))
status4 = state->machine(state, JANET_ASYNC_EVENT_HUP);
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
if (!mask) continue;
int has_err = mask & POLLERR;
int has_hup = mask & POLLHUP;
JanetFiber *rf = stream->read_fiber;
JanetFiber *wf = stream->write_fiber;
if (rf) {
if (rf->ev_callback && (mask & POLLIN)) {
rf->ev_callback(rf, JANET_ASYNC_EVENT_READ);
} else if (rf->ev_callback && has_hup) {
rf->ev_callback(rf, JANET_ASYNC_EVENT_HUP);
} else if (rf->ev_callback && has_err) {
rf->ev_callback(rf, JANET_ASYNC_EVENT_ERR);
}
}
if (wf) {
if (wf->ev_callback && (mask & POLLOUT)) {
wf->ev_callback(wf, JANET_ASYNC_EVENT_WRITE);
} else if (wf->ev_callback && has_hup) {
wf->ev_callback(wf, JANET_ASYNC_EVENT_HUP);
} else if (wf->ev_callback && has_err) {
wf->ev_callback(wf, JANET_ASYNC_EVENT_ERR);
}
}
janet_stream_checktoclose(stream);
}
@ -1956,6 +1881,9 @@ void janet_ev_init(void) {
janet_vm.fds[0].fd = janet_vm.selfpipe[0];
janet_vm.fds[0].events = POLLIN;
janet_vm.fds[0].revents = 0;
janet_vm.streams = NULL;
janet_vm.stream_count = 0;
janet_vm.stream_capacity = 0;
return;
}
@ -1963,7 +1891,9 @@ void janet_ev_deinit(void) {
janet_ev_deinit_common();
janet_ev_cleanup_selfpipe();
janet_free(janet_vm.fds);
janet_free(janet_vm.streams);
janet_vm.fds = NULL;
janet_vm.streams = NULL;
}
#endif
@ -2204,7 +2134,6 @@ typedef enum {
} JanetReadMode;
typedef struct {
JanetListenerState head;
int32_t bytes_left;
int32_t bytes_read;
JanetBuffer *buf;
@ -2224,8 +2153,9 @@ typedef struct {
#endif
} StateRead;
JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
StateRead *state = (StateRead *) s;
void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
JanetStream *stream = fiber->ev_stream;
StateRead *state = (StateRead *) fiber->ev_state;
switch (event) {
default:
break;
@ -2233,8 +2163,9 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
janet_mark(janet_wrap_buffer(state->buf));
break;
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
janet_schedule(fiber, janet_wrap_nil());
janet_async_end(fiber);
break;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when read finished */
@ -2307,12 +2238,15 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
#else
case JANET_ASYNC_EVENT_ERR: {
if (state->bytes_read) {
janet_schedule(s->fiber, janet_wrap_buffer(state->buf));
janet_schedule(fiber, janet_wrap_buffer(state->buf));
} else {
janet_schedule(s->fiber, janet_wrap_nil());
janet_schedule(fiber, janet_wrap_nil());
}
return JANET_ASYNC_STATUS_DONE;
janet_async_end(fiber);
break;
}
read_more:
case JANET_ASYNC_EVENT_HUP:
case JANET_ASYNC_EVENT_USER:
case JANET_ASYNC_EVENT_READ: {
@ -2328,36 +2262,38 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
do {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
nread = recvfrom(s->stream->handle, buffer->data + buffer->count, read_limit, state->flags,
nread = recvfrom(stream->handle, buffer->data + buffer->count, read_limit, state->flags,
(struct sockaddr *)&saddr, &socklen);
} else if (state->mode == JANET_ASYNC_READMODE_RECV) {
nread = recv(s->stream->handle, buffer->data + buffer->count, read_limit, state->flags);
nread = recv(stream->handle, buffer->data + buffer->count, read_limit, state->flags);
} else
#endif
{
nread = read(s->stream->handle, buffer->data + buffer->count, read_limit);
nread = read(stream->handle, buffer->data + buffer->count, read_limit);
}
} while (nread == -1 && errno == EINTR);
/* Check for errors - special case errors that can just be waited on to fix */
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return JANET_ASYNC_STATUS_NOT_DONE;
break;
}
/* In stream protocols, a pipe error is end of stream */
if (errno == EPIPE && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) {
nread = 0;
} else {
janet_cancel(s->fiber, janet_ev_lasterr());
return JANET_ASYNC_STATUS_DONE;
janet_cancel(fiber, janet_ev_lasterr());
janet_async_end(fiber);
break;
}
}
/* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
state->bytes_read += nread;
if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) {
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
janet_schedule(fiber, janet_wrap_nil());
janet_async_end(fiber);
break;
}
/* Increment buffer counts */
@ -2378,19 +2314,22 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
{
resume_val = janet_wrap_buffer(buffer);
}
janet_schedule(s->fiber, resume_val);
return JANET_ASYNC_STATUS_DONE;
janet_schedule(fiber, resume_val);
janet_async_end(fiber);
break;
}
/* Read some more if possible */
goto read_more;
}
break;
#endif
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) {
StateRead *state = (StateRead *) janet_listen(stream, ev_machine_read,
JANET_ASYNC_LISTEN_READ, sizeof(StateRead), NULL);
JanetFiber *f = janet_vm.root_fiber;
StateRead *state = (StateRead *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, ev_callback_read, sizeof(StateRead));
state->is_chunk = is_chunked;
state->buf = buf;
state->bytes_left = nbytes;
@ -2401,7 +2340,7 @@ static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t
#else
state->flags = flags;
#endif
ev_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
ev_callback_read(f, JANET_ASYNC_EVENT_USER);
}
void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
@ -2433,7 +2372,6 @@ typedef enum {
} JanetWriteMode;
typedef struct {
JanetListenerState head;
union {
JanetBuffer *buf;
const uint8_t *str;
@ -2453,8 +2391,9 @@ typedef struct {
#endif
} StateWrite;
JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
StateWrite *state = (StateWrite *) s;
void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) {
JanetStream *stream = fiber->ev_stream;
StateWrite *state = (StateWrite *) fiber->ev_state;
switch (event) {
default:
break;
@ -2468,8 +2407,9 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
break;
}
case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE;
janet_cancel(fiber, janet_cstringv("stream closed"));
janet_async_end(fiber);
break;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when write finished */
@ -2540,11 +2480,13 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
break;
#else
case JANET_ASYNC_EVENT_ERR:
janet_cancel(s->fiber, janet_cstringv("stream err"));
return JANET_ASYNC_STATUS_DONE;
janet_cancel(fiber, janet_cstringv("stream err"));
janet_async_end(fiber);
break;
case JANET_ASYNC_EVENT_HUP:
janet_cancel(s->fiber, janet_cstringv("stream hup"));
return JANET_ASYNC_STATUS_DONE;
janet_cancel(fiber, janet_cstringv("stream hup"));
janet_async_end(fiber);
break;
case JANET_ASYNC_EVENT_USER:
case JANET_ASYNC_EVENT_WRITE: {
int32_t start, len;
@ -2565,28 +2507,30 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
do {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
nwrote = sendto(s->stream->handle, bytes + start, nbytes, state->flags,
nwrote = sendto(stream->handle, bytes + start, nbytes, state->flags,
(struct sockaddr *) dest_abst, janet_abstract_size(dest_abst));
} else if (state->mode == JANET_ASYNC_WRITEMODE_SEND) {
nwrote = send(s->stream->handle, bytes + start, nbytes, state->flags);
nwrote = send(stream->handle, bytes + start, nbytes, state->flags);
} else
#endif
{
nwrote = write(s->stream->handle, bytes + start, nbytes);
nwrote = write(stream->handle, bytes + start, nbytes);
}
} while (nwrote == -1 && errno == EINTR);
/* Handle write errors */
if (nwrote == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
janet_cancel(s->fiber, janet_ev_lasterr());
return JANET_ASYNC_STATUS_DONE;
janet_cancel(fiber, janet_ev_lasterr());
janet_async_end(fiber);
break;
}
/* Unless using datagrams, empty message is a disconnect */
if (nwrote == 0 && !dest_abst) {
janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE;
janet_cancel(fiber, janet_cstringv("disconnect"));
janet_async_end(fiber);
break;
}
if (nwrote > 0) {
@ -2597,20 +2541,21 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
}
state->start = start;
if (start >= len) {
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
janet_schedule(fiber, janet_wrap_nil());
janet_async_end(fiber);
break;
}
break;
}
break;
#endif
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) {
StateWrite *state = (StateWrite *) janet_listen(stream, ev_machine_write,
JANET_ASYNC_LISTEN_WRITE, sizeof(StateWrite), NULL);
JanetFiber *f = janet_vm.root_fiber;
StateWrite *state = (StateWrite *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_WRITE,
ev_callback_write, sizeof(StateWrite));
state->is_buffer = is_buffer;
state->src.buf = buf;
state->dest_abst = dest_abst;
@ -2621,7 +2566,7 @@ static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_ab
state->flags = flags;
#endif
state->start = 0;
ev_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
ev_callback_write(f, JANET_ASYNC_EVENT_USER);
}
void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf) {

View File

@ -40,7 +40,9 @@ static void fiber_reset(JanetFiber *fiber) {
fiber->last_value = janet_wrap_nil();
#ifdef JANET_EV
fiber->sched_id = 0;
fiber->waiting = NULL;
fiber->ev_callback = NULL;
fiber->ev_state = NULL;
fiber->ev_stream = NULL;
fiber->supervisor_channel = NULL;
#endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW);

View File

@ -296,8 +296,11 @@ recur:
if (fiber->supervisor_channel) {
janet_mark_abstract(fiber->supervisor_channel);
}
if (fiber->waiting) {
janet_mark_abstract(fiber->waiting);
if (fiber->ev_stream) {
janet_mark_abstract(fiber->ev_stream);
}
if (fiber->ev_callback) {
fiber->ev_callback(fiber, JANET_ASYNC_EVENT_MARK);
}
#endif
@ -324,6 +327,11 @@ static void janet_deinit_block(JanetGCObject *mem) {
janet_free(((JanetTable *) mem)->data);
break;
case JANET_MEMORY_FIBER:
#ifdef JANET_EV
if (((JanetFiber *)mem)->ev_state) {
janet_free(((JanetFiber *)mem)->ev_state);
}
#endif
janet_free(((JanetFiber *)mem)->data);
break;
case JANET_MEMORY_BUFFER:

View File

@ -114,18 +114,19 @@ static void janet_net_socknoblock(JSock s) {
/* State machine for async connect */
typedef struct {
JanetListenerState head;
int did_connect;
} NetStateConnect;
JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent event) {
NetStateConnect *state = (NetStateConnect *)s;
void net_callback_connect(JanetFiber *fiber, JanetAsyncEvent event) {
JanetStream *stream = fiber->ev_stream;
NetStateConnect *state = (NetStateConnect *)fiber->ev_state;
switch (event) {
default:
return JANET_ASYNC_STATUS_NOT_DONE;
break;
case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE;
janet_cancel(fiber, janet_cstringv("stream closed"));
janet_async_end(fiber);
return;
case JANET_ASYNC_EVENT_HUP:
case JANET_ASYNC_EVENT_ERR:
case JANET_ASYNC_EVENT_COMPLETE:
@ -133,7 +134,6 @@ JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent even
case JANET_ASYNC_EVENT_USER:
break;
}
JanetStream *stream = s->stream;
#ifdef JANET_WINDOWS
int res = 0;
int size = sizeof(res);
@ -146,24 +146,24 @@ JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent even
if (r == 0) {
if (res == 0) {
state->did_connect = 1;
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
janet_schedule(fiber, janet_wrap_abstract(stream));
} else {
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
janet_cancel(fiber, janet_cstringv(strerror(res)));
stream->flags |= JANET_STREAM_TOCLOSE;
}
} else {
janet_cancel(s->fiber, janet_ev_lasterr());
janet_cancel(fiber, janet_ev_lasterr());
stream->flags |= JANET_STREAM_TOCLOSE;
}
return JANET_ASYNC_STATUS_DONE;
janet_async_end(fiber);
}
static void net_sched_connect(JanetStream *stream) {
JanetListenerState *s = janet_listen(stream, net_machine_connect, JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateConnect), NULL);
NetStateConnect *state = (NetStateConnect *)s;
JanetFiber *f = janet_vm.root_fiber;
NetStateConnect *state = (NetStateConnect *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_WRITE, net_callback_connect, sizeof(NetStateConnect));
state->did_connect = 0;
#ifdef JANET_WINDOWS
net_machine_connect(s, JANET_ASYNC_EVENT_USER);
net_callback_connect(s, JANET_ASYNC_EVENT_USER);
#endif
}
@ -264,12 +264,12 @@ static int net_sched_accept_impl(NetStateAccept *state, Janet *err) {
#else
typedef struct {
JanetListenerState head;
JanetFunction *function;
} NetStateAccept;
JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) {
NetStateAccept *state = (NetStateAccept *)s;
void net_callback_accept(JanetFiber *fiber, JanetAsyncEvent event) {
JanetStream *stream = fiber->ev_stream;
NetStateAccept *state = (NetStateAccept *)fiber->ev_state;
switch (event) {
default:
break;
@ -278,40 +278,41 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
break;
}
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
janet_schedule(fiber, janet_wrap_nil());
janet_async_end(fiber);
return;
case JANET_ASYNC_EVENT_USER:
case JANET_ASYNC_EVENT_READ: {
#if defined(JANET_LINUX)
JSock connfd = accept4(s->stream->handle, NULL, NULL, SOCK_CLOEXEC);
JSock connfd = accept4(stream->handle, NULL, NULL, SOCK_CLOEXEC);
#else
/* On BSDs, CLOEXEC should be inherited from server socket */
JSock connfd = accept(s->stream->handle, NULL, NULL);
JSock connfd = accept(stream->handle, NULL, NULL);
#endif
if (JSOCKVALID(connfd)) {
janet_net_socknoblock(connfd);
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
Janet streamv = janet_wrap_abstract(stream);
if (state->function) {
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
fiber->supervisor_channel = s->fiber->supervisor_channel;
janet_schedule(fiber, janet_wrap_nil());
JanetFiber *sub_fiber = janet_fiber(state->function, 64, 1, &streamv);
sub_fiber->supervisor_channel = fiber->supervisor_channel;
janet_schedule(sub_fiber, janet_wrap_nil());
} else {
janet_schedule(s->fiber, streamv);
return JANET_ASYNC_STATUS_DONE;
janet_schedule(fiber, streamv);
janet_async_end(fiber);
return;
}
}
break;
}
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
JanetListenerState *s = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
NetStateAccept *state = (NetStateAccept *) s;
JanetFiber *f = janet_vm.root_fiber;
NetStateAccept *state = (NetStateAccept *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, sizeof(NetStateAccept));
state->function = fun;
net_machine_accept(s, JANET_ASYNC_EVENT_USER);
net_callback_accept(f, JANET_ASYNC_EVENT_USER);
janet_await();
}

View File

@ -159,7 +159,6 @@ struct JanetVM {
volatile JanetAtomicInt listener_count; /* used in signal handler, must be volatile */
JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */
JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */
JanetArray *listeners; /* For GC */
JanetTable signal_handlers;
#ifdef JANET_WINDOWS
void **iocp;
@ -176,6 +175,9 @@ struct JanetVM {
int timer;
int timer_enabled;
#else
JanetStream **streams;
size_t stream_count;
size_t stream_capacity;
pthread_attr_t new_thread_attr;
JanetHandle selfpipe[2];
struct pollfd *fds;

View File

@ -234,6 +234,11 @@ extern "C" {
#define JANET_EV_KQUEUE
#endif
/* Use poll as last resort */
#if !defined(JANET_WINDOWS) && !defined(JANET_EV_EPOLL) && !defined(JANET_EV_KQUEUE)
#define JANET_EV_POLL
#endif
/* How to export symbols */
#ifndef JANET_EXPORT
#ifdef JANET_WINDOWS
@ -406,12 +411,11 @@ typedef enum {
JANET_SIGNAL_USER6,
JANET_SIGNAL_USER7,
JANET_SIGNAL_USER8,
JANET_SIGNAL_USER9
JANET_SIGNAL_USER9,
JANET_SIGNAL_INTERRUPT = JANET_SIGNAL_USER8,
JANET_SIGNAL_EVENT = JANET_SIGNAL_USER9,
} JanetSignal;
#define JANET_SIGNAL_EVENT JANET_SIGNAL_USER9
#define JANET_SIGNAL_INTERRUPT JANET_SIGNAL_USER8
/* Fiber statuses - mostly corresponds to signals. */
typedef enum {
JANET_STATUS_DEAD,
@ -575,7 +579,7 @@ typedef void *JanetAbstract;
#define JANET_STREAM_CLOSED 0x1
#define JANET_STREAM_SOCKET 0x2
#define JANET_STREAM_REGISTERED 0x4
#define JANET_STREAM_UNREGISTERED 0x4
#define JANET_STREAM_READABLE 0x200
#define JANET_STREAM_WRITABLE 0x400
#define JANET_STREAM_ACCEPTABLE 0x800
@ -583,54 +587,42 @@ typedef void *JanetAbstract;
#define JANET_STREAM_TOCLOSE 0x10000
typedef enum {
JANET_ASYNC_EVENT_INIT,
JANET_ASYNC_EVENT_MARK,
JANET_ASYNC_EVENT_DEINIT,
JANET_ASYNC_EVENT_CLOSE,
JANET_ASYNC_EVENT_ERR,
JANET_ASYNC_EVENT_HUP,
JANET_ASYNC_EVENT_READ,
JANET_ASYNC_EVENT_WRITE,
JANET_ASYNC_EVENT_COMPLETE, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_USER
JANET_ASYNC_EVENT_INIT = 0,
JANET_ASYNC_EVENT_MARK = 1,
JANET_ASYNC_EVENT_DEINIT = 2,
JANET_ASYNC_EVENT_CLOSE = 3,
JANET_ASYNC_EVENT_ERR = 4,
JANET_ASYNC_EVENT_HUP = 5,
JANET_ASYNC_EVENT_READ = 6,
JANET_ASYNC_EVENT_WRITE = 7,
JANET_ASYNC_EVENT_COMPLETE = 8, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_USER = 9
} JanetAsyncEvent;
#define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ)
#define JANET_ASYNC_LISTEN_WRITE (1 << JANET_ASYNC_EVENT_WRITE)
typedef enum {
JANET_ASYNC_STATUS_NOT_DONE,
JANET_ASYNC_STATUS_DONE
} JanetAsyncStatus;
JANET_ASYNC_LISTEN_READ = 1,
JANET_ASYNC_LISTEN_WRITE,
JANET_ASYNC_LISTEN_BOTH
} JanetAsyncMode;
/* Typedefs */
typedef struct JanetListenerState JanetListenerState;
typedef struct JanetStream JanetStream;
typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEvent event);
typedef void (*JanetEVCallback)(JanetFiber *fiber, JanetAsyncEvent event);
/* Wrapper around file descriptors and HANDLEs that can be polled. */
struct JanetStream {
JanetHandle handle;
uint32_t flags;
JanetListenerState *read_state;
JanetListenerState *write_state;
uint32_t index;
JanetFiber *read_fiber;
JanetFiber *write_fiber;
const void *methods; /* Methods for this stream */
};
/* Interface for state machine based event loop */
struct JanetListenerState {
JanetListener machine;
JanetFiber *fiber;
JanetStream *stream;
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
implementation of the event loop and the particular event. */
uint32_t index; /* Used for GC and poll implentation */
uint32_t flags;
#ifdef JANET_WINDOWS
void *tag; /* Used to associate listeners with an overlapped structure */
int bytes; /* Used to track how many bytes were transfered. */
#endif
};
JANET_API void janet_async_end(JanetFiber *fiber);
JANET_API void *janet_async_start(JanetFiber *fiber, JanetStream *stream,
JanetAsyncMode mode, JanetEVCallback callback, size_t data_size);
#endif
/* Janet uses atomic integers in several places for synchronization between threads and
@ -930,7 +922,9 @@ struct JanetFiber {
* in a multi-tasking system. It would be possible to move these fields to a new
* type, say "JanetTask", that as separate from fibers to save a bit of space. */
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
JanetListenerState *waiting;
JanetEVCallback ev_callback; /* Call this before starting scheduled fibers */
JanetStream *ev_stream; /* which stream we are waiting on */
void *ev_state; /* Extra data for ev callback state */
void *supervisor_channel; /* Channel to push self to when complete */
#endif
};
@ -1407,9 +1401,6 @@ JANET_API void janet_cancel(JanetFiber *fiber, Janet value);
JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig);
JANET_API void janet_schedule_soon(JanetFiber *fiber, Janet value, JanetSignal sig);
/* Start a state machine listening for events from a stream */
JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user);
/* Shorthand for yielding to event loop in C */
JANET_NO_RETURN JANET_API void janet_await(void);
JANET_NO_RETURN JANET_API void janet_sleep_await(double sec);

View File

@ -19,7 +19,7 @@
(frame :source) (frame :source-line)))
(if x
(when is-verbose (eprintf "\e[32m✔\e[0m %s: %s: %v" line-info (describe e) x))
(eprintf "\e[31m✘\e[0m %s: %s: %v" line-info (describe e) x))
(do (eprintf "\e[31m✘\e[0m %s: %s: %v" line-info (describe e) x) (eflush)))
x)
(defmacro assert-error