Switch to poll from select.

Simpler and more flexible interface, and also lets
us use epoll more easily on linux, which is the most important
plantform to optimize for network performance.
This commit is contained in:
Calvin Rose 2020-04-18 12:12:27 -05:00
parent 4ac382e553
commit 2904c19ed9
3 changed files with 102 additions and 33 deletions

View File

@ -222,6 +222,7 @@ test_files = [
'test/suite6.janet',
'test/suite7.janet',
'test/suite8.janet'
'test/suite9.janet'
]
foreach t : test_files
test(t, janet_nativeclient, args : files([t]), workdir : meson.current_source_dir())

View File

@ -32,7 +32,7 @@
#include <sys/fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <poll.h>
#include <netdb.h>
/*
@ -139,6 +139,7 @@ typedef struct {
#define JANET_LOOPFD_MAX 1024
/* Global loop data */
JANET_THREAD_LOCAL struct pollfd janet_vm_pollfds[JANET_LOOPFD_MAX];
JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX];
JANET_THREAD_LOCAL int janet_vm_loop_count;
@ -172,15 +173,25 @@ void janet_net_markloop(void) {
}
/* Add a loop fd to the global event loop */
static int janet_loop_schedule(JanetLoopFD lfd) {
static int janet_loop_schedule(JanetLoopFD lfd, short events) {
if (janet_vm_loop_count == JANET_LOOPFD_MAX) {
return -1;
}
int index = janet_vm_loop_count;
janet_vm_loopfds[janet_vm_loop_count++] = lfd;
int index = janet_vm_loop_count++;
janet_vm_loopfds[index] = lfd;
janet_vm_pollfds[index].fd = lfd.stream->fd;
janet_vm_pollfds[index].events = events;
janet_vm_pollfds[index].revents = 0;
return index;
}
/* Remove event from list */
static void janet_loop_rmindex(int index) {
janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count];
janet_vm_pollfds[index] = janet_vm_pollfds[janet_vm_loop_count];
}
/* Return delta in number of loop fds. Abstracted out so
* we can separate out the polling logic */
static size_t janet_loop_event(size_t index) {
@ -315,37 +326,30 @@ static size_t janet_loop_event(size_t index) {
}
/* Remove this handler from the handler pool. */
if (should_resume) {
janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count];
}
if (should_resume) janet_loop_rmindex(index);
return ret;
}
static void janet_loop1(void) {
/* Set up fd_sets */
fd_set readfds;
fd_set writefds;
FD_ZERO(&readfds);
FD_ZERO(&writefds);
int fd_max = 0;
for (int i = 0; i < janet_vm_loop_count; i++) {
JanetLoopFD *jlfd = janet_vm_loopfds + i;
int fd = jlfd->stream->fd;
if (fd > fd_max) fd_max = fd;
fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds;
FD_SET(fd, set);
}
/* Blocking call - we should add timeout functionality */
select(fd_max + 1, &readfds, &writefds, NULL, NULL);
/* Now handle all events */
/* Remove closed file descriptors */
for (int i = 0; i < janet_vm_loop_count;) {
JanetLoopFD *jlfd = janet_vm_loopfds + i;
int fd = jlfd->stream->fd;
fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds;
if (FD_ISSET(fd, set)) {
if (janet_vm_loopfds[i].stream->flags & JANET_STREAM_CLOSED) {
janet_loop_rmindex(i);
} else {
i++;
}
}
/* Poll */
if (janet_vm_loop_count == 0) return;
int ready;
do {
ready = poll(janet_vm_pollfds, janet_vm_loop_count, -1);
} while (ready == -1 && errno == EAGAIN);
if (ready == -1) return;
/* Handle events */
for (int i = 0; i < janet_vm_loop_count;) {
if (janet_vm_pollfds[i].events & janet_vm_pollfds[i].revents) {
size_t delta = janet_loop_event(i);
i += delta;
} else {
@ -373,7 +377,7 @@ JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *b
lfd.event_type = (flags & JANET_SCHED_FSOME) ? JLE_READ_SOME : JLE_READ_CHUNK;
lfd.data.read_chunk.buf = buf;
lfd.data.read_chunk.bytes_left = nbytes;
janet_loop_schedule(lfd);
janet_loop_schedule(lfd, POLLIN);
janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil());
}
@ -384,7 +388,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB
lfd.event_type = JLE_WRITE_FROM_BUFFER;
lfd.data.write_from_buffer.buf = buf;
lfd.data.write_from_buffer.start = 0;
janet_loop_schedule(lfd);
janet_loop_schedule(lfd, POLLOUT);
janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil());
}
@ -395,7 +399,7 @@ JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, co
lfd.event_type = JLE_WRITE_FROM_STRINGLIKE;
lfd.data.write_from_stringlike.str = str;
lfd.data.write_from_stringlike.start = 0;
janet_loop_schedule(lfd);
janet_loop_schedule(lfd, POLLOUT);
janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil());
}
@ -461,6 +465,19 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
for (rp = ai; rp != NULL; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1) continue;
/* Set various socket options */
int enable = 1;
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) {
close(sfd);
janet_panic("setsockopt(SO_REUSEADDR) failed");
}
#ifdef SO_REUSEPORT
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(int)) < 0) {
close(sfd);
janet_panic("setsockopt(SO_REUSEPORT) failed");
}
#endif
/* Bind */
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) break;
close(sfd);
}
@ -487,7 +504,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
lfd.stream = make_stream(sfd, 0);
lfd.event_type = JLE_READ_ACCEPT;
lfd.data.read_accept.handler = fun;
janet_loop_schedule(lfd);
janet_loop_schedule(lfd, POLLIN);
return janet_wrap_abstract(lfd.stream);
}

51
test/suite9.janet Normal file
View File

@ -0,0 +1,51 @@
# Copyright (c) 2020 Calvin Rose & contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
(import ./helper :prefix "" :exit true)
(start-suite 9)
# Net testing
(defn handler
"Simple handler for connections."
[stream]
(defer (:close stream)
(def id (gensym))
(def b @"")
(:read stream 1024 b)
(:write stream b)
(buffer/clear b)))
(def s (net/server "127.0.0.1" "8000" handler))
(assert s "made server 1")
(defn test-echo [msg]
(with [conn (net/connect "127.0.0.1" "8000")]
(:write conn msg)
(def res (:read conn 1024))
(assert (= (string res) msg) (string "echo " msg))))
(test-echo "hello")
(test-echo "world")
(test-echo (string/repeat "abcd" 200))
(:close s)
(end-suite)