1
0
mirror of https://github.com/janet-lang/janet synced 2025-01-13 00:50:26 +00:00

Merge branch 'windows-ev' into ev

This commit is contained in:
Calvin Rose 2020-10-11 09:32:17 -05:00
commit 813e3fdcfd
6 changed files with 195 additions and 44 deletions

12
examples/evsleep.janet Normal file
View File

@ -0,0 +1,12 @@
(defn worker
"Run for a number of iterations."
[name iterations]
(for i 0 iterations
(ev/sleep 1)
(print "worker " name " iteration " i)))
(ev/call worker :a 10)
(ev/sleep 0.2)
(ev/call worker :b 5)
(ev/sleep 0.3)
(ev/call worker :c 12)

View File

@ -3,14 +3,15 @@
(defn writer [c] (defn writer [c]
(for i 0 3 (for i 0 3
(def item (string i ":" (mod (hash c) 999)))
(ev/sleep 0.1) (ev/sleep 0.1)
(print "writer giving item " i " to " c "...") (print "writer giving item " item " to " c "...")
(ev/give c (string "item " i))) (ev/give c item))
(print "Done!")) (print "Done!"))
(defn reader [name] (defn reader [name]
(forever (forever
(def [_ c x] (ev/select ;channels)) (def [_ c x] (ev/rselect ;channels))
(print "reader " name " got " x " from " c))) (print "reader " name " got " x " from " c)))
# Readers # Readers

View File

@ -70,7 +70,7 @@
/* #define JANET_STACK_MAX 16384 */ /* #define JANET_STACK_MAX 16384 */
/* #define JANET_OS_NAME my-custom-os */ /* #define JANET_OS_NAME my-custom-os */
/* #define JANET_ARCH_NAME pdp-8 */ /* #define JANET_ARCH_NAME pdp-8 */
/* #define JANET_EV_EPOLL */ #define JANET_EV_EPOLL
/* Main client settings, does not affect library code */ /* Main client settings, does not affect library code */
/* #define JANET_SIMPLE_GETLINE */ /* #define JANET_SIMPLE_GETLINE */

View File

