From db67538311b660a284ce6b009c4c4e8491891275 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 4 Oct 2020 12:46:15 -0500 Subject: [PATCH 1/3] Work on getting ev support on windows. --- examples/evsleep.janet | 12 +++++ src/core/ev.c | 117 +++++++++++++++++++++++++++++++++++++---- src/core/net.c | 71 ++++++++++++++++++++++--- src/include/janet.h | 19 +++---- 4 files changed, 191 insertions(+), 28 deletions(-) create mode 100644 examples/evsleep.janet diff --git a/examples/evsleep.janet b/examples/evsleep.janet new file mode 100644 index 00000000..9ceeef0c --- /dev/null +++ b/examples/evsleep.janet @@ -0,0 +1,12 @@ +(defn worker + "Run for a number of iterations." + [name iterations] + (for i 0 iterations + (ev/sleep 1) + (print "worker " name " iteration " i))) + +(ev/call worker :a 10) +(ev/sleep 0.2) +(ev/call worker :b 5) +(ev/sleep 0.3) +(ev/call worker :c 12) diff --git a/src/core/ev.c b/src/core/ev.c index e0dabf52..ec79079b 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -33,6 +33,12 @@ /* Includes */ +#ifdef JANET_WINDOWS + +#include + +#else + #include #include #include @@ -50,6 +56,8 @@ #include #endif +#endif + /* General queue */ /* Ring buffer for storing a list of fibers */ @@ -95,22 +103,22 @@ static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) { int32_t newhead = q->head + (newcap - q->capacity); size_t seg1 = (size_t)(q->capacity - q->head); if (seg1 > 0) { - memmove(q->data + (newhead * itemsize), - q->data + (q->head * itemsize), + memmove((char *) q->data + (newhead * itemsize), + (char *) q->data + (q->head * itemsize), seg1 * itemsize); } q->head = newhead; } q->capacity = newcap; } - memcpy(q->data + itemsize * q->tail, item, itemsize); + memcpy((char *) q->data + itemsize * q->tail, item, itemsize); q->tail = q->tail + 1 < q->capacity ? q->tail + 1 : 0; return 0; } static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) { if (q->head == q->tail) return 1; - memcpy(out, q->data + itemsize * q->head, itemsize); + memcpy(out, (char *) q->data + itemsize * q->head, itemsize); q->head = q->head + 1 < q->capacity ? q->head + 1 : 0; return 0; } @@ -165,7 +173,7 @@ static void pop_timeout(size_t index) { if (janet_vm_tq_count <= index) return; janet_vm_tq[index].fiber->timeout_index = -1; janet_vm_tq[index] = janet_vm_tq[--janet_vm_tq_count]; - janet_vm_tq[index].fiber->timeout_index = index; + janet_vm_tq[index].fiber->timeout_index = (int32_t) index; for (;;) { size_t left = (index << 1) + 1; size_t right = left + 1; @@ -180,8 +188,8 @@ static void pop_timeout(size_t index) { JanetTimeout temp = janet_vm_tq[index]; janet_vm_tq[index] = janet_vm_tq[smallest]; janet_vm_tq[smallest] = temp; - janet_vm_tq[index].fiber->timeout_index = index; - janet_vm_tq[smallest].fiber->timeout_index = smallest; + janet_vm_tq[index].fiber->timeout_index = (int32_t) index; + janet_vm_tq[smallest].fiber->timeout_index = (int32_t) smallest; index = smallest; } } @@ -200,14 +208,14 @@ static void add_timeout(JanetTimeout to) { janet_vm_tq_capacity = newcap; } /* Append */ - janet_vm_tq_count = newcount; + janet_vm_tq_count = (int32_t) newcount; janet_vm_tq[oldcount] = to; /* Heapify */ size_t index = oldcount; if (to.fiber->timeout_index >= 0) { pop_timeout(to.fiber->timeout_index); } - to.fiber->timeout_index = index; + to.fiber->timeout_index = (int32_t) index; while (index > 0) { size_t parent = (index - 1) >> 1; if (janet_vm_tq[parent].when <= janet_vm_tq[index].when) break; @@ -277,7 +285,7 @@ static void janet_unlisten_impl(JanetListenerState *state) { } /* Call after creating a pollable */ -void janet_pollable_init(JanetPollable *pollable, JanetPollType handle) { +void janet_pollable_init(JanetPollable *pollable, JanetHandle handle) { pollable->handle = handle; pollable->flags = 0; pollable->state = NULL; @@ -625,7 +633,92 @@ void janet_loop(void) { } } -#ifdef JANET_EV_EPOLL +#ifdef JANET_WINDOWS + +/* Epoll global data */ +JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL; + +static JanetTimestamp ts_now(void) { + FILETIME ftime; + GetSystemTimeAsFileTime(&ftime); + return (uint64_t)(ftime.dwLowDateTime) | ((uint64_t)(ftime.dwHighDateTime) << 32); +} + +void janet_ev_init(void) { + janet_ev_init_common(); + janet_vm_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + if (NULL == janet_vm_iocp) janet_panic("could not create io completion port"); +} + +void janet_ev_deinit(void) { + janet_ev_deinit_common(); + CloseHandle(janet_vm_iocp); +} + +JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { + JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size); + /* TODO - associate IO operation with listener state somehow + * maybe we could require encoding the operation in a mask. */ + /* on windows, janet_listen does not actually start any listening behavior. */ + return state; +} + + +static void janet_unlisten(JanetListenerState *state) { + /* We don't necessarily want to cancel all io on this pollable */ + janet_unlisten_impl(state); +} + + +void janet_loop1_impl(void) { + JanetTimeout to; + memset(&to, 0, sizeof(to)); + int has_timeout = peek_timeout(&to); + ULONG_PTR completionKey = 0; + DWORD num_bytes_transfered = 0; + LPOVERLAPPED overlapped; + + /* Calculate how long to wait before timeout */ + uint64_t waittime; + if (has_timeout) { + JanetTimestamp now = ts_now(); + if (now > to.when) { + waittime = 0; + } else { + waittime = (uint64_t) (to.when - now); + } + } else { + waittime = INFINITE; + } + BOOL result = GetQueuedCompletionStatus(janet_vm_iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime); + + if (!result) { + /* timeout ? */ + if (has_timeout) { + /* Timer event */ + pop_timeout(0); + /* Cancel waiters for this fiber */ + if (to.is_error) { + janet_cancel(to.fiber, janet_cstringv("timeout")); + } else { + janet_schedule(to.fiber, janet_wrap_nil()); + } + } else { + JANET_EXIT("failed to get iocp GetQueuedCompletionStatus"); + } + } else { + JANET_EXIT("Unexpected event"); + /* Normal event */ + JanetListenerState *state = (JanetListenerState *) completionKey; + state->event = overlapped; + JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE); + if (status == JANET_ASYNC_STATUS_DONE) + janet_unlisten(state); + } +} + + +#elif defined(JANET_EV_POLL) /* * Start linux/epoll implementation @@ -734,6 +827,7 @@ void janet_loop1_impl(void) { /* Normal event */ int mask = events[i].events; JanetListenerState *state = pollable->state; + state->event = events + i; while (NULL != state) { JanetListenerState *next_state = state->_next; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; @@ -877,6 +971,7 @@ void janet_loop1_impl(void) { int mask = janet_vm_fds[i].revents; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; + state->event = pfd; if (mask & POLLOUT) status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); if (mask & POLLIN) diff --git a/src/core/net.c b/src/core/net.c index 26fa663d..4b5e7f07 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -75,7 +75,7 @@ static const JanetAbstractType AddressAT = { }; #ifdef JANET_WINDOWS -#define JSOCKCLOSE(x) closesocket(x) +#define JSOCKCLOSE(x) closesocket((SOCKET) x) #define JSOCKDEFAULT INVALID_SOCKET #define JLASTERR WSAGetLastError() #define JSOCKVALID(x) ((x) != INVALID_SOCKET) @@ -89,7 +89,7 @@ static const JanetAbstractType AddressAT = { static JanetStream *make_stream(SOCKET fd, uint32_t flags) { u_long iMode = 0; JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); - janet_pollable_init(stream, fd); + janet_pollable_init(stream, (JanetHandle) fd); ioctlsocket(fd, FIONBIO, &iMode); stream->flags = flags; return stream; @@ -180,6 +180,16 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) case JANET_ASYNC_EVENT_CLOSE: janet_cancel(s->fiber, janet_cstringv("stream closed")); return JANET_ASYNC_STATUS_DONE; +#ifdef JANET_WINDOWS + case JANET_ASYNC_EVENT_INIT: { + /* Begin read */ + } + break; + case JANET_ASYNC_EVENT_COMPLETE: { + /* Called when read finished */ + } + break; +#else case JANET_ASYNC_EVENT_READ: /* Read in bytes */ { @@ -233,6 +243,7 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) } } break; +#endif } return JANET_ASYNC_STATUS_NOT_DONE; } @@ -298,6 +309,17 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) case JANET_ASYNC_EVENT_CLOSE: janet_cancel(s->fiber, janet_cstringv("stream closed")); return JANET_ASYNC_STATUS_DONE; +#ifdef JANET_WINDOWS + case JANET_ASYNC_EVENT_INIT: { + /* Begin write */ + } + break; + case JANET_ASYNC_EVENT_COMPLETE: { + + /* Called when write finished */ + } + break; +#else case JANET_ASYNC_EVENT_WRITE: { int32_t start, len; const uint8_t *bytes; @@ -342,6 +364,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) break; } break; +#endif } return JANET_ASYNC_STATUS_NOT_DONE; } @@ -384,6 +407,9 @@ JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEven case JANET_ASYNC_EVENT_INIT: /* We know the pollable will be a stream */ janet_gcroot(janet_wrap_abstract(s->pollable)); +#ifdef JANET_WINDOWS + /* requires some more setup code */ +#endif break; case JANET_ASYNC_EVENT_MARK: janet_mark(janet_wrap_function(state->function)); @@ -391,6 +417,12 @@ JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEven case JANET_ASYNC_EVENT_CLOSE: janet_gcunroot(janet_wrap_abstract(s->pollable)); return JANET_ASYNC_STATUS_DONE; +#ifdef JANET_WINDOWS + case JANET_ASYNC_EVENT_COMPLETE: { + /* Called when ever we get an IOCP event */ + } + break; +#else case JANET_ASYNC_EVENT_READ: { JSock connfd = accept(s->pollable->handle, NULL, NULL); if (JSOCKVALID(connfd)) { @@ -403,6 +435,7 @@ JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEven } break; } +#endif } return JANET_ASYNC_STATUS_NOT_DONE; } @@ -420,6 +453,16 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event case JANET_ASYNC_EVENT_CLOSE: janet_cancel(s->fiber, janet_cstringv("stream closed")); return JANET_ASYNC_STATUS_DONE; +#ifdef JANET_WINDOWS + case JANET_ASYNC_EVENT_INIT: { + + } + break; + case JANET_ASYNC_EVENT_COMPLETE: { + + } + break; +#else case JANET_ASYNC_EVENT_READ: { JSock connfd = accept(s->pollable->handle, NULL, NULL); if (JSOCKVALID(connfd)) { @@ -431,6 +474,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event } break; } +#endif } return JANET_ASYNC_STATUS_NOT_DONE; } @@ -456,7 +500,8 @@ static int janet_get_sockettype(Janet *argv, int32_t argc, int32_t n) { /* Needs argc >= offset + 2 */ /* For unix paths, just rertuns a single sockaddr and sets *is_unix to 1, otherwise 0 */ static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset, int socktype, int passive, int *is_unix) { - /* Unix socket support */ + /* Unix socket support - not yet supported on windows. */ +#ifndef JANET_WINDOWS if (janet_keyeq(argv[offset], "unix")) { const char *path = janet_getcstring(argv, offset + 1); struct sockaddr_un *saddr = malloc(sizeof(struct sockaddr_un)); @@ -468,6 +513,7 @@ static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset, int sock *is_unix = 1; return (struct addrinfo *) saddr; } +#endif /* Get host and port */ const char *host = janet_getcstring(argv, offset); const char *port; @@ -501,12 +547,15 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) { int is_unix = 0; int make_arr = (argc >= 3 && janet_truthy(argv[3])); struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 0, &is_unix); +#ifndef JANET_WINDOWS + /* no unix domain socket support on windows yet */ if (is_unix) { void *abst = janet_abstract(&AddressAT, sizeof(struct sockaddr_un)); memcpy(abst, ai, sizeof(struct sockaddr_un)); Janet ret = janet_wrap_abstract(abst); return make_arr ? janet_wrap_array(janet_array_n(&ret, 1)) : ret; } +#endif if (make_arr) { /* Select all */ JanetArray *arr = janet_array(10); @@ -542,6 +591,7 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { JSock sock = JSOCKDEFAULT; void *addr = NULL; socklen_t addrlen; +#ifndef JANET_WINDOWS if (is_unix) { sock = socket(AF_UNIX, socktype | JSOCKFLAGS, 0); if (!JSOCKVALID(sock)) { @@ -549,13 +599,15 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { } addr = (void *) ai; addrlen = sizeof(struct sockaddr_un); - } else { + } else +#endif + { struct addrinfo *rp = NULL; for (rp = ai; rp != NULL; rp = rp->ai_next) { sock = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol); if (JSOCKVALID(sock)) { addr = rp->ai_addr; - addrlen = rp->ai_addrlen; + addrlen = (socklen_t) rp->ai_addrlen; break; } } @@ -610,6 +662,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 1, &is_unix); JSock sfd = JSOCKDEFAULT; +#ifndef JANET_WINDOWS if (is_unix) { sfd = socket(AF_UNIX, socktype | JSOCKFLAGS, 0); if (!JSOCKVALID(sfd)) { @@ -623,7 +676,9 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { janet_panic(err ? err : "could not bind socket"); } free(ai); - } else { + } else +#endif + { /* Check all addrinfos in a loop for the first that we can bind to. */ struct addrinfo *rp = NULL; for (rp = ai; rp != NULL; rp = rp->ai_next) { @@ -779,9 +834,9 @@ static Janet cfun_stream_flush(int32_t argc, Janet *argv) { check_stream_flag(stream, JANET_STREAM_WRITABLE); /* Toggle no delay flag */ int flag = 1; - setsockopt(stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); + setsockopt((JSock) stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); flag = 0; - setsockopt(stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); + setsockopt((JSock) stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); return argv[0]; } diff --git a/src/include/janet.h b/src/include/janet.h index 287e78d4..0ca02b79 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -509,7 +509,8 @@ typedef enum { JANET_ASYNC_EVENT_CLOSE, JANET_ASYNC_EVENT_READ, JANET_ASYNC_EVENT_WRITE, - JANET_ASYNC_EVENT_TIMEOUT + JANET_ASYNC_EVENT_TIMEOUT, + JANET_ASYNC_EVENT_COMPLETE /* Used on windows for IOCP */ } JanetAsyncEvent; #define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ) @@ -522,21 +523,19 @@ typedef enum { } JanetAsyncStatus; /* Typedefs */ -#ifdef JANET_WINDOWS -typedef HANDLE JanetPollType; -#else -typedef int JanetPollType; -#endif typedef struct JanetListenerState JanetListenerState; typedef struct JanetPollable JanetPollable; typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEvent event); /* Wrapper around file descriptors and HANDLEs that can be polled. */ struct JanetPollable { - JanetPollType handle; + JanetHandle handle; uint32_t flags; + /* Linked list of all in-flight IO routines for this pollable */ JanetListenerState *state; - /* internal */ + /* internal - used to disallow multiple concurrent reads / writes on the same pollable. + * this constraint may be lifted later but allowing such would require more internal book keeping + * for some implementations. You can read and write at the same time on the same pollable, though. */ int _mask; }; @@ -545,6 +544,8 @@ struct JanetListenerState { JanetListener machine; JanetFiber *fiber; JanetPollable *pollable; + void *event; /* Used to pass data from asynchronous IO event. Contents depend on both + implementation of the event loop and the particular event. */ /* internal */ int _index; /* not used in all implementations */ int _mask; @@ -1246,7 +1247,7 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT]; JANET_API void janet_loop(void); /* Wrapper around pollables */ -JANET_API void janet_pollable_init(JanetPollable *pollable, JanetPollType handle); +JANET_API void janet_pollable_init(JanetPollable *pollable, JanetHandle handle); JANET_API void janet_pollable_mark(JanetPollable *pollable); JANET_API void janet_pollable_deinit(JanetPollable *pollable); From 2e944931b358881915ff0ab1f7087fac1311e9bf Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 4 Oct 2020 15:18:31 -0500 Subject: [PATCH 2/3] Update timestamp function to work on windows. --- src/core/ev.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index ec79079b..dddc6625 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -68,7 +68,7 @@ typedef struct { void *data; } JanetQueue; -#define JANET_MAX_Q_CAPACITY 0x7FFFFFFF +#define JANET_MAX_Q_CAPACITY 0x7FFFFFF static void janet_q_init(JanetQueue *q) { q->data = NULL; @@ -132,7 +132,7 @@ struct JanetTask { }; /* Min priority queue of timestamps for timeouts. */ -typedef uint64_t JanetTimestamp; +typedef int64_t JanetTimestamp; typedef struct JanetTimeout JanetTimeout; struct JanetTimeout { JanetTimestamp when; @@ -639,9 +639,7 @@ void janet_loop(void) { JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL; static JanetTimestamp ts_now(void) { - FILETIME ftime; - GetSystemTimeAsFileTime(&ftime); - return (uint64_t)(ftime.dwLowDateTime) | ((uint64_t)(ftime.dwHighDateTime) << 32); + return (JanetTimestamp) GetTickCount64(); } void janet_ev_init(void) { @@ -671,9 +669,11 @@ static void janet_unlisten(JanetListenerState *state) { void janet_loop1_impl(void) { + /* Check for timeout */ JanetTimeout to; memset(&to, 0, sizeof(to)); int has_timeout = peek_timeout(&to); + ULONG_PTR completionKey = 0; DWORD num_bytes_transfered = 0; LPOVERLAPPED overlapped; From 964a800d51493a8d34d4eb5e7984a8bbe161c829 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Tue, 6 Oct 2020 19:07:29 -0500 Subject: [PATCH 3/3] More work on windows event loop code. --- examples/select.janet | 5 +++-- src/core/ev.c | 15 ++++++++------- src/core/net.c | 26 +++++++++++++++----------- src/include/janet.h | 2 +- 4 files changed, 27 insertions(+), 21 deletions(-) diff --git a/examples/select.janet b/examples/select.janet index 98e3610d..a5dbc91b 100644 --- a/examples/select.janet +++ b/examples/select.janet @@ -3,9 +3,10 @@ (defn writer [c] (for i 0 3 + (def item (string i ":" (hash c))) (ev/sleep 0.1) - (print "writer giving item " i " to " c "...") - (ev/give c (string "item " i))) + (print "writer giving item " item " to " c "...") + (ev/give c item)) (print "Done!")) (defn reader [name] diff --git a/src/core/ev.c b/src/core/ev.c index dddc6625..4bd2ff6e 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -229,7 +229,7 @@ static void add_timeout(JanetTimeout to) { } /* Create a new event listener */ -static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { +static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { if (pollable->_mask & mask) { janet_panic("cannot listen for duplicate event on pollable"); } @@ -259,6 +259,7 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe state->_next = pollable->state; pollable->state = state; /* Emit INIT event for convenience */ + state->event = user; state->machine(state, JANET_ASYNC_EVENT_INIT); return state; } @@ -653,8 +654,8 @@ void janet_ev_deinit(void) { CloseHandle(janet_vm_iocp); } -JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { - JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size); +JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { + JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); /* TODO - associate IO operation with listener state somehow * maybe we could require encoding the operation in a mask. */ /* on windows, janet_listen does not actually start any listening behavior. */ @@ -747,10 +748,10 @@ static int make_epoll_events(int mask) { } /* Wait for the next event */ -JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { +JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { int is_first = !(pollable->state); int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size); + JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); struct epoll_event ev; ev.events = make_epoll_events(state->pollable->_mask); ev.data.ptr = pollable; @@ -916,8 +917,8 @@ static void janet_push_pollfd(struct pollfd pfd) { } /* Wait for the next event */ -JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { - JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size); +JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { + JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); struct pollfd ev; ev.fd = pollable->handle; ev.events = make_poll_events(state->pollable->_mask); diff --git a/src/core/net.c b/src/core/net.c index 4b5e7f07..cb37f889 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -167,6 +167,10 @@ typedef struct { JanetBuffer *buf; int is_chunk; int is_recv_from; +#ifdef JANET_WINDOWS + WSAOVERLAPPED overlapped; + uint8_t chunk_buf[2048]; +#endif } NetStateRead; JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) { @@ -181,10 +185,6 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) janet_cancel(s->fiber, janet_cstringv("stream closed")); return JANET_ASYNC_STATUS_DONE; #ifdef JANET_WINDOWS - case JANET_ASYNC_EVENT_INIT: { - /* Begin read */ - } - break; case JANET_ASYNC_EVENT_COMPLETE: { /* Called when read finished */ } @@ -250,17 +250,21 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); + JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL); state->is_chunk = 0; state->buf = buf; state->bytes_left = nbytes; state->is_recv_from = 0; +#ifdef JANET_WINDOWS + WSARecv((SOCKET) stream->handle, + +#endif janet_await(); } JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); + JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL); state->is_chunk = 1; state->buf = buf; state->bytes_left = nbytes; @@ -270,7 +274,7 @@ JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer * JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); + JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL); state->is_chunk = 0; state->buf = buf; state->bytes_left = nbytes; @@ -371,7 +375,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf, void *dest_abst) { NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, - JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite)); + JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL); state->is_buffer = 1; state->start = 0; state->src.buf = buf; @@ -382,7 +386,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str, void *dest_abst) { NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, - JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite)); + JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL); state->is_buffer = 0; state->start = 0; state->src.str = str; @@ -480,7 +484,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event } JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) { - janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept)); + janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL); janet_await(); } @@ -730,7 +734,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { /* Server with handler */ JanetStream *stream = make_stream(sfd, 0); NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, - JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer)); + JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer), NULL); ss->function = fun; return janet_wrap_abstract(stream); } diff --git a/src/include/janet.h b/src/include/janet.h index 0ca02b79..db4414bf 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1257,7 +1257,7 @@ JANET_API void janet_cancel(JanetFiber *fiber, Janet value); JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig); /* Start a state machine listening for events from a pollable */ -JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size); +JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user); /* Shorthand for yielding to event loop in C */ JANET_NO_RETURN JANET_API void janet_await(void);