From 4139e426fe67a87e9cd636668d50bd8def5a6dcc Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 8 Oct 2023 18:25:46 -0500 Subject: [PATCH] Refine interface for janet's new event loop. Infer the current root fiber and force user to allocate state for async events. --- CHANGELOG.md | 2 ++ src/core/ev.c | 77 ++++++++++++++++++++++----------------------- src/core/net.c | 40 +++++++++++------------ src/include/janet.h | 46 +++++++++++++++++---------- 4 files changed, 89 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69546d05..de900ee1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ All notable changes to this project will be documented in this file. ## Unreleased - ??? +- Change C API for event loop interaction - get rid of JanetListener and instead use `janet_async_start` and `janet_async_end`. +- Rework event loop to make fewer system calls on kqueue and epoll. - Expose atomic refcount abstraction in janet.h - Add `array/weak` for weak references in arrays - Add support for weak tables via `table/weak`, `table/weak-keys`, and `table/weak-values`. diff --git a/src/core/ev.c b/src/core/ev.c index 1cf10e75..2523809a 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -255,6 +255,7 @@ static void add_timeout(JanetTimeout to) { void janet_async_end(JanetFiber *fiber) { if (fiber->ev_callback) { + fiber->ev_callback(fiber, JANET_ASYNC_EVENT_DEINIT); janet_gcunroot(janet_wrap_abstract(fiber->ev_stream)); fiber->ev_callback = NULL; if (fiber->ev_state) { @@ -267,7 +268,16 @@ void janet_async_end(JanetFiber *fiber) { } } -void *janet_async_start(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, size_t data_size) { +void janet_async_in_flight(JanetFiber *fiber) { +#ifdef JANET_WINDOWS + fiber->flags |= JANET_FIBER_EV_FLAG_IN_FLIGHT; +#else + (void) fiber; +#endif +} + +void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state) { + JanetFiber *fiber = janet_vm.root_fiber; janet_assert(!fiber->ev_callback, "double async on fiber"); if (mode & JANET_ASYNC_LISTEN_READ) stream->read_fiber = fiber; if (mode & JANET_ASYNC_LISTEN_WRITE) stream->write_fiber = fiber; @@ -275,14 +285,9 @@ void *janet_async_start(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode m fiber->ev_stream = stream; janet_ev_inc_refcount(); janet_gcroot(janet_wrap_abstract(stream)); - if (data_size) { - void *data = janet_malloc(data_size); - fiber->ev_state = data; - return data; - } else { - fiber->ev_state = NULL; - return NULL; - } + fiber->ev_state = state; + callback(fiber, JANET_ASYNC_EVENT_INIT); + janet_await(); } void janet_fiber_did_resume(JanetFiber *fiber) { @@ -2199,7 +2204,7 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { } /* fallthrough */ - case JANET_ASYNC_EVENT_USER: { + case JANET_ASYNC_EVENT_INIT: { int32_t chunk_size = state->bytes_left > JANET_EV_CHUNKSIZE ? JANET_EV_CHUNKSIZE : state->bytes_left; memset(&(state->overlapped), 0, sizeof(OVERLAPPED)); int status; @@ -2237,7 +2242,7 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { return; } } - fiber->flags |= JANET_FIBER_EV_FLAG_IN_FLIGHT; + janet_async_in_flight(fiber); } break; #else @@ -2253,7 +2258,7 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { read_more: case JANET_ASYNC_EVENT_HUP: - case JANET_ASYNC_EVENT_USER: + case JANET_ASYNC_EVENT_INIT: case JANET_ASYNC_EVENT_READ: { JanetBuffer *buffer = state->buf; int32_t bytes_left = state->bytes_left; @@ -2332,9 +2337,8 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { } } -static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) { - JanetFiber *f = janet_vm.root_fiber; - StateRead *state = (StateRead *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, ev_callback_read, sizeof(StateRead)); +static JANET_NO_RETURN void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) { + StateRead *state = janet_malloc(sizeof(StateRead)); state->is_chunk = is_chunked; state->buf = buf; state->bytes_left = nbytes; @@ -2345,23 +2349,23 @@ static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t #else state->flags = flags; #endif - ev_callback_read(f, JANET_ASYNC_EVENT_USER); + janet_async_start(stream, JANET_ASYNC_LISTEN_READ, ev_callback_read, state); } -void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { +JANET_NO_RETURN void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_READ, 0); } -void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { +JANET_NO_RETURN void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_READ, 0); } #ifdef JANET_NET -void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { +JANET_NO_RETURN void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECV, flags); } -void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { +JANET_NO_RETURN void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_RECV, flags); } -void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { +JANET_NO_RETURN void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECVFROM, flags); } #endif @@ -2431,7 +2435,7 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) { return; } break; - case JANET_ASYNC_EVENT_USER: { + case JANET_ASYNC_EVENT_INIT: { /* Begin write */ int32_t len; const uint8_t *bytes; @@ -2461,7 +2465,7 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) { status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL); if (status) { if (WSA_IO_PENDING == WSAGetLastError()) { - fiber->flags |= JANET_FIBER_EV_FLAG_IN_FLIGHT; + janet_async_in_flight(fiber); } else { janet_cancel(fiber, janet_ev_lasterr()); janet_async_end(fiber); @@ -2486,7 +2490,7 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) { status = WriteFile(stream->handle, bytes, len, NULL, &state->overlapped); if (!status) { if (ERROR_IO_PENDING == GetLastError()) { - fiber->flags |= JANET_FIBER_EV_FLAG_IN_FLIGHT; + janet_async_in_flight(fiber); } else { janet_cancel(fiber, janet_ev_lasterr()); janet_async_end(fiber); @@ -2505,7 +2509,7 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) { janet_cancel(fiber, janet_cstringv("stream hup")); janet_async_end(fiber); break; - case JANET_ASYNC_EVENT_USER: + case JANET_ASYNC_EVENT_INIT: case JANET_ASYNC_EVENT_WRITE: { int32_t start, len; const uint8_t *bytes; @@ -2570,10 +2574,8 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) { } } -static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) { - JanetFiber *f = janet_vm.root_fiber; - StateWrite *state = (StateWrite *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_WRITE, - ev_callback_write, sizeof(StateWrite)); +static JANET_NO_RETURN void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) { + StateWrite *state = janet_malloc(sizeof(StateWrite)); state->is_buffer = is_buffer; state->src.buf = buf; state->dest_abst = dest_abst; @@ -2584,31 +2586,31 @@ static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_ab state->flags = flags; state->start = 0; #endif - ev_callback_write(f, JANET_ASYNC_EVENT_USER); + janet_async_start(stream, JANET_ASYNC_LISTEN_WRITE, ev_callback_write, state); } -void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf) { +JANET_NO_RETURN void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf) { janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_WRITE, 1, 0); } -void janet_ev_write_string(JanetStream *stream, JanetString str) { +JANET_NO_RETURN void janet_ev_write_string(JanetStream *stream, JanetString str) { janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_WRITE, 0, 0); } #ifdef JANET_NET -void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags) { +JANET_NO_RETURN void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags) { janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_SEND, 1, flags); } -void janet_ev_send_string(JanetStream *stream, JanetString str, int flags) { +JANET_NO_RETURN void janet_ev_send_string(JanetStream *stream, JanetString str, int flags) { janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_SEND, 0, flags); } -void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags) { +JANET_NO_RETURN void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags) { janet_ev_write_generic(stream, buf, dest, JANET_ASYNC_WRITEMODE_SENDTO, 1, flags); } -void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) { +JANET_NO_RETURN void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) { janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags); } #endif @@ -2999,7 +3001,6 @@ JANET_CORE_FN(janet_cfun_stream_read, if (to != INFINITY) janet_addtimeout(to); janet_ev_read(stream, buffer, n); } - janet_await(); } JANET_CORE_FN(janet_cfun_stream_chunk, @@ -3014,7 +3015,6 @@ JANET_CORE_FN(janet_cfun_stream_chunk, double to = janet_optnumber(argv, argc, 3, INFINITY); if (to != INFINITY) janet_addtimeout(to); janet_ev_readchunk(stream, buffer, n); - janet_await(); } JANET_CORE_FN(janet_cfun_stream_write, @@ -3034,7 +3034,6 @@ JANET_CORE_FN(janet_cfun_stream_write, if (to != INFINITY) janet_addtimeout(to); janet_ev_write_string(stream, bytes.bytes); } - janet_await(); } static int mutexgc(void *p, size_t size) { diff --git a/src/core/net.c b/src/core/net.c index 9ab3199b..531cfcee 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -124,6 +124,13 @@ void net_callback_connect(JanetFiber *fiber, JanetAsyncEvent event) { switch (event) { default: break; +#ifndef JANET_WINDOWS + /* Wait until we have an actually event before checking. + * Windows doesn't support async connect with this, just try immediately.*/ + case JANET_ASYNC_EVENT_INIT: +#endif + case JANET_ASYNC_EVENT_DEINIT: + return; case JANET_ASYNC_EVENT_CLOSE: janet_cancel(fiber, janet_cstringv("stream closed")); janet_async_end(fiber); @@ -154,12 +161,9 @@ void net_callback_connect(JanetFiber *fiber, JanetAsyncEvent event) { } static void net_sched_connect(JanetStream *stream) { - JanetFiber *f = janet_vm.root_fiber; - NetStateConnect *state = (NetStateConnect *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_WRITE, net_callback_connect, sizeof(NetStateConnect)); + NetStateConnect *state = janet_malloc(sizeof(NetStateConnect)); state->did_connect = 0; -#ifdef JANET_WINDOWS - net_callback_connect(f, JANET_ASYNC_EVENT_USER); -#endif + janet_async_start(stream, JANET_ASYNC_LISTEN_WRITE, net_callback_connect, state); } /* State machine for accepting connections. */ @@ -229,14 +233,16 @@ void net_callback_accept(JanetFiber *fiber, JanetAsyncEvent event) { JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) { Janet err; - JanetFiber *f = janet_vm.root_fiber; - NetStateAccept *state = (NetStateAccept *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, sizeof(NetStateAccept)); + NetStateAccept *state = janet_malloc(sizeof(NetStateAccept)); memset(&state->overlapped, 0, sizeof(WSAOVERLAPPED)); memset(&state->buf, 0, 1024); state->function = fun; state->lstream = stream; - if (net_sched_accept_impl(state, f, &err)) janet_panicv(err); - janet_await(); + if (net_sched_accept_impl(state, janet_root_fiber(), &err)) { + janet_free(state); + janet_panicv(err); + } + janet_async_start(stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, state); } static int net_sched_accept_impl(NetStateAccept *state, JanetFiber *fiber, Janet *err) { @@ -253,7 +259,7 @@ static int net_sched_accept_impl(NetStateAccept *state, JanetFiber *fiber, Janet int code = WSAGetLastError(); if (code == WSA_IO_PENDING) { /* indicates io is happening async */ - fiber->flags |= JANET_FIBER_EV_FLAG_IN_FLIGHT; + janet_async_in_flight(fiber); return 0; } *err = janet_ev_lasterr(); @@ -282,7 +288,7 @@ void net_callback_accept(JanetFiber *fiber, JanetAsyncEvent event) { janet_schedule(fiber, janet_wrap_nil()); janet_async_end(fiber); return; - case JANET_ASYNC_EVENT_USER: + case JANET_ASYNC_EVENT_INIT: case JANET_ASYNC_EVENT_READ: { #if defined(JANET_LINUX) JSock connfd = accept4(stream->handle, NULL, NULL, SOCK_CLOEXEC); @@ -310,11 +316,10 @@ void net_callback_accept(JanetFiber *fiber, JanetAsyncEvent event) { } JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) { - JanetFiber *f = janet_vm.root_fiber; - NetStateAccept *state = (NetStateAccept *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, sizeof(NetStateAccept)); + NetStateAccept *state = janet_malloc(sizeof(NetStateAccept)); + memset(state, 0, sizeof(NetStateAccept)); state->function = fun; - net_callback_accept(f, JANET_ASYNC_EVENT_USER); - janet_await(); + janet_async_start(stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, state); } #endif @@ -851,7 +856,6 @@ JANET_CORE_FN(cfun_stream_read, if (to != INFINITY) janet_addtimeout(to); janet_ev_recv(stream, buffer, n, MSG_NOSIGNAL); } - janet_await(); } JANET_CORE_FN(cfun_stream_chunk, @@ -866,7 +870,6 @@ JANET_CORE_FN(cfun_stream_chunk, double to = janet_optnumber(argv, argc, 3, INFINITY); if (to != INFINITY) janet_addtimeout(to); janet_ev_recvchunk(stream, buffer, n, MSG_NOSIGNAL); - janet_await(); } JANET_CORE_FN(cfun_stream_recv_from, @@ -881,7 +884,6 @@ JANET_CORE_FN(cfun_stream_recv_from, double to = janet_optnumber(argv, argc, 3, INFINITY); if (to != INFINITY) janet_addtimeout(to); janet_ev_recvfrom(stream, buffer, n, MSG_NOSIGNAL); - janet_await(); } JANET_CORE_FN(cfun_stream_write, @@ -901,7 +903,6 @@ JANET_CORE_FN(cfun_stream_write, if (to != INFINITY) janet_addtimeout(to); janet_ev_send_string(stream, bytes.bytes, MSG_NOSIGNAL); } - janet_await(); } JANET_CORE_FN(cfun_stream_send_to, @@ -922,7 +923,6 @@ JANET_CORE_FN(cfun_stream_send_to, if (to != INFINITY) janet_addtimeout(to); janet_ev_sendto_string(stream, bytes.bytes, dest, MSG_NOSIGNAL); } - janet_await(); } JANET_CORE_FN(cfun_stream_flush, diff --git a/src/include/janet.h b/src/include/janet.h index d365721d..ea7d34db 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -596,8 +596,7 @@ typedef enum { JANET_ASYNC_EVENT_READ = 6, JANET_ASYNC_EVENT_WRITE = 7, JANET_ASYNC_EVENT_COMPLETE = 8, /* Used on windows for IOCP */ - JANET_ASYNC_EVENT_FAILED = 9, /* Used on windows for IOCP */ - JANET_ASYNC_EVENT_USER = 10 + JANET_ASYNC_EVENT_FAILED = 9 /* Used on windows for IOCP */ } JanetAsyncEvent; typedef enum { @@ -606,9 +605,7 @@ typedef enum { JANET_ASYNC_LISTEN_BOTH } JanetAsyncMode; -/* Typedefs */ typedef struct JanetStream JanetStream; -typedef void (*JanetEVCallback)(JanetFiber *fiber, JanetAsyncEvent event); /* Wrapper around file descriptors and HANDLEs that can be polled. */ struct JanetStream { @@ -620,9 +617,24 @@ struct JanetStream { const void *methods; /* Methods for this stream */ }; +typedef void (*JanetEVCallback)(JanetFiber *fiber, JanetAsyncEvent event); + +/* Start listening for events from a stream on the current root fiber. After + * calling this, users should call janet_await() before returning from the + * current C Function. This also will call janet_await. + * mode is which events to listen for, and callback is the function pointer to + * call when ever an event is sent from the event loop. state is an optional (can be NULL) + * pointer to data allocated with janet_malloc. This pointer will be passed to callback as + * fiber->ev_state. It will also be freed for you by the runtime when the event loop determines + * it can no longer be referenced. On windows, the contents of state MUST contained an OVERLAPPED struct. */ +JANET_API JANET_NO_RETURN void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state); + +/* Do not send any more events to the given callback. Call this after scheduling fiber to be resume + * or canceled. */ JANET_API void janet_async_end(JanetFiber *fiber); -JANET_API void *janet_async_start(JanetFiber *fiber, JanetStream *stream, - JanetAsyncMode mode, JanetEVCallback callback, size_t data_size); + +/* Needed for windows to mark a fiber as waiting for an IOCP completion event. Noop on other platforms. */ +JANET_API void janet_async_in_flight(JanetFiber *fiber); #endif @@ -1488,22 +1500,22 @@ JANET_API void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGeneric JANET_API void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value); /* Read async from a stream */ -JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes); -JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes); +JANET_NO_RETURN JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes); +JANET_NO_RETURN JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes); #ifdef JANET_NET -JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); -JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); -JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); +JANET_NO_RETURN JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); +JANET_NO_RETURN JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); +JANET_NO_RETURN JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); #endif /* Write async to a stream */ -JANET_API void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf); -JANET_API void janet_ev_write_string(JanetStream *stream, JanetString str); +JANET_NO_RETURN JANET_API void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf); +JANET_NO_RETURN JANET_API void janet_ev_write_string(JanetStream *stream, JanetString str); #ifdef JANET_NET -JANET_API void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags); -JANET_API void janet_ev_send_string(JanetStream *stream, JanetString str, int flags); -JANET_API void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags); -JANET_API void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags); +JANET_NO_RETURN JANET_API void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags); +JANET_NO_RETURN JANET_API void janet_ev_send_string(JanetStream *stream, JanetString str, int flags); +JANET_NO_RETURN JANET_API void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags); +JANET_NO_RETURN JANET_API void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags); #endif #endif