From f0f1b7ce9e232b017c55946203a6f1299cd584d7 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Fri, 26 Apr 2024 19:28:20 -0500 Subject: [PATCH] Address #1431 - level-trigger mode for net/accept-loop In the edge-trigger mode before this change, if a socket receives 2 connections before one can be handled, then only a single connection is handle and 1 connection will never be handled in some cases. Reverting to level-trigger mode makes this impossible. --- src/core/ev.c | 49 +++++++++++++++++++++++++++++++++++++++------ src/core/net.c | 1 + src/include/janet.h | 6 ++++++ 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index 335b9738..161a4c59 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -1530,6 +1530,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { } } +void janet_stream_edge_triggered(JanetStream *stream) { + (void) stream; +} + +void janet_stream_level_triggered(JanetStream *stream) { + (void) stream; +} + #elif defined(JANET_EV_EPOLL) static JanetTimestamp ts_now(void) { @@ -1541,15 +1549,15 @@ static JanetTimestamp ts_now(void) { } /* Wait for the next event */ -static void janet_register_stream(JanetStream *stream) { +static void janet_register_stream_impl(JanetStream *stream, int mod, int edge_trigger) { struct epoll_event ev; - ev.events = EPOLLET; + ev.events = edge_trigger ? EPOLLET : 0; if (stream->flags & (JANET_STREAM_READABLE | JANET_STREAM_ACCEPTABLE)) ev.events |= EPOLLIN; if (stream->flags & JANET_STREAM_WRITABLE) ev.events |= EPOLLOUT; ev.data.ptr = stream; int status; do { - status = epoll_ctl(janet_vm.epoll, EPOLL_CTL_ADD, stream->handle, &ev); + status = epoll_ctl(janet_vm.epoll, mod ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, stream->handle, &ev); } while (status == -1 && errno == EINTR); if (status == -1) { if (errno == EPERM) { @@ -1563,6 +1571,18 @@ static void janet_register_stream(JanetStream *stream) { } } +static void janet_register_stream(JanetStream *stream) { + janet_register_stream_impl(stream, 0, 1); +} + +void janet_stream_edge_triggered(JanetStream *stream) { + janet_register_stream_impl(stream, 1, 1); +} + +void janet_stream_level_triggered(JanetStream *stream) { + janet_register_stream_impl(stream, 1, 0); +} + #define JANET_EPOLL_MAX_EVENTS 64 void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { struct itimerspec its; @@ -1692,14 +1712,15 @@ static void timestamp2timespec(struct timespec *t, JanetTimestamp ts) { t->tv_nsec = ts == 0 ? 0 : (ts % 1000) * 1000000; } -void janet_register_stream(JanetStream *stream) { +void janet_register_stream_impl(JanetStream *stream, int edge_trigger) { struct kevent kevs[2]; int length = 0; + int clear = edge_trigger ? EV_CLEAR : 0; if (stream->flags & (JANET_STREAM_READABLE | JANET_STREAM_ACCEPTABLE)) { - EV_SETx(&kevs[length++], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, stream); + EV_SETx(&kevs[length++], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE | clear, 0, 0, stream); } if (stream->flags & JANET_STREAM_WRITABLE) { - EV_SETx(&kevs[length++], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, stream); + EV_SETx(&kevs[length++], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE | clear, 0, 0, stream); } int status; do { @@ -1710,6 +1731,14 @@ void janet_register_stream(JanetStream *stream) { } } +void janet_stream_edge_triggered(JanetStream *stream) { + janet_register_stream_impl(stream, 1); +} + +void janet_stream_level_triggered(JanetStream *stream) { + janet_register_stream_impl(stream, 0); +} + #define JANET_KQUEUE_MAX_EVENTS 64 void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { @@ -1832,6 +1861,14 @@ void janet_register_stream(JanetStream *stream) { janet_vm.stream_count = new_count; } +void janet_stream_edge_triggered(JanetStream *stream) { + (void) stream; +} + +void janet_stream_level_triggered(JanetStream *stream) { + (void) stream; +} + void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { /* set event flags */ diff --git a/src/core/net.c b/src/core/net.c index 2e71718a..4f5323e3 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -319,6 +319,7 @@ JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunctio NetStateAccept *state = janet_malloc(sizeof(NetStateAccept)); memset(state, 0, sizeof(NetStateAccept)); state->function = fun; + if (fun) janet_stream_level_triggered(stream); janet_async_start(stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, state); } diff --git a/src/include/janet.h b/src/include/janet.h index 1cfb970f..195c1c47 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -636,6 +636,12 @@ JANET_API void janet_async_end(JanetFiber *fiber); /* Needed for windows to mark a fiber as waiting for an IOCP completion event. Noop on other platforms. */ JANET_API void janet_async_in_flight(JanetFiber *fiber); +/* On some platforms, it is important to be able to control if a stream is edge-trigger or level triggered. + * For example, a server that is accepting connections might want to be level triggered or edge-triggered + * depending on expected service. */ +JANET_API void janet_stream_edge_triggered(JanetStream *stream); +JANET_API void janet_stream_level_triggered(JanetStream *stream); + #endif /* Janet uses atomic integers in several places for synchronization between threads and