From 964a800d51493a8d34d4eb5e7984a8bbe161c829 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Tue, 6 Oct 2020 19:07:29 -0500 Subject: [PATCH] More work on windows event loop code. --- examples/select.janet | 5 +++-- src/core/ev.c | 15 ++++++++------- src/core/net.c | 26 +++++++++++++++----------- src/include/janet.h | 2 +- 4 files changed, 27 insertions(+), 21 deletions(-) diff --git a/examples/select.janet b/examples/select.janet index 98e3610d..a5dbc91b 100644 --- a/examples/select.janet +++ b/examples/select.janet @@ -3,9 +3,10 @@ (defn writer [c] (for i 0 3 + (def item (string i ":" (hash c))) (ev/sleep 0.1) - (print "writer giving item " i " to " c "...") - (ev/give c (string "item " i))) + (print "writer giving item " item " to " c "...") + (ev/give c item)) (print "Done!")) (defn reader [name] diff --git a/src/core/ev.c b/src/core/ev.c index dddc6625..4bd2ff6e 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -229,7 +229,7 @@ static void add_timeout(JanetTimeout to) { } /* Create a new event listener */ -static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { +static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { if (pollable->_mask & mask) { janet_panic("cannot listen for duplicate event on pollable"); } @@ -259,6 +259,7 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe state->_next = pollable->state; pollable->state = state; /* Emit INIT event for convenience */ + state->event = user; state->machine(state, JANET_ASYNC_EVENT_INIT); return state; } @@ -653,8 +654,8 @@ void janet_ev_deinit(void) { CloseHandle(janet_vm_iocp); } -JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { - JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size); +JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { + JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); /* TODO - associate IO operation with listener state somehow * maybe we could require encoding the operation in a mask. */ /* on windows, janet_listen does not actually start any listening behavior. */ @@ -747,10 +748,10 @@ static int make_epoll_events(int mask) { } /* Wait for the next event */ -JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { +JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { int is_first = !(pollable->state); int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size); + JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); struct epoll_event ev; ev.events = make_epoll_events(state->pollable->_mask); ev.data.ptr = pollable; @@ -916,8 +917,8 @@ static void janet_push_pollfd(struct pollfd pfd) { } /* Wait for the next event */ -JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { - JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size); +JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { + JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); struct pollfd ev; ev.fd = pollable->handle; ev.events = make_poll_events(state->pollable->_mask); diff --git a/src/core/net.c b/src/core/net.c index 4b5e7f07..cb37f889 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -167,6 +167,10 @@ typedef struct { JanetBuffer *buf; int is_chunk; int is_recv_from; +#ifdef JANET_WINDOWS + WSAOVERLAPPED overlapped; + uint8_t chunk_buf[2048]; +#endif } NetStateRead; JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) { @@ -181,10 +185,6 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) janet_cancel(s->fiber, janet_cstringv("stream closed")); return JANET_ASYNC_STATUS_DONE; #ifdef JANET_WINDOWS - case JANET_ASYNC_EVENT_INIT: { - /* Begin read */ - } - break; case JANET_ASYNC_EVENT_COMPLETE: { /* Called when read finished */ } @@ -250,17 +250,21 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); + JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL); state->is_chunk = 0; state->buf = buf; state->bytes_left = nbytes; state->is_recv_from = 0; +#ifdef JANET_WINDOWS + WSARecv((SOCKET) stream->handle, + +#endif janet_await(); } JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); + JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL); state->is_chunk = 1; state->buf = buf; state->bytes_left = nbytes; @@ -270,7 +274,7 @@ JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer * JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); + JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL); state->is_chunk = 0; state->buf = buf; state->bytes_left = nbytes; @@ -371,7 +375,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf, void *dest_abst) { NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, - JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite)); + JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL); state->is_buffer = 1; state->start = 0; state->src.buf = buf; @@ -382,7 +386,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str, void *dest_abst) { NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, - JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite)); + JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL); state->is_buffer = 0; state->start = 0; state->src.str = str; @@ -480,7 +484,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event } JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) { - janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept)); + janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL); janet_await(); } @@ -730,7 +734,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { /* Server with handler */ JanetStream *stream = make_stream(sfd, 0); NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, - JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer)); + JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer), NULL); ss->function = fun; return janet_wrap_abstract(stream); } diff --git a/src/include/janet.h b/src/include/janet.h index 0ca02b79..db4414bf 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1257,7 +1257,7 @@ JANET_API void janet_cancel(JanetFiber *fiber, Janet value); JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig); /* Start a state machine listening for events from a pollable */ -JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size); +JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user); /* Shorthand for yielding to event loop in C */ JANET_NO_RETURN JANET_API void janet_await(void);