1
0
mirror of https://github.com/janet-lang/janet synced 2024-06-18 11:19:56 +00:00

Get socket reads and writes working with IOCP.

This commit is contained in:
Calvin Rose 2020-11-08 10:38:28 -06:00
parent 1092013c2b
commit 07910272e2
4 changed files with 154 additions and 23 deletions

14
examples/echoserve.janet Normal file
View File

@ -0,0 +1,14 @@
(defn handler
"Simple handler for connections."
[stream]
(defer (:close stream)
(def id (gensym))
(def b @"")
(print "Connection " id "!")
(while (:read stream 1024 b)
(printf " %v -> %v" id b)
(:write stream b)
(buffer/clear b))
(printf "Done %v!" id)))
(net/server "127.0.0.1" "8000" handler)

View File

@ -728,16 +728,19 @@ void janet_ev_deinit(void) {
}
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) {
/* Add the handle to the io completion port if not already added */
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user);
/* TODO - associate IO operation with listener state somehow
* maybe we could require encoding the operation in a mask. */
/* on windows, janet_listen does not actually start any listening behavior. */
if (!(pollable->flags & JANET_POLL_FLAG_IOCP)) {
if (NULL == CreateIoCompletionPort(pollable->handle, janet_vm_iocp, (ULONG_PTR) pollable, 0)) {
janet_panic("failed to listen for events");
}
pollable->flags |= JANET_POLL_FLAG_IOCP;
}
return state;
}
static void janet_unlisten(JanetListenerState *state) {
/* We don't necessarily want to cancel all io on this pollable */
janet_unlisten_impl(state);
}
@ -766,11 +769,20 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
}
} else {
/* Normal event */
JanetListenerState *state = (JanetListenerState *) completionKey;
state->event = overlapped;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state);
JanetPollable *pollable = (JanetPollable *) completionKey;
JanetListenerState *state = pollable->state;
while (state != NULL) {
if (state->tag == overlapped) {
state->event = overlapped;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
}
break;
} else {
state = state->_next;
}
}
}
}

View File

