From 73989f5cc7f2c35f2ad689b669a041488d170c73 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Wed, 29 Apr 2020 21:07:21 -0500 Subject: [PATCH] Consolidate windows and posix socket code. Also remove code that ignored sigpipe and instead try our best to ignore through various platform specific mechanisms. --- src/core/net.c | 226 +++++++++++++++++-------------------------------- 1 file changed, 78 insertions(+), 148 deletions(-) diff --git a/src/core/net.c b/src/core/net.c index 062c769c..1e092a7a 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -52,19 +52,8 @@ #define JANET_STREAM_READABLE 2 #define JANET_STREAM_WRITABLE 4 -typedef struct { -#ifdef JANET_WINDOWS - SOCKET socket; -#else - int fd; -#endif - int flags; -} JanetStream; - static int janet_stream_close(void *p, size_t s); - static int janet_stream_getter(void *p, Janet key, Janet *out); - static const JanetAbstractType StreamAT = { "core/stream", janet_stream_close, @@ -73,30 +62,46 @@ static const JanetAbstractType StreamAT = { JANET_ATEND_GET }; -static int janet_stream_close(void *p, size_t s) { - (void) s; - JanetStream *stream = p; - if (!(stream->flags & JANET_STREAM_CLOSED)) { - stream->flags |= JANET_STREAM_CLOSED; #ifdef JANET_WINDOWS - closesocket(stream->socket); -#else - close(stream->fd); -#endif - } - return 0; -} - -#ifdef JANET_WINDOWS -static JanetStream *make_stream(SOCKET socket, int flags) { +typedef struct { + SOCKET fd; + int flags; +} JanetStream; +#define JSOCKCLOSE(x) closesocket(x) +#define JSOCKDEFAULT INVALID_SOCKET +#define JLASTERR WSALastError() +#define JSOCKVALID(x) ((x) != INVALID_SOCKET) +#define JEINTR WSAEINTR +#define JEWOULDBLOCK WSAEWOULDBLOCK +#define JEAGAIN WSAEWOULDBLOCK +#define JPOLL WSAPoll +#define JPollStruct WSAPOLLFD +#define JSock SOCKET +#define JReadInt long +static JanetStream *make_stream(SOCKET fd, int flags) { u_long iMode = 0; JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); - ioctlsocket(socket, FIONBIO, &iMode); - stream->socket = socket; + ioctlsocket(fd, FIONBIO, &iMode); + stream->fd = fd; stream->flags = flags; return stream; } #else +typedef struct { + int fd; + int flags; +} JanetStream; +#define JSOCKCLOSE(x) close(x) +#define JSOCKDEFAULT 0 +#define JLASTERR errno +#define JSOCKVALID(x) ((x) >= 0) +#define JEINTR EINTR +#define JEWOULDBLOCK EWOULDBLOCK +#define JEAGAIN EAGAIN +#define JPOLL poll +#define JPollStruct struct pollfd +#define JSock int +#define JReadInt ssize_t static JanetStream *make_stream(int fd, int flags) { JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); @@ -106,6 +111,21 @@ static JanetStream *make_stream(int fd, int flags) { } #endif +/* We pass this flag to all send calls to prevent sigpipe */ +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL 0 +#endif + +static int janet_stream_close(void *p, size_t s) { + (void) s; + JanetStream *stream = p; + if (!(stream->flags & JANET_STREAM_CLOSED)) { + stream->flags |= JANET_STREAM_CLOSED; + JSOCKCLOSE(stream->fd); + } + return 0; +} + /* * Event loop */ @@ -167,11 +187,7 @@ typedef struct { #define JANET_LOOPFD_MAX 1024 /* Global loop data */ -#ifdef JANET_WINDOWS -JANET_THREAD_LOCAL WSAPOLLFD janet_vm_pollfds[JANET_LOOPFD_MAX]; -#else -JANET_THREAD_LOCAL struct pollfd janet_vm_pollfds[JANET_LOOPFD_MAX]; -#endif +JANET_THREAD_LOCAL JPollStruct janet_vm_pollfds[JANET_LOOPFD_MAX]; JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX]; JANET_THREAD_LOCAL int janet_vm_loop_count; @@ -211,11 +227,7 @@ static int janet_loop_schedule(JanetLoopFD lfd, short events) { } int index = janet_vm_loop_count++; janet_vm_loopfds[index] = lfd; -#ifdef JANET_WINDOWS - janet_vm_pollfds[index].fd = lfd.stream->socket; -#else janet_vm_pollfds[index].fd = lfd.stream->fd; -#endif janet_vm_pollfds[index].events = events; janet_vm_pollfds[index].revents = 0; return index; @@ -233,11 +245,7 @@ static void janet_loop_rmindex(int index) { static size_t janet_loop_event(size_t index) { JanetLoopFD *jlfd = janet_vm_loopfds + index; JanetStream *stream = jlfd->stream; -#ifdef JANET_WINDOWS - SOCKET socket = stream->socket; -#else - int fd = stream->fd; -#endif + JSock fd = stream->fd; int ret = 1; int should_resume = 0; Janet resumeval = janet_wrap_nil(); @@ -256,25 +264,15 @@ static size_t janet_loop_event(size_t index) { ret = 0; break; } -#ifdef JANET_WINDOWS - long nread; + JReadInt nread; do { - nread = recv(socket, buffer->data + buffer->count, bytes_left, 0); - } while (nread == -1 && WSAGetLastError() == WSAEINTR); - if (WSAGetLastError() == WSAEWOULDBLOCK) { + nread = recv(fd, buffer->data + buffer->count, bytes_left, 0); + } while (nread == -1 && JLASTERR == JEINTR); + if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) { ret = 1; break; } -#else - ssize_t nread; - do { - nread = read(fd, buffer->data + buffer->count, bytes_left); - } while (nread == -1 && errno == EINTR); - if (errno == EAGAIN || errno == EWOULDBLOCK) { - ret = 1; - break; - } -#endif + if (nread > 0) { buffer->count += nread; bytes_left -= nread; @@ -294,15 +292,8 @@ static size_t janet_loop_event(size_t index) { break; } case JLE_READ_ACCEPT: { -#ifdef JANET_WINDOWS - SOCKET connfd = accept(socket, NULL, NULL); - if (connfd != INVALID_SOCKET) { -#else - char addr[256] = {0}; /* Just make sure it is large enough for largest address type */ - socklen_t len = 0; - int connfd = accept(fd, (void *) &addr, &len); - if (connfd >= 0) { -#endif + JSock connfd = accept(fd, NULL, NULL); + if (JSOCKVALID(connfd)) { /* Made a new connection socket */ JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); Janet streamv = janet_wrap_abstract(stream); @@ -339,17 +330,10 @@ static size_t janet_loop_event(size_t index) { } if (start < len) { int32_t nbytes = len - start; -#ifdef JANET_WINDOWS - long nwrote; + JReadInt nwrote; do { - nwrote = send(socket, bytes + start, nbytes, 0); - } while (nwrote == -1 && WSAGetLastError() == WSAEINTR); -#else - ssize_t nwrote; - do { - nwrote = write(fd, bytes + start, nbytes); - } while (nwrote == -1 && errno == EINTR); -#endif + nwrote = send(fd, bytes + start, nbytes, MSG_NOSIGNAL); + } while (nwrote == -1 && JLASTERR == JEINTR); if (nwrote > 0) { start += nwrote; } else { @@ -372,7 +356,7 @@ static size_t janet_loop_event(size_t index) { case JLE_CONNECT: { break; } - } + } } /* Resume a fiber for some events */ @@ -403,17 +387,10 @@ static void janet_loop1(void) { /* Poll */ if (janet_vm_loop_count == 0) return; int ready; -#ifdef JANET_WINDOWS do { - ready = WSAPoll(janet_vm_pollfds, janet_vm_loop_count, -1); - } while (ready == -1 && WSAGetLastError() == WSAEINTR); + ready = JPOLL(janet_vm_pollfds, janet_vm_loop_count, -1); + } while (ready == -1 && JLASTERR == JEINTR); if (ready == -1) return; -#else - do { - ready = poll(janet_vm_pollfds, janet_vm_loop_count, -1); - } while (ready == -1 && errno == EAGAIN); - if (ready == -1) return; -#endif /* Handle events */ for (int i = 0; i < janet_vm_loop_count;) { int revents = janet_vm_pollfds[i].revents; @@ -500,10 +477,9 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { struct addrinfo *ai = janet_get_addrinfo(argv, 0); -#ifdef JANET_WINDOWS /* Create socket */ - SOCKET sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); - if (sock == INVALID_SOCKET) { + JSock sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (!JSOCKVALID(sock)) { freeaddrinfo(ai); janet_panic("could not create socket"); } @@ -512,25 +488,9 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { int status = connect(sock, ai->ai_addr, (int) ai->ai_addrlen); freeaddrinfo(ai); if (status == -1) { - closesocket(sock); + JSOCKCLOSE(sock); janet_panic("could not connect to socket"); } -#else - /* Create socket */ - int sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); - if (sock < 0) { - freeaddrinfo(ai); - janet_panic("could not create socket"); - } - - /* Connect to socket */ - int status = connect(sock, ai->ai_addr, ai->ai_addrlen); - freeaddrinfo(ai); - if (status < 0) { - close(sock); - janet_panic("could not connect to socket"); - } -#endif /* Wrap socket in abstract type JanetStream */ JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); @@ -545,57 +505,33 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { struct addrinfo *ai = janet_get_addrinfo(argv, 0); -#ifdef JANET_WINDOWS /* Check all addrinfos in a loop for the first that we can bind to. */ - SOCKET sfd = INVALID_SOCKET; + JSock sfd = JSOCKDEFAULT; struct addrinfo *rp = NULL; for (rp = ai; rp != NULL; rp = rp->ai_next) { sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (sfd == INVALID_SOCKET) continue; + if (!JSOCKVALID(sfd)) continue; /* Set various socket options */ int enable = 1; if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (char *) &enable, sizeof(int)) < 0) { - closesocket(sfd); + JSOCKCLOSE(sfd); janet_panic("setsockopt(SO_REUSEADDR) failed"); } - /* Bind */ - if (bind(sfd, rp->ai_addr, (int) rp->ai_addrlen) == 0) break; - closesocket(sfd); - } - if (NULL == rp) { - freeaddrinfo(ai); - janet_panic("could not bind to any sockets"); - } - - /* listen */ - int status = listen(sfd, 1024); - freeaddrinfo(ai); - if (status) { - closesocket(sfd); - janet_panic("could not listen on file descriptor"); - } -#else - /* Check all addrinfos in a loop for the first that we can bind to. */ - int sfd = 0; - struct addrinfo *rp = NULL; - for (rp = ai; rp != NULL; rp = rp->ai_next) { - sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (sfd == -1) continue; - /* Set various socket options */ - int enable = 1; - if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) { - close(sfd); - janet_panic("setsockopt(SO_REUSEADDR) failed"); +#ifdef SO_NOSIGPIPE + if (setsockopt(sfd, SOL_SOCKET, SO_NOSIGPIPE, &enable, sizeof(int)) < 0) { + JSOCKCLOSE(sfd); + janet_panic("setsockopt(SO_NOSIGPIPE) failed"); } +#endif #ifdef SO_REUSEPORT if (setsockopt(sfd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(int)) < 0) { - close(sfd); + JSOCKCLOSE(sfd); janet_panic("setsockopt(SO_REUSEPORT) failed"); } #endif /* Bind */ - if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) break; - close(sfd); + if (bind(sfd, rp->ai_addr, (int) rp->ai_addrlen) == 0) break; + JSOCKCLOSE(sfd); } if (NULL == rp) { freeaddrinfo(ai); @@ -606,16 +542,10 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { int status = listen(sfd, 1024); freeaddrinfo(ai); if (status) { - close(sfd); + JSOCKCLOSE(sfd); janet_panic("could not listen on file descriptor"); } - /* We need to ignore sigpipe when reading and writing to our connection socket. - * Since a connection could be disconnected at any time, any read or write may fail. - * We don't want to blow up the whole application. */ - signal(SIGPIPE, SIG_IGN); -#endif - /* Put sfd on our loop */ JanetLoopFD lfd = {0}; lfd.stream = make_stream(sfd, 0);