1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-17 14:14:49 +00:00

Refine interface for janet's new event loop.

Infer the current root fiber and force user to
allocate state for async events.
This commit is contained in:
Calvin Rose 2023-10-08 18:25:46 -05:00
parent a775a89e01
commit 4139e426fe
4 changed files with 89 additions and 76 deletions

View File

@ -2,6 +2,8 @@
All notable changes to this project will be documented in this file.
## Unreleased - ???
- Change C API for event loop interaction - get rid of JanetListener and instead use `janet_async_start` and `janet_async_end`.
- Rework event loop to make fewer system calls on kqueue and epoll.
- Expose atomic refcount abstraction in janet.h
- Add `array/weak` for weak references in arrays
- Add support for weak tables via `table/weak`, `table/weak-keys`, and `table/weak-values`.

View File

@ -255,6 +255,7 @@ static void add_timeout(JanetTimeout to) {
void janet_async_end(JanetFiber *fiber) {
if (fiber->ev_callback) {
fiber->ev_callback(fiber, JANET_ASYNC_EVENT_DEINIT);
janet_gcunroot(janet_wrap_abstract(fiber->ev_stream));
fiber->ev_callback = NULL;
if (fiber->ev_state) {
@ -267,7 +268,16 @@ void janet_async_end(JanetFiber *fiber) {
}
}
void *janet_async_start(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, size_t data_size) {
void janet_async_in_flight(JanetFiber *fiber) {
#ifdef JANET_WINDOWS
fiber->flags |= JANET_FIBER_EV_FLAG_IN_FLIGHT;
#else
(void) fiber;
#endif
}
void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state) {
JanetFiber *fiber = janet_vm.root_fiber;
janet_assert(!fiber->ev_callback, "double async on fiber");
if (mode & JANET_ASYNC_LISTEN_READ) stream->read_fiber = fiber;
if (mode & JANET_ASYNC_LISTEN_WRITE) stream->write_fiber = fiber;
@ -275,14 +285,9 @@ void *janet_async_start(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode m
fiber->ev_stream = stream;
janet_ev_inc_refcount();
janet_gcroot(janet_wrap_abstract(stream));
if (data_size) {
void *data = janet_malloc(data_size);
fiber->ev_state = data;
return data;
} else {
fiber->ev_state = NULL;
return NULL;
}
fiber->ev_state = state;
callback(fiber, JANET_ASYNC_EVENT_INIT);
janet_await();
}
void janet_fiber_did_resume(JanetFiber *fiber) {
@ -2199,7 +2204,7 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
}
/* fallthrough */
case JANET_ASYNC_EVENT_USER: {
case JANET_ASYNC_EVENT_INIT: {
int32_t chunk_size = state->bytes_left > JANET_EV_CHUNKSIZE ? JANET_EV_CHUNKSIZE : state->bytes_left;
memset(&(state->overlapped), 0, sizeof(OVERLAPPED));
int status;
@ -2237,7 +2242,7 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
return;
}
}
fiber->flags |= JANET_FIBER_EV_FLAG_IN_FLIGHT;
janet_async_in_flight(fiber);
}
break;
#else
@ -2253,7 +2258,7 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
read_more:
case JANET_ASYNC_EVENT_HUP:
case JANET_ASYNC_EVENT_USER:
case JANET_ASYNC_EVENT_INIT:
case JANET_ASYNC_EVENT_READ: {
JanetBuffer *buffer = state->buf;
int32_t bytes_left = state->bytes_left;
@ -2332,9 +2337,8 @@ void ev_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
}
}
static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) {
JanetFiber *f = janet_vm.root_fiber;
StateRead *state = (StateRead *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, ev_callback_read, sizeof(StateRead));
static JANET_NO_RETURN void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) {
StateRead *state = janet_malloc(sizeof(StateRead));
state->is_chunk = is_chunked;
state->buf = buf;
state->bytes_left = nbytes;
@ -2345,23 +2349,23 @@ static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t
#else
state->flags = flags;
#endif
ev_callback_read(f, JANET_ASYNC_EVENT_USER);
janet_async_start(stream, JANET_ASYNC_LISTEN_READ, ev_callback_read, state);
}
void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
JANET_NO_RETURN void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_READ, 0);
}
void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
JANET_NO_RETURN void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_READ, 0);
}
#ifdef JANET_NET
void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
JANET_NO_RETURN void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECV, flags);
}
void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
JANET_NO_RETURN void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_RECV, flags);
}
void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
JANET_NO_RETURN void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECVFROM, flags);
}
#endif
@ -2431,7 +2435,7 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) {
return;
}
break;
case JANET_ASYNC_EVENT_USER: {
case JANET_ASYNC_EVENT_INIT: {
/* Begin write */
int32_t len;
const uint8_t *bytes;
@ -2461,7 +2465,7 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) {
status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
if (status) {
if (WSA_IO_PENDING == WSAGetLastError()) {
fiber->flags |= JANET_FIBER_EV_FLAG_IN_FLIGHT;
janet_async_in_flight(fiber);
} else {
janet_cancel(fiber, janet_ev_lasterr());
janet_async_end(fiber);
@ -2486,7 +2490,7 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) {
status = WriteFile(stream->handle, bytes, len, NULL, &state->overlapped);
if (!status) {
if (ERROR_IO_PENDING == GetLastError()) {
fiber->flags |= JANET_FIBER_EV_FLAG_IN_FLIGHT;
janet_async_in_flight(fiber);
} else {
janet_cancel(fiber, janet_ev_lasterr());
janet_async_end(fiber);
@ -2505,7 +2509,7 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) {
janet_cancel(fiber, janet_cstringv("stream hup"));
janet_async_end(fiber);
break;
case JANET_ASYNC_EVENT_USER:
case JANET_ASYNC_EVENT_INIT:
case JANET_ASYNC_EVENT_WRITE: {
int32_t start, len;
const uint8_t *bytes;
@ -2570,10 +2574,8 @@ void ev_callback_write(JanetFiber *fiber, JanetAsyncEvent event) {
}
}
static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) {
JanetFiber *f = janet_vm.root_fiber;
StateWrite *state = (StateWrite *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_WRITE,
ev_callback_write, sizeof(StateWrite));
static JANET_NO_RETURN void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) {
StateWrite *state = janet_malloc(sizeof(StateWrite));
state->is_buffer = is_buffer;
state->src.buf = buf;
state->dest_abst = dest_abst;
@ -2584,31 +2586,31 @@ static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_ab
state->flags = flags;
state->start = 0;
#endif
ev_callback_write(f, JANET_ASYNC_EVENT_USER);
janet_async_start(stream, JANET_ASYNC_LISTEN_WRITE, ev_callback_write, state);
}
void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf) {
JANET_NO_RETURN void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf) {
janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_WRITE, 1, 0);
}
void janet_ev_write_string(JanetStream *stream, JanetString str) {
JANET_NO_RETURN void janet_ev_write_string(JanetStream *stream, JanetString str) {
janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_WRITE, 0, 0);
}
#ifdef JANET_NET
void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags) {
JANET_NO_RETURN void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags) {
janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_SEND, 1, flags);
}
void janet_ev_send_string(JanetStream *stream, JanetString str, int flags) {
JANET_NO_RETURN void janet_ev_send_string(JanetStream *stream, JanetString str, int flags) {
janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_SEND, 0, flags);
}
void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags) {
JANET_NO_RETURN void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags) {
janet_ev_write_generic(stream, buf, dest, JANET_ASYNC_WRITEMODE_SENDTO, 1, flags);
}
void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
JANET_NO_RETURN void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
}
#endif
@ -2999,7 +3001,6 @@ JANET_CORE_FN(janet_cfun_stream_read,
if (to != INFINITY) janet_addtimeout(to);
janet_ev_read(stream, buffer, n);
}
janet_await();
}
JANET_CORE_FN(janet_cfun_stream_chunk,
@ -3014,7 +3015,6 @@ JANET_CORE_FN(janet_cfun_stream_chunk,
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_ev_readchunk(stream, buffer, n);
janet_await();
}
JANET_CORE_FN(janet_cfun_stream_write,
@ -3034,7 +3034,6 @@ JANET_CORE_FN(janet_cfun_stream_write,
if (to != INFINITY) janet_addtimeout(to);
janet_ev_write_string(stream, bytes.bytes);
}
janet_await();
}
static int mutexgc(void *p, size_t size) {

View File

@ -124,6 +124,13 @@ void net_callback_connect(JanetFiber *fiber, JanetAsyncEvent event) {
switch (event) {
default:
break;
#ifndef JANET_WINDOWS
/* Wait until we have an actually event before checking.
* Windows doesn't support async connect with this, just try immediately.*/
case JANET_ASYNC_EVENT_INIT:
#endif
case JANET_ASYNC_EVENT_DEINIT:
return;
case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(fiber, janet_cstringv("stream closed"));
janet_async_end(fiber);
@ -154,12 +161,9 @@ void net_callback_connect(JanetFiber *fiber, JanetAsyncEvent event) {
}
static void net_sched_connect(JanetStream *stream) {
JanetFiber *f = janet_vm.root_fiber;
NetStateConnect *state = (NetStateConnect *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_WRITE, net_callback_connect, sizeof(NetStateConnect));
NetStateConnect *state = janet_malloc(sizeof(NetStateConnect));
state->did_connect = 0;
#ifdef JANET_WINDOWS
net_callback_connect(f, JANET_ASYNC_EVENT_USER);
#endif
janet_async_start(stream, JANET_ASYNC_LISTEN_WRITE, net_callback_connect, state);
}
/* State machine for accepting connections. */
@ -229,14 +233,16 @@ void net_callback_accept(JanetFiber *fiber, JanetAsyncEvent event) {
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
Janet err;
JanetFiber *f = janet_vm.root_fiber;
NetStateAccept *state = (NetStateAccept *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, sizeof(NetStateAccept));
NetStateAccept *state = janet_malloc(sizeof(NetStateAccept));
memset(&state->overlapped, 0, sizeof(WSAOVERLAPPED));
memset(&state->buf, 0, 1024);
state->function = fun;
state->lstream = stream;
if (net_sched_accept_impl(state, f, &err)) janet_panicv(err);
janet_await();
if (net_sched_accept_impl(state, janet_root_fiber(), &err)) {
janet_free(state);
janet_panicv(err);
}
janet_async_start(stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, state);
}
static int net_sched_accept_impl(NetStateAccept *state, JanetFiber *fiber, Janet *err) {
@ -253,7 +259,7 @@ static int net_sched_accept_impl(NetStateAccept *state, JanetFiber *fiber, Janet
int code = WSAGetLastError();
if (code == WSA_IO_PENDING) {
/* indicates io is happening async */
fiber->flags |= JANET_FIBER_EV_FLAG_IN_FLIGHT;
janet_async_in_flight(fiber);
return 0;
}
*err = janet_ev_lasterr();
@ -282,7 +288,7 @@ void net_callback_accept(JanetFiber *fiber, JanetAsyncEvent event) {
janet_schedule(fiber, janet_wrap_nil());
janet_async_end(fiber);
return;
case JANET_ASYNC_EVENT_USER:
case JANET_ASYNC_EVENT_INIT:
case JANET_ASYNC_EVENT_READ: {
#if defined(JANET_LINUX)
JSock connfd = accept4(stream->handle, NULL, NULL, SOCK_CLOEXEC);
@ -310,11 +316,10 @@ void net_callback_accept(JanetFiber *fiber, JanetAsyncEvent event) {
}
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
JanetFiber *f = janet_vm.root_fiber;
NetStateAccept *state = (NetStateAccept *) janet_async_start(f, stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, sizeof(NetStateAccept));
NetStateAccept *state = janet_malloc(sizeof(NetStateAccept));
memset(state, 0, sizeof(NetStateAccept));
state->function = fun;
net_callback_accept(f, JANET_ASYNC_EVENT_USER);
janet_await();
janet_async_start(stream, JANET_ASYNC_LISTEN_READ, net_callback_accept, state);
}
#endif
@ -851,7 +856,6 @@ JANET_CORE_FN(cfun_stream_read,
if (to != INFINITY) janet_addtimeout(to);
janet_ev_recv(stream, buffer, n, MSG_NOSIGNAL);
}
janet_await();
}
JANET_CORE_FN(cfun_stream_chunk,
@ -866,7 +870,6 @@ JANET_CORE_FN(cfun_stream_chunk,
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_ev_recvchunk(stream, buffer, n, MSG_NOSIGNAL);
janet_await();
}
JANET_CORE_FN(cfun_stream_recv_from,
@ -881,7 +884,6 @@ JANET_CORE_FN(cfun_stream_recv_from,
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_ev_recvfrom(stream, buffer, n, MSG_NOSIGNAL);
janet_await();
}
JANET_CORE_FN(cfun_stream_write,
@ -901,7 +903,6 @@ JANET_CORE_FN(cfun_stream_write,
if (to != INFINITY) janet_addtimeout(to);
janet_ev_send_string(stream, bytes.bytes, MSG_NOSIGNAL);
}
janet_await();
}
JANET_CORE_FN(cfun_stream_send_to,
@ -922,7 +923,6 @@ JANET_CORE_FN(cfun_stream_send_to,
if (to != INFINITY) janet_addtimeout(to);
janet_ev_sendto_string(stream, bytes.bytes, dest, MSG_NOSIGNAL);
}
janet_await();
}
JANET_CORE_FN(cfun_stream_flush,

View File

@ -596,8 +596,7 @@ typedef enum {
JANET_ASYNC_EVENT_READ = 6,
JANET_ASYNC_EVENT_WRITE = 7,
JANET_ASYNC_EVENT_COMPLETE = 8, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_FAILED = 9, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_USER = 10
JANET_ASYNC_EVENT_FAILED = 9 /* Used on windows for IOCP */
} JanetAsyncEvent;
typedef enum {
@ -606,9 +605,7 @@ typedef enum {
JANET_ASYNC_LISTEN_BOTH
} JanetAsyncMode;
/* Typedefs */
typedef struct JanetStream JanetStream;
typedef void (*JanetEVCallback)(JanetFiber *fiber, JanetAsyncEvent event);
/* Wrapper around file descriptors and HANDLEs that can be polled. */
struct JanetStream {
@ -620,9 +617,24 @@ struct JanetStream {
const void *methods; /* Methods for this stream */
};
typedef void (*JanetEVCallback)(JanetFiber *fiber, JanetAsyncEvent event);
/* Start listening for events from a stream on the current root fiber. After
* calling this, users should call janet_await() before returning from the
* current C Function. This also will call janet_await.
* mode is which events to listen for, and callback is the function pointer to
* call when ever an event is sent from the event loop. state is an optional (can be NULL)
* pointer to data allocated with janet_malloc. This pointer will be passed to callback as
* fiber->ev_state. It will also be freed for you by the runtime when the event loop determines
* it can no longer be referenced. On windows, the contents of state MUST contained an OVERLAPPED struct. */
JANET_API JANET_NO_RETURN void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state);
/* Do not send any more events to the given callback. Call this after scheduling fiber to be resume
* or canceled. */
JANET_API void janet_async_end(JanetFiber *fiber);
JANET_API void *janet_async_start(JanetFiber *fiber, JanetStream *stream,
JanetAsyncMode mode, JanetEVCallback callback, size_t data_size);
/* Needed for windows to mark a fiber as waiting for an IOCP completion event. Noop on other platforms. */
JANET_API void janet_async_in_flight(JanetFiber *fiber);
#endif
@ -1488,22 +1500,22 @@ JANET_API void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGeneric
JANET_API void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value);
/* Read async from a stream */
JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
JANET_NO_RETURN JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
JANET_NO_RETURN JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
#ifdef JANET_NET
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_NO_RETURN JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_NO_RETURN JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_NO_RETURN JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
#endif
/* Write async to a stream */
JANET_API void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf);
JANET_API void janet_ev_write_string(JanetStream *stream, JanetString str);
JANET_NO_RETURN JANET_API void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf);
JANET_NO_RETURN JANET_API void janet_ev_write_string(JanetStream *stream, JanetString str);
#ifdef JANET_NET
JANET_API void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags);
JANET_API void janet_ev_send_string(JanetStream *stream, JanetString str, int flags);
JANET_API void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags);
JANET_API void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags);
JANET_NO_RETURN JANET_API void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags);
JANET_NO_RETURN JANET_API void janet_ev_send_string(JanetStream *stream, JanetString str, int flags);
JANET_NO_RETURN JANET_API void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags);
JANET_NO_RETURN JANET_API void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags);
#endif
#endif