diff --git a/src/core/ev.c b/src/core/ev.c index fa9712c8..2b90bbdb 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -281,7 +281,7 @@ static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener state->event = user; state->machine(state, JANET_ASYNC_EVENT_INIT); janet_ev_inc_refcount(); - janet_gcroot(janet_wrap_abstract(state)); + janet_table_put(&janet_vm.listeners, janet_wrap_abstract(state), janet_wrap_true()); return state; bad_listen_write: janet_panic("cannot listen for duplicate write event on stream"); @@ -297,7 +297,7 @@ void janet_fiber_did_resume(JanetFiber *fiber) { } static void janet_unlisten_impl(JanetListenerState *state) { - janet_gcunroot(janet_wrap_abstract(state)); + janet_table_remove(&janet_vm.listeners, janet_wrap_abstract(state)); if (state->stream) { janet_ev_dec_refcount(); if (state->stream->read_state == state) { @@ -557,6 +557,14 @@ void janet_ev_mark(void) { janet_mark(janet_wrap_fiber(janet_vm.tq[i].curr_fiber)); } } + + /* Listeners */ + for (size_t i = 0; i < janet_vm.listeners.capacity; i++) { + JanetKV *kv = janet_vm.listeners.data + i; + if (!janet_checktype(kv->key, JANET_NIL)) { + janet_mark(kv->key); + } + } } static int janet_channel_push(JanetChannel *channel, Janet x, int mode); @@ -582,6 +590,7 @@ void janet_ev_init_common(void) { janet_vm.tq_capacity = 0; janet_table_init_raw(&janet_vm.threaded_abstracts, 0); janet_table_init_raw(&janet_vm.active_tasks, 0); + janet_table_init_raw(&janet_vm.listeners, 0); janet_table_init_raw(&janet_vm.signal_handlers, 0); janet_rng_seed(&janet_vm.ev_rng, 0); #ifndef JANET_WINDOWS @@ -596,6 +605,7 @@ void janet_ev_deinit_common(void) { janet_free(janet_vm.tq); janet_table_deinit(&janet_vm.threaded_abstracts); janet_table_deinit(&janet_vm.active_tasks); + janet_table_deinit(&janet_vm.listeners); janet_table_deinit(&janet_vm.signal_handlers); #ifndef JANET_WINDOWS pthread_attr_destroy(&janet_vm.new_thread_attr); @@ -1797,11 +1807,11 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in struct kevent kev[2]; int length = 0; - if (state->stream->_mask & JANET_ASYNC_LISTEN_READ) { + if (mask & JANET_ASYNC_LISTEN_READ) { EV_SETx(&kev[length], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, stream); length++; } - if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE) { + if (mask & JANET_ASYNC_LISTEN_WRITE) { EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, stream); length++; } @@ -1824,11 +1834,11 @@ static void janet_unlisten(JanetListenerState *state) { EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream); int length = 0; - if (stream->_mask & JANET_ASYNC_EVENT_WRITE) { + if (stream->read_state == state) { EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream); length++; } - if (stream->_mask & JANET_ASYNC_EVENT_READ) { + if (stream->write_state == state) { EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream); length++; } diff --git a/src/core/state.h b/src/core/state.h index 115ac109..fb0161da 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -158,6 +158,7 @@ struct JanetVM { volatile size_t extra_listeners; /* used in signal handler, must be volatile */ JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */ JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */ + JanetTable listeners; /* For GC */ JanetTable signal_handlers; #ifdef JANET_WINDOWS void **iocp; diff --git a/src/include/janet.h b/src/include/janet.h index 4de24b1f..c34a89c7 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -627,6 +627,8 @@ struct JanetListenerState { #ifdef JANET_WINDOWS void *tag; /* Used to associate listeners with an overlapped structure */ int bytes; /* Used to track how many bytes were transfered. */ +#else + uint32_t index; /* Used for poll implentation */ #endif }; #endif