1
0
mirror of https://github.com/janet-lang/janet synced 2024-09-27 14:48:13 +00:00

Partial work updating epoll reimplentation.

This commit is contained in:
Calvin Rose 2023-09-25 18:52:15 -07:00
parent 1ee98e1e66
commit fb8c529f2e
2 changed files with 26 additions and 16 deletions

View File

@ -260,7 +260,8 @@ static int janet_listener_mark(void *p, size_t s);
static const JanetAbstractType janet_listener_AT = { static const JanetAbstractType janet_listener_AT = {
"core/ev-listener", "core/ev-listener",
janet_listener_gc, janet_listener_gc,
janet_listener_mark janet_listener_mark,
JANET_ATEND_GCMARK
}; };
/* Create a new event listener */ /* Create a new event listener */
@ -559,7 +560,7 @@ void janet_ev_mark(void) {
} }
/* Listeners */ /* Listeners */
for (size_t i = 0; i < janet_vm.listeners.capacity; i++) { for (int32_t i = 0; i < janet_vm.listeners.capacity; i++) {
JanetKV *kv = janet_vm.listeners.data + i; JanetKV *kv = janet_vm.listeners.data + i;
if (!janet_checktype(kv->key, JANET_NIL)) { if (!janet_checktype(kv->key, JANET_NIL)) {
janet_mark(kv->key); janet_mark(kv->key);
@ -1586,9 +1587,9 @@ static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
JanetListenerState *state = msg.argp; JanetListenerState *state = msg.argp;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE) if (state == state->stream->read_state)
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (state->stream->_mask & JANET_ASYNC_LISTEN_READ) if (state == state->stream->write_state)
status2 = state->machine(state, JANET_ASYNC_EVENT_READ); status2 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (status1 == JANET_ASYNC_STATUS_DONE || if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE) { status2 == JANET_ASYNC_STATUS_DONE) {
@ -1601,11 +1602,13 @@ static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
/* Wait for the next event */ /* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
int is_first = !(stream->state); int is_first = !stream->read_state && !stream->write_state;
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
struct epoll_event ev; struct epoll_event ev;
ev.events = make_epoll_events(state->stream->_mask); ev.events = 0;
if (stream->read_state) ev.events |= EPOLLIN;
if (stream->write_state) ev.events |= EPOLLOUT;
ev.data.ptr = stream; ev.data.ptr = stream;
int status; int status;
do { do {
@ -1619,7 +1622,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
* event to a file. So we just post a custom event to do the read/write * event to a file. So we just post a custom event to do the read/write
* asap. */ * asap. */
/* Use flag to indicate state is not registered in epoll */ /* Use flag to indicate state is not registered in epoll */
state->_mask |= (1 << JANET_ASYNC_EVENT_COMPLETE); state->index = 1;
JanetEVGenericMessage msg = {0}; JanetEVGenericMessage msg = {0};
msg.argp = state; msg.argp = state;
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg); janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
@ -1629,6 +1632,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
janet_panicv(janet_ev_lasterr()); janet_panicv(janet_ev_lasterr());
} }
} }
state->index = 0;
return state; return state;
} }
@ -1637,11 +1641,15 @@ static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream; JanetStream *stream = state->stream;
if (!(stream->handle != -1)) { if (!(stream->handle != -1)) {
/* Use flag to indicate state is not registered in epoll */ /* Use flag to indicate state is not registered in epoll */
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) { if (!state->index) {
int is_last = (state->_next == NULL && stream->state == state); int is_read = stream->read_state != state && stream->read_state;
int is_write = stream->write_state != state && stream->write_state;
int is_last = !is_read && !is_write;
int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
struct epoll_event ev; struct epoll_event ev;
ev.events = make_epoll_events(stream->_mask & ~state->_mask); ev.events = 0;
if (is_read) ev.events |= EPOLLIN;
if (is_write) ev.events |= EPOLLOUT;
ev.data.ptr = stream; ev.data.ptr = stream;
int status; int status;
do { do {
@ -1690,10 +1698,12 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
} else { } else {
JanetStream *stream = p; JanetStream *stream = p;
int mask = events[i].events; int mask = events[i].events;
JanetListenerState *state = stream->state; JanetListenerState *state = stream->read_state;
while (NULL != state) { JanetListenerState *states[2] = {stream->read_state, stream->write_state};
for (int j = 0; j < 2; j++) {
JanetListenerState *state = states[j];
if (!state) continue;
state->event = events + i; state->event = events + i;
JanetListenerState *next_state = state->_next;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
@ -1709,9 +1719,9 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (status1 == JANET_ASYNC_STATUS_DONE || if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE || status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE) status4 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state); janet_unlisten(state);
state = next_state; }
} }
janet_stream_checktoclose(stream); janet_stream_checktoclose(stream);
} }

View File

@ -628,7 +628,7 @@ struct JanetListenerState {
void *tag; /* Used to associate listeners with an overlapped structure */ void *tag; /* Used to associate listeners with an overlapped structure */
int bytes; /* Used to track how many bytes were transfered. */ int bytes; /* Used to track how many bytes were transfered. */
#else #else
uint32_t index; /* Used for poll implentation */ uint32_t index; /* Used for poll/epoll implentation */
#endif #endif
}; };
#endif #endif