From 4c211c8dce25318be553716691ba1fdce84eb58a Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 28 May 2020 16:51:11 -0500 Subject: [PATCH] More updates to the ev library. --- .gitignore | 3 + examples/numarray/numarray.c | 2 +- examples/tcpserver.janet | 1 + src/core/ev.c | 220 +++++++++++++++++++++++++---------- src/core/net.c | 124 ++++++++++---------- src/include/janet.h | 1 - 6 files changed, 227 insertions(+), 124 deletions(-) diff --git a/.gitignore b/.gitignore index e738c879..1aa622b4 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,9 @@ lockfile.janet # Local directory for testing local +# Common test file I use. +temp.janet + # Emscripten *.bc janet.js diff --git a/examples/numarray/numarray.c b/examples/numarray/numarray.c index 0c3153a4..825e9a34 100644 --- a/examples/numarray/numarray.c +++ b/examples/numarray/numarray.c @@ -23,7 +23,7 @@ static int num_array_gc(void *p, size_t s) { return 0; } -int num_array_get(void *p, Janet key, Janet *out); +int num_array_get(void *p, Janet key, Janet *out); void num_array_put(void *p, Janet key, Janet value); static const JanetAbstractType num_array_type = { diff --git a/examples/tcpserver.janet b/examples/tcpserver.janet index 71501b8b..f1d21e54 100644 --- a/examples/tcpserver.janet +++ b/examples/tcpserver.janet @@ -10,4 +10,5 @@ (buffer/clear b)) (printf "Done %v!" id))) +(print "Starting echo server on 127.0.0.1:8000") (net/server "127.0.0.1" "8000" handler) diff --git a/src/core/ev.c b/src/core/ev.c index 6ab80db4..896c1fe9 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -32,6 +32,7 @@ /* Includes */ +#include #include #include #include @@ -49,10 +50,11 @@ struct JanetTask { }; /* Min priority queue of timestamps for timeouts. */ -typedef struct JanetListenerTimeout JanetListenerTimeout; -struct JanetListenerTimeout JanetListenerTimeout { - JanetListenerState *state; - struct timespec when; +typedef uint64_t JanetTimestamp; +typedef struct JanetTimeout JanetTimeout; +struct JanetTimeout { + JanetTimestamp when; + JanetFiber *fiber; }; /* Global data */ @@ -62,40 +64,40 @@ JANET_THREAD_LOCAL size_t janet_vm_spawn_count = 0; JANET_THREAD_LOCAL size_t janet_vm_tq_count = 0; JANET_THREAD_LOCAL size_t janet_vm_tq_capacity = 0; JANET_THREAD_LOCAL JanetTask *janet_vm_spawn = NULL; -JANET_THREAD_LOCAL JanetListenerTimeout *janet_vm_tq = NULL; +JANET_THREAD_LOCAL JanetTimeout *janet_vm_tq = NULL; -/* Compare two timespecs - 1 if t1 > t2 */ -static int timespec_cmp(struct timespec t1, struct timespec t2) { - if (t1.tv_sec < t2.tv_sec) return -1; - if (t1.tv_sec > t2.tv_sec) return 1; - if (t1.tv_nsec < t2.tv_nsec) return -1; - if (t1.tv_nsec > t2.tv_nsec) return 1; - return 0; +/* Get current timestamp (millisecond precision) */ +static JanetTimestamp ts_now(void); + +/* Get current timestamp + an interval (millisecond precision) */ +static JanetTimestamp ts_delta(JanetTimestamp ts, double delta) { + ts += (int64_t)round(delta * 1000); + return ts; } /* Add a timeout to the timeout min heap */ -static void add_timeout(JanetListenerState *state, struct timespec when) { +static void add_timeout(JanetTimeout to) { size_t oldcount = janet_vm_tq_count; size_t newcount = oldcount + 1; - if (oldcount == janet_vm_tq_capacity) { + if (newcount > janet_vm_tq_capacity) { size_t newcap = 2 * newcount; - JanetListenerTimeout *tq = realloc(janet_vm_tq, newcap * sizeof(JanetListenerTimeout)); + JanetTimeout *tq = realloc(janet_vm_tq, newcap * sizeof(JanetTimeout)); if (NULL == tq) { JANET_OUT_OF_MEMORY; } + janet_vm_tq = tq; janet_vm_tq_capacity = newcap; } /* Append */ janet_vm_tq_count = newcount; - janet_vm_tq[oldcount] = { state, when }; + janet_vm_tq[oldcount] = to; /* Heapify */ size_t index = oldcount; while (index > 0) { size_t parent = (index - 1) >> 1; - int cmp = timespec_cmp(janet_vm_tq[parent].when, when); - if (cmp <= 0) break; + if (janet_vm_tq[parent].when <= janet_vm_tq[index].when) break; /* Swap */ - JanetListenerState tmp = janet_vm_tq[index]; + JanetTimeout tmp = janet_vm_tq[index]; janet_vm_tq[index] = janet_vm_tq[parent]; janet_vm_tq[parent] = tmp; /* Next */ @@ -103,9 +105,36 @@ static void add_timeout(JanetListenerState *state, struct timespec when) { } } -/* Extract the next timeout from the priority queue */ -static JanetListenerTimeout next_timeout(void) { +/* Look at the next timeout value without + * removing it. */ +static int peek_timeout(JanetTimeout *out) { + if (janet_vm_tq_count == 0) return 0; + *out = janet_vm_tq[0]; + return 1; +} +/* Remove the next timeout from the priority queue */ +static void pop_timeout(void) { + if (janet_vm_tq_count == 0) return; + janet_vm_tq[0] = janet_vm_tq[--janet_vm_tq_count]; + /* Keep heap invariant */ + size_t index = 0; + for (;;) { + size_t left = (index << 1) + 1; + size_t right = left + 1; + size_t smallest = index; + if (left < janet_vm_tq_count && + (janet_vm_tq[left].when < janet_vm_tq[smallest].when)) + smallest = left; + if (right < janet_vm_tq_count && + (janet_vm_tq[right].when < janet_vm_tq[smallest].when)) + smallest = right; + if (smallest == index) return; + JanetTimeout temp = janet_vm_tq[index]; + janet_vm_tq[index] = janet_vm_tq[smallest]; + janet_vm_tq[smallest] = temp; + index = smallest; + } } /* Create a new event listener */ @@ -166,7 +195,7 @@ void janet_pollable_mark(JanetPollable *pollable) { } } -/* Must be called to close all pollables - does NOT call `close` for you. +/* Must be called to close all pollables - does NOT call `close` for you. * Also does not free memory of the pollable, so can be used on close. */ void janet_pollable_deinit(JanetPollable *pollable) { pollable->flags |= JANET_POLL_FLAG_CLOSED; @@ -204,37 +233,18 @@ void janet_ev_mark(void) { janet_mark(janet_wrap_fiber(janet_vm_spawn[i].fiber)); janet_mark(janet_vm_spawn[i].value); } -} - -/* Run scheduled tasks */ -static void run_scheduled(void) { - size_t index = 0; - while (index < janet_vm_spawn_count) { - JanetTask task = janet_vm_spawn[index]; - Janet res; - JanetSignal sig = janet_continue(task.fiber, task.value, &res); - if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) { - janet_stacktrace(task.fiber, res); - } - index++; + for (size_t i = 0; i < janet_vm_tq_count; i++) { + janet_mark(janet_wrap_fiber(janet_vm_tq[i].fiber)); } - janet_vm_spawn_count = 0; } -/* Main event loop */ - -void janet_loop1_impl(void); - -void janet_loop1(void) { - if (janet_vm_active_listeners) { - janet_loop1_impl(); +/* Run a top level task */ +static void run_one(JanetFiber *fiber, Janet value) { + Janet res; + JanetSignal sig = janet_continue(fiber, value, &res); + if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) { + janet_stacktrace(fiber, res); } - /* Run scheduled fibers */ - run_scheduled(); -} - -void janet_loop(void) { - while (janet_vm_active_listeners || janet_vm_spawn_count) janet_loop1(); } /* Common init code */ @@ -243,6 +253,9 @@ void janet_ev_init_common(void) { janet_vm_spawn_count = 0; janet_vm_spawn = NULL; janet_vm_active_listeners = 0; + janet_vm_tq = NULL; + janet_vm_tq_count = 0; + janet_vm_tq_capacity = 0; } /* Common deinit code */ @@ -255,12 +268,50 @@ void janet_await(void) { janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil()); } +/* Main event loop */ + +void janet_loop1_impl(void); + +void janet_loop(void) { + while (janet_vm_active_listeners || janet_vm_spawn_count || janet_vm_tq_count) { + /* Run expired timers */ + JanetTimeout to; + while (peek_timeout(&to) && to.when <= ts_now()) { + pop_timeout(); + janet_schedule(to.fiber, janet_wrap_nil()); + } + /* Run scheduled fibers */ + size_t index = 0; + while (index < janet_vm_spawn_count) { + JanetTask task = janet_vm_spawn[index]; + run_one(task.fiber, task.value); + index++; + } + janet_vm_spawn_count = 0; + /* Poll for events */ + if (janet_vm_active_listeners || janet_vm_tq_count) { + janet_loop1_impl(); + } + } +} + + /* - * Start epoll implementation + * Start linux/epoll implementation */ +static JanetTimestamp ts_now(void) { + struct timespec now; + janet_assert(-1 != clock_gettime(CLOCK_MONOTONIC, &now), "failed to get time"); + uint64_t res = 1000 * now.tv_sec; + res += now.tv_nsec / 1000000; + return res; +} + /* Epoll global data */ JANET_THREAD_LOCAL int janet_vm_epoll = 0; +JANET_THREAD_LOCAL int janet_vm_timerfd = 0; +JANET_THREAD_LOCAL int janet_vm_timer_enabled = 0; static int make_epoll_events(int mask) { int events = 0; @@ -312,6 +363,21 @@ void janet_unlisten(JanetListenerState *state) { /* Replace janet_loop with this */ #define JANET_EPOLL_MAX_EVENTS 64 void janet_loop1_impl(void) { + /* Set timer */ + JanetTimeout to; + struct itimerspec its; + memset(&to, 0, sizeof(to)); + int has_timeout = peek_timeout(&to); + if (janet_vm_timer_enabled || has_timeout) { + memset(&its, 0, sizeof(its)); + if (has_timeout) { + its.it_value.tv_sec = to.when / 1000; + its.it_value.tv_nsec = (to.when % 1000) * 1000000; + } + timerfd_settime(janet_vm_timerfd, TFD_TIMER_ABSTIME, &its, NULL); + } + janet_vm_timer_enabled = has_timeout; + /* Poll for events */ struct epoll_event events[JANET_EPOLL_MAX_EVENTS]; int ready; @@ -321,17 +387,25 @@ void janet_loop1_impl(void) { if (ready == -1) { JANET_EXIT("failed to poll events"); } + /* Step state machines */ for (int i = 0; i < ready; i++) { JanetPollable *pollable = events[i].data.ptr; - int mask = events[i].events; - JanetListenerState *state = pollable->state; - while (NULL != state) { - if (mask & EPOLLOUT) - state->machine(state, JANET_ASYNC_EVENT_WRITE); - if (mask & EPOLLIN) - state->machine(state, JANET_ASYNC_EVENT_READ); - state = state->_next; + if (NULL == pollable) { + /* Timer event */ + pop_timeout(); + janet_schedule(to.fiber, janet_wrap_nil()); + } else { + /* Normal event */ + int mask = events[i].events; + JanetListenerState *state = pollable->state; + while (NULL != state) { + if (mask & EPOLLOUT) + state->machine(state, JANET_ASYNC_EVENT_WRITE); + if (mask & EPOLLIN) + state->machine(state, JANET_ASYNC_EVENT_READ); + state = state->_next; + } } } } @@ -339,11 +413,22 @@ void janet_loop1_impl(void) { void janet_ev_init(void) { janet_ev_init_common(); janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC); + janet_vm_timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); + janet_vm_timer_enabled = 0; + if (janet_vm_epoll == -1 || janet_vm_timerfd == -1) goto error; + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLET; + ev.data.ptr = NULL; + if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_timerfd, &ev)) goto error; + return; +error: + JANET_EXIT("failed to initialize event loop"); } void janet_ev_deinit(void) { janet_ev_deinit_common(); close(janet_vm_epoll); + close(janet_vm_timerfd); janet_vm_epoll = 0; } @@ -361,12 +446,27 @@ static Janet cfun_ev_spawn(int32_t argc, Janet *argv) { return argv[0]; } +static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + double sec = janet_getnumber(argv, 0); + JanetTimeout to; + to.when = ts_delta(ts_now(), sec); + to.fiber = janet_vm_root_fiber; + add_timeout(to); + janet_await(); +} + static const JanetReg ev_cfuns[] = { { "ev/go", cfun_ev_spawn, JDOC("(ev/go fiber &opt value)\n\n" - "Put a fiber on the event loop to be resumed later. Optionally pass " - "a value to resume with, otherwise resumes with nil.") + "Put a fiber on the event loop to be resumed later. Optionally pass " + "a value to resume with, otherwise resumes with nil.") + }, + { + "ev/sleep", cfun_ev_sleep, + JDOC("(ev/sleep sec)\n\n" + "Suspend the current fiber for sec seconds without blocking the event loop.") }, {NULL, NULL, NULL} }; diff --git a/src/core/net.c b/src/core/net.c index dd79bdd5..b722445b 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -164,41 +164,41 @@ void net_machine_read(JanetListenerState *s, int event) { break; case JANET_ASYNC_EVENT_READ: /* Read in bytes */ - { - JanetBuffer *buffer = state->buf; - int32_t bytes_left = state->bytes_left; - janet_buffer_extra(buffer, bytes_left); - JReadInt nread; - do { - nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0); - } while (nread == -1 && JLASTERR == JEINTR); - if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) { - break; - } - - /* Increment buffer counts */ - if (nread > 0) { - buffer->count += nread; - bytes_left -= nread; - } else { - bytes_left = 0; - } - state->bytes_left = bytes_left; - - /* Resume if done */ - if (!state->is_chunk || bytes_left == 0) { - Janet resume_val = nread > 0 ? janet_wrap_buffer(buffer) : janet_wrap_nil(); - janet_schedule(s->fiber, resume_val); - janet_unlisten(s); - } + { + JanetBuffer *buffer = state->buf; + int32_t bytes_left = state->bytes_left; + janet_buffer_extra(buffer, bytes_left); + JReadInt nread; + do { + nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0); + } while (nread == -1 && JLASTERR == JEINTR); + if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) { + break; } - break; + + /* Increment buffer counts */ + if (nread > 0) { + buffer->count += nread; + bytes_left -= nread; + } else { + bytes_left = 0; + } + state->bytes_left = bytes_left; + + /* Resume if done */ + if (!state->is_chunk || bytes_left == 0) { + Janet resume_val = nread > 0 ? janet_wrap_buffer(buffer) : janet_wrap_nil(); + janet_schedule(s->fiber, resume_val); + janet_unlisten(s); + } + } + break; } } JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_EVENT_READ, sizeof(NetStateRead)); + JANET_ASYNC_EVENT_READ, sizeof(NetStateRead)); state->is_chunk = 0; state->buf = buf; state->bytes_left = nbytes; @@ -207,7 +207,7 @@ JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *b JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_EVENT_READ, sizeof(NetStateRead)); + JANET_ASYNC_EVENT_READ, sizeof(NetStateRead)); state->is_chunk = 1; state->buf = buf; state->bytes_left = nbytes; @@ -235,50 +235,50 @@ void net_machine_write(JanetListenerState *s, int event) { break; case JANET_ASYNC_EVENT_MARK: janet_mark(state->is_buffer - ? janet_wrap_buffer(state->src.buf) - : janet_wrap_string(state->src.str)); + ? janet_wrap_buffer(state->src.buf) + : janet_wrap_string(state->src.str)); break; case JANET_ASYNC_EVENT_CLOSE: janet_schedule(s->fiber, janet_wrap_nil()); break; case JANET_ASYNC_EVENT_WRITE: { - int32_t start, len; - const uint8_t *bytes; - start = state->start; - if (state->is_buffer) { - JanetBuffer *buffer = state->src.buf; - bytes = buffer->data; - len = buffer->count; + int32_t start, len; + const uint8_t *bytes; + start = state->start; + if (state->is_buffer) { + JanetBuffer *buffer = state->src.buf; + bytes = buffer->data; + len = buffer->count; + } else { + bytes = state->src.str; + len = janet_string_length(bytes); + } + if (start < len) { + int32_t nbytes = len - start; + JReadInt nwrote; + do { + nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL); + } while (nwrote == -1 && JLASTERR == JEINTR); + if (nwrote > 0) { + start += nwrote; } else { - bytes = state->src.str; - len = janet_string_length(bytes); + start = len; } - if (start < len) { - int32_t nbytes = len - start; - JReadInt nwrote; - do { - nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL); - } while (nwrote == -1 && JLASTERR == JEINTR); - if (nwrote > 0) { - start += nwrote; - } else { - start = len; - } - } - state->start = start; - if (start >= len) { - janet_schedule(s->fiber, janet_wrap_nil()); - janet_unlisten(s); - } - break; + } + state->start = start; + if (start >= len) { + janet_schedule(s->fiber, janet_wrap_nil()); + janet_unlisten(s); } break; + } + break; } } JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf) { NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, - JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite)); + JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite)); state->is_buffer = 1; state->start = 0; state->src.buf = buf; @@ -288,7 +288,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str) { NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, - JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite)); + JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite)); state->is_buffer = 0; state->start = 0; state->src.str = str; @@ -442,7 +442,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { /* Put sfd on our loop */ JanetStream *stream = make_stream(sfd, 0); NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, - JANET_ASYNC_EVENT_READ, sizeof(NetStateSimpleServer)); + JANET_ASYNC_EVENT_READ, sizeof(NetStateSimpleServer)); ss->function = fun; return janet_wrap_abstract(stream); } diff --git a/src/include/janet.h b/src/include/janet.h index cfc613e7..a5bc4d84 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1180,7 +1180,6 @@ struct JanetListenerState { }; /* Run the event loop */ -JANET_API void janet_loop1(void); JANET_API void janet_loop(void); /* Wrapper around pollables */