1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-29 03:19:54 +00:00

Port net code to windows.

Use winsock2 and WSAPoll. Not the most high performance
solution but should work well.
This commit is contained in:
Calvin Rose 2020-04-18 19:14:38 -04:00
parent 0745c15d7b
commit 4a693222b4
4 changed files with 145 additions and 11 deletions

View File

@ -29,6 +29,10 @@
#define _POSIX_C_SOURCE 200112L #define _POSIX_C_SOURCE 200112L
#endif #endif
#if defined(WIN32) || defined(_WIN32)
#define WIN32_LEAN_AND_MEAN
#endif
/* Needed for realpath on linux */ /* Needed for realpath on linux */
#if !defined(_XOPEN_SOURCE) && (defined(__linux__) || defined(__EMSCRIPTEN__)) #if !defined(_XOPEN_SOURCE) && (defined(__linux__) || defined(__EMSCRIPTEN__))
#define _XOPEN_SOURCE 500 #define _XOPEN_SOURCE 500

View File

@ -26,6 +26,14 @@
#include "util.h" #include "util.h"
#endif #endif
#ifdef JANET_WINDOWS
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment (lib, "Ws2_32.lib")
#pragma comment (lib, "Mswsock.lib")
#pragma comment (lib, "Advapi32.lib")
#else
#include <unistd.h> #include <unistd.h>
#include <signal.h> #include <signal.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
@ -34,6 +42,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <poll.h> #include <poll.h>
#include <netdb.h> #include <netdb.h>
#endif
/* /*
* Streams * Streams
@ -44,7 +53,11 @@
#define JANET_STREAM_WRITABLE 4 #define JANET_STREAM_WRITABLE 4
typedef struct { typedef struct {
#ifdef JANET_WINDOWS
SOCKET socket;
#else
int fd; int fd;
#endif
int flags; int flags;
} JanetStream; } JanetStream;
@ -65,11 +78,25 @@ static int janet_stream_close(void *p, size_t s) {
JanetStream *stream = p; JanetStream *stream = p;
if (!(stream->flags & JANET_STREAM_CLOSED)) { if (!(stream->flags & JANET_STREAM_CLOSED)) {
stream->flags |= JANET_STREAM_CLOSED; stream->flags |= JANET_STREAM_CLOSED;
#ifdef JANET_WINDOWS
closesocket(stream->socket);
#else
close(stream->fd); close(stream->fd);
#endif
} }
return 0; return 0;
} }
#ifdef JANET_WINDOWS
static JanetStream *make_stream(SOCKET socket, int flags) {
u_long iMode = 0;
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
ioctlsocket(socket, FIONBIO, &iMode);
stream->socket = socket;
stream->flags = flags;
return stream;
}
#else
static JanetStream *make_stream(int fd, int flags) { static JanetStream *make_stream(int fd, int flags) {
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
@ -77,6 +104,7 @@ static JanetStream *make_stream(int fd, int flags) {
stream->flags = flags; stream->flags = flags;
return stream; return stream;
} }
#endif
/* /*
* Event loop * Event loop
@ -139,7 +167,11 @@ typedef struct {
#define JANET_LOOPFD_MAX 1024 #define JANET_LOOPFD_MAX 1024
/* Global loop data */ /* Global loop data */
#ifdef JANET_WINDOWS
JANET_THREAD_LOCAL WSAPOLLFD janet_vm_pollfds[JANET_LOOPFD_MAX];
#else
JANET_THREAD_LOCAL struct pollfd janet_vm_pollfds[JANET_LOOPFD_MAX]; JANET_THREAD_LOCAL struct pollfd janet_vm_pollfds[JANET_LOOPFD_MAX];
#endif
JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX]; JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX];
JANET_THREAD_LOCAL int janet_vm_loop_count; JANET_THREAD_LOCAL int janet_vm_loop_count;
@ -179,7 +211,11 @@ static int janet_loop_schedule(JanetLoopFD lfd, short events) {
} }
int index = janet_vm_loop_count++; int index = janet_vm_loop_count++;
janet_vm_loopfds[index] = lfd; janet_vm_loopfds[index] = lfd;
#ifdef JANET_WINDOWS
janet_vm_pollfds[index].fd = lfd.stream->socket;
#else
janet_vm_pollfds[index].fd = lfd.stream->fd; janet_vm_pollfds[index].fd = lfd.stream->fd;
#endif
janet_vm_pollfds[index].events = events; janet_vm_pollfds[index].events = events;
janet_vm_pollfds[index].revents = 0; janet_vm_pollfds[index].revents = 0;
return index; return index;
@ -197,7 +233,11 @@ static void janet_loop_rmindex(int index) {
static size_t janet_loop_event(size_t index) { static size_t janet_loop_event(size_t index) {
JanetLoopFD *jlfd = janet_vm_loopfds + index; JanetLoopFD *jlfd = janet_vm_loopfds + index;
JanetStream *stream = jlfd->stream; JanetStream *stream = jlfd->stream;
#ifdef JANET_WINDOWS
SOCKET socket = stream->socket;
#else
int fd = stream->fd; int fd = stream->fd;
#endif
int ret = 1; int ret = 1;
int should_resume = 0; int should_resume = 0;
Janet resumeval = janet_wrap_nil(); Janet resumeval = janet_wrap_nil();
@ -211,20 +251,30 @@ static size_t janet_loop_event(size_t index) {
JanetBuffer *buffer = jlfd->data.read_chunk.buf; JanetBuffer *buffer = jlfd->data.read_chunk.buf;
int32_t bytes_left = jlfd->data.read_chunk.bytes_left; int32_t bytes_left = jlfd->data.read_chunk.bytes_left;
janet_buffer_extra(buffer, bytes_left); janet_buffer_extra(buffer, bytes_left);
ssize_t nread;
errno = 0;
if (!(stream->flags & JANET_STREAM_READABLE)) { if (!(stream->flags & JANET_STREAM_READABLE)) {
should_resume = 1; should_resume = 1;
ret = 0; ret = 0;
break; break;
} }
#ifdef JANET_WINDOWS
long nread;
do {
nread = recv(socket, buffer->data + buffer->count, bytes_left, 0);
} while (nread == -1 && WSAGetLastError() == WSAEINTR);
if (WSAGetLastError() == WSAEWOULDBLOCK) {
ret = 1;
break;
}
#else
ssize_t nread;
do { do {
nread = read(fd, buffer->data + buffer->count, bytes_left); nread = read(fd, buffer->data + buffer->count, bytes_left);
} while (errno == EINTR); } while (nread == -1 && errno == EINTR);
if (errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EAGAIN || errno == EWOULDBLOCK) {
ret = 1; ret = 1;
break; break;
} }
#endif
if (nread > 0) { if (nread > 0) {
buffer->count += nread; buffer->count += nread;
bytes_left -= nread; bytes_left -= nread;
@ -244,10 +294,15 @@ static size_t janet_loop_event(size_t index) {
break; break;
} }
case JLE_READ_ACCEPT: { case JLE_READ_ACCEPT: {
#ifdef JANET_WINDOWS
SOCKET connfd = accept(socket, NULL, NULL);
if (connfd != INVALID_SOCKET) {
#else
char addr[256] = {0}; /* Just make sure it is large enough for largest address type */ char addr[256] = {0}; /* Just make sure it is large enough for largest address type */
socklen_t len = 0; socklen_t len = 0;
int connfd = accept(fd, (void *) &addr, &len); int connfd = accept(fd, (void *) &addr, &len);
if (connfd >= 0) { if (connfd >= 0) {
#endif
/* Made a new connection socket */ /* Made a new connection socket */
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
Janet streamv = janet_wrap_abstract(stream); Janet streamv = janet_wrap_abstract(stream);
@ -284,11 +339,17 @@ static size_t janet_loop_event(size_t index) {
} }
if (start < len) { if (start < len) {
int32_t nbytes = len - start; int32_t nbytes = len - start;
#ifdef JANET_WINDOWS
long nwrote;
do {
nwrote = send(socket, bytes + start, nbytes, 0);
} while (nwrote == -1 && WSAGetLastError() == WSAEINTR);
#else
ssize_t nwrote; ssize_t nwrote;
errno = 0;
do { do {
nwrote = write(fd, bytes + start, nbytes); nwrote = write(fd, bytes + start, nbytes);
} while (errno == EINTR); } while (nwrote == -1 && errno == EINTR);
#endif
if (nwrote > 0) { if (nwrote > 0) {
start += nwrote; start += nwrote;
} else { } else {
@ -309,7 +370,6 @@ static size_t janet_loop_event(size_t index) {
break; break;
} }
case JLE_CONNECT: { case JLE_CONNECT: {
break; break;
} }
} }
@ -326,7 +386,7 @@ static size_t janet_loop_event(size_t index) {
} }
/* Remove this handler from the handler pool. */ /* Remove this handler from the handler pool. */
if (should_resume) janet_loop_rmindex(index); if (should_resume) janet_loop_rmindex((int) index);
return ret; return ret;
} }
@ -343,15 +403,24 @@ static void janet_loop1(void) {
/* Poll */ /* Poll */
if (janet_vm_loop_count == 0) return; if (janet_vm_loop_count == 0) return;
int ready; int ready;
#ifdef JANET_WINDOWS
do {
ready = WSAPoll(janet_vm_pollfds, janet_vm_loop_count, -1);
} while (ready == -1 && WSAGetLastError() == WSAEINTR);
if (ready == -1) return;
#else
do { do {
ready = poll(janet_vm_pollfds, janet_vm_loop_count, -1); ready = poll(janet_vm_pollfds, janet_vm_loop_count, -1);
} while (ready == -1 && errno == EAGAIN); } while (ready == -1 && errno == EAGAIN);
if (ready == -1) return; if (ready == -1) return;
#endif
/* Handle events */ /* Handle events */
for (int i = 0; i < janet_vm_loop_count;) { for (int i = 0; i < janet_vm_loop_count;) {
if (janet_vm_pollfds[i].events & janet_vm_pollfds[i].revents) { int revents = janet_vm_pollfds[i].revents;
janet_vm_pollfds[i].revents = 0;
if ((janet_vm_pollfds[i].events | POLLHUP | POLLER) & revents) {
size_t delta = janet_loop_event(i); size_t delta = janet_loop_event(i);
i += delta; i += (int) delta;
} else { } else {
i++; i++;
} }
@ -431,6 +500,22 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
struct addrinfo *ai = janet_get_addrinfo(argv, 0); struct addrinfo *ai = janet_get_addrinfo(argv, 0);
#ifdef JANET_WINDOWS
/* Create socket */
SOCKET sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
if (sock == INVALID_SOCKET) {
freeaddrinfo(ai);
janet_panic("could not create socket");
}
/* Connect to socket */
int status = connect(sock, ai->ai_addr, (int) ai->ai_addrlen);
freeaddrinfo(ai);
if (status == -1) {
closesocket(sock);
janet_panic("could not connect to socket");
}
#else
/* Create socket */ /* Create socket */
int sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); int sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
if (sock < 0) { if (sock < 0) {
@ -445,6 +530,7 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
close(sock); close(sock);
janet_panic("could not connect to socket"); janet_panic("could not connect to socket");
} }
#endif
/* Wrap socket in abstract type JanetStream */ /* Wrap socket in abstract type JanetStream */
JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
@ -459,6 +545,36 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
struct addrinfo *ai = janet_get_addrinfo(argv, 0); struct addrinfo *ai = janet_get_addrinfo(argv, 0);
#ifdef JANET_WINDOWS
/* Check all addrinfos in a loop for the first that we can bind to. */
SOCKET sfd = INVALID_SOCKET;
struct addrinfo *rp = NULL;
for (rp = ai; rp != NULL; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == INVALID_SOCKET) continue;
/* Set various socket options */
int enable = 1;
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (char *) &enable, sizeof(int)) < 0) {
closesocket(sfd);
janet_panic("setsockopt(SO_REUSEADDR) failed");
}
/* Bind */
if (bind(sfd, rp->ai_addr, (int) rp->ai_addrlen) == 0) break;
closesocket(sfd);
}
if (NULL == rp) {
freeaddrinfo(ai);
janet_panic("could not bind to any sockets");
}
/* listen */
int status = listen(sfd, 1024);
freeaddrinfo(ai);
if (status) {
closesocket(sfd);
janet_panic("could not listen on file descriptor");
}
#else
/* Check all addrinfos in a loop for the first that we can bind to. */ /* Check all addrinfos in a loop for the first that we can bind to. */
int sfd = 0; int sfd = 0;
struct addrinfo *rp = NULL; struct addrinfo *rp = NULL;
@ -498,6 +614,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
* Since a connection could be disconnected at any time, any read or write may fail. * Since a connection could be disconnected at any time, any read or write may fail.
* We don't want to blow up the whole application. */ * We don't want to blow up the whole application. */
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
#endif
/* Put sfd on our loop */ /* Put sfd on our loop */
JanetLoopFD lfd = {0}; JanetLoopFD lfd = {0};
@ -572,6 +689,15 @@ static const JanetReg net_cfuns[] = {
void janet_lib_net(JanetTable *env) { void janet_lib_net(JanetTable *env) {
janet_vm_loop_count = 0; janet_vm_loop_count = 0;
#ifdef JANET_WINDOWS
WSADATA wsaData;
janet_assert(!WSAStartup(MAKEWORD(2, 2), &wsaData), "could not start winsock");
#endif
janet_core_cfuns(env, NULL, net_cfuns); janet_core_cfuns(env, NULL, net_cfuns);
} }
void janet_net_deinit(void) {
#ifdef JANET_WINDOWS
WSACleanup();
#endif
}

View File

@ -35,7 +35,7 @@
#ifndef janet_exit #ifndef janet_exit
#include <stdio.h> #include <stdio.h>
#define janet_exit(m) do { \ #define janet_exit(m) do { \
printf("C runtime error at line %d in file %s: %s\n",\ fprintf(stderr, "C runtime error at line %d in file %s: %s\n",\
__LINE__,\ __LINE__,\
__FILE__,\ __FILE__,\
(m));\ (m));\
@ -50,7 +50,7 @@
/* What to do when out of memory */ /* What to do when out of memory */
#ifndef JANET_OUT_OF_MEMORY #ifndef JANET_OUT_OF_MEMORY
#include <stdio.h> #include <stdio.h>
#define JANET_OUT_OF_MEMORY do { printf("janet out of memory\n"); exit(1); } while (0) #define JANET_OUT_OF_MEMORY do { fprintf(stderr, "janet out of memory\n"); exit(1); } while (0)
#endif #endif
/* Omit docstrings in some builds */ /* Omit docstrings in some builds */
@ -128,6 +128,7 @@ void janet_lib_thread(JanetTable *env);
#endif #endif
#ifdef JANET_NET #ifdef JANET_NET
void janet_lib_net(JanetTable *env); void janet_lib_net(JanetTable *env);
void janet_net_deinit(void);
void janet_net_markloop(void); void janet_net_markloop(void);
#endif #endif

View File

@ -1444,4 +1444,7 @@ void janet_deinit(void) {
#ifdef JANET_THREADS #ifdef JANET_THREADS
janet_threads_deinit(); janet_threads_deinit();
#endif #endif
#ifdef JANET_NET
janet_net_deinit();
#endif
} }