1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-08 09:49:58 +00:00

Address #1431 - level-trigger mode for net/accept-loop

In the edge-trigger mode before this change, if a socket
receives 2 connections before one can be handled, then only a single
connection is handle and 1 connection will never be handled in some
cases. Reverting to level-trigger mode makes this impossible.
This commit is contained in:
Calvin Rose 2024-04-26 19:28:20 -05:00
parent 7c9157a0ed
commit f0f1b7ce9e
3 changed files with 50 additions and 6 deletions

View File

@ -1530,6 +1530,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
} }
} }
void janet_stream_edge_triggered(JanetStream *stream) {
(void) stream;
}
void janet_stream_level_triggered(JanetStream *stream) {
(void) stream;
}
#elif defined(JANET_EV_EPOLL) #elif defined(JANET_EV_EPOLL)
static JanetTimestamp ts_now(void) { static JanetTimestamp ts_now(void) {
@ -1541,15 +1549,15 @@ static JanetTimestamp ts_now(void) {
} }
/* Wait for the next event */ /* Wait for the next event */
static void janet_register_stream(JanetStream *stream) { static void janet_register_stream_impl(JanetStream *stream, int mod, int edge_trigger) {
struct epoll_event ev; struct epoll_event ev;
ev.events = EPOLLET; ev.events = edge_trigger ? EPOLLET : 0;
if (stream->flags & (JANET_STREAM_READABLE | JANET_STREAM_ACCEPTABLE)) ev.events |= EPOLLIN; if (stream->flags & (JANET_STREAM_READABLE | JANET_STREAM_ACCEPTABLE)) ev.events |= EPOLLIN;
if (stream->flags & JANET_STREAM_WRITABLE) ev.events |= EPOLLOUT; if (stream->flags & JANET_STREAM_WRITABLE) ev.events |= EPOLLOUT;
ev.data.ptr = stream; ev.data.ptr = stream;
int status; int status;
do { do {
status = epoll_ctl(janet_vm.epoll, EPOLL_CTL_ADD, stream->handle, &ev); status = epoll_ctl(janet_vm.epoll, mod ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, stream->handle, &ev);
} while (status == -1 && errno == EINTR); } while (status == -1 && errno == EINTR);
if (status == -1) { if (status == -1) {
if (errno == EPERM) { if (errno == EPERM) {
@ -1563,6 +1571,18 @@ static void janet_register_stream(JanetStream *stream) {
} }
} }
static void janet_register_stream(JanetStream *stream) {
janet_register_stream_impl(stream, 0, 1);
}
void janet_stream_edge_triggered(JanetStream *stream) {
janet_register_stream_impl(stream, 1, 1);
}
void janet_stream_level_triggered(JanetStream *stream) {
janet_register_stream_impl(stream, 1, 0);
}
#define JANET_EPOLL_MAX_EVENTS 64 #define JANET_EPOLL_MAX_EVENTS 64
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
struct itimerspec its; struct itimerspec its;
@ -1692,14 +1712,15 @@ static void timestamp2timespec(struct timespec *t, JanetTimestamp ts) {
t->tv_nsec = ts == 0 ? 0 : (ts % 1000) * 1000000; t->tv_nsec = ts == 0 ? 0 : (ts % 1000) * 1000000;
} }
void janet_register_stream(JanetStream *stream) { void janet_register_stream_impl(JanetStream *stream, int edge_trigger) {
struct kevent kevs[2]; struct kevent kevs[2];
int length = 0; int length = 0;
int clear = edge_trigger ? EV_CLEAR : 0;
if (stream->flags & (JANET_STREAM_READABLE | JANET_STREAM_ACCEPTABLE)) { if (stream->flags & (JANET_STREAM_READABLE | JANET_STREAM_ACCEPTABLE)) {
EV_SETx(&kevs[length++], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, stream); EV_SETx(&kevs[length++], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE | clear, 0, 0, stream);
} }
if (stream->flags & JANET_STREAM_WRITABLE) { if (stream->flags & JANET_STREAM_WRITABLE) {
EV_SETx(&kevs[length++], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, stream); EV_SETx(&kevs[length++], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE | clear, 0, 0, stream);
} }
int status; int status;
do { do {
@ -1710,6 +1731,14 @@ void janet_register_stream(JanetStream *stream) {
} }
} }
void janet_stream_edge_triggered(JanetStream *stream) {
janet_register_stream_impl(stream, 1);
}
void janet_stream_level_triggered(JanetStream *stream) {
janet_register_stream_impl(stream, 0);
}
#define JANET_KQUEUE_MAX_EVENTS 64 #define JANET_KQUEUE_MAX_EVENTS 64
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
@ -1832,6 +1861,14 @@ void janet_register_stream(JanetStream *stream) {
janet_vm.stream_count = new_count; janet_vm.stream_count = new_count;
} }
void janet_stream_edge_triggered(JanetStream *stream) {
(void) stream;
}
void janet_stream_level_triggered(JanetStream *stream) {
(void) stream;
}
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
/* set event flags */ /* set event flags */

View File

@ -319,6 +319,7 @@ JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunctio
NetStateAccept *state = janet_malloc(sizeof(NetStateAccept)); NetStateAccept *state = janet_malloc(sizeof(NetStateAccept));
memset(state, 0, sizeof(NetStateAccept)); memset(state, 0, sizeof(NetStateAccept));
state->function = fun; state->function = fun;
if (fun) janet_stream_level_triggered(stream);
janet_async_start(stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, state); janet_async_start(stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, state);
} }

View File

@ -636,6 +636,12 @@ JANET_API void janet_async_end(JanetFiber *fiber);
/* Needed for windows to mark a fiber as waiting for an IOCP completion event. Noop on other platforms. */ /* Needed for windows to mark a fiber as waiting for an IOCP completion event. Noop on other platforms. */
JANET_API void janet_async_in_flight(JanetFiber *fiber); JANET_API void janet_async_in_flight(JanetFiber *fiber);
/* On some platforms, it is important to be able to control if a stream is edge-trigger or level triggered.
* For example, a server that is accepting connections might want to be level triggered or edge-triggered
* depending on expected service. */
JANET_API void janet_stream_edge_triggered(JanetStream *stream);
JANET_API void janet_stream_level_triggered(JanetStream *stream);
#endif #endif
/* Janet uses atomic integers in several places for synchronization between threads and /* Janet uses atomic integers in several places for synchronization between threads and