Lots of work to make iocp work again.

Big issue with IOCP vs. poll variants is that the overlapped
structures have a longer lifetime than intermediate state needed
for epoll. One cannot free overlapped structures after closing a
handle/socket, like one can do with any intermediate state when using
readiness-based IO.
This commit is contained in:
Calvin Rose 2023-10-07 08:01:35 -07:00
parent e7e4341e70
commit 7bfb17c209
7 changed files with 132 additions and 114 deletions

View File

@ -258,15 +258,17 @@ void janet_async_end(JanetFiber *fiber) {
janet_gcunroot(janet_wrap_abstract(fiber->ev_stream));
fiber->ev_callback = NULL;
if (fiber->ev_state) {
janet_free(fiber->ev_state);
if (!fiber->ev_in_flight) {
janet_free(fiber->ev_state);
janet_ev_dec_refcount();
}
fiber->ev_state = NULL;
}
janet_ev_dec_refcount();
}
}
void *janet_async_start(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, size_t data_size) {
janet_async_end(fiber); /* Clear existing callback */
janet_assert(!fiber->ev_callback, "double async on fiber");
if (mode & JANET_ASYNC_LISTEN_READ) stream->read_fiber = fiber;
if (mode & JANET_ASYNC_LISTEN_WRITE) stream->write_fiber = fiber;
fiber->ev_callback = callback;
@ -278,6 +280,7 @@ void *janet_async_start(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode m
fiber->ev_state = data;
return data;
} else {
fiber->ev_state = NULL;
return NULL;
}
}
@ -1437,20 +1440,10 @@ void janet_ev_deinit(void) {
CloseHandle(janet_vm.iocp);
}
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
/* Add the handle to the io completion port if not already added */
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
if (!(stream->flags & JANET_STREAM_REGISTERED)) {
if (NULL == CreateIoCompletionPort(stream->handle, janet_vm.iocp, (ULONG_PTR) stream, 0)) {
janet_panicf("failed to listen for events: %V", janet_ev_lasterr());
}
stream->flags |= JANET_STREAM_REGISTERED;
void janet_register_stream(JanetStream *stream) {
if (NULL == CreateIoCompletionPort(stream->handle, janet_vm.iocp, (ULONG_PTR) stream, 0)) {
janet_panicf("failed to listen for events: %V", janet_ev_lasterr());
}
return state;
}
static void janet_unlisten(JanetListenerState *state) {
janet_unlisten_impl(state);
}
void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
@ -1483,19 +1476,19 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
} else {
/* Normal event */
JanetStream *stream = (JanetStream *) completionKey;
JanetListenerState *state = NULL;
if (stream->read_state && stream->read_state->tag == overlapped) {
state = stream->read_state;
} else if (stream->write_state && stream->write_state->tag == overlapped) {
state = stream->write_state;
JanetFiber *fiber = NULL;
if (stream->read_fiber && stream->read_fiber->ev_state == overlapped) {
fiber = stream->read_fiber;
} else if (stream->write_fiber && stream->write_fiber->ev_state == overlapped) {
fiber = stream->write_fiber;
}
if (state != NULL) {
state->event = overlapped;
state->bytes = num_bytes_transfered;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
}
if (fiber != NULL) {
fiber->ev_in_flight = 0;
fiber->ev_bytes = num_bytes_transfered;
fiber->ev_callback(fiber, result ? JANET_ASYNC_EVENT_COMPLETE : JANET_ASYNC_EVENT_FAILED);
} else {
janet_free((void *) overlapped);
janet_ev_dec_refcount();
}
janet_stream_checktoclose(stream);
}
@ -2134,11 +2127,6 @@ typedef enum {
} JanetReadMode;
typedef struct {
int32_t bytes_left;
int32_t bytes_read;
JanetBuffer *buf;
int is_chunk;
JanetReadMode mode;
#ifdef JANET_WINDOWS
OVERLAPPED overlapped;
DWORD flags;
@ -2151,6 +2139,11 @@ typedef struct {
#else
int flags;
#endif
int32_t bytes_left;
int32_t bytes_read;
JanetBuffer *buf;
int is_chunk;
JanetReadMode mode;
} StateRead;
void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
@ -2167,18 +2160,20 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
janet_async_end(fiber);
break;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_FAILED:
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when read finished */
state->bytes_read += s->bytes;
state->bytes_read += fiber->ev_bytes;
if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) {
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
janet_schedule(fiber, janet_wrap_nil());
janet_async_end(fiber);
return;
}
janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes);
state->bytes_left -= s->bytes;
janet_buffer_push_bytes(state->buf, state->chunk_buf, fiber->ev_bytes);
state->bytes_left -= fiber->ev_bytes;
if (state->bytes_left == 0 || !state->is_chunk || s->bytes == 0) {
if (state->bytes_left == 0 || !state->is_chunk || fiber->ev_bytes == 0) {
Janet resume_val;
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
@ -2190,15 +2185,15 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
{
resume_val = janet_wrap_buffer(state->buf);
}
janet_schedule(s->fiber, resume_val);
return JANET_ASYNC_STATUS_DONE;
janet_schedule(fiber, resume_val);
janet_async_end(fiber);
return;
}
}
/* fallthrough */
case JANET_ASYNC_EVENT_USER: {
int32_t chunk_size = state->bytes_left > JANET_EV_CHUNKSIZE ? JANET_EV_CHUNKSIZE : state->bytes_left;
s->tag = &state->overlapped;
memset(&(state->overlapped), 0, sizeof(OVERLAPPED));
int status;
#ifdef JANET_NET
@ -2206,33 +2201,36 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
state->wbuf.len = (ULONG) chunk_size;
state->wbuf.buf = (char *) state->chunk_buf;
state->fromlen = sizeof(state->from);
status = WSARecvFrom((SOCKET) s->stream->handle, &state->wbuf, 1,
status = WSARecvFrom((SOCKET) stream->handle, &state->wbuf, 1,
NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
if (status && (WSA_IO_PENDING != WSAGetLastError())) {
janet_cancel(s->fiber, janet_ev_lasterr());
return JANET_ASYNC_STATUS_DONE;
janet_cancel(fiber, janet_ev_lasterr());
janet_async_end(fiber);
return;
}
} else
#endif
{
/* Some handles (not all) read from the offset in lopOverlapped
/* Some handles (not all) read from the offset in lpOverlapped
* if its not set before calling `ReadFile` these streams will always read from offset 0 */
state->overlapped.Offset = (DWORD) state->bytes_read;
status = ReadFile(s->stream->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped);
status = ReadFile(stream->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped);
if (!status && (ERROR_IO_PENDING != GetLastError())) {
if (GetLastError() == ERROR_BROKEN_PIPE) {
if (state->bytes_read) {
janet_schedule(s->fiber, janet_wrap_buffer(state->buf));
janet_schedule(fiber, janet_wrap_buffer(state->buf));
} else {
janet_schedule(s->fiber, janet_wrap_nil());
janet_schedule(fiber, janet_wrap_nil());
}
} else {
janet_cancel(s->fiber, janet_ev_lasterr());
janet_cancel(fiber, janet_ev_lasterr());
}
return JANET_ASYNC_STATUS_DONE;
janet_async_end(fiber);
return;
}
}
fiber->ev_in_flight = 1;
}
break;
#else
@ -2372,13 +2370,6 @@ typedef enum {
} JanetWriteMode;
typedef struct {
union {
JanetBuffer *buf;
const uint8_t *str;
} src;
int is_buffer;
JanetWriteMode mode;
void *dest_abst;
#ifdef JANET_WINDOWS
OVERLAPPED overlapped;
DWORD flags;
@ -2389,6 +2380,13 @@ typedef struct {
int flags;
int32_t start;
#endif
union {
JanetBuffer *buf;
const uint8_t *str;
} src;
int is_buffer;
JanetWriteMode mode;
void *dest_abst;
} StateWrite;
void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) {
@ -2411,15 +2409,18 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) {
janet_async_end(fiber);
break;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_FAILED:
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when write finished */
if (s->bytes == 0 && (state->mode != JANET_ASYNC_WRITEMODE_SENDTO)) {
janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE;
if (fiber->ev_bytes == 0 && (state->mode != JANET_ASYNC_WRITEMODE_SENDTO)) {
janet_cancel(fiber, janet_cstringv("disconnect"));
janet_async_end(fiber);
return;
}
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
janet_schedule(fiber, janet_wrap_nil());
janet_async_end(fiber);
return;
}
break;
case JANET_ASYNC_EVENT_USER: {
@ -2439,21 +2440,25 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) {
bytes = state->src.str;
len = janet_string_length(bytes);
}
s->tag = &state->overlapped;
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
int status;
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
SOCKET sock = (SOCKET) s->stream->handle;
SOCKET sock = (SOCKET) stream->handle;
state->wbuf.buf = (char *) bytes;
state->wbuf.len = len;
const struct sockaddr *to = state->dest_abst;
int tolen = (int) janet_abstract_size((void *) to);
status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
if (status && (WSA_IO_PENDING != WSAGetLastError())) {
janet_cancel(s->fiber, janet_ev_lasterr());
return JANET_ASYNC_STATUS_DONE;
if (status) {
if (WSA_IO_PENDING == WSAGetLastError()) {
fiber->ev_in_flight = 1;
} else {
janet_cancel(fiber, janet_ev_lasterr());
janet_async_end(fiber);
return;
}
}
} else
#endif
@ -2470,10 +2475,15 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) {
*/
state->overlapped.Offset = (DWORD) 0xFFFFFFFF;
state->overlapped.OffsetHigh = (DWORD) 0xFFFFFFFF;
status = WriteFile(s->stream->handle, bytes, len, NULL, &state->overlapped);
if (!status && (ERROR_IO_PENDING != GetLastError())) {
janet_cancel(s->fiber, janet_ev_lasterr());
return JANET_ASYNC_STATUS_DONE;
status = WriteFile(stream->handle, bytes, len, NULL, &state->overlapped);
if (!status) {
if (ERROR_IO_PENDING == GetLastError()) {
fiber->ev_in_flight = 1;
} else {
janet_cancel(fiber, janet_ev_lasterr());
janet_async_end(fiber);
return;
}
}
}
}
@ -2564,8 +2574,8 @@ static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_ab
state->flags = (DWORD) flags;
#else
state->flags = flags;
#endif
state->start = 0;
#endif
ev_callback_write(f, JANET_ASYNC_EVENT_USER);
}

