mirror of
https://github.com/janet-lang/janet
synced 2025-02-02 10:19:10 +00:00
Make poll work by going back to array of listeners for gc keeping.
This commit is contained in:
parent
a96971c8a7
commit
d12464fc0e
@ -275,6 +275,7 @@ static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener
|
|||||||
JanetListenerState *state = janet_abstract(&janet_listener_AT, size);
|
JanetListenerState *state = janet_abstract(&janet_listener_AT, size);
|
||||||
state->machine = behavior;
|
state->machine = behavior;
|
||||||
state->fiber = janet_vm.root_fiber;
|
state->fiber = janet_vm.root_fiber;
|
||||||
|
state->flags = 0;
|
||||||
janet_vm.root_fiber->waiting = state;
|
janet_vm.root_fiber->waiting = state;
|
||||||
if (mask & JANET_ASYNC_LISTEN_READ) stream->read_state = state;
|
if (mask & JANET_ASYNC_LISTEN_READ) stream->read_state = state;
|
||||||
if (mask & JANET_ASYNC_LISTEN_WRITE) stream->write_state = state;
|
if (mask & JANET_ASYNC_LISTEN_WRITE) stream->write_state = state;
|
||||||
@ -282,7 +283,8 @@ static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener
|
|||||||
state->event = user;
|
state->event = user;
|
||||||
state->machine(state, JANET_ASYNC_EVENT_INIT);
|
state->machine(state, JANET_ASYNC_EVENT_INIT);
|
||||||
janet_ev_inc_refcount();
|
janet_ev_inc_refcount();
|
||||||
janet_table_put(&janet_vm.listeners, janet_wrap_abstract(state), janet_wrap_true());
|
state->index = janet_vm.listeners->count;
|
||||||
|
janet_array_push(janet_vm.listeners, janet_wrap_abstract(state));
|
||||||
return state;
|
return state;
|
||||||
bad_listen_write:
|
bad_listen_write:
|
||||||
janet_panic("cannot listen for duplicate write event on stream");
|
janet_panic("cannot listen for duplicate write event on stream");
|
||||||
@ -298,8 +300,14 @@ void janet_fiber_did_resume(JanetFiber *fiber) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void janet_unlisten_impl(JanetListenerState *state) {
|
static void janet_unlisten_impl(JanetListenerState *state) {
|
||||||
janet_table_remove(&janet_vm.listeners, janet_wrap_abstract(state));
|
/* Move last listener to position of this listener - O(1) removal and keep things densely packed. */
|
||||||
if (state->stream) {
|
if (state->stream) {
|
||||||
|
Janet popped = janet_array_pop(janet_vm.listeners);
|
||||||
|
janet_assert(janet_checktype(popped, JANET_ABSTRACT), "pop check");
|
||||||
|
JanetListenerState *popped_state = (JanetListenerState *) janet_unwrap_abstract(popped);
|
||||||
|
janet_vm.listeners->data[state->index] = popped;
|
||||||
|
popped_state->index = state->index;
|
||||||
|
state->index = UINT32_MAX; /* just in case */
|
||||||
janet_ev_dec_refcount();
|
janet_ev_dec_refcount();
|
||||||
if (state->stream->read_state == state) {
|
if (state->stream->read_state == state) {
|
||||||
state->stream->read_state = NULL;
|
state->stream->read_state = NULL;
|
||||||
@ -558,14 +566,6 @@ void janet_ev_mark(void) {
|
|||||||
janet_mark(janet_wrap_fiber(janet_vm.tq[i].curr_fiber));
|
janet_mark(janet_wrap_fiber(janet_vm.tq[i].curr_fiber));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Listeners */
|
|
||||||
for (int32_t i = 0; i < janet_vm.listeners.capacity; i++) {
|
|
||||||
JanetKV *kv = janet_vm.listeners.data + i;
|
|
||||||
if (!janet_checktype(kv->key, JANET_NIL)) {
|
|
||||||
janet_mark(kv->key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int janet_channel_push(JanetChannel *channel, Janet x, int mode);
|
static int janet_channel_push(JanetChannel *channel, Janet x, int mode);
|
||||||
@ -591,9 +591,10 @@ void janet_ev_init_common(void) {
|
|||||||
janet_vm.tq_capacity = 0;
|
janet_vm.tq_capacity = 0;
|
||||||
janet_table_init_raw(&janet_vm.threaded_abstracts, 0);
|
janet_table_init_raw(&janet_vm.threaded_abstracts, 0);
|
||||||
janet_table_init_raw(&janet_vm.active_tasks, 0);
|
janet_table_init_raw(&janet_vm.active_tasks, 0);
|
||||||
janet_table_init_raw(&janet_vm.listeners, 0);
|
|
||||||
janet_table_init_raw(&janet_vm.signal_handlers, 0);
|
janet_table_init_raw(&janet_vm.signal_handlers, 0);
|
||||||
janet_rng_seed(&janet_vm.ev_rng, 0);
|
janet_rng_seed(&janet_vm.ev_rng, 0);
|
||||||
|
janet_vm.listeners = janet_array(0);
|
||||||
|
janet_gcroot(janet_wrap_array(janet_vm.listeners));
|
||||||
#ifndef JANET_WINDOWS
|
#ifndef JANET_WINDOWS
|
||||||
pthread_attr_init(&janet_vm.new_thread_attr);
|
pthread_attr_init(&janet_vm.new_thread_attr);
|
||||||
pthread_attr_setdetachstate(&janet_vm.new_thread_attr, PTHREAD_CREATE_DETACHED);
|
pthread_attr_setdetachstate(&janet_vm.new_thread_attr, PTHREAD_CREATE_DETACHED);
|
||||||
@ -606,8 +607,8 @@ void janet_ev_deinit_common(void) {
|
|||||||
janet_free(janet_vm.tq);
|
janet_free(janet_vm.tq);
|
||||||
janet_table_deinit(&janet_vm.threaded_abstracts);
|
janet_table_deinit(&janet_vm.threaded_abstracts);
|
||||||
janet_table_deinit(&janet_vm.active_tasks);
|
janet_table_deinit(&janet_vm.active_tasks);
|
||||||
janet_table_deinit(&janet_vm.listeners);
|
|
||||||
janet_table_deinit(&janet_vm.signal_handlers);
|
janet_table_deinit(&janet_vm.signal_handlers);
|
||||||
|
janet_vm.listeners = NULL;
|
||||||
#ifndef JANET_WINDOWS
|
#ifndef JANET_WINDOWS
|
||||||
pthread_attr_destroy(&janet_vm.new_thread_attr);
|
pthread_attr_destroy(&janet_vm.new_thread_attr);
|
||||||
#endif
|
#endif
|
||||||
@ -1596,7 +1597,6 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
|
|||||||
int is_first = !stream->read_state && !stream->write_state;
|
int is_first = !stream->read_state && !stream->write_state;
|
||||||
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
|
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
|
||||||
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
|
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
|
||||||
state->index = 0;
|
|
||||||
struct epoll_event ev;
|
struct epoll_event ev;
|
||||||
ev.events = 0;
|
ev.events = 0;
|
||||||
if (stream->read_state) ev.events |= EPOLLIN;
|
if (stream->read_state) ev.events |= EPOLLIN;
|
||||||
@ -1614,7 +1614,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
|
|||||||
* event to a file. So we just post a custom event to do the read/write
|
* event to a file. So we just post a custom event to do the read/write
|
||||||
* asap. */
|
* asap. */
|
||||||
/* Use flag to indicate state is not registered in epoll */
|
/* Use flag to indicate state is not registered in epoll */
|
||||||
state->index = 1;
|
state->flags = 1;
|
||||||
JanetEVGenericMessage msg = {0};
|
JanetEVGenericMessage msg = {0};
|
||||||
msg.argp = state;
|
msg.argp = state;
|
||||||
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
|
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
|
||||||
@ -1632,7 +1632,7 @@ static void janet_unlisten(JanetListenerState *state) {
|
|||||||
JanetStream *stream = state->stream;
|
JanetStream *stream = state->stream;
|
||||||
if (stream && (stream->handle != -1)) {
|
if (stream && (stream->handle != -1)) {
|
||||||
/* Use flag to indicate state is not registered in epoll */
|
/* Use flag to indicate state is not registered in epoll */
|
||||||
if (!state->index) {
|
if (!state->flags) {
|
||||||
int is_read = (stream->read_state != state) && stream->read_state;
|
int is_read = (stream->read_state != state) && stream->read_state;
|
||||||
int is_write = (stream->write_state != state) && stream->write_state;
|
int is_write = (stream->write_state != state) && stream->write_state;
|
||||||
int is_last = !is_read && !is_write;
|
int is_last = !is_read && !is_write;
|
||||||
@ -1816,37 +1816,33 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
|
|||||||
length++;
|
length++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (length > 0) {
|
janet_assert(length, "expected to add kqueue events");
|
||||||
add_kqueue_events(kev, length);
|
add_kqueue_events(kev, length);
|
||||||
}
|
|
||||||
|
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void janet_unlisten(JanetListenerState *state) {
|
static void janet_unlisten(JanetListenerState *state) {
|
||||||
JanetStream *stream = state->stream;
|
JanetStream *stream = state->stream;
|
||||||
if (stream->handle != -1) {
|
if (stream && (stream->handle != -1)) {
|
||||||
/* Use flag to indicate state is not registered in kqueue */
|
int is_read = (stream->read_state != state) && stream->read_state;
|
||||||
if (!state->index) {
|
int is_write = (stream->write_state != state) && stream->write_state;
|
||||||
int is_read = (stream->read_state != state) && stream->read_state;
|
int is_last = !is_read && !is_write;
|
||||||
int is_write = (stream->write_state != state) && stream->write_state;
|
int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD;
|
||||||
int is_last = !is_read && !is_write;
|
struct kevent kev[2];
|
||||||
int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD;
|
EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
|
||||||
struct kevent kev[2];
|
|
||||||
EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
|
|
||||||
|
|
||||||
int length = 0;
|
int length = 0;
|
||||||
if (stream->read_state == state) {
|
if (stream->read_state == state) {
|
||||||
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
|
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
|
||||||
length++;
|
length++;
|
||||||
}
|
|
||||||
if (stream->write_state == state) {
|
|
||||||
EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream);
|
|
||||||
length++;
|
|
||||||
}
|
|
||||||
|
|
||||||
add_kqueue_events(kev, length);
|
|
||||||
}
|
}
|
||||||
|
if (stream->write_state == state) {
|
||||||
|
EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream);
|
||||||
|
length++;
|
||||||
|
}
|
||||||
|
|
||||||
|
add_kqueue_events(kev, length);
|
||||||
}
|
}
|
||||||
janet_unlisten_impl(state);
|
janet_unlisten_impl(state);
|
||||||
}
|
}
|
||||||
@ -1953,9 +1949,9 @@ static JanetTimestamp ts_now(void) {
|
|||||||
|
|
||||||
/* Wait for the next event */
|
/* Wait for the next event */
|
||||||
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
|
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
|
||||||
size_t oldsize = janet_vm.listeners.count_cap;
|
size_t oldsize = janet_vm.listeners->capacity;
|
||||||
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
|
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
|
||||||
size_t newsize = janet_vm.listener_cap;
|
size_t newsize = janet_vm.listeners->capacity;
|
||||||
if (newsize > oldsize) {
|
if (newsize > oldsize) {
|
||||||
janet_vm.fds = janet_realloc(janet_vm.fds, (newsize + 1) * sizeof(struct pollfd));
|
janet_vm.fds = janet_realloc(janet_vm.fds, (newsize + 1) * sizeof(struct pollfd));
|
||||||
if (NULL == janet_vm.fds) {
|
if (NULL == janet_vm.fds) {
|
||||||
@ -1965,15 +1961,17 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
|
|||||||
struct pollfd ev;
|
struct pollfd ev;
|
||||||
ev.fd = stream->handle;
|
ev.fd = stream->handle;
|
||||||
ev.events = 0;
|
ev.events = 0;
|
||||||
if (stream->read_state) events |= POLLIN;
|
if (stream->read_state) ev.events |= POLLIN;
|
||||||
if (stream->write_state) events |= POLLOUT;
|
if (stream->write_state) ev.events |= POLLOUT;
|
||||||
ev.revents = 0;
|
ev.revents = 0;
|
||||||
janet_vm.fds[state->_index + 1] = ev;
|
janet_vm.fds[state->index + 1] = ev;
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void janet_unlisten(JanetListenerState *state) {
|
static void janet_unlisten(JanetListenerState *state) {
|
||||||
janet_vm.fds[state->_index + 1] = janet_vm.fds[janet_vm.listener_count];
|
if (state->stream) {
|
||||||
|
janet_vm.fds[state->index + 1] = janet_vm.fds[janet_vm.listeners->count];
|
||||||
|
}
|
||||||
janet_unlisten_impl(state);
|
janet_unlisten_impl(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1986,7 +1984,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
|
|||||||
JanetTimestamp now = ts_now();
|
JanetTimestamp now = ts_now();
|
||||||
to = now > timeout ? 0 : (int)(timeout - now);
|
to = now > timeout ? 0 : (int)(timeout - now);
|
||||||
}
|
}
|
||||||
ready = poll(janet_vm.fds, janet_vm.listener_count + 1, to);
|
ready = poll(janet_vm.fds, janet_vm.listeners->count + 1, to);
|
||||||
} while (ready == -1 && errno == EINTR);
|
} while (ready == -1 && errno == EINTR);
|
||||||
if (ready == -1) {
|
if (ready == -1) {
|
||||||
JANET_EXIT("failed to poll events");
|
JANET_EXIT("failed to poll events");
|
||||||
@ -1999,10 +1997,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Step state machines */
|
/* Step state machines */
|
||||||
for (size_t i = 0; i < janet_vm.listener_count; i++) {
|
for (int32_t i = 0; i < janet_vm.listeners->count; i++) {
|
||||||
struct pollfd *pfd = janet_vm.fds + i + 1;
|
struct pollfd *pfd = janet_vm.fds + i + 1;
|
||||||
/* Skip fds where nothing interesting happened */
|
/* Skip fds where nothing interesting happened */
|
||||||
JanetListenerState *state = janet_vm.listeners[i];
|
JanetListenerState *state = (JanetListenerState *) janet_unwrap_abstract(janet_vm.listeners->data[i]);
|
||||||
/* Normal event */
|
/* Normal event */
|
||||||
int mask = pfd->revents;
|
int mask = pfd->revents;
|
||||||
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
|
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
|
||||||
|
@ -158,7 +158,7 @@ struct JanetVM {
|
|||||||
volatile size_t extra_listeners; /* used in signal handler, must be volatile */
|
volatile size_t extra_listeners; /* used in signal handler, must be volatile */
|
||||||
JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */
|
JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */
|
||||||
JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */
|
JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */
|
||||||
JanetTable listeners; /* For GC */
|
JanetArray *listeners; /* For GC */
|
||||||
JanetTable signal_handlers;
|
JanetTable signal_handlers;
|
||||||
#ifdef JANET_WINDOWS
|
#ifdef JANET_WINDOWS
|
||||||
void **iocp;
|
void **iocp;
|
||||||
|
@ -624,11 +624,11 @@ struct JanetListenerState {
|
|||||||
JanetStream *stream;
|
JanetStream *stream;
|
||||||
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
|
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
|
||||||
implementation of the event loop and the particular event. */
|
implementation of the event loop and the particular event. */
|
||||||
|
uint32_t index; /* Used for GC and poll implentation */
|
||||||
|
uint32_t flags;
|
||||||
#ifdef JANET_WINDOWS
|
#ifdef JANET_WINDOWS
|
||||||
void *tag; /* Used to associate listeners with an overlapped structure */
|
void *tag; /* Used to associate listeners with an overlapped structure */
|
||||||
int bytes; /* Used to track how many bytes were transfered. */
|
int bytes; /* Used to track how many bytes were transfered. */
|
||||||
#else
|
|
||||||
uint32_t index; /* Used for poll/epoll implentation */
|
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
Loading…
Reference in New Issue
Block a user