1
0
mirror of https://github.com/janet-lang/janet synced 2024-06-18 11:19:56 +00:00

Work on debugging issue with server spawning.

This commit is contained in:
Calvin Rose 2023-09-24 15:32:34 -07:00
parent dccb60ba35
commit 1b402347cd
7 changed files with 62 additions and 74 deletions

View File

@ -3766,7 +3766,7 @@
[host port &opt handler type]
(def s (net/listen host port type))
(if handler
(ev/call (fn [] (net/accept-loop s handler))))
(ev/go (fn [] (net/accept-loop s handler))))
s))
###

View File

@ -180,7 +180,7 @@ static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) {
}
/* Forward declaration */
static void janet_unlisten(JanetListenerState *state, int is_gc);
static void janet_unlisten(JanetListenerState *state);
/* Get current timestamp (millisecond precision) */
static JanetTimestamp ts_now(void);
@ -254,16 +254,27 @@ static void add_timeout(JanetTimeout to) {
}
}
static void janet_cleanup_canceled_states(JanetStream *stream) {
JanetListenerState *other_state = stream->state;
while (other_state) {
JanetListenerState *next_state = other_state->_next;
if (other_state->fiber->sched_id != other_state->_sched_id) {
janet_unlisten(other_state);
}
other_state = next_state;
}
}
/* Create a new event listener */
static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
if (stream->flags & JANET_STREAM_CLOSED) {
janet_panic("cannot listen on closed stream");
}
if (stream->_mask & mask) {
janet_panic("cannot listen for duplicate event on stream");
}
if (janet_vm.root_fiber->waiting != NULL) {
janet_panic("current fiber is already waiting for event");
janet_cleanup_canceled_states(stream);
if (stream->_mask & mask) {
janet_panic("cannot listen for duplicate event on stream");
}
}
if (size < sizeof(JanetListenerState))
size = sizeof(JanetListenerState);
@ -273,7 +284,7 @@ static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener
}
state->machine = behavior;
state->fiber = janet_vm.root_fiber;
janet_vm.root_fiber->waiting = state;
state->_sched_id = janet_vm.root_fiber->sched_id;
state->stream = stream;
state->_mask = mask;
stream->_mask |= mask;
@ -302,7 +313,7 @@ static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener
/* Indicate we are no longer listening for an event. This
* frees the memory of the state machine as well. */
static void janet_unlisten_impl(JanetListenerState *state, int is_gc) {
static void janet_unlisten_impl(JanetListenerState *state) {
state->machine(state, JANET_ASYNC_EVENT_DEINIT);
/* Remove state machine from poll list */
JanetListenerState **iter = &(state->stream->state);
@ -312,18 +323,11 @@ static void janet_unlisten_impl(JanetListenerState *state, int is_gc) {
*iter = state->_next;
/* Remove mask */
state->stream->_mask &= ~(state->_mask);
/* Ensure fiber does not reference this state */
if (!is_gc) {
JanetFiber *fiber = state->fiber;
if (NULL != fiber && fiber->waiting == state) {
fiber->waiting = NULL;
}
}
/* Untrack a listener for gc purposes */
size_t index = state->_index;
janet_vm.listeners[index] = janet_vm.listeners[--janet_vm.listener_count];
janet_vm.listeners[index]->_index = index;
janet_free(state);
//janet_free(state);
}
static void janet_stream_checktoclose(JanetStream *stream) {
@ -352,45 +356,40 @@ JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod
return stream;
}
/* Close a stream */
static void janet_stream_close_impl(JanetStream *stream, int is_gc) {
if (stream->flags & JANET_STREAM_CLOSED) return;
void janet_stream_close(JanetStream *stream) {
stream->flags |= JANET_STREAM_CLOSED;
JanetListenerState *state = stream->state;
while (NULL != state) {
if (!is_gc) {
state->machine(state, JANET_ASYNC_EVENT_CLOSE);
}
while (state) {
state->machine(state, JANET_ASYNC_EVENT_CLOSE);
JanetListenerState *next_state = state->_next;
janet_unlisten(state, is_gc);
janet_unlisten(state);
state = next_state;
}
stream->state = NULL;
#ifdef JANET_WINDOWS
if (stream->handle != INVALID_HANDLE_VALUE) {
#ifdef JANET_NET
if (stream->flags & JANET_STREAM_SOCKET) {
closesocket((SOCKET) stream->handle);
} else
if (stream->flags & JANET_STREAM_SOCKET) {
closesocket((SOCKET) stream->handle);
} else
#endif
{
CloseHandle(stream->handle);
{
CloseHandle(stream->handle);
}
stream->handle = INVALID_HANDLE_VALUE;
}
stream->handle = INVALID_HANDLE_VALUE;
#else
close(stream->handle);
stream->handle = -1;
if (stream->handle != -1) {
close(stream->handle);
stream->handle = -1;
}
#endif
}
void janet_stream_close(JanetStream *stream) {
janet_stream_close_impl(stream, 0);
}
/* Called to clean up a stream */
static int janet_stream_gc(void *p, size_t s) {
(void) s;
JanetStream *stream = (JanetStream *)p;
janet_stream_close_impl(stream, 1);
janet_stream_close(stream);
return 0;
}
@ -522,14 +521,6 @@ void janet_schedule(JanetFiber *fiber, Janet value) {
janet_schedule_signal(fiber, value, JANET_SIGNAL_OK);
}
void janet_fiber_did_resume(JanetFiber *fiber) {
/* Cancel any pending fibers */
if (fiber->waiting) {
fiber->waiting->machine(fiber->waiting, JANET_ASYNC_EVENT_CANCEL);
janet_unlisten(fiber->waiting, 0);
}
}
/* Mark all pending tasks */
void janet_ev_mark(void) {
@ -639,9 +630,9 @@ void janet_addtimeout(double sec) {
void janet_ev_inc_refcount(void) {
#ifdef JANET_WINDOWS
#ifdef JANET_64
InterlockedIncrement64(&janet_vm.extra_listeners);
InterlockedIncrement64((int64_t volatile *) &janet_vm.extra_listeners);
#else
InterlockedIncrement(&janet_vm.extra_listeners);
InterlockedIncrement((int32_t volatile *) &janet_vm.extra_listeners);
#endif
#else
__atomic_add_fetch(&janet_vm.extra_listeners, 1, __ATOMIC_RELAXED);
@ -651,9 +642,9 @@ void janet_ev_inc_refcount(void) {
void janet_ev_dec_refcount(void) {
#ifdef JANET_WINDOWS
#ifdef JANET_64
InterlockedDecrement64(&janet_vm.extra_listeners);
InterlockedDecrement64((int64_t volatile *) &janet_vm.extra_listeners);
#else
InterlockedDecrement(&janet_vm.extra_listeners);
InterlockedDecrement((int32_t volatile *) &janet_vm.extra_listeners);
#endif
#else
__atomic_add_fetch(&janet_vm.extra_listeners, -1, __ATOMIC_RELAXED);
@ -1512,8 +1503,8 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
return state;
}
static void janet_unlisten(JanetListenerState *state, int is_gc) {
janet_unlisten_impl(state, is_gc);
static void janet_unlisten(JanetListenerState *state) {
janet_unlisten_impl(state);
}
void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
@ -1550,6 +1541,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
} else {
/* Normal event */
JanetStream *stream = (JanetStream *) completionKey;
janet_assert(!(stream->flags & JANET_STREAM_CLOSED), "got closed stream event");
JanetListenerState *state = stream->state;
while (state != NULL) {
if (state->tag == overlapped) {
@ -1557,7 +1549,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
state->bytes = num_bytes_transfered;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0);
janet_unlisten(state);
}
break;
} else {
@ -1598,7 +1590,7 @@ static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
status2 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0);
janet_unlisten(state);
} else {
/* Repost event */
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
@ -1631,7 +1623,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
} else {
/* Unexpected error */
janet_unlisten_impl(state, 0);
janet_unlisten_impl(state);
janet_panicv(janet_ev_lasterr());
}
}
@ -1639,7 +1631,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
}
/* Tell system we are done listening for a certain event */
static void janet_unlisten(JanetListenerState *state, int is_gc) {
static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream;
if (!(stream->flags & JANET_STREAM_CLOSED)) {
/* Use flag to indicate state is not registered in epoll */
@ -1659,7 +1651,7 @@ static void janet_unlisten(JanetListenerState *state, int is_gc) {
}
}
/* Destroy state machine and free memory */
janet_unlisten_impl(state, is_gc);
janet_unlisten_impl(state);
}
#define JANET_EPOLL_MAX_EVENTS 64
@ -1716,7 +1708,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state, 0);
janet_unlisten(state);
state = next_state;
}
janet_stream_checktoclose(stream);
@ -1829,9 +1821,9 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
return state;
}
static void janet_unlisten(JanetListenerState *state, int is_gc) {
static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream;
if (!(stream->flags & JANET_STREAM_CLOSED)) {
if (stream->handle != -1) {
/* Use flag to indicate state is not registered in kqueue */
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) {
int is_last = (state->_next == NULL && stream->state == state);
@ -1852,7 +1844,7 @@ static void janet_unlisten(JanetListenerState *state, int is_gc) {
add_kqueue_events(kev, length);
}
}
janet_unlisten_impl(state, is_gc);
janet_unlisten_impl(state);
}
#define JANET_KQUEUE_MAX_EVENTS 64
@ -1913,9 +1905,9 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (statuses[0] == JANET_ASYNC_STATUS_DONE ||
statuses[1] == JANET_ASYNC_STATUS_DONE ||
statuses[2] == JANET_ASYNC_STATUS_DONE ||
statuses[3] == JANET_ASYNC_STATUS_DONE)
statuses[3] == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0);
}
state = next_state;
}
janet_stream_checktoclose(stream);
@ -1984,9 +1976,9 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
return state;
}
static void janet_unlisten(JanetListenerState *state, int is_gc) {
static void janet_unlisten(JanetListenerState *state) {
janet_vm.fds[state->_index + 1] = janet_vm.fds[janet_vm.listener_count];
janet_unlisten_impl(state, is_gc);
janet_unlisten_impl(state);
}
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
@ -2035,7 +2027,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0);
janet_unlisten(state);
}
janet_stream_checktoclose(stream);
}