@ -33,6 +33,12 @@
/* Includes */ /* Includes */
#ifdef JANET_WINDOWS
#include <windows.h>
#else
#include <limits.h> #include <limits.h>
#include <errno.h> #include <errno.h>
#include <unistd.h> #include <unistd.h>
@ -50,6 +56,8 @@
#include <sys/timerfd.h> #include <sys/timerfd.h>
#endif #endif
#endif
/* General queue */ /* General queue */
/* Ring buffer for storing a list of fibers */ /* Ring buffer for storing a list of fibers */
@ -60,7 +68,7 @@ typedef struct {
void *data; void *data;
} JanetQueue; } JanetQueue;
#define JANET_MAX_Q_CAPACITY 0x7FFFFFFF #define JANET_MAX_Q_CAPACITY 0x7FFFFFF
static void janet_q_init(JanetQueue *q) { static void janet_q_init(JanetQueue *q) {
q->data = NULL; q->data = NULL;
@ -95,22 +103,22 @@ static int janet_q_push(JanetQueue *q, void *item, size_t itemsize) {
int32_t newhead = q->head + (newcap - q->capacity); int32_t newhead = q->head + (newcap - q->capacity);
size_t seg1 = (size_t)(q->capacity - q->head); size_t seg1 = (size_t)(q->capacity - q->head);
if (seg1 > 0) { if (seg1 > 0) {
memmove(q->data + (newhead * itemsize), memmove((char *) q->data + (newhead * itemsize),
q->data + (q->head * itemsize), (char *) q->data + (q->head * itemsize),
seg1 * itemsize); seg1 * itemsize);
} }
q->head = newhead; q->head = newhead;
} }
q->capacity = newcap; q->capacity = newcap;
} }
memcpy(q->data + itemsize * q->tail, item, itemsize); memcpy((char *) q->data + itemsize * q->tail, item, itemsize);
q->tail = q->tail + 1 < q->capacity ? q->tail + 1 : 0; q->tail = q->tail + 1 < q->capacity ? q->tail + 1 : 0;
return 0; return 0;
} }
static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) { static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) {
if (q->head == q->tail) return 1; if (q->head == q->tail) return 1;
memcpy(out, q->data + itemsize * q->head, itemsize); memcpy(out, (char *) q->data + itemsize * q->head, itemsize);
q->head = q->head + 1 < q->capacity ? q->head + 1 : 0; q->head = q->head + 1 < q->capacity ? q->head + 1 : 0;
return 0; return 0;
} }
@ -124,7 +132,7 @@ struct JanetTask {
}; };
/* Min priority queue of timestamps for timeouts. */ /* Min priority queue of timestamps for timeouts. */
typedef uint64_t JanetTimestamp; typedef int64_t JanetTimestamp;
typedef struct JanetTimeout JanetTimeout; typedef struct JanetTimeout JanetTimeout;
struct JanetTimeout { struct JanetTimeout {
JanetTimestamp when; JanetTimestamp when;
@ -197,7 +205,7 @@ static void add_timeout(JanetTimeout to) {
janet_vm_tq_capacity = newcap; janet_vm_tq_capacity = newcap;
} }
/* Append */ /* Append */
janet_vm_tq_count = newcount; janet_vm_tq_count = (int32_t) newcount;
janet_vm_tq[oldcount] = to; janet_vm_tq[oldcount] = to;
/* Heapify */ /* Heapify */
size_t index = oldcount; size_t index = oldcount;
@ -214,7 +222,7 @@ static void add_timeout(JanetTimeout to) {
} }
/* Create a new event listener */ /* Create a new event listener */
static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) {
if (pollable->_mask & mask) { if (pollable->_mask & mask) {
janet_panic("cannot listen for duplicate event on pollable"); janet_panic("cannot listen for duplicate event on pollable");
} }
@ -244,6 +252,7 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe
state->_next = pollable->state; state->_next = pollable->state;
pollable->state = state; pollable->state = state;
/* Emit INIT event for convenience */ /* Emit INIT event for convenience */
state->event = user;
state->machine(state, JANET_ASYNC_EVENT_INIT); state->machine(state, JANET_ASYNC_EVENT_INIT);
return state; return state;
} }
@ -270,7 +279,7 @@ static void janet_unlisten_impl(JanetListenerState *state) {
} }
/* Call after creating a pollable */ /* Call after creating a pollable */
void janet_pollable_init(JanetPollable *pollable, JanetPollType handle) { void janet_pollable_init(JanetPollable *pollable, JanetHandle handle) {
pollable->handle = handle; pollable->handle = handle;
pollable->flags = 0; pollable->flags = 0;
pollable->state = NULL; pollable->state = NULL;
@ -697,8 +706,75 @@ void janet_loop(void) {
janet_loop1(); janet_loop1();
} }
} }
;
#ifdef JANET_EV_EPOLL #ifdef JANET_WINDOWS
/* Epoll global data */
JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL;
static JanetTimestamp ts_now(void) {
return (JanetTimestamp) GetTickCount64();
}
void janet_ev_init(void) {
janet_ev_init_common();
janet_vm_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (NULL == janet_vm_iocp) janet_panic("could not create io completion port");
}
void janet_ev_deinit(void) {
janet_ev_deinit_common();
CloseHandle(janet_vm_iocp);
}
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) {
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user);
/* TODO - associate IO operation with listener state somehow
* maybe we could require encoding the operation in a mask. */
/* on windows, janet_listen does not actually start any listening behavior. */
return state;
}
static void janet_unlisten(JanetListenerState *state) {
/* We don't necessarily want to cancel all io on this pollable */
janet_unlisten_impl(state);
}
void janet_loop1_impl(int has_timeout, JanetTimeout timeout) {
ULONG_PTR completionKey = 0;
DWORD num_bytes_transfered = 0;
LPOVERLAPPED overlapped;
/* Calculate how long to wait before timeout */
uint64_t waittime;
if (has_timeout) {
JanetTimestamp now = ts_now();
if (now > to.when) {
waittime = 0;
} else {
waittime = (uint64_t) (to.when - now);
}
} else {
waittime = INFINITE;
}
BOOL result = GetQueuedCompletionStatus(janet_vm_iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime);
if (!result) {
if (!has_timeout) {
JANET_EXIT("failed to get iocp GetQueuedCompletionStatus");
}
} else {
/* Normal event */
JanetListenerState *state = (JanetListenerState *) completionKey;
state->event = overlapped;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state);
}
}
#elif defined(JANET_EV_POLL)
/* /*
* Start linux/epoll implementation * Start linux/epoll implementation
@ -727,10 +803,10 @@ static int make_epoll_events(int mask) {
} }
/* Wait for the next event */ /* Wait for the next event */
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) {
int is_first = !(pollable->state); int is_first = !(pollable->state);
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size); JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user);
struct epoll_event ev; struct epoll_event ev;
ev.events = make_epoll_events(state->pollable->_mask); ev.events = make_epoll_events(state->pollable->_mask);
ev.data.ptr = pollable; ev.data.ptr = pollable;
@ -792,6 +868,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (NULL != pollable) { /* If NULL, is a timeout */ if (NULL != pollable) { /* If NULL, is a timeout */
int mask = events[i].events; int mask = events[i].events;
JanetListenerState *state = pollable->state; JanetListenerState *state = pollable->state;
state->event = events + i;
while (NULL != state) { while (NULL != state) {
JanetListenerState *next_state = state->_next; JanetListenerState *next_state = state->_next;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
@ -880,8 +957,8 @@ static void janet_push_pollfd(struct pollfd pfd) {
} }
/* Wait for the next event */ /* Wait for the next event */
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) {
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size); JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user);
struct pollfd ev; struct pollfd ev;
ev.fd = pollable->handle; ev.fd = pollable->handle;
ev.events = make_poll_events(state->pollable->_mask); ev.events = make_poll_events(state->pollable->_mask);
@ -928,6 +1005,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
int mask = janet_vm_fds[i].revents; int mask = janet_vm_fds[i].revents;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
state->event = pfd;
if (mask & POLLOUT) if (mask & POLLOUT)
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (mask & POLLIN) if (mask & POLLIN)

View File

@ -75,7 +75,7 @@ static const JanetAbstractType AddressAT = {
}; };
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
#define JSOCKCLOSE(x) closesocket(x) #define JSOCKCLOSE(x) closesocket((SOCKET) x)
#define JSOCKDEFAULT INVALID_SOCKET #define JSOCKDEFAULT INVALID_SOCKET
#define JLASTERR WSAGetLastError() #define JLASTERR WSAGetLastError()
#define JSOCKVALID(x) ((x) != INVALID_SOCKET) #define JSOCKVALID(x) ((x) != INVALID_SOCKET)
@ -89,7 +89,7 @@ static const JanetAbstractType AddressAT = {
static JanetStream *make_stream(SOCKET fd, uint32_t flags) { static JanetStream *make_stream(SOCKET fd, uint32_t flags) {
u_long iMode = 0; u_long iMode = 0;
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
janet_pollable_init(stream, fd); janet_pollable_init(stream, (JanetHandle) fd);
ioctlsocket(fd, FIONBIO, &iMode); ioctlsocket(fd, FIONBIO, &iMode);
stream->flags = flags; stream->flags = flags;
return stream; return stream;
@ -167,6 +167,10 @@ typedef struct {
JanetBuffer *buf; JanetBuffer *buf;
int is_chunk; int is_chunk;
int is_recv_from; int is_recv_from;
#ifdef JANET_WINDOWS
WSAOVERLAPPED overlapped;
uint8_t chunk_buf[2048];
#endif
} NetStateRead; } NetStateRead;
JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) { JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
@ -180,6 +184,12 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
case JANET_ASYNC_EVENT_CLOSE: case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed")); janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when read finished */
}
break;
#else
case JANET_ASYNC_EVENT_READ: case JANET_ASYNC_EVENT_READ:
/* Read in bytes */ /* Read in bytes */
{ {
@ -233,23 +243,28 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
} }
} }
break; break;
#endif
} }
return JANET_ASYNC_STATUS_NOT_DONE; return JANET_ASYNC_STATUS_NOT_DONE;
} }
JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
state->is_chunk = 0; state->is_chunk = 0;
state->buf = buf; state->buf = buf;
state->bytes_left = nbytes; state->bytes_left = nbytes;
state->is_recv_from = 0; state->is_recv_from = 0;
#ifdef JANET_WINDOWS
WSARecv((SOCKET) stream->handle,
#endif
janet_await(); janet_await();
} }
JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
state->is_chunk = 1; state->is_chunk = 1;
state->buf = buf; state->buf = buf;
state->bytes_left = nbytes; state->bytes_left = nbytes;
@ -259,7 +274,7 @@ JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *
JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
state->is_chunk = 0; state->is_chunk = 0;
state->buf = buf; state->buf = buf;
state->bytes_left = nbytes; state->bytes_left = nbytes;
@ -298,6 +313,17 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
case JANET_ASYNC_EVENT_CLOSE: case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed")); janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_INIT: {
/* Begin write */
}
break;
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when write finished */
}
break;
#else
case JANET_ASYNC_EVENT_WRITE: { case JANET_ASYNC_EVENT_WRITE: {
int32_t start, len; int32_t start, len;
const uint8_t *bytes; const uint8_t *bytes;
@ -342,13 +368,14 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
break; break;
} }
break; break;
#endif
} }
return JANET_ASYNC_STATUS_NOT_DONE; return JANET_ASYNC_STATUS_NOT_DONE;
} }
JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf, void *dest_abst) { JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf, void *dest_abst) {
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite)); JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL);
state->is_buffer = 1; state->is_buffer = 1;
state->start = 0; state->start = 0;
state->src.buf = buf; state->src.buf = buf;
@ -359,7 +386,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB
JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str, void *dest_abst) { JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str, void *dest_abst) {
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite)); JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL);
state->is_buffer = 0; state->is_buffer = 0;
state->start = 0; state->start = 0;
state->src.str = str; state->src.str = str;
@ -384,6 +411,9 @@ JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEven
case JANET_ASYNC_EVENT_INIT: case JANET_ASYNC_EVENT_INIT:
/* We know the pollable will be a stream */ /* We know the pollable will be a stream */
janet_gcroot(janet_wrap_abstract(s->pollable)); janet_gcroot(janet_wrap_abstract(s->pollable));
#ifdef JANET_WINDOWS
/* requires some more setup code */
#endif
break; break;
case JANET_ASYNC_EVENT_MARK: case JANET_ASYNC_EVENT_MARK:
janet_mark(janet_wrap_function(state->function)); janet_mark(janet_wrap_function(state->function));
@ -391,6 +421,12 @@ JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEven
case JANET_ASYNC_EVENT_CLOSE: case JANET_ASYNC_EVENT_CLOSE:
janet_gcunroot(janet_wrap_abstract(s->pollable)); janet_gcunroot(janet_wrap_abstract(s->pollable));
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when ever we get an IOCP event */
}
break;
#else
case JANET_ASYNC_EVENT_READ: { case JANET_ASYNC_EVENT_READ: {
JSock connfd = accept(s->pollable->handle, NULL, NULL); JSock connfd = accept(s->pollable->handle, NULL, NULL);
if (JSOCKVALID(connfd)) { if (JSOCKVALID(connfd)) {
@ -403,6 +439,7 @@ JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEven
} }
break; break;
} }
#endif
} }
return JANET_ASYNC_STATUS_NOT_DONE; return JANET_ASYNC_STATUS_NOT_DONE;
} }
@ -420,6 +457,16 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
case JANET_ASYNC_EVENT_CLOSE: case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed")); janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_INIT: {
}
break;
case JANET_ASYNC_EVENT_COMPLETE: {
}
break;
#else
case JANET_ASYNC_EVENT_READ: { case JANET_ASYNC_EVENT_READ: {
JSock connfd = accept(s->pollable->handle, NULL, NULL); JSock connfd = accept(s->pollable->handle, NULL, NULL);
if (JSOCKVALID(connfd)) { if (JSOCKVALID(connfd)) {
@ -431,12 +478,13 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
} }
break; break;
} }
#endif
} }
return JANET_ASYNC_STATUS_NOT_DONE; return JANET_ASYNC_STATUS_NOT_DONE;
} }
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) { JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) {
janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept)); janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
janet_await(); janet_await();
} }
@ -456,7 +504,8 @@ static int janet_get_sockettype(Janet *argv, int32_t argc, int32_t n) {
/* Needs argc >= offset + 2 */ /* Needs argc >= offset + 2 */
/* For unix paths, just rertuns a single sockaddr and sets *is_unix to 1, otherwise 0 */ /* For unix paths, just rertuns a single sockaddr and sets *is_unix to 1, otherwise 0 */
static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset, int socktype, int passive, int *is_unix) { static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset, int socktype, int passive, int *is_unix) {
/* Unix socket support */ /* Unix socket support - not yet supported on windows. */
#ifndef JANET_WINDOWS
if (janet_keyeq(argv[offset], "unix")) { if (janet_keyeq(argv[offset], "unix")) {
const char *path = janet_getcstring(argv, offset + 1); const char *path = janet_getcstring(argv, offset + 1);
struct sockaddr_un *saddr = malloc(sizeof(struct sockaddr_un)); struct sockaddr_un *saddr = malloc(sizeof(struct sockaddr_un));
@ -468,6 +517,7 @@ static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset, int sock
*is_unix = 1; *is_unix = 1;
return (struct addrinfo *) saddr; return (struct addrinfo *) saddr;
} }
#endif
/* Get host and port */ /* Get host and port */
const char *host = janet_getcstring(argv, offset); const char *host = janet_getcstring(argv, offset);
const char *port; const char *port;
@ -501,12 +551,15 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) {
int is_unix = 0; int is_unix = 0;
int make_arr = (argc >= 3 && janet_truthy(argv[3])); int make_arr = (argc >= 3 && janet_truthy(argv[3]));
struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 0, &is_unix); struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 0, &is_unix);
#ifndef JANET_WINDOWS
/* no unix domain socket support on windows yet */
if (is_unix) { if (is_unix) {
void *abst = janet_abstract(&AddressAT, sizeof(struct sockaddr_un)); void *abst = janet_abstract(&AddressAT, sizeof(struct sockaddr_un));
memcpy(abst, ai, sizeof(struct sockaddr_un)); memcpy(abst, ai, sizeof(struct sockaddr_un));
Janet ret = janet_wrap_abstract(abst); Janet ret = janet_wrap_abstract(abst);
return make_arr ? janet_wrap_array(janet_array_n(&ret, 1)) : ret; return make_arr ? janet_wrap_array(janet_array_n(&ret, 1)) : ret;
} }
#endif
if (make_arr) { if (make_arr) {
/* Select all */ /* Select all */
JanetArray *arr = janet_array(10); JanetArray *arr = janet_array(10);
@ -542,6 +595,7 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
JSock sock = JSOCKDEFAULT; JSock sock = JSOCKDEFAULT;
void *addr = NULL; void *addr = NULL;
socklen_t addrlen; socklen_t addrlen;
#ifndef JANET_WINDOWS
if (is_unix) { if (is_unix) {
sock = socket(AF_UNIX, socktype | JSOCKFLAGS, 0); sock = socket(AF_UNIX, socktype | JSOCKFLAGS, 0);
if (!JSOCKVALID(sock)) { if (!JSOCKVALID(sock)) {
@ -549,13 +603,15 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
} }
addr = (void *) ai; addr = (void *) ai;
addrlen = sizeof(struct sockaddr_un); addrlen = sizeof(struct sockaddr_un);
} else { } else
#endif
{
struct addrinfo *rp = NULL; struct addrinfo *rp = NULL;
for (rp = ai; rp != NULL; rp = rp->ai_next) { for (rp = ai; rp != NULL; rp = rp->ai_next) {
sock = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol); sock = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol);
if (JSOCKVALID(sock)) { if (JSOCKVALID(sock)) {
addr = rp->ai_addr; addr = rp->ai_addr;
addrlen = rp->ai_addrlen; addrlen = (socklen_t) rp->ai_addrlen;
break; break;
} }
} }
@ -610,6 +666,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 1, &is_unix); struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 1, &is_unix);
JSock sfd = JSOCKDEFAULT; JSock sfd = JSOCKDEFAULT;
#ifndef JANET_WINDOWS
if (is_unix) { if (is_unix) {
sfd = socket(AF_UNIX, socktype | JSOCKFLAGS, 0); sfd = socket(AF_UNIX, socktype | JSOCKFLAGS, 0);
if (!JSOCKVALID(sfd)) { if (!JSOCKVALID(sfd)) {
@ -623,7 +680,9 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
janet_panic(err ? err : "could not bind socket"); janet_panic(err ? err : "could not bind socket");
} }
free(ai); free(ai);
} else { } else
#endif
{
/* Check all addrinfos in a loop for the first that we can bind to. */ /* Check all addrinfos in a loop for the first that we can bind to. */
struct addrinfo *rp = NULL; struct addrinfo *rp = NULL;
for (rp = ai; rp != NULL; rp = rp->ai_next) { for (rp = ai; rp != NULL; rp = rp->ai_next) {
@ -675,7 +734,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
/* Server with handler */ /* Server with handler */
JanetStream *stream = make_stream(sfd, 0); JanetStream *stream = make_stream(sfd, 0);
NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server,
JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer)); JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer), NULL);
ss->function = fun; ss->function = fun;
return janet_wrap_abstract(stream); return janet_wrap_abstract(stream);
} }
@ -779,9 +838,9 @@ static Janet cfun_stream_flush(int32_t argc, Janet *argv) {
check_stream_flag(stream, JANET_STREAM_WRITABLE); check_stream_flag(stream, JANET_STREAM_WRITABLE);
/* Toggle no delay flag */ /* Toggle no delay flag */
int flag = 1; int flag = 1;
setsockopt(stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); setsockopt((JSock) stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
flag = 0; flag = 0;
setsockopt(stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); setsockopt((JSock) stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
return argv[0]; return argv[0];
} }

View File

@ -509,7 +509,8 @@ typedef enum {
JANET_ASYNC_EVENT_CLOSE, JANET_ASYNC_EVENT_CLOSE,
JANET_ASYNC_EVENT_READ, JANET_ASYNC_EVENT_READ,
JANET_ASYNC_EVENT_WRITE, JANET_ASYNC_EVENT_WRITE,
JANET_ASYNC_EVENT_TIMEOUT JANET_ASYNC_EVENT_TIMEOUT,
JANET_ASYNC_EVENT_COMPLETE /* Used on windows for IOCP */
} JanetAsyncEvent; } JanetAsyncEvent;
#define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ) #define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ)
@ -522,21 +523,19 @@ typedef enum {
} JanetAsyncStatus; } JanetAsyncStatus;
/* Typedefs */ /* Typedefs */
#ifdef JANET_WINDOWS
typedef HANDLE JanetPollType;
#else
typedef int JanetPollType;
#endif
typedef struct JanetListenerState JanetListenerState; typedef struct JanetListenerState JanetListenerState;
typedef struct JanetPollable JanetPollable; typedef struct JanetPollable JanetPollable;
typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEvent event); typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEvent event);
/* Wrapper around file descriptors and HANDLEs that can be polled. */ /* Wrapper around file descriptors and HANDLEs that can be polled. */
struct JanetPollable { struct JanetPollable {
JanetPollType handle; JanetHandle handle;
uint32_t flags; uint32_t flags;
/* Linked list of all in-flight IO routines for this pollable */
JanetListenerState *state; JanetListenerState *state;
/* internal */ /* internal - used to disallow multiple concurrent reads / writes on the same pollable.
* this constraint may be lifted later but allowing such would require more internal book keeping
* for some implementations. You can read and write at the same time on the same pollable, though. */
int _mask; int _mask;
}; };
@ -545,6 +544,8 @@ struct JanetListenerState {
JanetListener machine; JanetListener machine;
JanetFiber *fiber; JanetFiber *fiber;
JanetPollable *pollable; JanetPollable *pollable;
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
implementation of the event loop and the particular event. */
/* internal */ /* internal */
int _index; /* not used in all implementations */ int _index; /* not used in all implementations */
int _mask; int _mask;
@ -1245,7 +1246,7 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT];
JANET_API void janet_loop(void); JANET_API void janet_loop(void);
/* Wrapper around pollables */ /* Wrapper around pollables */
JANET_API void janet_pollable_init(JanetPollable *pollable, JanetPollType handle); JANET_API void janet_pollable_init(JanetPollable *pollable, JanetHandle handle);
JANET_API void janet_pollable_mark(JanetPollable *pollable); JANET_API void janet_pollable_mark(JanetPollable *pollable);
JANET_API void janet_pollable_deinit(JanetPollable *pollable); JANET_API void janet_pollable_deinit(JanetPollable *pollable);
@ -1255,7 +1256,7 @@ JANET_API void janet_cancel(JanetFiber *fiber, Janet value);
JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig); JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig);
/* Start a state machine listening for events from a pollable */ /* Start a state machine listening for events from a pollable */
JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size); JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user);
/* Shorthand for yielding to event loop in C */ /* Shorthand for yielding to event loop in C */
JANET_NO_RETURN JANET_API void janet_await(void); JANET_NO_RETURN JANET_API void janet_await(void);