1
0
mirror of https://github.com/janet-lang/janet synced 2024-06-18 11:19:56 +00:00

Redo state management for Janet listeners.

Make more use of the built in GC code for abstracts to
be sure things are more correct. Issue before was streams could
be freed before IOCP events arrived.
This commit is contained in:
Calvin Rose 2023-09-25 00:43:36 -07:00
parent 1b402347cd
commit 81f35f5dd1
7 changed files with 117 additions and 129 deletions

View File

@ -254,84 +254,89 @@ static void add_timeout(JanetTimeout to) {
}
}
static void janet_cleanup_canceled_states(JanetStream *stream) {
JanetListenerState *other_state = stream->state;
while (other_state) {
JanetListenerState *next_state = other_state->_next;
if (other_state->fiber->sched_id != other_state->_sched_id) {
janet_unlisten(other_state);
}
other_state = next_state;
}
}
static int janet_listener_gc(void *p, size_t s);
static int janet_listener_mark(void *p, size_t s);
static const JanetAbstractType janet_listener_AT = {
"core/ev-listener",
janet_listener_gc,
janet_listener_mark
};
/* Create a new event listener */
static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
if (stream->flags & JANET_STREAM_CLOSED) {
janet_panic("cannot listen on closed stream");
}
if (stream->_mask & mask) {
janet_cleanup_canceled_states(stream);
if (stream->_mask & mask) {
janet_panic("cannot listen for duplicate event on stream");
}
}
if (size < sizeof(JanetListenerState))
size = sizeof(JanetListenerState);
JanetListenerState *state = janet_malloc(size);
if (NULL == state) {
JANET_OUT_OF_MEMORY;
}
if ((mask & JANET_ASYNC_LISTEN_READ) && stream->read_state) goto bad_listen_read;
if ((mask & JANET_ASYNC_LISTEN_WRITE) && stream->write_state) goto bad_listen_write;
janet_assert(size >= sizeof(JanetListenerState), "bad size");
JanetListenerState *state = janet_abstract(&janet_listener_AT, size);
state->machine = behavior;
state->fiber = janet_vm.root_fiber;
state->_sched_id = janet_vm.root_fiber->sched_id;
janet_vm.root_fiber->waiting = state;
if (mask & JANET_ASYNC_LISTEN_READ) stream->read_state = state;
if (mask & JANET_ASYNC_LISTEN_WRITE) stream->write_state = state;
state->stream = stream;
state->_mask = mask;
stream->_mask |= mask;
state->_next = stream->state;
stream->state = state;
/* Keep track of a listener for GC purposes */
int resize = janet_vm.listener_cap == janet_vm.listener_count;
if (resize) {
size_t newcap = janet_vm.listener_count ? janet_vm.listener_cap * 2 : 16;
janet_vm.listeners = janet_realloc(janet_vm.listeners, newcap * sizeof(JanetListenerState *));
if (NULL == janet_vm.listeners) {
JANET_OUT_OF_MEMORY;
}
janet_vm.listener_cap = newcap;
}
size_t index = janet_vm.listener_count++;
janet_vm.listeners[index] = state;
state->_index = index;
/* Emit INIT event for convenience */
state->event = user;
state->machine(state, JANET_ASYNC_EVENT_INIT);
janet_ev_inc_refcount();
janet_gcroot(janet_wrap_abstract(state));
return state;
bad_listen_write:
janet_panic("cannot listen for duplicate write event on stream");
bad_listen_read:
janet_panic("cannot listen for duplicate read event on stream");
}
void janet_fiber_did_resume(JanetFiber *fiber) {
if (fiber->waiting) {
janet_unlisten(fiber->waiting);
fiber->waiting = NULL;
}
}
/* Indicate we are no longer listening for an event. This
* frees the memory of the state machine as well. */
static void janet_unlisten_impl(JanetListenerState *state) {
state->machine(state, JANET_ASYNC_EVENT_DEINIT);
/* Remove state machine from poll list */
JanetListenerState **iter = &(state->stream->state);
while (*iter && *iter != state)
iter = &((*iter)->_next);
janet_assert(*iter, "failed to remove listener");
*iter = state->_next;
/* Remove mask */
state->stream->_mask &= ~(state->_mask);
/* Untrack a listener for gc purposes */
size_t index = state->_index;
janet_vm.listeners[index] = janet_vm.listeners[--janet_vm.listener_count];
janet_vm.listeners[index]->_index = index;
//janet_free(state);
janet_gcunroot(janet_wrap_abstract(state));
if (state->stream) {
janet_ev_dec_refcount();
if (state->stream->read_state == state) {
state->stream->read_state = NULL;
}
if (state->stream->write_state == state) {
state->stream->write_state = NULL;
}
state->stream = NULL;
}
}
static int janet_listener_gc(void *p, size_t size) {
(void) size;
JanetListenerState *state = (JanetListenerState *)p;
if (state->stream) {
janet_ev_dec_refcount();
}
if (state->machine) {
state->machine(state, JANET_ASYNC_EVENT_DEINIT);
}
return 0;
}
static int janet_listener_mark(void *p, size_t size) {
(void) size;
JanetListenerState *state = (JanetListenerState *)p;
if (state->stream) {
janet_mark(janet_wrap_abstract(state->stream));
}
if (state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
state->machine(state, JANET_ASYNC_EVENT_MARK);
return 0;
}
static void janet_stream_checktoclose(JanetStream *stream) {
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->read_state && !stream->write_state) {
janet_stream_close(stream);
}
}
@ -349,22 +354,15 @@ JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod
JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream));
stream->handle = handle;
stream->flags = flags;
stream->state = NULL;
stream->_mask = 0;
stream->read_state = NULL;
stream->write_state = NULL;
if (methods == NULL) methods = ev_default_stream_methods;
stream->methods = methods;
return stream;
}
void janet_stream_close(JanetStream *stream) {
static void janet_stream_close_impl(JanetStream *stream) {
stream->flags |= JANET_STREAM_CLOSED;
JanetListenerState *state = stream->state;
while (state) {
state->machine(state, JANET_ASYNC_EVENT_CLOSE);
JanetListenerState *next_state = state->_next;
janet_unlisten(state);
state = next_state;
}
#ifdef JANET_WINDOWS
if (stream->handle != INVALID_HANDLE_VALUE) {
#ifdef JANET_NET
@ -385,11 +383,23 @@ void janet_stream_close(JanetStream *stream) {
#endif
}
void janet_stream_close(JanetStream *stream) {
if (stream->read_state) {
stream->read_state->machine(stream->read_state, JANET_ASYNC_EVENT_CLOSE);
janet_unlisten(stream->read_state);
}
if (stream->write_state) {
stream->write_state->machine(stream->write_state, JANET_ASYNC_EVENT_CLOSE);
janet_unlisten(stream->write_state);
}
janet_stream_close_impl(stream);
}
/* Called to clean up a stream */
static int janet_stream_gc(void *p, size_t s) {
(void) s;
JanetStream *stream = (JanetStream *)p;
janet_stream_close(stream);
janet_stream_close_impl(stream);
return 0;
}
@ -397,13 +407,11 @@ static int janet_stream_gc(void *p, size_t s) {
static int janet_stream_mark(void *p, size_t s) {
(void) s;
JanetStream *stream = (JanetStream *) p;
JanetListenerState *state = stream->state;
while (NULL != state) {
if (NULL != state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
(state->machine)(state, JANET_ASYNC_EVENT_MARK);
state = state->_next;
if (NULL != stream->read_state) {
janet_mark(janet_wrap_abstract(stream->read_state));
}
if (NULL != stream->write_state) {
janet_mark(janet_wrap_abstract(stream->write_state));
}
return 0;
}
@ -456,8 +464,8 @@ static void *janet_stream_unmarshal(JanetMarshalContext *ctx) {
}
JanetStream *p = janet_unmarshal_abstract(ctx, sizeof(JanetStream));
/* Can't share listening state and such across threads */
p->_mask = 0;
p->state = NULL;
p->read_state = NULL;
p->write_state = NULL;
p->flags = (uint32_t) janet_unmarshal_int(ctx);
p->methods = janet_unmarshal_ptr(ctx);
#ifdef JANET_WINDOWS
@ -549,16 +557,6 @@ void janet_ev_mark(void) {
janet_mark(janet_wrap_fiber(janet_vm.tq[i].curr_fiber));
}
}
/* Pending listeners */
for (size_t i = 0; i < janet_vm.listener_count; i++) {
JanetListenerState *state = janet_vm.listeners[i];
if (NULL != state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
janet_stream_mark(state->stream, sizeof(JanetStream));
(state->machine)(state, JANET_ASYNC_EVENT_MARK);
}
}
static int janet_channel_push(JanetChannel *channel, Janet x, int mode);
@ -579,9 +577,6 @@ static Janet make_supervisor_event(const char *name, JanetFiber *fiber, int thre
/* Common init code */
void janet_ev_init_common(void) {
janet_q_init(&janet_vm.spawn);
janet_vm.listener_count = 0;
janet_vm.listener_cap = 0;
janet_vm.listeners = NULL;
janet_vm.tq = NULL;
janet_vm.tq_count = 0;
janet_vm.tq_capacity = 0;
@ -599,8 +594,6 @@ void janet_ev_init_common(void) {
void janet_ev_deinit_common(void) {
janet_q_deinit(&janet_vm.spawn);
janet_free(janet_vm.tq);
janet_free(janet_vm.listeners);
janet_vm.listeners = NULL;
janet_table_deinit(&janet_vm.threaded_abstracts);
janet_table_deinit(&janet_vm.active_tasks);
janet_table_deinit(&janet_vm.signal_handlers);
@ -1327,8 +1320,7 @@ const JanetAbstractType janet_channel_type = {
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout);
int janet_loop_done(void) {
return !(janet_vm.listener_count ||
(janet_vm.spawn.head != janet_vm.spawn.tail) ||
return !((janet_vm.spawn.head != janet_vm.spawn.tail) ||
janet_vm.tq_count ||
janet_vm.extra_listeners);
}
@ -1392,7 +1384,7 @@ JanetFiber *janet_loop1(void) {
}
/* Poll for events */
if (janet_vm.listener_count || janet_vm.tq_count || janet_vm.extra_listeners) {
if (janet_vm.tq_count || janet_vm.extra_listeners) {
JanetTimeout to;
memset(&to, 0, sizeof(to));
int has_timeout;
@ -1411,7 +1403,7 @@ JanetFiber *janet_loop1(void) {
break;
}
/* Run polling implementation only if pending timeouts or pending events */
if (janet_vm.tq_count || janet_vm.listener_count || janet_vm.extra_listeners) {
if (janet_vm.tq_count || janet_vm.extra_listeners) {
janet_loop1_impl(has_timeout, to.when);
}
}
@ -1541,19 +1533,19 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
} else {
/* Normal event */
JanetStream *stream = (JanetStream *) completionKey;
janet_assert(!(stream->flags & JANET_STREAM_CLOSED), "got closed stream event");
JanetListenerState *state = stream->state;
while (state != NULL) {
if (state->tag == overlapped) {
state->event = overlapped;
state->bytes = num_bytes_transfered;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
}
break;
} else {
state = state->_next;
janet_assert(stream->handle != INVALID_HANDLE_VALUE, "got closed stream event");
JanetListenerState *state = NULL;
if (stream->read_state && stream->read_state->tag == overlapped) {
state = stream->read_state;
} else if (stream->write_state && stream->write_state->tag == overlapped) {
state = stream->write_state;
}
if (state != NULL) {
state->event = overlapped;
state->bytes = num_bytes_transfered;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
}
}
janet_stream_checktoclose(stream);
@ -1633,7 +1625,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
/* Tell system we are done listening for a certain event */
static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream;
if (!(stream->flags & JANET_STREAM_CLOSED)) {
if (!(stream->handle != -1)) {
/* Use flag to indicate state is not registered in epoll */
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) {
int is_last = (state->_next == NULL && stream->state == state);

View File

@ -40,6 +40,7 @@ static void fiber_reset(JanetFiber *fiber) {
fiber->last_value = janet_wrap_nil();
#ifdef JANET_EV
fiber->sched_id = 0;
fiber->waiting = NULL;
fiber->supervisor_channel = NULL;
#endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW);

View File

@ -268,6 +268,9 @@ recur:
if (fiber->supervisor_channel) {
janet_mark_abstract(fiber->supervisor_channel);
}
if (fiber->waiting) {
janet_mark_abstract(fiber->waiting);
}
#endif
/* Explicit tail recursion */

View File

@ -155,9 +155,6 @@ struct JanetVM {
JanetQueue spawn;
JanetTimeout *tq;
JanetRNG ev_rng;
JanetListenerState **listeners;
size_t listener_count;
size_t listener_cap;
volatile size_t extra_listeners; /* used in signal handler, must be volatile */
JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */
JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */

View File

@ -49,7 +49,7 @@
#ifndef JANET_EXIT
#include <stdio.h>
#define JANET_EXIT(m) do { \
fprintf(stderr, "C runtime error at line %d in file %s: %s\n",\
fprintf(stderr, "janet interpreter runtime error at line %d in file %s: %s\n",\
__LINE__,\
__FILE__,\
(m));\

View File

@ -1446,6 +1446,10 @@ static JanetSignal janet_continue_no_check(JanetFiber *fiber, Janet in, Janet *o
JanetFiberStatus old_status = janet_fiber_status(fiber);
#ifdef JANET_EV
janet_fiber_did_resume(fiber);
#endif
/* Clear last value */
fiber->last_value = janet_wrap_nil();

View File

@ -591,7 +591,6 @@ typedef enum {
JANET_ASYNC_EVENT_HUP,
JANET_ASYNC_EVENT_READ,
JANET_ASYNC_EVENT_WRITE,
JANET_ASYNC_EVENT_CANCEL,
JANET_ASYNC_EVENT_COMPLETE, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_USER
} JanetAsyncEvent;
@ -613,13 +612,9 @@ typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEv
struct JanetStream {
JanetHandle handle;
uint32_t flags;
/* Linked list of all in-flight IO routines for this stream */
JanetListenerState *state;
JanetListenerState *read_state;
JanetListenerState *write_state;
const void *methods; /* Methods for this stream */
/* internal - used to disallow multiple concurrent reads / writes on the same stream.
* this constraint may be lifted later but allowing such would require more internal book keeping
* for some implementations. You can read and write at the same time on the same stream, though. */
int _mask;
};
/* Interface for state machine based event loop */
@ -633,11 +628,6 @@ struct JanetListenerState {
void *tag; /* Used to associate listeners with an overlapped structure */
int bytes; /* Used to track how many bytes were transfered. */
#endif
/* internal */
size_t _index;
int _mask;
uint32_t _sched_id;
JanetListenerState *_next;
};
#endif
@ -928,6 +918,7 @@ struct JanetFiber {
* in a multi-tasking system. It would be possible to move these fields to a new
* type, say "JanetTask", that as separate from fibers to save a bit of space. */
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
JanetListenerState *waiting;
void *supervisor_channel; /* Channel to push self to when complete */
#endif
};