More work on windows event loop code.

This commit is contained in:
Calvin Rose 2020-10-06 19:07:29 -05:00
parent 2e944931b3
commit 964a800d51
4 changed files with 27 additions and 21 deletions

View File

@ -3,9 +3,10 @@
(defn writer [c]
(for i 0 3
(def item (string i ":" (hash c)))
(ev/sleep 0.1)
(print "writer giving item " i " to " c "...")
(ev/give c (string "item " i)))
(print "writer giving item " item " to " c "...")
(ev/give c item))
(print "Done!"))
(defn reader [name]

View File

@ -229,7 +229,7 @@ static void add_timeout(JanetTimeout to) {
}
/* Create a new event listener */
static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) {
static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) {
if (pollable->_mask & mask) {
janet_panic("cannot listen for duplicate event on pollable");
}
@ -259,6 +259,7 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe
state->_next = pollable->state;
pollable->state = state;
/* Emit INIT event for convenience */
state->event = user;
state->machine(state, JANET_ASYNC_EVENT_INIT);
return state;
}
@ -653,8 +654,8 @@ void janet_ev_deinit(void) {
CloseHandle(janet_vm_iocp);
}
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) {
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size);
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) {
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user);
/* TODO - associate IO operation with listener state somehow
* maybe we could require encoding the operation in a mask. */
/* on windows, janet_listen does not actually start any listening behavior. */
@ -747,10 +748,10 @@ static int make_epoll_events(int mask) {
}
/* Wait for the next event */
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) {
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) {
int is_first = !(pollable->state);
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size);
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user);
struct epoll_event ev;
ev.events = make_epoll_events(state->pollable->_mask);
ev.data.ptr = pollable;
@ -916,8 +917,8 @@ static void janet_push_pollfd(struct pollfd pfd) {
}
/* Wait for the next event */
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) {
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size);
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) {
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user);
struct pollfd ev;
ev.fd = pollable->handle;
ev.events = make_poll_events(state->pollable->_mask);

View File

@ -167,6 +167,10 @@ typedef struct {
JanetBuffer *buf;
int is_chunk;
int is_recv_from;
#ifdef JANET_WINDOWS
WSAOVERLAPPED overlapped;
uint8_t chunk_buf[2048];
#endif
} NetStateRead;
JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
@ -181,10 +185,6 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_INIT: {
/* Begin read */
}
break;
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when read finished */
}
@ -250,17 +250,21 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead));
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
state->is_chunk = 0;
state->buf = buf;
state->bytes_left = nbytes;
state->is_recv_from = 0;
#ifdef JANET_WINDOWS
WSARecv((SOCKET) stream->handle,
#endif
janet_await();
}
JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead));
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
state->is_chunk = 1;
state->buf = buf;
state->bytes_left = nbytes;
@ -270,7 +274,7 @@ JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *
JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead));
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
state->is_chunk = 0;
state->buf = buf;
state->bytes_left = nbytes;
@ -371,7 +375,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf, void *dest_abst) {
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite));
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL);
state->is_buffer = 1;
state->start = 0;
state->src.buf = buf;
@ -382,7 +386,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB
JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str, void *dest_abst) {
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite));
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL);
state->is_buffer = 0;
state->start = 0;
state->src.str = str;
@ -480,7 +484,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
}
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) {
janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept));
janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
janet_await();
}
@ -730,7 +734,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
/* Server with handler */
JanetStream *stream = make_stream(sfd, 0);
NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server,
JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer));
JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer), NULL);
ss->function = fun;
return janet_wrap_abstract(stream);
}

View File

@ -1257,7 +1257,7 @@ JANET_API void janet_cancel(JanetFiber *fiber, Janet value);
JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig);
/* Start a state machine listening for events from a pollable */
JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size);
JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user);
/* Shorthand for yielding to event loop in C */
JANET_NO_RETURN JANET_API void janet_await(void);