1
0
mirror of https://github.com/janet-lang/janet synced 2025-01-26 07:06:51 +00:00

More updates to the ev library.

This commit is contained in:
Calvin Rose 2020-05-28 16:51:11 -05:00
parent c10d9b9d9d
commit 4c211c8dce
6 changed files with 227 additions and 124 deletions

3
.gitignore vendored
View File

@ -32,6 +32,9 @@ lockfile.janet
# Local directory for testing # Local directory for testing
local local
# Common test file I use.
temp.janet
# Emscripten # Emscripten
*.bc *.bc
janet.js janet.js

View File

@ -10,4 +10,5 @@
(buffer/clear b)) (buffer/clear b))
(printf "Done %v!" id))) (printf "Done %v!" id)))
(print "Starting echo server on 127.0.0.1:8000")
(net/server "127.0.0.1" "8000" handler) (net/server "127.0.0.1" "8000" handler)

View File

@ -32,6 +32,7 @@
/* Includes */ /* Includes */
#include <limits.h>
#include <errno.h> #include <errno.h>
#include <unistd.h> #include <unistd.h>
#include <signal.h> #include <signal.h>
@ -49,10 +50,11 @@ struct JanetTask {
}; };
/* Min priority queue of timestamps for timeouts. */ /* Min priority queue of timestamps for timeouts. */
typedef struct JanetListenerTimeout JanetListenerTimeout; typedef uint64_t JanetTimestamp;
struct JanetListenerTimeout JanetListenerTimeout { typedef struct JanetTimeout JanetTimeout;
JanetListenerState *state; struct JanetTimeout {
struct timespec when; JanetTimestamp when;
JanetFiber *fiber;
}; };
/* Global data */ /* Global data */
@ -62,40 +64,40 @@ JANET_THREAD_LOCAL size_t janet_vm_spawn_count = 0;
JANET_THREAD_LOCAL size_t janet_vm_tq_count = 0; JANET_THREAD_LOCAL size_t janet_vm_tq_count = 0;
JANET_THREAD_LOCAL size_t janet_vm_tq_capacity = 0; JANET_THREAD_LOCAL size_t janet_vm_tq_capacity = 0;
JANET_THREAD_LOCAL JanetTask *janet_vm_spawn = NULL; JANET_THREAD_LOCAL JanetTask *janet_vm_spawn = NULL;
JANET_THREAD_LOCAL JanetListenerTimeout *janet_vm_tq = NULL; JANET_THREAD_LOCAL JanetTimeout *janet_vm_tq = NULL;
/* Compare two timespecs - 1 if t1 > t2 */ /* Get current timestamp (millisecond precision) */
static int timespec_cmp(struct timespec t1, struct timespec t2) { static JanetTimestamp ts_now(void);
if (t1.tv_sec < t2.tv_sec) return -1;
if (t1.tv_sec > t2.tv_sec) return 1; /* Get current timestamp + an interval (millisecond precision) */
if (t1.tv_nsec < t2.tv_nsec) return -1; static JanetTimestamp ts_delta(JanetTimestamp ts, double delta) {
if (t1.tv_nsec > t2.tv_nsec) return 1; ts += (int64_t)round(delta * 1000);
return 0; return ts;
} }
/* Add a timeout to the timeout min heap */ /* Add a timeout to the timeout min heap */
static void add_timeout(JanetListenerState *state, struct timespec when) { static void add_timeout(JanetTimeout to) {
size_t oldcount = janet_vm_tq_count; size_t oldcount = janet_vm_tq_count;
size_t newcount = oldcount + 1; size_t newcount = oldcount + 1;
if (oldcount == janet_vm_tq_capacity) { if (newcount > janet_vm_tq_capacity) {
size_t newcap = 2 * newcount; size_t newcap = 2 * newcount;
JanetListenerTimeout *tq = realloc(janet_vm_tq, newcap * sizeof(JanetListenerTimeout)); JanetTimeout *tq = realloc(janet_vm_tq, newcap * sizeof(JanetTimeout));
if (NULL == tq) { if (NULL == tq) {
JANET_OUT_OF_MEMORY; JANET_OUT_OF_MEMORY;
} }
janet_vm_tq = tq;
janet_vm_tq_capacity = newcap; janet_vm_tq_capacity = newcap;
} }
/* Append */ /* Append */
janet_vm_tq_count = newcount; janet_vm_tq_count = newcount;
janet_vm_tq[oldcount] = { state, when }; janet_vm_tq[oldcount] = to;
/* Heapify */ /* Heapify */
size_t index = oldcount; size_t index = oldcount;
while (index > 0) { while (index > 0) {
size_t parent = (index - 1) >> 1; size_t parent = (index - 1) >> 1;
int cmp = timespec_cmp(janet_vm_tq[parent].when, when); if (janet_vm_tq[parent].when <= janet_vm_tq[index].when) break;
if (cmp <= 0) break;
/* Swap */ /* Swap */
JanetListenerState tmp = janet_vm_tq[index]; JanetTimeout tmp = janet_vm_tq[index];
janet_vm_tq[index] = janet_vm_tq[parent]; janet_vm_tq[index] = janet_vm_tq[parent];
janet_vm_tq[parent] = tmp; janet_vm_tq[parent] = tmp;
/* Next */ /* Next */
@ -103,9 +105,36 @@ static void add_timeout(JanetListenerState *state, struct timespec when) {
} }
} }
/* Extract the next timeout from the priority queue */ /* Look at the next timeout value without
static JanetListenerTimeout next_timeout(void) { * removing it. */
static int peek_timeout(JanetTimeout *out) {
if (janet_vm_tq_count == 0) return 0;
*out = janet_vm_tq[0];
return 1;
}
/* Remove the next timeout from the priority queue */
static void pop_timeout(void) {
if (janet_vm_tq_count == 0) return;
janet_vm_tq[0] = janet_vm_tq[--janet_vm_tq_count];
/* Keep heap invariant */
size_t index = 0;
for (;;) {
size_t left = (index << 1) + 1;
size_t right = left + 1;
size_t smallest = index;
if (left < janet_vm_tq_count &&
(janet_vm_tq[left].when < janet_vm_tq[smallest].when))
smallest = left;
if (right < janet_vm_tq_count &&
(janet_vm_tq[right].when < janet_vm_tq[smallest].when))
smallest = right;
if (smallest == index) return;
JanetTimeout temp = janet_vm_tq[index];
janet_vm_tq[index] = janet_vm_tq[smallest];
janet_vm_tq[smallest] = temp;
index = smallest;
}
} }
/* Create a new event listener */ /* Create a new event listener */
@ -204,37 +233,18 @@ void janet_ev_mark(void) {
janet_mark(janet_wrap_fiber(janet_vm_spawn[i].fiber)); janet_mark(janet_wrap_fiber(janet_vm_spawn[i].fiber));
janet_mark(janet_vm_spawn[i].value); janet_mark(janet_vm_spawn[i].value);
} }
} for (size_t i = 0; i < janet_vm_tq_count; i++) {
janet_mark(janet_wrap_fiber(janet_vm_tq[i].fiber));
/* Run scheduled tasks */
static void run_scheduled(void) {
size_t index = 0;
while (index < janet_vm_spawn_count) {
JanetTask task = janet_vm_spawn[index];
Janet res;
JanetSignal sig = janet_continue(task.fiber, task.value, &res);
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) {
janet_stacktrace(task.fiber, res);
}
index++;
} }
janet_vm_spawn_count = 0;
} }
/* Main event loop */ /* Run a top level task */
static void run_one(JanetFiber *fiber, Janet value) {
void janet_loop1_impl(void); Janet res;
JanetSignal sig = janet_continue(fiber, value, &res);
void janet_loop1(void) { if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) {
if (janet_vm_active_listeners) { janet_stacktrace(fiber, res);
janet_loop1_impl();
} }
/* Run scheduled fibers */
run_scheduled();
}
void janet_loop(void) {
while (janet_vm_active_listeners || janet_vm_spawn_count) janet_loop1();
} }
/* Common init code */ /* Common init code */
@ -243,6 +253,9 @@ void janet_ev_init_common(void) {
janet_vm_spawn_count = 0; janet_vm_spawn_count = 0;
janet_vm_spawn = NULL; janet_vm_spawn = NULL;
janet_vm_active_listeners = 0; janet_vm_active_listeners = 0;
janet_vm_tq = NULL;
janet_vm_tq_count = 0;
janet_vm_tq_capacity = 0;
} }
/* Common deinit code */ /* Common deinit code */
@ -255,12 +268,50 @@ void janet_await(void) {
janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil());
} }
/* Main event loop */
void janet_loop1_impl(void);
void janet_loop(void) {
while (janet_vm_active_listeners || janet_vm_spawn_count || janet_vm_tq_count) {
/* Run expired timers */
JanetTimeout to;
while (peek_timeout(&to) && to.when <= ts_now()) {
pop_timeout();
janet_schedule(to.fiber, janet_wrap_nil());
}
/* Run scheduled fibers */
size_t index = 0;
while (index < janet_vm_spawn_count) {
JanetTask task = janet_vm_spawn[index];
run_one(task.fiber, task.value);
index++;
}
janet_vm_spawn_count = 0;
/* Poll for events */
if (janet_vm_active_listeners || janet_vm_tq_count) {
janet_loop1_impl();
}
}
}
/* /*
* Start epoll implementation * Start linux/epoll implementation
*/ */
static JanetTimestamp ts_now(void) {
struct timespec now;
janet_assert(-1 != clock_gettime(CLOCK_MONOTONIC, &now), "failed to get time");
uint64_t res = 1000 * now.tv_sec;
res += now.tv_nsec / 1000000;
return res;
}
/* Epoll global data */ /* Epoll global data */
JANET_THREAD_LOCAL int janet_vm_epoll = 0; JANET_THREAD_LOCAL int janet_vm_epoll = 0;
JANET_THREAD_LOCAL int janet_vm_timerfd = 0;
JANET_THREAD_LOCAL int janet_vm_timer_enabled = 0;
static int make_epoll_events(int mask) { static int make_epoll_events(int mask) {
int events = 0; int events = 0;
@ -312,6 +363,21 @@ void janet_unlisten(JanetListenerState *state) {
/* Replace janet_loop with this */ /* Replace janet_loop with this */
#define JANET_EPOLL_MAX_EVENTS 64 #define JANET_EPOLL_MAX_EVENTS 64
void janet_loop1_impl(void) { void janet_loop1_impl(void) {
/* Set timer */
JanetTimeout to;
struct itimerspec its;
memset(&to, 0, sizeof(to));
int has_timeout = peek_timeout(&to);
if (janet_vm_timer_enabled || has_timeout) {
memset(&its, 0, sizeof(its));
if (has_timeout) {
its.it_value.tv_sec = to.when / 1000;
its.it_value.tv_nsec = (to.when % 1000) * 1000000;
}
timerfd_settime(janet_vm_timerfd, TFD_TIMER_ABSTIME, &its, NULL);
}
janet_vm_timer_enabled = has_timeout;
/* Poll for events */ /* Poll for events */
struct epoll_event events[JANET_EPOLL_MAX_EVENTS]; struct epoll_event events[JANET_EPOLL_MAX_EVENTS];
int ready; int ready;
@ -321,17 +387,25 @@ void janet_loop1_impl(void) {
if (ready == -1) { if (ready == -1) {
JANET_EXIT("failed to poll events"); JANET_EXIT("failed to poll events");
} }
/* Step state machines */ /* Step state machines */
for (int i = 0; i < ready; i++) { for (int i = 0; i < ready; i++) {
JanetPollable *pollable = events[i].data.ptr; JanetPollable *pollable = events[i].data.ptr;
int mask = events[i].events; if (NULL == pollable) {
JanetListenerState *state = pollable->state; /* Timer event */
while (NULL != state) { pop_timeout();
if (mask & EPOLLOUT) janet_schedule(to.fiber, janet_wrap_nil());
state->machine(state, JANET_ASYNC_EVENT_WRITE); } else {
if (mask & EPOLLIN) /* Normal event */
state->machine(state, JANET_ASYNC_EVENT_READ); int mask = events[i].events;
state = state->_next; JanetListenerState *state = pollable->state;
while (NULL != state) {
if (mask & EPOLLOUT)
state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (mask & EPOLLIN)
state->machine(state, JANET_ASYNC_EVENT_READ);
state = state->_next;
}
} }
} }
} }
@ -339,11 +413,22 @@ void janet_loop1_impl(void) {
void janet_ev_init(void) { void janet_ev_init(void) {
janet_ev_init_common(); janet_ev_init_common();
janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC); janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC);
janet_vm_timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
janet_vm_timer_enabled = 0;
if (janet_vm_epoll == -1 || janet_vm_timerfd == -1) goto error;
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = NULL;
if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_timerfd, &ev)) goto error;
return;
error:
JANET_EXIT("failed to initialize event loop");
} }
void janet_ev_deinit(void) { void janet_ev_deinit(void) {
janet_ev_deinit_common(); janet_ev_deinit_common();
close(janet_vm_epoll); close(janet_vm_epoll);
close(janet_vm_timerfd);
janet_vm_epoll = 0; janet_vm_epoll = 0;
} }
@ -361,12 +446,27 @@ static Janet cfun_ev_spawn(int32_t argc, Janet *argv) {
return argv[0]; return argv[0];
} }
static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
double sec = janet_getnumber(argv, 0);
JanetTimeout to;
to.when = ts_delta(ts_now(), sec);
to.fiber = janet_vm_root_fiber;
add_timeout(to);
janet_await();
}
static const JanetReg ev_cfuns[] = { static const JanetReg ev_cfuns[] = {
{ {
"ev/go", cfun_ev_spawn, "ev/go", cfun_ev_spawn,
JDOC("(ev/go fiber &opt value)\n\n" JDOC("(ev/go fiber &opt value)\n\n"
"Put a fiber on the event loop to be resumed later. Optionally pass " "Put a fiber on the event loop to be resumed later. Optionally pass "
"a value to resume with, otherwise resumes with nil.") "a value to resume with, otherwise resumes with nil.")
},
{
"ev/sleep", cfun_ev_sleep,
JDOC("(ev/sleep sec)\n\n"
"Suspend the current fiber for sec seconds without blocking the event loop.")
}, },
{NULL, NULL, NULL} {NULL, NULL, NULL}
}; };

