From 7bfb17c209f7699cd36dc4c7a0bf05891f4ba452 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sat, 7 Oct 2023 08:01:35 -0700 Subject: [PATCH] Lots of work to make iocp work again. Big issue with IOCP vs. poll variants is that the overlapped structures have a longer lifetime than intermediate state needed for epoll. One cannot free overlapped structures after closing a handle/socket, like one can do with any intermediate state when using readiness-based IO. --- src/core/ev.c | 156 +++++++++++++++++++++++--------------------- src/core/fiber.c | 2 + src/core/gc.c | 12 ++-- src/core/marsh.c | 2 + src/core/net.c | 64 +++++++++--------- src/include/janet.h | 7 +- test/helper.janet | 3 - 7 files changed, 132 insertions(+), 114 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index 2ae217e3..39bff114 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -258,15 +258,17 @@ void janet_async_end(JanetFiber *fiber) { janet_gcunroot(janet_wrap_abstract(fiber->ev_stream)); fiber->ev_callback = NULL; if (fiber->ev_state) { - janet_free(fiber->ev_state); + if (!fiber->ev_in_flight) { + janet_free(fiber->ev_state); + janet_ev_dec_refcount(); + } fiber->ev_state = NULL; } - janet_ev_dec_refcount(); } } void *janet_async_start(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, size_t data_size) { - janet_async_end(fiber); /* Clear existing callback */ + janet_assert(!fiber->ev_callback, "double async on fiber"); if (mode & JANET_ASYNC_LISTEN_READ) stream->read_fiber = fiber; if (mode & JANET_ASYNC_LISTEN_WRITE) stream->write_fiber = fiber; fiber->ev_callback = callback; @@ -278,6 +280,7 @@ void *janet_async_start(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode m fiber->ev_state = data; return data; } else { + fiber->ev_state = NULL; return NULL; } } @@ -1437,20 +1440,10 @@ void janet_ev_deinit(void) { CloseHandle(janet_vm.iocp); } -JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { - /* Add the handle to the io completion port if not already added */ - JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); - if (!(stream->flags & JANET_STREAM_REGISTERED)) { - if (NULL == CreateIoCompletionPort(stream->handle, janet_vm.iocp, (ULONG_PTR) stream, 0)) { - janet_panicf("failed to listen for events: %V", janet_ev_lasterr()); - } - stream->flags |= JANET_STREAM_REGISTERED; +void janet_register_stream(JanetStream *stream) { + if (NULL == CreateIoCompletionPort(stream->handle, janet_vm.iocp, (ULONG_PTR) stream, 0)) { + janet_panicf("failed to listen for events: %V", janet_ev_lasterr()); } - return state; -} - -static void janet_unlisten(JanetListenerState *state) { - janet_unlisten_impl(state); } void janet_loop1_impl(int has_timeout, JanetTimestamp to) { @@ -1483,19 +1476,19 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { } else { /* Normal event */ JanetStream *stream = (JanetStream *) completionKey; - JanetListenerState *state = NULL; - if (stream->read_state && stream->read_state->tag == overlapped) { - state = stream->read_state; - } else if (stream->write_state && stream->write_state->tag == overlapped) { - state = stream->write_state; + JanetFiber *fiber = NULL; + if (stream->read_fiber && stream->read_fiber->ev_state == overlapped) { + fiber = stream->read_fiber; + } else if (stream->write_fiber && stream->write_fiber->ev_state == overlapped) { + fiber = stream->write_fiber; } - if (state != NULL) { - state->event = overlapped; - state->bytes = num_bytes_transfered; - JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE); - if (status == JANET_ASYNC_STATUS_DONE) { - janet_unlisten(state); - } + if (fiber != NULL) { + fiber->ev_in_flight = 0; + fiber->ev_bytes = num_bytes_transfered; + fiber->ev_callback(fiber, result ? JANET_ASYNC_EVENT_COMPLETE : JANET_ASYNC_EVENT_FAILED); + } else { + janet_free((void *) overlapped); + janet_ev_dec_refcount(); } janet_stream_checktoclose(stream); } @@ -2134,11 +2127,6 @@ typedef enum { } JanetReadMode; typedef struct { - int32_t bytes_left; - int32_t bytes_read; - JanetBuffer *buf; - int is_chunk; - JanetReadMode mode; #ifdef JANET_WINDOWS OVERLAPPED overlapped; DWORD flags; @@ -2151,6 +2139,11 @@ typedef struct { #else int flags; #endif + int32_t bytes_left; + int32_t bytes_read; + JanetBuffer *buf; + int is_chunk; + JanetReadMode mode; } StateRead; void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { @@ -2167,18 +2160,20 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { janet_async_end(fiber); break; #ifdef JANET_WINDOWS + case JANET_ASYNC_EVENT_FAILED: case JANET_ASYNC_EVENT_COMPLETE: { /* Called when read finished */ - state->bytes_read += s->bytes; + state->bytes_read += fiber->ev_bytes; if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; + janet_schedule(fiber, janet_wrap_nil()); + janet_async_end(fiber); + return; } - janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes); - state->bytes_left -= s->bytes; + janet_buffer_push_bytes(state->buf, state->chunk_buf, fiber->ev_bytes); + state->bytes_left -= fiber->ev_bytes; - if (state->bytes_left == 0 || !state->is_chunk || s->bytes == 0) { + if (state->bytes_left == 0 || !state->is_chunk || fiber->ev_bytes == 0) { Janet resume_val; #ifdef JANET_NET if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { @@ -2190,15 +2185,15 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { { resume_val = janet_wrap_buffer(state->buf); } - janet_schedule(s->fiber, resume_val); - return JANET_ASYNC_STATUS_DONE; + janet_schedule(fiber, resume_val); + janet_async_end(fiber); + return; } } /* fallthrough */ case JANET_ASYNC_EVENT_USER: { int32_t chunk_size = state->bytes_left > JANET_EV_CHUNKSIZE ? JANET_EV_CHUNKSIZE : state->bytes_left; - s->tag = &state->overlapped; memset(&(state->overlapped), 0, sizeof(OVERLAPPED)); int status; #ifdef JANET_NET @@ -2206,33 +2201,36 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { state->wbuf.len = (ULONG) chunk_size; state->wbuf.buf = (char *) state->chunk_buf; state->fromlen = sizeof(state->from); - status = WSARecvFrom((SOCKET) s->stream->handle, &state->wbuf, 1, + status = WSARecvFrom((SOCKET) stream->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); if (status && (WSA_IO_PENDING != WSAGetLastError())) { - janet_cancel(s->fiber, janet_ev_lasterr()); - return JANET_ASYNC_STATUS_DONE; + janet_cancel(fiber, janet_ev_lasterr()); + janet_async_end(fiber); + return; } } else #endif { - /* Some handles (not all) read from the offset in lopOverlapped + /* Some handles (not all) read from the offset in lpOverlapped * if its not set before calling `ReadFile` these streams will always read from offset 0 */ state->overlapped.Offset = (DWORD) state->bytes_read; - status = ReadFile(s->stream->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped); + status = ReadFile(stream->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped); if (!status && (ERROR_IO_PENDING != GetLastError())) { if (GetLastError() == ERROR_BROKEN_PIPE) { if (state->bytes_read) { - janet_schedule(s->fiber, janet_wrap_buffer(state->buf)); + janet_schedule(fiber, janet_wrap_buffer(state->buf)); } else { - janet_schedule(s->fiber, janet_wrap_nil()); + janet_schedule(fiber, janet_wrap_nil()); } } else { - janet_cancel(s->fiber, janet_ev_lasterr()); + janet_cancel(fiber, janet_ev_lasterr()); } - return JANET_ASYNC_STATUS_DONE; + janet_async_end(fiber); + return; } } + fiber->ev_in_flight = 1; } break; #else @@ -2372,13 +2370,6 @@ typedef enum { } JanetWriteMode; typedef struct { - union { - JanetBuffer *buf; - const uint8_t *str; - } src; - int is_buffer; - JanetWriteMode mode; - void *dest_abst; #ifdef JANET_WINDOWS OVERLAPPED overlapped; DWORD flags; @@ -2389,6 +2380,13 @@ typedef struct { int flags; int32_t start; #endif + union { + JanetBuffer *buf; + const uint8_t *str; + } src; + int is_buffer; + JanetWriteMode mode; + void *dest_abst; } StateWrite; void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) { @@ -2411,15 +2409,18 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) { janet_async_end(fiber); break; #ifdef JANET_WINDOWS + case JANET_ASYNC_EVENT_FAILED: case JANET_ASYNC_EVENT_COMPLETE: { /* Called when write finished */ - if (s->bytes == 0 && (state->mode != JANET_ASYNC_WRITEMODE_SENDTO)) { - janet_cancel(s->fiber, janet_cstringv("disconnect")); - return JANET_ASYNC_STATUS_DONE; + if (fiber->ev_bytes == 0 && (state->mode != JANET_ASYNC_WRITEMODE_SENDTO)) { + janet_cancel(fiber, janet_cstringv("disconnect")); + janet_async_end(fiber); + return; } - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; + janet_schedule(fiber, janet_wrap_nil()); + janet_async_end(fiber); + return; } break; case JANET_ASYNC_EVENT_USER: { @@ -2439,21 +2440,25 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) { bytes = state->src.str; len = janet_string_length(bytes); } - s->tag = &state->overlapped; memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED)); int status; #ifdef JANET_NET if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { - SOCKET sock = (SOCKET) s->stream->handle; + SOCKET sock = (SOCKET) stream->handle; state->wbuf.buf = (char *) bytes; state->wbuf.len = len; const struct sockaddr *to = state->dest_abst; int tolen = (int) janet_abstract_size((void *) to); status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL); - if (status && (WSA_IO_PENDING != WSAGetLastError())) { - janet_cancel(s->fiber, janet_ev_lasterr()); - return JANET_ASYNC_STATUS_DONE; + if (status) { + if (WSA_IO_PENDING == WSAGetLastError()) { + fiber->ev_in_flight = 1; + } else { + janet_cancel(fiber, janet_ev_lasterr()); + janet_async_end(fiber); + return; + } } } else #endif @@ -2470,10 +2475,15 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) { */ state->overlapped.Offset = (DWORD) 0xFFFFFFFF; state->overlapped.OffsetHigh = (DWORD) 0xFFFFFFFF; - status = WriteFile(s->stream->handle, bytes, len, NULL, &state->overlapped); - if (!status && (ERROR_IO_PENDING != GetLastError())) { - janet_cancel(s->fiber, janet_ev_lasterr()); - return JANET_ASYNC_STATUS_DONE; + status = WriteFile(stream->handle, bytes, len, NULL, &state->overlapped); + if (!status) { + if (ERROR_IO_PENDING == GetLastError()) { + fiber->ev_in_flight = 1; + } else { + janet_cancel(fiber, janet_ev_lasterr()); + janet_async_end(fiber); + return; + } } } } @@ -2564,8 +2574,8 @@ static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_ab state->flags = (DWORD) flags; #else state->flags = flags; -#endif state->start = 0; +#endif ev_callback_write(f, JANET_ASYNC_EVENT_USER); } diff --git a/src/core/fiber.c b/src/core/fiber.c index d5f51599..fe558fb0 100644 --- a/src/core/fiber.c +++ b/src/core/fiber.c @@ -43,7 +43,9 @@ static void fiber_reset(JanetFiber *fiber) { fiber->ev_callback = NULL; fiber->ev_state = NULL; fiber->ev_stream = NULL; + fiber->ev_bytes = 0; fiber->supervisor_channel = NULL; + fiber->ev_in_flight = 0; #endif janet_fiber_set_status(fiber, JANET_STATUS_NEW); } diff --git a/src/core/gc.c b/src/core/gc.c index f9807282..7be0c4cc 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -327,12 +327,16 @@ static void janet_deinit_block(JanetGCObject *mem) { janet_free(((JanetTable *) mem)->data); break; case JANET_MEMORY_FIBER: + { + JanetFiber *f = (JanetFiber *)mem; #ifdef JANET_EV - if (((JanetFiber *)mem)->ev_state) { - janet_free(((JanetFiber *)mem)->ev_state); - } + if (f->ev_state && !f->ev_in_flight) { + janet_ev_dec_refcount(); + janet_free(f->ev_state); + } #endif - janet_free(((JanetFiber *)mem)->data); + janet_free(f->data); + } break; case JANET_MEMORY_BUFFER: janet_buffer_deinit((JanetBuffer *) mem); diff --git a/src/core/marsh.c b/src/core/marsh.c index 89181e7d..4332c6f7 100644 --- a/src/core/marsh.c +++ b/src/core/marsh.c @@ -1051,8 +1051,10 @@ static const uint8_t *unmarshal_one_fiber( fiber->sched_id = 0; fiber->supervisor_channel = NULL; fiber->ev_state = NULL; + fiber->ev_bytes = 0; fiber->ev_callback = NULL; fiber->ev_stream = NULL; + fiber->ev_in_flight = 0; #endif /* Push fiber to seen stack */ diff --git a/src/core/net.c b/src/core/net.c index 7621a2ed..4a3c7473 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -127,12 +127,6 @@ void net_callback_connect(JanetFiber *fiber, JanetAsyncEvent event) { janet_cancel(fiber, janet_cstringv("stream closed")); janet_async_end(fiber); return; - case JANET_ASYNC_EVENT_HUP: - case JANET_ASYNC_EVENT_ERR: - case JANET_ASYNC_EVENT_COMPLETE: - case JANET_ASYNC_EVENT_WRITE: - case JANET_ASYNC_EVENT_USER: - break; } #ifdef JANET_WINDOWS int res = 0; @@ -163,7 +157,7 @@ static void net_sched_connect(JanetStream *stream) { NetStateConnect *state = (NetStateConnect *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_WRITE, net_callback_connect, sizeof(NetStateConnect)); state->did_connect = 0; #ifdef JANET_WINDOWS - net_callback_connect(s, JANET_ASYNC_EVENT_USER); + net_callback_connect(f, JANET_ASYNC_EVENT_USER); #endif } @@ -172,7 +166,6 @@ static void net_sched_connect(JanetStream *stream) { #ifdef JANET_WINDOWS typedef struct { - JanetListenerState head; WSAOVERLAPPED overlapped; JanetFunction *function; JanetStream *lstream; @@ -180,10 +173,10 @@ typedef struct { char buf[1024]; } NetStateAccept; -static int net_sched_accept_impl(NetStateAccept *state, Janet *err); +static int net_sched_accept_impl(NetStateAccept *state, JanetFiber *fiber, Janet *err); -JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) { - NetStateAccept *state = (NetStateAccept *)s; +void net_callback_accept(JanetFiber *fiber, JanetAsyncEvent event) { + NetStateAccept *state = (NetStateAccept *)fiber->ev_state; switch (event) { default: break; @@ -194,55 +187,58 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event break; } case JANET_ASYNC_EVENT_CLOSE: - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; + janet_schedule(fiber, janet_wrap_nil()); + janet_async_end(fiber); + return; case JANET_ASYNC_EVENT_COMPLETE: { if (state->astream->flags & JANET_STREAM_CLOSED) { - janet_cancel(s->fiber, janet_cstringv("failed to accept connection")); - return JANET_ASYNC_STATUS_DONE; + janet_cancel(fiber, janet_cstringv("failed to accept connection")); + janet_async_end(fiber); + return; } SOCKET lsock = (SOCKET) state->lstream->handle; if (NO_ERROR != setsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *) &lsock, sizeof(lsock))) { - janet_cancel(s->fiber, janet_cstringv("failed to accept connection")); - return JANET_ASYNC_STATUS_DONE; + janet_cancel(fiber, janet_cstringv("failed to accept connection")); + janet_async_end(fiber); + return; } Janet streamv = janet_wrap_abstract(state->astream); if (state->function) { /* Schedule worker */ - JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv); - fiber->supervisor_channel = s->fiber->supervisor_channel; - janet_schedule(fiber, janet_wrap_nil()); + JanetFiber *sub_fiber = janet_fiber(state->function, 64, 1, &streamv); + sub_fiber->supervisor_channel = fiber->supervisor_channel; + janet_schedule(sub_fiber, janet_wrap_nil()); /* Now listen again for next connection */ Janet err; - if (net_sched_accept_impl(state, &err)) { - janet_cancel(s->fiber, err); - return JANET_ASYNC_STATUS_DONE; + if (net_sched_accept_impl(state, fiber, &err)) { + janet_cancel(fiber, err); + janet_async_end(fiber); + return; } } else { - janet_schedule(s->fiber, streamv); - return JANET_ASYNC_STATUS_DONE; + janet_schedule(fiber, streamv); + janet_async_end(fiber); + return; } } } - return JANET_ASYNC_STATUS_NOT_DONE; } JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) { Janet err; - JanetListenerState *s = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL); - NetStateAccept *state = (NetStateAccept *)s; + JanetFiber *f = janet_vm.root_fiber; + NetStateAccept *state = (NetStateAccept *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, sizeof(NetStateAccept)); memset(&state->overlapped, 0, sizeof(WSAOVERLAPPED)); memset(&state->buf, 0, 1024); state->function = fun; state->lstream = stream; - s->tag = &state->overlapped; - if (net_sched_accept_impl(state, &err)) janet_panicv(err); + if (net_sched_accept_impl(state, f, &err)) janet_panicv(err); janet_await(); } -static int net_sched_accept_impl(NetStateAccept *state, Janet *err) { +static int net_sched_accept_impl(NetStateAccept *state, JanetFiber *fiber, Janet *err) { SOCKET lsock = (SOCKET) state->lstream->handle; SOCKET asock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (asock == INVALID_SOCKET) { @@ -254,7 +250,11 @@ static int net_sched_accept_impl(NetStateAccept *state, Janet *err) { int socksize = sizeof(SOCKADDR_STORAGE) + 16; if (FALSE == AcceptEx(lsock, asock, state->buf, 0, socksize, socksize, NULL, &state->overlapped)) { int code = WSAGetLastError(); - if (code == WSA_IO_PENDING) return 0; /* indicates io is happening async */ + if (code == WSA_IO_PENDING) { + /* indicates io is happening async */ + fiber->ev_in_flight = 1; + return 0; + } *err = janet_ev_lasterr(); return 1; } diff --git a/src/include/janet.h b/src/include/janet.h index 96765279..0e2f5790 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -596,7 +596,8 @@ typedef enum { JANET_ASYNC_EVENT_READ = 6, JANET_ASYNC_EVENT_WRITE = 7, JANET_ASYNC_EVENT_COMPLETE = 8, /* Used on windows for IOCP */ - JANET_ASYNC_EVENT_USER = 9 + JANET_ASYNC_EVENT_FAILED = 9, /* Used on windows for IOCP */ + JANET_ASYNC_EVENT_USER = 10 } JanetAsyncEvent; typedef enum { @@ -924,8 +925,10 @@ struct JanetFiber { uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */ JanetEVCallback ev_callback; /* Call this before starting scheduled fibers */ JanetStream *ev_stream; /* which stream we are waiting on */ - void *ev_state; /* Extra data for ev callback state */ + void *ev_state; /* Extra data for ev callback state. On windows, first element must be OVERLAPPED. */ void *supervisor_channel; /* Channel to push self to when complete */ + uint32_t ev_bytes; /* Number of bytes for completion event */ + int ev_in_flight; /* If overlapped operation is in flight */ #endif }; diff --git a/test/helper.janet b/test/helper.janet index dc6ada96..01c760d3 100644 --- a/test/helper.janet +++ b/test/helper.janet @@ -42,9 +42,6 @@ (set suite-name (cond (number? x) (string x) - (string? x) (string/slice x - (length "test/suite-") - (- (inc (length ".janet")))) (string x))) (set start-time (os/clock)) (eprint "Starting suite " suite-name "..."))