View File

@ -39,7 +39,6 @@ static void fiber_reset(JanetFiber *fiber) {
fiber->env = NULL;
fiber->last_value = janet_wrap_nil();
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->sched_id = 0;
fiber->supervisor_channel = NULL;
#endif
@ -85,7 +84,6 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t
if (janet_fiber_funcframe(fiber, callee)) return NULL;
janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE;
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->supervisor_channel = NULL;
#endif
return fiber;

View File

@ -1048,7 +1048,6 @@ static const uint8_t *unmarshal_one_fiber(
fiber->env = NULL;
fiber->last_value = janet_wrap_nil();
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->sched_id = 0;
fiber->supervisor_channel = NULL;
#endif

View File

@ -123,6 +123,9 @@ JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent even
switch (event) {
default:
return JANET_ASYNC_STATUS_NOT_DONE;
case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_HUP:
case JANET_ASYNC_EVENT_ERR:
case JANET_ASYNC_EVENT_COMPLETE:

View File

@ -1446,10 +1446,6 @@ static JanetSignal janet_continue_no_check(JanetFiber *fiber, Janet in, Janet *o
JanetFiberStatus old_status = janet_fiber_status(fiber);
#ifdef JANET_EV
janet_fiber_did_resume(fiber);
#endif
/* Clear last value */
fiber->last_value = janet_wrap_nil();

View File

@ -636,6 +636,7 @@ struct JanetListenerState {
/* internal */
size_t _index;
int _mask;
uint32_t _sched_id;
JanetListenerState *_next;
};
#endif
@ -926,7 +927,6 @@ struct JanetFiber {
* that is, fibers that are scheduled on the event loop and behave much like threads
* in a multi-tasking system. It would be possible to move these fields to a new
* type, say "JanetTask", that as separate from fibers to save a bit of space. */
JanetListenerState *waiting;
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
void *supervisor_channel; /* Channel to push self to when complete */
#endif