View File

@ -43,7 +43,9 @@ static void fiber_reset(JanetFiber *fiber) {
fiber->ev_callback = NULL;
fiber->ev_state = NULL;
fiber->ev_stream = NULL;
fiber->ev_bytes = 0;
fiber->supervisor_channel = NULL;
fiber->ev_in_flight = 0;
#endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
}

View File

@ -327,12 +327,16 @@ static void janet_deinit_block(JanetGCObject *mem) {
janet_free(((JanetTable *) mem)->data);
break;
case JANET_MEMORY_FIBER:
{
JanetFiber *f = (JanetFiber *)mem;
#ifdef JANET_EV
if (((JanetFiber *)mem)->ev_state) {
janet_free(((JanetFiber *)mem)->ev_state);
}
if (f->ev_state && !f->ev_in_flight) {
janet_ev_dec_refcount();
janet_free(f->ev_state);
}
#endif
janet_free(((JanetFiber *)mem)->data);
janet_free(f->data);
}
break;
case JANET_MEMORY_BUFFER:
janet_buffer_deinit((JanetBuffer *) mem);

View File

@ -1051,8 +1051,10 @@ static const uint8_t *unmarshal_one_fiber(
fiber->sched_id = 0;
fiber->supervisor_channel = NULL;
fiber->ev_state = NULL;
fiber->ev_bytes = 0;
fiber->ev_callback = NULL;
fiber->ev_stream = NULL;
fiber->ev_in_flight = 0;
#endif
/* Push fiber to seen stack */

View File

@ -127,12 +127,6 @@ void net_callback_connect(JanetFiber *fiber, JanetAsyncEvent event) {
janet_cancel(fiber, janet_cstringv("stream closed"));
janet_async_end(fiber);
return;
case JANET_ASYNC_EVENT_HUP:
case JANET_ASYNC_EVENT_ERR:
case JANET_ASYNC_EVENT_COMPLETE:
case JANET_ASYNC_EVENT_WRITE:
case JANET_ASYNC_EVENT_USER:
break;
}
#ifdef JANET_WINDOWS
int res = 0;
@ -163,7 +157,7 @@ static void net_sched_connect(JanetStream *stream) {
NetStateConnect *state = (NetStateConnect *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_WRITE, net_callback_connect, sizeof(NetStateConnect));
state->did_connect = 0;
#ifdef JANET_WINDOWS
net_callback_connect(s, JANET_ASYNC_EVENT_USER);
net_callback_connect(f, JANET_ASYNC_EVENT_USER);
#endif
}
@ -172,7 +166,6 @@ static void net_sched_connect(JanetStream *stream) {
#ifdef JANET_WINDOWS
typedef struct {
JanetListenerState head;
WSAOVERLAPPED overlapped;
JanetFunction *function;
JanetStream *lstream;
@ -180,10 +173,10 @@ typedef struct {
char buf[1024];
} NetStateAccept;
static int net_sched_accept_impl(NetStateAccept *state, Janet *err);
static int net_sched_accept_impl(NetStateAccept *state, JanetFiber *fiber, Janet *err);
JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) {
NetStateAccept *state = (NetStateAccept *)s;
void net_callback_accept(JanetFiber *fiber, JanetAsyncEvent event) {
NetStateAccept *state = (NetStateAccept *)fiber->ev_state;
switch (event) {
default:
break;
@ -194,55 +187,58 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
break;
}
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
janet_schedule(fiber, janet_wrap_nil());
janet_async_end(fiber);
return;
case JANET_ASYNC_EVENT_COMPLETE: {
if (state->astream->flags & JANET_STREAM_CLOSED) {
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
return JANET_ASYNC_STATUS_DONE;
janet_cancel(fiber, janet_cstringv("failed to accept connection"));
janet_async_end(fiber);
return;
}
SOCKET lsock = (SOCKET) state->lstream->handle;
if (NO_ERROR != setsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char *) &lsock, sizeof(lsock))) {
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
return JANET_ASYNC_STATUS_DONE;
janet_cancel(fiber, janet_cstringv("failed to accept connection"));
janet_async_end(fiber);
return;
}
Janet streamv = janet_wrap_abstract(state->astream);
if (state->function) {
/* Schedule worker */
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
fiber->supervisor_channel = s->fiber->supervisor_channel;
janet_schedule(fiber, janet_wrap_nil());
JanetFiber *sub_fiber = janet_fiber(state->function, 64, 1, &streamv);
sub_fiber->supervisor_channel = fiber->supervisor_channel;
janet_schedule(sub_fiber, janet_wrap_nil());
/* Now listen again for next connection */
Janet err;
if (net_sched_accept_impl(state, &err)) {
janet_cancel(s->fiber, err);
return JANET_ASYNC_STATUS_DONE;
if (net_sched_accept_impl(state, fiber, &err)) {
janet_cancel(fiber, err);
janet_async_end(fiber);
return;
}
} else {
janet_schedule(s->fiber, streamv);
return JANET_ASYNC_STATUS_DONE;
janet_schedule(fiber, streamv);
janet_async_end(fiber);
return;
}
}
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
Janet err;
JanetListenerState *s = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
NetStateAccept *state = (NetStateAccept *)s;
JanetFiber *f = janet_vm.root_fiber;
NetStateAccept *state = (NetStateAccept *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, sizeof(NetStateAccept));
memset(&state->overlapped, 0, sizeof(WSAOVERLAPPED));
memset(&state->buf, 0, 1024);
state->function = fun;
state->lstream = stream;
s->tag = &state->overlapped;
if (net_sched_accept_impl(state, &err)) janet_panicv(err);
if (net_sched_accept_impl(state, f, &err)) janet_panicv(err);
janet_await();
}
static int net_sched_accept_impl(NetStateAccept *state, Janet *err) {
static int net_sched_accept_impl(NetStateAccept *state, JanetFiber *fiber, Janet *err) {
SOCKET lsock = (SOCKET) state->lstream->handle;
SOCKET asock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (asock == INVALID_SOCKET) {
@ -254,7 +250,11 @@ static int net_sched_accept_impl(NetStateAccept *state, Janet *err) {
int socksize = sizeof(SOCKADDR_STORAGE) + 16;
if (FALSE == AcceptEx(lsock, asock, state->buf, 0, socksize, socksize, NULL, &state->overlapped)) {
int code = WSAGetLastError();
if (code == WSA_IO_PENDING) return 0; /* indicates io is happening async */
if (code == WSA_IO_PENDING) {
/* indicates io is happening async */
fiber->ev_in_flight = 1;
return 0;
}
*err = janet_ev_lasterr();
return 1;
}

View File

@ -596,7 +596,8 @@ typedef enum {
JANET_ASYNC_EVENT_READ = 6,
JANET_ASYNC_EVENT_WRITE = 7,
JANET_ASYNC_EVENT_COMPLETE = 8, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_USER = 9
JANET_ASYNC_EVENT_FAILED = 9, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_USER = 10
} JanetAsyncEvent;
typedef enum {
@ -924,8 +925,10 @@ struct JanetFiber {
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
JanetEVCallback ev_callback; /* Call this before starting scheduled fibers */
JanetStream *ev_stream; /* which stream we are waiting on */
void *ev_state; /* Extra data for ev callback state */
void *ev_state; /* Extra data for ev callback state. On windows, first element must be OVERLAPPED. */
void *supervisor_channel; /* Channel to push self to when complete */
uint32_t ev_bytes; /* Number of bytes for completion event */
int ev_in_flight; /* If overlapped operation is in flight */
#endif
};

View File

@ -42,9 +42,6 @@
(set suite-name
(cond
(number? x) (string x)
(string? x) (string/slice x
(length "test/suite-")
(- (inc (length ".janet"))))
(string x)))
(set start-time (os/clock))
(eprint "Starting suite " suite-name "..."))