View File

@ -164,41 +164,41 @@ void net_machine_read(JanetListenerState *s, int event) {
break; break;
case JANET_ASYNC_EVENT_READ: case JANET_ASYNC_EVENT_READ:
/* Read in bytes */ /* Read in bytes */
{ {
JanetBuffer *buffer = state->buf; JanetBuffer *buffer = state->buf;
int32_t bytes_left = state->bytes_left; int32_t bytes_left = state->bytes_left;
janet_buffer_extra(buffer, bytes_left); janet_buffer_extra(buffer, bytes_left);
JReadInt nread; JReadInt nread;
do { do {
nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0); nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0);
} while (nread == -1 && JLASTERR == JEINTR); } while (nread == -1 && JLASTERR == JEINTR);
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) { if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) {
break; break;
}
/* Increment buffer counts */
if (nread > 0) {
buffer->count += nread;
bytes_left -= nread;
} else {
bytes_left = 0;
}
state->bytes_left = bytes_left;
/* Resume if done */
if (!state->is_chunk || bytes_left == 0) {
Janet resume_val = nread > 0 ? janet_wrap_buffer(buffer) : janet_wrap_nil();
janet_schedule(s->fiber, resume_val);
janet_unlisten(s);
}
} }
break;
/* Increment buffer counts */
if (nread > 0) {
buffer->count += nread;
bytes_left -= nread;
} else {
bytes_left = 0;
}
state->bytes_left = bytes_left;
/* Resume if done */
if (!state->is_chunk || bytes_left == 0) {
Janet resume_val = nread > 0 ? janet_wrap_buffer(buffer) : janet_wrap_nil();
janet_schedule(s->fiber, resume_val);
janet_unlisten(s);
}
}
break;
} }
} }
JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
JANET_ASYNC_EVENT_READ, sizeof(NetStateRead)); JANET_ASYNC_EVENT_READ, sizeof(NetStateRead));
state->is_chunk = 0; state->is_chunk = 0;
state->buf = buf; state->buf = buf;
state->bytes_left = nbytes; state->bytes_left = nbytes;
@ -207,7 +207,7 @@ JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *b
JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
JANET_ASYNC_EVENT_READ, sizeof(NetStateRead)); JANET_ASYNC_EVENT_READ, sizeof(NetStateRead));
state->is_chunk = 1; state->is_chunk = 1;
state->buf = buf; state->buf = buf;
state->bytes_left = nbytes; state->bytes_left = nbytes;
@ -235,50 +235,50 @@ void net_machine_write(JanetListenerState *s, int event) {
break; break;
case JANET_ASYNC_EVENT_MARK: case JANET_ASYNC_EVENT_MARK:
janet_mark(state->is_buffer janet_mark(state->is_buffer
? janet_wrap_buffer(state->src.buf) ? janet_wrap_buffer(state->src.buf)
: janet_wrap_string(state->src.str)); : janet_wrap_string(state->src.str));
break; break;
case JANET_ASYNC_EVENT_CLOSE: case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(s->fiber, janet_wrap_nil()); janet_schedule(s->fiber, janet_wrap_nil());
break; break;
case JANET_ASYNC_EVENT_WRITE: { case JANET_ASYNC_EVENT_WRITE: {
int32_t start, len; int32_t start, len;
const uint8_t *bytes; const uint8_t *bytes;
start = state->start; start = state->start;
if (state->is_buffer) { if (state->is_buffer) {
JanetBuffer *buffer = state->src.buf; JanetBuffer *buffer = state->src.buf;
bytes = buffer->data; bytes = buffer->data;
len = buffer->count; len = buffer->count;
} else {
bytes = state->src.str;
len = janet_string_length(bytes);
}
if (start < len) {
int32_t nbytes = len - start;
JReadInt nwrote;
do {
nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL);
} while (nwrote == -1 && JLASTERR == JEINTR);
if (nwrote > 0) {
start += nwrote;
} else { } else {
bytes = state->src.str; start = len;
len = janet_string_length(bytes);
} }
if (start < len) { }
int32_t nbytes = len - start; state->start = start;
JReadInt nwrote; if (start >= len) {
do { janet_schedule(s->fiber, janet_wrap_nil());
nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL); janet_unlisten(s);
} while (nwrote == -1 && JLASTERR == JEINTR);
if (nwrote > 0) {
start += nwrote;
} else {
start = len;
}
}
state->start = start;
if (start >= len) {
janet_schedule(s->fiber, janet_wrap_nil());
janet_unlisten(s);
}
break;
} }
break; break;
}
break;
} }
} }
JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf) { JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf) {
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite)); JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite));
state->is_buffer = 1; state->is_buffer = 1;
state->start = 0; state->start = 0;
state->src.buf = buf; state->src.buf = buf;
@ -288,7 +288,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB
JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str) { JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str) {
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite)); JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite));
state->is_buffer = 0; state->is_buffer = 0;
state->start = 0; state->start = 0;
state->src.str = str; state->src.str = str;
@ -442,7 +442,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
/* Put sfd on our loop */ /* Put sfd on our loop */
JanetStream *stream = make_stream(sfd, 0); JanetStream *stream = make_stream(sfd, 0);
NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server,
JANET_ASYNC_EVENT_READ, sizeof(NetStateSimpleServer)); JANET_ASYNC_EVENT_READ, sizeof(NetStateSimpleServer));
ss->function = fun; ss->function = fun;
return janet_wrap_abstract(stream); return janet_wrap_abstract(stream);
} }

View File

@ -1180,7 +1180,6 @@ struct JanetListenerState {
}; };
/* Run the event loop */ /* Run the event loop */
JANET_API void janet_loop1(void);
JANET_API void janet_loop(void); JANET_API void janet_loop(void);
/* Wrapper around pollables */ /* Wrapper around pollables */