mirror of
https://github.com/janet-lang/janet
synced 2025-04-05 14:56:55 +00:00
Get IOCP reworked event loop passing tests.
This commit is contained in:
parent
81f35f5dd1
commit
1ee98e1e66
@ -281,7 +281,7 @@ static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener
|
||||
state->event = user;
|
||||
state->machine(state, JANET_ASYNC_EVENT_INIT);
|
||||
janet_ev_inc_refcount();
|
||||
janet_gcroot(janet_wrap_abstract(state));
|
||||
janet_table_put(&janet_vm.listeners, janet_wrap_abstract(state), janet_wrap_true());
|
||||
return state;
|
||||
bad_listen_write:
|
||||
janet_panic("cannot listen for duplicate write event on stream");
|
||||
@ -297,7 +297,7 @@ void janet_fiber_did_resume(JanetFiber *fiber) {
|
||||
}
|
||||
|
||||
static void janet_unlisten_impl(JanetListenerState *state) {
|
||||
janet_gcunroot(janet_wrap_abstract(state));
|
||||
janet_table_remove(&janet_vm.listeners, janet_wrap_abstract(state));
|
||||
if (state->stream) {
|
||||
janet_ev_dec_refcount();
|
||||
if (state->stream->read_state == state) {
|
||||
@ -557,6 +557,14 @@ void janet_ev_mark(void) {
|
||||
janet_mark(janet_wrap_fiber(janet_vm.tq[i].curr_fiber));
|
||||
}
|
||||
}
|
||||
|
||||
/* Listeners */
|
||||
for (size_t i = 0; i < janet_vm.listeners.capacity; i++) {
|
||||
JanetKV *kv = janet_vm.listeners.data + i;
|
||||
if (!janet_checktype(kv->key, JANET_NIL)) {
|
||||
janet_mark(kv->key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int janet_channel_push(JanetChannel *channel, Janet x, int mode);
|
||||
@ -582,6 +590,7 @@ void janet_ev_init_common(void) {
|
||||
janet_vm.tq_capacity = 0;
|
||||
janet_table_init_raw(&janet_vm.threaded_abstracts, 0);
|
||||
janet_table_init_raw(&janet_vm.active_tasks, 0);
|
||||
janet_table_init_raw(&janet_vm.listeners, 0);
|
||||
janet_table_init_raw(&janet_vm.signal_handlers, 0);
|
||||
janet_rng_seed(&janet_vm.ev_rng, 0);
|
||||
#ifndef JANET_WINDOWS
|
||||
@ -596,6 +605,7 @@ void janet_ev_deinit_common(void) {
|
||||
janet_free(janet_vm.tq);
|
||||
janet_table_deinit(&janet_vm.threaded_abstracts);
|
||||
janet_table_deinit(&janet_vm.active_tasks);
|
||||
janet_table_deinit(&janet_vm.listeners);
|
||||
janet_table_deinit(&janet_vm.signal_handlers);
|
||||
#ifndef JANET_WINDOWS
|
||||
pthread_attr_destroy(&janet_vm.new_thread_attr);
|
||||
@ -1797,11 +1807,11 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
|
||||
struct kevent kev[2];
|
||||
|
||||
int length = 0;
|
||||
if (state->stream->_mask & JANET_ASYNC_LISTEN_READ) {
|
||||
if (mask & JANET_ASYNC_LISTEN_READ) {
|
||||
EV_SETx(&kev[length], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, stream);
|
||||
length++;
|
||||
}
|
||||
if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE) {
|
||||
if (mask & JANET_ASYNC_LISTEN_WRITE) {
|
||||
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, stream);
|
||||
length++;
|
||||
}
|
||||
@ -1824,11 +1834,11 @@ static void janet_unlisten(JanetListenerState *state) {
|
||||
EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
|
||||
|
||||
int length = 0;
|
||||
if (stream->_mask & JANET_ASYNC_EVENT_WRITE) {
|
||||
if (stream->read_state == state) {
|
||||
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
|
||||
length++;
|
||||
}
|
||||
if (stream->_mask & JANET_ASYNC_EVENT_READ) {
|
||||
if (stream->write_state == state) {
|
||||
EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream);
|
||||
length++;
|
||||
}
|
||||
|
@ -158,6 +158,7 @@ struct JanetVM {
|
||||
volatile size_t extra_listeners; /* used in signal handler, must be volatile */
|
||||
JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */
|
||||
JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */
|
||||
JanetTable listeners; /* For GC */
|
||||
JanetTable signal_handlers;
|
||||
#ifdef JANET_WINDOWS
|
||||
void **iocp;
|
||||
|
@ -627,6 +627,8 @@ struct JanetListenerState {
|
||||
#ifdef JANET_WINDOWS
|
||||
void *tag; /* Used to associate listeners with an overlapped structure */
|
||||
int bytes; /* Used to track how many bytes were transfered. */
|
||||
#else
|
||||
uint32_t index; /* Used for poll implentation */
|
||||
#endif
|
||||
};
|
||||
#endif
|
||||
|
Loading…
x
Reference in New Issue
Block a user