1
0
mirror of https://github.com/janet-lang/janet synced 2024-09-30 16:00:40 +00:00

Consolidate windows and posix socket code.

Also remove code that ignored sigpipe and instead try
our best to ignore through various platform specific mechanisms.
This commit is contained in:
Calvin Rose 2020-04-29 21:07:21 -05:00
parent dd458c8ab5
commit 73989f5cc7

View File

@ -52,19 +52,8 @@
#define JANET_STREAM_READABLE 2 #define JANET_STREAM_READABLE 2
#define JANET_STREAM_WRITABLE 4 #define JANET_STREAM_WRITABLE 4
typedef struct {
#ifdef JANET_WINDOWS
SOCKET socket;
#else
int fd;
#endif
int flags;
} JanetStream;
static int janet_stream_close(void *p, size_t s); static int janet_stream_close(void *p, size_t s);
static int janet_stream_getter(void *p, Janet key, Janet *out); static int janet_stream_getter(void *p, Janet key, Janet *out);
static const JanetAbstractType StreamAT = { static const JanetAbstractType StreamAT = {
"core/stream", "core/stream",
janet_stream_close, janet_stream_close,
@ -73,30 +62,46 @@ static const JanetAbstractType StreamAT = {
JANET_ATEND_GET JANET_ATEND_GET
}; };
static int janet_stream_close(void *p, size_t s) {
(void) s;
JanetStream *stream = p;
if (!(stream->flags & JANET_STREAM_CLOSED)) {
stream->flags |= JANET_STREAM_CLOSED;
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
closesocket(stream->socket); typedef struct {
#else SOCKET fd;
close(stream->fd); int flags;
#endif } JanetStream;
} #define JSOCKCLOSE(x) closesocket(x)
return 0; #define JSOCKDEFAULT INVALID_SOCKET
} #define JLASTERR WSALastError()
#define JSOCKVALID(x) ((x) != INVALID_SOCKET)
#ifdef JANET_WINDOWS #define JEINTR WSAEINTR
static JanetStream *make_stream(SOCKET socket, int flags) { #define JEWOULDBLOCK WSAEWOULDBLOCK
#define JEAGAIN WSAEWOULDBLOCK
#define JPOLL WSAPoll
#define JPollStruct WSAPOLLFD
#define JSock SOCKET
#define JReadInt long
static JanetStream *make_stream(SOCKET fd, int flags) {
u_long iMode = 0; u_long iMode = 0;
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
ioctlsocket(socket, FIONBIO, &iMode); ioctlsocket(fd, FIONBIO, &iMode);
stream->socket = socket; stream->fd = fd;
stream->flags = flags; stream->flags = flags;
return stream; return stream;
} }
#else #else
typedef struct {
int fd;
int flags;
} JanetStream;
#define JSOCKCLOSE(x) close(x)
#define JSOCKDEFAULT 0
#define JLASTERR errno
#define JSOCKVALID(x) ((x) >= 0)
#define JEINTR EINTR
#define JEWOULDBLOCK EWOULDBLOCK
#define JEAGAIN EAGAIN
#define JPOLL poll
#define JPollStruct struct pollfd
#define JSock int
#define JReadInt ssize_t
static JanetStream *make_stream(int fd, int flags) { static JanetStream *make_stream(int fd, int flags) {
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
@ -106,6 +111,21 @@ static JanetStream *make_stream(int fd, int flags) {
} }
#endif #endif
/* We pass this flag to all send calls to prevent sigpipe */
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
static int janet_stream_close(void *p, size_t s) {
(void) s;
JanetStream *stream = p;
if (!(stream->flags & JANET_STREAM_CLOSED)) {
stream->flags |= JANET_STREAM_CLOSED;
JSOCKCLOSE(stream->fd);
}
return 0;
}
/* /*
* Event loop * Event loop
*/ */
@ -167,11 +187,7 @@ typedef struct {
#define JANET_LOOPFD_MAX 1024 #define JANET_LOOPFD_MAX 1024
/* Global loop data */ /* Global loop data */
#ifdef JANET_WINDOWS JANET_THREAD_LOCAL JPollStruct janet_vm_pollfds[JANET_LOOPFD_MAX];
JANET_THREAD_LOCAL WSAPOLLFD janet_vm_pollfds[JANET_LOOPFD_MAX];
#else
JANET_THREAD_LOCAL struct pollfd janet_vm_pollfds[JANET_LOOPFD_MAX];
#endif
JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX]; JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX];
JANET_THREAD_LOCAL int janet_vm_loop_count; JANET_THREAD_LOCAL int janet_vm_loop_count;
@ -211,11 +227,7 @@ static int janet_loop_schedule(JanetLoopFD lfd, short events) {
} }
int index = janet_vm_loop_count++; int index = janet_vm_loop_count++;
janet_vm_loopfds[index] = lfd; janet_vm_loopfds[index] = lfd;
#ifdef JANET_WINDOWS
janet_vm_pollfds[index].fd = lfd.stream->socket;
#else
janet_vm_pollfds[index].fd = lfd.stream->fd; janet_vm_pollfds[index].fd = lfd.stream->fd;
#endif
janet_vm_pollfds[index].events = events; janet_vm_pollfds[index].events = events;
janet_vm_pollfds[index].revents = 0; janet_vm_pollfds[index].revents = 0;
return index; return index;
@ -233,11 +245,7 @@ static void janet_loop_rmindex(int index) {
static size_t janet_loop_event(size_t index) { static size_t janet_loop_event(size_t index) {
JanetLoopFD *jlfd = janet_vm_loopfds + index; JanetLoopFD *jlfd = janet_vm_loopfds + index;
JanetStream *stream = jlfd->stream; JanetStream *stream = jlfd->stream;
#ifdef JANET_WINDOWS JSock fd = stream->fd;
SOCKET socket = stream->socket;
#else
int fd = stream->fd;
#endif
int ret = 1; int ret = 1;
int should_resume = 0; int should_resume = 0;
Janet resumeval = janet_wrap_nil(); Janet resumeval = janet_wrap_nil();
@ -256,25 +264,15 @@ static size_t janet_loop_event(size_t index) {
ret = 0; ret = 0;
break; break;
} }
#ifdef JANET_WINDOWS JReadInt nread;
long nread;
do { do {
nread = recv(socket, buffer->data + buffer->count, bytes_left, 0); nread = recv(fd, buffer->data + buffer->count, bytes_left, 0);
} while (nread == -1 && WSAGetLastError() == WSAEINTR); } while (nread == -1 && JLASTERR == JEINTR);
if (WSAGetLastError() == WSAEWOULDBLOCK) { if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) {
ret = 1; ret = 1;
break; break;
} }
#else
ssize_t nread;
do {
nread = read(fd, buffer->data + buffer->count, bytes_left);
} while (nread == -1 && errno == EINTR);
if (errno == EAGAIN || errno == EWOULDBLOCK) {
ret = 1;
break;
}
#endif
if (nread > 0) { if (nread > 0) {
buffer->count += nread; buffer->count += nread;
bytes_left -= nread; bytes_left -= nread;
@ -294,15 +292,8 @@ static size_t janet_loop_event(size_t index) {
break; break;
} }
case JLE_READ_ACCEPT: { case JLE_READ_ACCEPT: {
#ifdef JANET_WINDOWS JSock connfd = accept(fd, NULL, NULL);
SOCKET connfd = accept(socket, NULL, NULL); if (JSOCKVALID(connfd)) {
if (connfd != INVALID_SOCKET) {
#else
char addr[256] = {0}; /* Just make sure it is large enough for largest address type */
socklen_t len = 0;
int connfd = accept(fd, (void *) &addr, &len);
if (connfd >= 0) {
#endif
/* Made a new connection socket */ /* Made a new connection socket */
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
Janet streamv = janet_wrap_abstract(stream); Janet streamv = janet_wrap_abstract(stream);
@ -339,17 +330,10 @@ static size_t janet_loop_event(size_t index) {
} }
if (start < len) { if (start < len) {
int32_t nbytes = len - start; int32_t nbytes = len - start;
#ifdef JANET_WINDOWS JReadInt nwrote;
long nwrote;
do { do {
nwrote = send(socket, bytes + start, nbytes, 0); nwrote = send(fd, bytes + start, nbytes, MSG_NOSIGNAL);
} while (nwrote == -1 && WSAGetLastError() == WSAEINTR); } while (nwrote == -1 && JLASTERR == JEINTR);
#else
ssize_t nwrote;
do {
nwrote = write(fd, bytes + start, nbytes);
} while (nwrote == -1 && errno == EINTR);
#endif
if (nwrote > 0) { if (nwrote > 0) {
start += nwrote; start += nwrote;
} else { } else {
@ -403,17 +387,10 @@ static void janet_loop1(void) {
/* Poll */ /* Poll */
if (janet_vm_loop_count == 0) return; if (janet_vm_loop_count == 0) return;
int ready; int ready;
#ifdef JANET_WINDOWS
do { do {
ready = WSAPoll(janet_vm_pollfds, janet_vm_loop_count, -1); ready = JPOLL(janet_vm_pollfds, janet_vm_loop_count, -1);
} while (ready == -1 && WSAGetLastError() == WSAEINTR); } while (ready == -1 && JLASTERR == JEINTR);
if (ready == -1) return; if (ready == -1) return;
#else
do {
ready = poll(janet_vm_pollfds, janet_vm_loop_count, -1);
} while (ready == -1 && errno == EAGAIN);
if (ready == -1) return;
#endif
/* Handle events */ /* Handle events */
for (int i = 0; i < janet_vm_loop_count;) { for (int i = 0; i < janet_vm_loop_count;) {
int revents = janet_vm_pollfds[i].revents; int revents = janet_vm_pollfds[i].revents;
@ -500,10 +477,9 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
struct addrinfo *ai = janet_get_addrinfo(argv, 0); struct addrinfo *ai = janet_get_addrinfo(argv, 0);
#ifdef JANET_WINDOWS
/* Create socket */ /* Create socket */
SOCKET sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); JSock sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
if (sock == INVALID_SOCKET) { if (!JSOCKVALID(sock)) {
freeaddrinfo(ai); freeaddrinfo(ai);
janet_panic("could not create socket"); janet_panic("could not create socket");
} }
@ -512,25 +488,9 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
int status = connect(sock, ai->ai_addr, (int) ai->ai_addrlen); int status = connect(sock, ai->ai_addr, (int) ai->ai_addrlen);
freeaddrinfo(ai); freeaddrinfo(ai);
if (status == -1) { if (status == -1) {
closesocket(sock); JSOCKCLOSE(sock);
janet_panic("could not connect to socket"); janet_panic("could not connect to socket");
} }
#else
/* Create socket */
int sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
if (sock < 0) {
freeaddrinfo(ai);
janet_panic("could not create socket");
}
/* Connect to socket */
int status = connect(sock, ai->ai_addr, ai->ai_addrlen);
freeaddrinfo(ai);
if (status < 0) {
close(sock);
janet_panic("could not connect to socket");
}
#endif
/* Wrap socket in abstract type JanetStream */ /* Wrap socket in abstract type JanetStream */
JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
@ -545,57 +505,33 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
struct addrinfo *ai = janet_get_addrinfo(argv, 0); struct addrinfo *ai = janet_get_addrinfo(argv, 0);
#ifdef JANET_WINDOWS
/* Check all addrinfos in a loop for the first that we can bind to. */ /* Check all addrinfos in a loop for the first that we can bind to. */
SOCKET sfd = INVALID_SOCKET; JSock sfd = JSOCKDEFAULT;
struct addrinfo *rp = NULL; struct addrinfo *rp = NULL;
for (rp = ai; rp != NULL; rp = rp->ai_next) { for (rp = ai; rp != NULL; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == INVALID_SOCKET) continue; if (!JSOCKVALID(sfd)) continue;
/* Set various socket options */ /* Set various socket options */
int enable = 1; int enable = 1;
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (char *) &enable, sizeof(int)) < 0) { if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (char *) &enable, sizeof(int)) < 0) {
closesocket(sfd); JSOCKCLOSE(sfd);
janet_panic("setsockopt(SO_REUSEADDR) failed"); janet_panic("setsockopt(SO_REUSEADDR) failed");
} }
/* Bind */ #ifdef SO_NOSIGPIPE
if (bind(sfd, rp->ai_addr, (int) rp->ai_addrlen) == 0) break; if (setsockopt(sfd, SOL_SOCKET, SO_NOSIGPIPE, &enable, sizeof(int)) < 0) {
closesocket(sfd); JSOCKCLOSE(sfd);
} janet_panic("setsockopt(SO_NOSIGPIPE) failed");
if (NULL == rp) {
freeaddrinfo(ai);
janet_panic("could not bind to any sockets");
}
/* listen */
int status = listen(sfd, 1024);
freeaddrinfo(ai);
if (status) {
closesocket(sfd);
janet_panic("could not listen on file descriptor");
}
#else
/* Check all addrinfos in a loop for the first that we can bind to. */
int sfd = 0;
struct addrinfo *rp = NULL;
for (rp = ai; rp != NULL; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1) continue;
/* Set various socket options */
int enable = 1;
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) {
close(sfd);
janet_panic("setsockopt(SO_REUSEADDR) failed");
} }
#endif
#ifdef SO_REUSEPORT #ifdef SO_REUSEPORT
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(int)) < 0) { if (setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(int)) < 0) {
close(sfd); JSOCKCLOSE(sfd);
janet_panic("setsockopt(SO_REUSEPORT) failed"); janet_panic("setsockopt(SO_REUSEPORT) failed");
} }
#endif #endif
/* Bind */ /* Bind */
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) break; if (bind(sfd, rp->ai_addr, (int) rp->ai_addrlen) == 0) break;
close(sfd); JSOCKCLOSE(sfd);
} }
if (NULL == rp) { if (NULL == rp) {
freeaddrinfo(ai); freeaddrinfo(ai);
@ -606,16 +542,10 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
int status = listen(sfd, 1024); int status = listen(sfd, 1024);
freeaddrinfo(ai); freeaddrinfo(ai);
if (status) { if (status) {
close(sfd); JSOCKCLOSE(sfd);
janet_panic("could not listen on file descriptor"); janet_panic("could not listen on file descriptor");
} }
/* We need to ignore sigpipe when reading and writing to our connection socket.
* Since a connection could be disconnected at any time, any read or write may fail.
* We don't want to blow up the whole application. */
signal(SIGPIPE, SIG_IGN);
#endif
/* Put sfd on our loop */ /* Put sfd on our loop */
JanetLoopFD lfd = {0}; JanetLoopFD lfd = {0};
lfd.stream = make_stream(sfd, 0); lfd.stream = make_stream(sfd, 0);