1
0
mirror of https://github.com/janet-lang/janet synced 2025-01-25 22:56:52 +00:00

Patches to kqueue implementation.

This commit is contained in:
Calvin Rose 2023-09-25 19:07:18 -07:00
parent fb8c529f2e
commit a6ffafb1a2

View File

@ -1642,8 +1642,8 @@ static void janet_unlisten(JanetListenerState *state) {
if (!(stream->handle != -1)) { if (!(stream->handle != -1)) {
/* Use flag to indicate state is not registered in epoll */ /* Use flag to indicate state is not registered in epoll */
if (!state->index) { if (!state->index) {
int is_read = stream->read_state != state && stream->read_state; int is_read = (stream->read_state != state) && stream->read_state;
int is_write = stream->write_state != state && stream->write_state; int is_write = (stream->write_state != state) && stream->write_state;
int is_last = !is_read && !is_write; int is_last = !is_read && !is_write;
int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
struct epoll_event ev; struct epoll_event ev;
@ -1698,7 +1698,6 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
} else { } else {
JanetStream *stream = p; JanetStream *stream = p;
int mask = events[i].events; int mask = events[i].events;
JanetListenerState *state = stream->read_state;
JanetListenerState *states[2] = {stream->read_state, stream->write_state}; JanetListenerState *states[2] = {stream->read_state, stream->write_state};
for (int j = 0; j < 2; j++) { for (int j = 0; j < 2; j++) {
JanetListenerState *state = states[j]; JanetListenerState *state = states[j];
@ -1837,8 +1836,10 @@ static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream; JanetStream *stream = state->stream;
if (stream->handle != -1) { if (stream->handle != -1) {
/* Use flag to indicate state is not registered in kqueue */ /* Use flag to indicate state is not registered in kqueue */
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) { if (!state->index) {
int is_last = (state->_next == NULL && stream->state == state); int is_read = (stream->read_state != state) && stream->read_state;
int is_write = (stream->write_state != state) && stream->write_state;
int is_last = !is_read && !is_write;
int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD; int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD;
struct kevent kev[2]; struct kevent kev[2];
EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream); EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
@ -1896,14 +1897,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
janet_ev_handle_selfpipe(); janet_ev_handle_selfpipe();
} else { } else {
JanetStream *stream = p; JanetStream *stream = p;
JanetListenerState *state = stream->state; JanetListenerState *states[2] = {stream->read_state, stream->write_state};
while (NULL != state) { for (int j = 0; j < 2; j++) {
JanetListenerState *next_state = state->_next; JanetListenerState *state = states[j];
if (!state) continue;
state->event = events + i; state->event = events + i;
JanetAsyncStatus statuses[4]; JanetAsyncStatus statuses[4];
for (int i = 0; i < 4; i++) for (int i = 0; i < 4; i++)
statuses[i] = JANET_ASYNC_STATUS_NOT_DONE; statuses[i] = JANET_ASYNC_STATUS_NOT_DONE;
if (!(events[i].flags & EV_ERROR)) { if (!(events[i].flags & EV_ERROR)) {
if (events[i].filter == EVFILT_WRITE) if (events[i].filter == EVFILT_WRITE)
statuses[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE); statuses[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE);
@ -1920,7 +1921,6 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
statuses[3] == JANET_ASYNC_STATUS_DONE) { statuses[3] == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0); janet_unlisten(state, 0);
} }
state = next_state;
} }
janet_stream_checktoclose(stream); janet_stream_checktoclose(stream);
} }