@ -76,6 +76,7 @@ static const JanetAbstractType AddressAT = {
};
#ifdef JANET_WINDOWS
#define JANET_NET_CHUNKSIZE 4096
#define JSOCKCLOSE(x) closesocket((SOCKET) x)
#define JSOCKDEFAULT INVALID_SOCKET
#define JLASTERR WSAGetLastError()
@ -170,7 +171,13 @@ typedef struct {
int is_recv_from;
#ifdef JANET_WINDOWS
WSAOVERLAPPED overlapped;
uint8_t chunk_buf[2048];
WSABUF wbuf;
DWORD flags;
DWORD received;
int32_t chunk_size;
struct sockaddr from;
int fromlen;
uint8_t chunk_buf[JANET_NET_CHUNKSIZE];
#endif
} NetStateRead;
@ -188,6 +195,50 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when read finished */
if (state->received == 0 && !state->is_recv_from) {
janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE;
}
janet_buffer_push_bytes(state->buf, state->chunk_buf, state->received);
state->bytes_left -= state->received;
if (state->bytes_left <= 0 || !state->is_chunk) {
Janet resume_val;
if (state->is_recv_from) {
void *abst = janet_abstract(&AddressAT, state->fromlen);
memcpy(abst, &state->from, state->fromlen);
resume_val = janet_wrap_abstract(abst);
} else {
resume_val = janet_wrap_buffer(state->buf);
}
janet_schedule(s->fiber, resume_val);
return JANET_ASYNC_STATUS_DONE;
}
}
/* fallthrough */
case JANET_ASYNC_EVENT_USER:
{
state->flags = 0;
/* If there are bytes left, do another read. Currently only read 2k at a time. */
int32_t chunk_size = state->bytes_left > JANET_NET_CHUNKSIZE ? JANET_NET_CHUNKSIZE : state->bytes_left;
state->wbuf.len = (ULONG) chunk_size;
state->wbuf.buf = state->chunk_buf;
state->chunk_size = chunk_size;
state->received = 0;
s->tag = &state->overlapped;
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
int status;
if (state->is_recv_from) {
status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, &state->received, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
} else {
status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, &state->received, &state->flags, &state->overlapped, NULL);
}
if (status) {
janet_cancel(s->fiber, janet_cstringv("failed to read from stream"));
return JANET_ASYNC_STATUS_DONE;
}
}
break;
#else
@ -233,7 +284,6 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
/* Resume if done */
if (!state->is_chunk || bytes_left == 0) {
JanetSignal sig = JANET_SIGNAL_OK;
Janet resume_val;
if (state->is_recv_from) {
void *abst = janet_abstract(&AddressAT, socklen);
@ -242,7 +292,7 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
} else {
resume_val = janet_wrap_buffer(buffer);
}
janet_schedule_signal(s->fiber, resume_val, sig);
janet_schedule(s->fiber, resume_val);
return JANET_ASYNC_STATUS_DONE;
}
}
@ -259,11 +309,8 @@ JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *b
state->buf = buf;
state->bytes_left = nbytes;
state->is_recv_from = 0;
#ifdef JANET_WINDOWS
WSARecv((SOCKET) stream->handle,
#endif
janet_await();
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
@ -273,6 +320,7 @@ JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *
state->buf = buf;
state->bytes_left = nbytes;
state->is_recv_from = 0;
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
@ -283,6 +331,7 @@ JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuff
state->buf = buf;
state->bytes_left = nbytes;
state->is_recv_from = 1;
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
@ -299,6 +348,12 @@ typedef struct {
int32_t start;
int is_buffer;
void *dest_abst;
#ifdef JANET_WINDOWS
WSAOVERLAPPED overlapped;
WSABUF wbuf;
DWORD sent;
DWORD flags;
#endif
} NetStateWrite;
JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
@ -318,13 +373,56 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_INIT: {
/* Begin write */
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when write finished */
if (state->sent == 0 && !state->dest_abst) {
janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE;
}
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
}
break;
case JANET_ASYNC_EVENT_COMPLETE: {
case JANET_ASYNC_EVENT_USER: {
/* Begin write */
int32_t start, len;
const uint8_t *bytes;
start = state->start;
if (state->is_buffer) {
/* If buffer, convert to string. */
/* TODO - be more efficient about this */
JanetBuffer *buffer = state->src.buf;
JanetString str = janet_string(buffer->data, buffer->count);
bytes = str;
len = buffer->count;
state->is_buffer = 0;
state->src.str = str;
} else {
bytes = state->src.str;
len = janet_string_length(bytes);
}
state->wbuf.buf = (char *) bytes;
state->wbuf.len = len;
state->sent = 0;
state->flags = 0;
s->tag = &state->overlapped;
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
/* Called when write finished */
int status;
SOCKET sock = (SOCKET) s->pollable->handle;
if (state->dest_abst) {
const struct sockaddr *to = state->dest_abst;
int tolen = (int) janet_abstract_size((void *) to);
status = WSASendTo(sock, &state->wbuf, 1, &state->sent, state->flags, to, tolen, &state->overlapped, NULL);
} else {
status = WSASend(sock, &state->wbuf, 1, &state->sent, state->flags, &state->overlapped, NULL);
}
if (status) {
janet_cancel(s->fiber, janet_cstringv("failed to write to stream"));
return JANET_ASYNC_STATUS_DONE;
}
}
break;
#else
@ -360,7 +458,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
return JANET_ASYNC_STATUS_DONE;
}
/* Unless using datagrams, empty message is a disocnnect */
/* Unless using datagrams, empty message is a disconnect */
if (nwrote == 0 && !dest_abst) {
janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE;
@ -392,6 +490,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB
state->start = 0;
state->src.buf = buf;
state->dest_abst = dest_abst;
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
@ -403,6 +502,7 @@ JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, co
state->start = 0;
state->src.str = str;
state->dest_abst = dest_abst;
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}

View File

@ -501,6 +501,7 @@ typedef void *JanetAbstract;
#ifdef JANET_EV
#define JANET_POLL_FLAG_CLOSED 0x1
#define JANET_POLL_FLAG_SOCKET 0x2
#define JANET_POLL_FLAG_IOCP 0x4
typedef enum {
JANET_ASYNC_EVENT_INIT,
@ -510,7 +511,8 @@ typedef enum {
JANET_ASYNC_EVENT_READ,
JANET_ASYNC_EVENT_WRITE,
JANET_ASYNC_EVENT_TIMEOUT,
JANET_ASYNC_EVENT_COMPLETE /* Used on windows for IOCP */
JANET_ASYNC_EVENT_COMPLETE, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_USER
} JanetAsyncEvent;
#define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ)
@ -546,6 +548,9 @@ struct JanetListenerState {
JanetPollable *pollable;
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
implementation of the event loop and the particular event. */
#ifdef JANET_WINDOWS
void *tag; /* Used to associate listeners with an overlapped structure */
#endif
/* internal */
int _index; /* not used in all implementations */
int _mask;