mirror of
https://github.com/janet-lang/janet
synced 2026-04-08 07:51:26 +00:00
Compare commits
19 Commits
ev
...
consolidat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d199c817dc | ||
|
|
dc51bd09f7 | ||
|
|
139e3fab25 | ||
|
|
7a98f9aa02 | ||
|
|
b53dd67e74 | ||
|
|
e546731093 | ||
|
|
d50c4ef6da | ||
|
|
7d0b1955a2 | ||
|
|
16cf7681f0 | ||
|
|
12f09ad2d7 | ||
|
|
b3e88a8d80 | ||
|
|
761273bcc4 | ||
|
|
1a75f68cb2 | ||
|
|
1b0edf54f1 | ||
|
|
caa6576719 | ||
|
|
93bd2c11fa | ||
|
|
2be09790a9 | ||
|
|
bf6eae711a | ||
|
|
69b68c0091 |
@@ -2,7 +2,13 @@
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
## Unreleased - ???
|
||||
- Add `janet_thread_current(void)` to C API
|
||||
- Add integer parsing forms to pegs. This makes parsing many binary protocols easier.
|
||||
- Lots of updates to networking code - now can use epoll (or poll) on linux and IOCP on windows.
|
||||
- Add `ev/` module. This exposes a fiber scheduler, queues, timeouts, and other functionality to users
|
||||
for single threaded cooperative scheduling and asynchornous IO.
|
||||
- Add `net/accept-loop` and `net/listen`. These functions break down `net/server` into it's essential parts
|
||||
and are more flexible. They also allow furter improvements to these utility functions.
|
||||
|
||||
## 1.12.2 - 2020-09-20
|
||||
- Add janet\_try and janet\_restore to C API.
|
||||
|
||||
@@ -2733,15 +2733,24 @@
|
||||
(if (dyn sym)
|
||||
form))
|
||||
|
||||
(guarddef ev/go
|
||||
(defmacro ev/spawn
|
||||
"Run some code in a new fiber. This is shorthand for (ev/call (fn [] ;body))."
|
||||
[& body]
|
||||
~(,ev/call (fn [] ,;body))))
|
||||
|
||||
(guarddef net/listen
|
||||
(defn net/server
|
||||
"Start a server asynchornously with net/listen and net/accept-loop. Returns the new server stream."
|
||||
[host port &opt handler type]
|
||||
(def s (net/listen host port type))
|
||||
(if handler
|
||||
(ev/go (fn [] (net/accept-loop s handler))))
|
||||
(ev/call (fn [] (net/accept-loop s handler))))
|
||||
s))
|
||||
|
||||
(guarddef ev/close
|
||||
(defn net/close "Alias for ev/close." [stream] (ev/close stream)))
|
||||
|
||||
(undef guarddef)
|
||||
|
||||
###
|
||||
@@ -2751,7 +2760,7 @@
|
||||
###
|
||||
|
||||
(defn- no-side-effects
|
||||
"Check if form may have side effects. If returns true, then the src
|
||||
"Check if form may have side effects. If rturns true, then the src
|
||||
must not have side effects, such as calling a C function."
|
||||
[src]
|
||||
(cond
|
||||
|
||||
848
src/core/ev.c
848
src/core/ev.c
File diff suppressed because it is too large
Load Diff
@@ -404,6 +404,7 @@ void janet_collect(void) {
|
||||
#ifdef JANET_EV
|
||||
janet_ev_mark();
|
||||
#endif
|
||||
janet_mark_fiber(janet_vm_root_fiber);
|
||||
for (i = 0; i < orig_rootcount; i++)
|
||||
janet_mark(janet_vm_roots[i]);
|
||||
while (orig_rootcount < janet_vm_root_count) {
|
||||
|
||||
582
src/core/net.c
582
src/core/net.c
@@ -28,6 +28,7 @@
|
||||
|
||||
#ifdef JANET_NET
|
||||
|
||||
#include <math.h>
|
||||
#ifdef JANET_WINDOWS
|
||||
#include <winsock2.h>
|
||||
#include <windows.h>
|
||||
@@ -47,127 +48,38 @@
|
||||
#include <netinet/tcp.h>
|
||||
#include <netdb.h>
|
||||
#include <fcntl.h>
|
||||
#include <math.h>
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Streams - simple abstract type that wraps a pollable + extra flags
|
||||
*/
|
||||
|
||||
#define JANET_STREAM_READABLE 0x200
|
||||
#define JANET_STREAM_WRITABLE 0x400
|
||||
#define JANET_STREAM_ACCEPTABLE 0x800
|
||||
#define JANET_STREAM_UDPSERVER 0x1000
|
||||
|
||||
static int janet_stream_close(void *p, size_t s);
|
||||
static int janet_stream_mark(void *p, size_t s);
|
||||
static int janet_stream_getter(void *p, Janet key, Janet *out);
|
||||
static const JanetAbstractType StreamAT = {
|
||||
"core/stream",
|
||||
janet_stream_close,
|
||||
janet_stream_mark,
|
||||
janet_stream_getter,
|
||||
JANET_ATEND_GET
|
||||
};
|
||||
|
||||
typedef JanetPollable JanetStream;
|
||||
|
||||
static const JanetAbstractType AddressAT = {
|
||||
const JanetAbstractType janet_address_type = {
|
||||
"core/socket-address",
|
||||
JANET_ATEND_NAME
|
||||
};
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
#define JANET_NET_CHUNKSIZE 4096
|
||||
#define JSOCKCLOSE(x) closesocket((SOCKET) x)
|
||||
#define JSOCKDEFAULT INVALID_SOCKET
|
||||
#define JLASTERR WSAGetLastError()
|
||||
#define JSOCKVALID(x) ((x) != INVALID_SOCKET)
|
||||
#define JEINTR WSAEINTR
|
||||
#define JEWOULDBLOCK WSAEWOULDBLOCK
|
||||
#define JEAGAIN WSAEWOULDBLOCK
|
||||
#define JPOLL WSAPoll
|
||||
#define JSock SOCKET
|
||||
#define JReadInt long
|
||||
#define JSOCKFLAGS 0
|
||||
static JanetStream *make_stream(SOCKET fd, uint32_t flags) {
|
||||
u_long iMode = 0;
|
||||
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
|
||||
janet_pollable_init(stream, (JanetHandle) fd);
|
||||
ioctlsocket(fd, FIONBIO, &iMode);
|
||||
stream->flags = flags;
|
||||
return stream;
|
||||
}
|
||||
static Janet net_lasterr(void) {
|
||||
int code = WSAGetLastError();
|
||||
char msgbuf[256];
|
||||
msgbuf[0] = '\0';
|
||||
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
|
||||
NULL,
|
||||
code,
|
||||
MAKELANGID (LANG_NEUTRAL, SUBLANG_DEFAULT),
|
||||
msgbuf,
|
||||
sizeof (msgbuf),
|
||||
NULL);
|
||||
if (!*msgbuf) sprintf(msgbuf, "%d", code);
|
||||
char *c = msgbuf;
|
||||
while (*c) {
|
||||
if (*c == '\n' || *c == '\r') {
|
||||
*c = '\0';
|
||||
break;
|
||||
}
|
||||
c++;
|
||||
}
|
||||
return janet_cstringv(msgbuf);
|
||||
}
|
||||
#else
|
||||
#define JSOCKCLOSE(x) close(x)
|
||||
#define JSOCKDEFAULT 0
|
||||
#define JLASTERR errno
|
||||
#define JSOCKVALID(x) ((x) >= 0)
|
||||
#define JEINTR EINTR
|
||||
#define JEWOULDBLOCK EWOULDBLOCK
|
||||
#define JEAGAIN EAGAIN
|
||||
#define JPOLL poll
|
||||
#define JSock int
|
||||
#define JReadInt ssize_t
|
||||
#ifdef SOCK_CLOEXEC
|
||||
#define JSOCKFLAGS SOCK_CLOEXEC
|
||||
#else
|
||||
#define JSOCKFLAGS 0
|
||||
#endif
|
||||
static JanetStream *make_stream(int fd, uint32_t flags) {
|
||||
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
|
||||
janet_pollable_init(stream, fd);
|
||||
#if !defined(SOCK_CLOEXEC) && defined(O_CLOEXEC)
|
||||
int extra = O_CLOEXEC;
|
||||
#else
|
||||
int extra = 0;
|
||||
#endif
|
||||
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK | extra);
|
||||
stream->flags = flags;
|
||||
return stream;
|
||||
}
|
||||
static Janet net_lasterr(void) {
|
||||
return janet_cstringv(strerror(errno));
|
||||
}
|
||||
#endif
|
||||
|
||||
static JanetStream *make_stream(JSock handle, uint32_t flags);
|
||||
|
||||
/* We pass this flag to all send calls to prevent sigpipe */
|
||||
#ifndef MSG_NOSIGNAL
|
||||
#define MSG_NOSIGNAL 0
|
||||
#endif
|
||||
|
||||
static int janet_stream_close(void *p, size_t s) {
|
||||
(void) s;
|
||||
JanetStream *stream = p;
|
||||
if (!(stream->flags & JANET_POLL_FLAG_CLOSED)) {
|
||||
JSOCKCLOSE(stream->handle);
|
||||
janet_pollable_deinit(stream);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void nosigpipe(JSock s) {
|
||||
#ifdef SO_NOSIGPIPE
|
||||
int enable = 1;
|
||||
@@ -180,358 +92,6 @@ static void nosigpipe(JSock s) {
|
||||
#endif
|
||||
}
|
||||
|
||||
static int janet_stream_mark(void *p, size_t s) {
|
||||
(void) s;
|
||||
janet_pollable_mark((JanetPollable *) p);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* State machine for read
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
int32_t bytes_left;
|
||||
JanetBuffer *buf;
|
||||
int is_chunk;
|
||||
int is_recv_from;
|
||||
#ifdef JANET_WINDOWS
|
||||
WSAOVERLAPPED overlapped;
|
||||
WSABUF wbuf;
|
||||
DWORD flags;
|
||||
int32_t chunk_size;
|
||||
struct sockaddr from;
|
||||
int fromlen;
|
||||
uint8_t chunk_buf[JANET_NET_CHUNKSIZE];
|
||||
#endif
|
||||
} NetStateRead;
|
||||
|
||||
JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
|
||||
NetStateRead *state = (NetStateRead *) s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK:
|
||||
janet_mark(janet_wrap_buffer(state->buf));
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_cancel(s->fiber, janet_cstringv("stream closed"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
#ifdef JANET_WINDOWS
|
||||
case JANET_ASYNC_EVENT_COMPLETE: {
|
||||
/* Called when read finished */
|
||||
if (s->bytes == 0 && !state->is_recv_from) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes);
|
||||
state->bytes_left -= s->bytes;
|
||||
|
||||
if (state->bytes_left <= 0 || !state->is_chunk) {
|
||||
Janet resume_val;
|
||||
if (state->is_recv_from) {
|
||||
void *abst = janet_abstract(&AddressAT, state->fromlen);
|
||||
memcpy(abst, &state->from, state->fromlen);
|
||||
resume_val = janet_wrap_abstract(abst);
|
||||
} else {
|
||||
resume_val = janet_wrap_buffer(state->buf);
|
||||
}
|
||||
janet_schedule(s->fiber, resume_val);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
|
||||
/* fallthrough */
|
||||
case JANET_ASYNC_EVENT_USER:
|
||||
{
|
||||
state->flags = 0;
|
||||
int32_t chunk_size = state->bytes_left > JANET_NET_CHUNKSIZE ? JANET_NET_CHUNKSIZE : state->bytes_left;
|
||||
state->wbuf.len = (ULONG) chunk_size;
|
||||
state->wbuf.buf = state->chunk_buf;
|
||||
state->chunk_size = chunk_size;
|
||||
s->tag = &state->overlapped;
|
||||
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
|
||||
int status;
|
||||
if (state->is_recv_from) {
|
||||
status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
|
||||
} else {
|
||||
status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL);
|
||||
}
|
||||
if (status && WSA_IO_PENDING != WSAGetLastError()) {
|
||||
janet_cancel(s->fiber, net_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
#else
|
||||
case JANET_ASYNC_EVENT_READ:
|
||||
/* Read in bytes */
|
||||
{
|
||||
JanetBuffer *buffer = state->buf;
|
||||
int32_t bytes_left = state->bytes_left;
|
||||
janet_buffer_extra(buffer, bytes_left);
|
||||
JReadInt nread;
|
||||
char saddr[256];
|
||||
socklen_t socklen = sizeof(saddr);
|
||||
do {
|
||||
if (state->is_recv_from) {
|
||||
nread = recvfrom(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0,
|
||||
(struct sockaddr *)&saddr, &socklen);
|
||||
} else {
|
||||
nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0);
|
||||
}
|
||||
} while (nread == -1 && JLASTERR == JEINTR);
|
||||
|
||||
/* Check for errors - special case errors that can just be waited on to fix */
|
||||
if (nread == -1) {
|
||||
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
|
||||
janet_cancel(s->fiber, net_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
|
||||
if (nread == 0 && !state->is_recv_from) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Increment buffer counts */
|
||||
if (nread > 0) {
|
||||
buffer->count += nread;
|
||||
bytes_left -= nread;
|
||||
} else {
|
||||
bytes_left = 0;
|
||||
}
|
||||
state->bytes_left = bytes_left;
|
||||
|
||||
/* Resume if done */
|
||||
if (!state->is_chunk || bytes_left == 0) {
|
||||
Janet resume_val;
|
||||
if (state->is_recv_from) {
|
||||
void *abst = janet_abstract(&AddressAT, socklen);
|
||||
memcpy(abst, &saddr, socklen);
|
||||
resume_val = janet_wrap_abstract(abst);
|
||||
} else {
|
||||
resume_val = janet_wrap_buffer(buffer);
|
||||
}
|
||||
janet_schedule(s->fiber, resume_val);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
#endif
|
||||
}
|
||||
return JANET_ASYNC_STATUS_NOT_DONE;
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
|
||||
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
|
||||
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
|
||||
state->is_chunk = 0;
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
state->is_recv_from = 0;
|
||||
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
|
||||
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
|
||||
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
|
||||
state->is_chunk = 1;
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
state->is_recv_from = 0;
|
||||
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
|
||||
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
|
||||
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
|
||||
state->is_chunk = 0;
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
state->is_recv_from = 1;
|
||||
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
/*
|
||||
* State machine for write/send-to
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
union {
|
||||
JanetBuffer *buf;
|
||||
const uint8_t *str;
|
||||
} src;
|
||||
int32_t start;
|
||||
int is_buffer;
|
||||
void *dest_abst;
|
||||
#ifdef JANET_WINDOWS
|
||||
WSAOVERLAPPED overlapped;
|
||||
WSABUF wbuf;
|
||||
DWORD flags;
|
||||
#endif
|
||||
} NetStateWrite;
|
||||
|
||||
JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
|
||||
NetStateWrite *state = (NetStateWrite *) s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK:
|
||||
janet_mark(state->is_buffer
|
||||
? janet_wrap_buffer(state->src.buf)
|
||||
: janet_wrap_string(state->src.str));
|
||||
if (state->dest_abst != NULL) {
|
||||
janet_mark(janet_wrap_abstract(state->dest_abst));
|
||||
}
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_cancel(s->fiber, janet_cstringv("stream closed"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
#ifdef JANET_WINDOWS
|
||||
case JANET_ASYNC_EVENT_COMPLETE: {
|
||||
/* Called when write finished */
|
||||
if (s->bytes == 0 && !state->dest_abst) {
|
||||
janet_cancel(s->fiber, janet_cstringv("disconnect"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_USER: {
|
||||
/* Begin write */
|
||||
int32_t start, len;
|
||||
const uint8_t *bytes;
|
||||
start = state->start;
|
||||
if (state->is_buffer) {
|
||||
/* If buffer, convert to string. */
|
||||
/* TODO - be more efficient about this */
|
||||
JanetBuffer *buffer = state->src.buf;
|
||||
JanetString str = janet_string(buffer->data, buffer->count);
|
||||
bytes = str;
|
||||
len = buffer->count;
|
||||
state->is_buffer = 0;
|
||||
state->src.str = str;
|
||||
} else {
|
||||
bytes = state->src.str;
|
||||
len = janet_string_length(bytes);
|
||||
}
|
||||
state->wbuf.buf = (char *) bytes;
|
||||
state->wbuf.len = len;
|
||||
state->flags = 0;
|
||||
s->tag = &state->overlapped;
|
||||
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
|
||||
|
||||
int status;
|
||||
SOCKET sock = (SOCKET) s->pollable->handle;
|
||||
if (state->dest_abst) {
|
||||
const struct sockaddr *to = state->dest_abst;
|
||||
int tolen = (int) janet_abstract_size((void *) to);
|
||||
status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
|
||||
} else {
|
||||
status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL);
|
||||
}
|
||||
|
||||
if (status && WSA_IO_PENDING != WSAGetLastError()) {
|
||||
janet_cancel(s->fiber, janet_cstringv("failed to write to stream"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
#else
|
||||
case JANET_ASYNC_EVENT_WRITE: {
|
||||
int32_t start, len;
|
||||
const uint8_t *bytes;
|
||||
start = state->start;
|
||||
if (state->is_buffer) {
|
||||
JanetBuffer *buffer = state->src.buf;
|
||||
bytes = buffer->data;
|
||||
len = buffer->count;
|
||||
} else {
|
||||
bytes = state->src.str;
|
||||
len = janet_string_length(bytes);
|
||||
}
|
||||
JReadInt nwrote = 0;
|
||||
if (start < len) {
|
||||
int32_t nbytes = len - start;
|
||||
void *dest_abst = state->dest_abst;
|
||||
do {
|
||||
if (dest_abst) {
|
||||
nwrote = sendto(s->pollable->handle, bytes + start, nbytes, 0,
|
||||
(struct sockaddr *) dest_abst, janet_abstract_size(dest_abst));
|
||||
} else {
|
||||
nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL);
|
||||
}
|
||||
} while (nwrote == -1 && JLASTERR == JEINTR);
|
||||
|
||||
/* Handle write errors */
|
||||
if (nwrote == -1) {
|
||||
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
|
||||
janet_cancel(s->fiber, net_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Unless using datagrams, empty message is a disconnect */
|
||||
if (nwrote == 0 && !dest_abst) {
|
||||
janet_cancel(s->fiber, janet_cstringv("disconnect"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
if (nwrote > 0) {
|
||||
start += nwrote;
|
||||
} else {
|
||||
start = len;
|
||||
}
|
||||
}
|
||||
state->start = start;
|
||||
if (start >= len) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
#endif
|
||||
}
|
||||
return JANET_ASYNC_STATUS_NOT_DONE;
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf, void *dest_abst) {
|
||||
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
|
||||
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL);
|
||||
state->is_buffer = 1;
|
||||
state->start = 0;
|
||||
state->src.buf = buf;
|
||||
state->dest_abst = dest_abst;
|
||||
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str, void *dest_abst) {
|
||||
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
|
||||
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL);
|
||||
state->is_buffer = 0;
|
||||
state->start = 0;
|
||||
state->src.str = str;
|
||||
state->dest_abst = dest_abst;
|
||||
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
/*
|
||||
* State machine for simple server
|
||||
*/
|
||||
|
||||
/* State machine for accepting connections. */
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
@@ -557,7 +117,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
|
||||
if (state->astream) janet_mark(janet_wrap_abstract(state->astream));
|
||||
if (state->function) janet_mark(janet_wrap_abstract(state->function));
|
||||
break;
|
||||
}
|
||||
}
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
@@ -565,16 +125,16 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
|
||||
int seconds;
|
||||
int bytes = sizeof(seconds);
|
||||
if (NO_ERROR != getsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_CONNECT_TIME,
|
||||
(char *)&seconds, &bytes)) {
|
||||
(char *)&seconds, &bytes)) {
|
||||
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
if (NO_ERROR != setsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
|
||||
(char *)&(state->lstream->handle), sizeof(SOCKET))) {
|
||||
if (NO_ERROR != setsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
|
||||
(char *) & (state->lstream->handle), sizeof(SOCKET))) {
|
||||
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
|
||||
Janet streamv = janet_wrap_abstract(state->astream);
|
||||
if (state->function) {
|
||||
/* Schedule worker */
|
||||
@@ -587,7 +147,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
} else {
|
||||
janet_schedule(s->fiber, streamv);
|
||||
janet_schedule(s->fiber, streamv);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
@@ -613,7 +173,7 @@ static int net_sched_accept_impl(NetStateAccept *state, Janet *err) {
|
||||
SOCKET lsock = (SOCKET) state->lstream->handle;
|
||||
SOCKET asock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
|
||||
if (asock == INVALID_SOCKET) {
|
||||
*err = net_lasterr();
|
||||
*err = janet_ev_lasterr();
|
||||
return 1;
|
||||
}
|
||||
JanetStream *astream = make_stream(asock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
|
||||
@@ -622,7 +182,7 @@ static int net_sched_accept_impl(NetStateAccept *state, Janet *err) {
|
||||
if (FALSE == AcceptEx(lsock, asock, state->buf, 0, socksize, socksize, NULL, &state->overlapped)) {
|
||||
int code = WSAGetLastError();
|
||||
if (code == WSA_IO_PENDING) return 0; /* indicates io is happening async */
|
||||
*err = net_lasterr();
|
||||
*err = janet_ev_lasterr();
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
@@ -648,7 +208,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
case JANET_ASYNC_EVENT_READ: {
|
||||
JSock connfd = accept(s->pollable->handle, NULL, NULL);
|
||||
JSock connfd = accept(s->stream->handle, NULL, NULL);
|
||||
if (JSOCKVALID(connfd)) {
|
||||
nosigpipe(connfd);
|
||||
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
|
||||
@@ -751,7 +311,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) {
|
||||
#ifndef JANET_WINDOWS
|
||||
/* no unix domain socket support on windows yet */
|
||||
if (is_unix) {
|
||||
void *abst = janet_abstract(&AddressAT, sizeof(struct sockaddr_un));
|
||||
void *abst = janet_abstract(&janet_address_type, sizeof(struct sockaddr_un));
|
||||
memcpy(abst, ai, sizeof(struct sockaddr_un));
|
||||
Janet ret = janet_wrap_abstract(abst);
|
||||
return make_arr ? janet_wrap_array(janet_array_n(&ret, 1)) : ret;
|
||||
@@ -762,7 +322,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) {
|
||||
JanetArray *arr = janet_array(10);
|
||||
struct addrinfo *iter = ai;
|
||||
while (NULL != iter) {
|
||||
void *abst = janet_abstract(&AddressAT, iter->ai_addrlen);
|
||||
void *abst = janet_abstract(&janet_address_type, iter->ai_addrlen);
|
||||
memcpy(abst, iter->ai_addr, iter->ai_addrlen);
|
||||
janet_array_push(arr, janet_wrap_abstract(abst));
|
||||
iter = iter->ai_next;
|
||||
@@ -774,7 +334,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) {
|
||||
if (NULL == ai) {
|
||||
janet_panic("no data for given address");
|
||||
}
|
||||
void *abst = janet_abstract(&AddressAT, ai->ai_addrlen);
|
||||
void *abst = janet_abstract(&janet_address_type, ai->ai_addrlen);
|
||||
memcpy(abst, ai->ai_addr, ai->ai_addrlen);
|
||||
freeaddrinfo(ai);
|
||||
return janet_wrap_abstract(abst);
|
||||
@@ -928,29 +488,30 @@ static Janet cfun_net_listen(int32_t argc, Janet *argv) {
|
||||
}
|
||||
}
|
||||
|
||||
static void check_stream_flag(JanetStream *stream, int flag) {
|
||||
if (!(stream->flags & flag) || (stream->flags & JANET_POLL_FLAG_CLOSED)) {
|
||||
const char *msg = "";
|
||||
if (flag == JANET_STREAM_READABLE) msg = "readable";
|
||||
if (flag == JANET_STREAM_WRITABLE) msg = "writable";
|
||||
if (flag == JANET_STREAM_ACCEPTABLE) msg = "server";
|
||||
if (flag == JANET_STREAM_UDPSERVER) msg = "datagram server";
|
||||
janet_panicf("bad stream, expected %s stream", msg);
|
||||
void janet_stream_flags(JanetStream *stream, uint32_t flags) {
|
||||
if ((stream->flags & flags) != flags || (stream->flags & JANET_STREAM_CLOSED)) {
|
||||
const char *rmsg = "", *wmsg = "", *amsg = "", *dmsg = "", *smsg = "stream";
|
||||
if (flags & JANET_STREAM_READABLE) rmsg = "readable ";
|
||||
if (flags & JANET_STREAM_WRITABLE) wmsg = "writable ";
|
||||
if (flags & JANET_STREAM_ACCEPTABLE) amsg = "server ";
|
||||
if (flags & JANET_STREAM_UDPSERVER) dmsg = "datagram ";
|
||||
if (flags & JANET_STREAM_SOCKET) smsg = "socket";
|
||||
janet_panicf("bad stream, expected %s%s%s%s%s", rmsg, wmsg, amsg, dmsg, smsg);
|
||||
}
|
||||
}
|
||||
|
||||
static Janet cfun_stream_accept_loop(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 2);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
|
||||
janet_stream_flags(stream, JANET_STREAM_ACCEPTABLE | JANET_STREAM_SOCKET);
|
||||
JanetFunction *fun = janet_getfunction(argv, 1);
|
||||
janet_sched_accept(stream, fun);
|
||||
}
|
||||
|
||||
static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 1, 2);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
|
||||
janet_stream_flags(stream, JANET_STREAM_ACCEPTABLE | JANET_STREAM_SOCKET);
|
||||
double to = janet_optnumber(argv, argc, 1, INFINITY);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_accept(stream, NULL);
|
||||
@@ -958,85 +519,77 @@ static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
|
||||
|
||||
static Janet cfun_stream_read(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 2, 4);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
check_stream_flag(stream, JANET_STREAM_READABLE);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
|
||||
janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET);
|
||||
int32_t n = janet_getnat(argv, 1);
|
||||
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
|
||||
double to = janet_optnumber(argv, argc, 3, INFINITY);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_read(stream, buffer, n);
|
||||
janet_ev_recv(stream, buffer, n, MSG_NOSIGNAL);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 2, 4);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
check_stream_flag(stream, JANET_STREAM_READABLE);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
|
||||
janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET);
|
||||
int32_t n = janet_getnat(argv, 1);
|
||||
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
|
||||
double to = janet_optnumber(argv, argc, 3, INFINITY);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_chunk(stream, buffer, n);
|
||||
janet_ev_recvchunk(stream, buffer, n, MSG_NOSIGNAL);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 3, 4);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
check_stream_flag(stream, JANET_STREAM_UDPSERVER);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
|
||||
janet_stream_flags(stream, JANET_STREAM_UDPSERVER | JANET_STREAM_SOCKET);
|
||||
int32_t n = janet_getnat(argv, 1);
|
||||
JanetBuffer *buffer = janet_getbuffer(argv, 2);
|
||||
double to = janet_optnumber(argv, argc, 3, INFINITY);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_recv_from(stream, buffer, n);
|
||||
}
|
||||
|
||||
static Janet cfun_stream_close(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 1);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
janet_stream_close(stream, 0);
|
||||
return janet_wrap_nil();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_closed(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 1);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
return janet_wrap_boolean(stream->flags & JANET_POLL_FLAG_CLOSED);
|
||||
janet_ev_recvfrom(stream, buffer, n, MSG_NOSIGNAL);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_write(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 2, 3);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
check_stream_flag(stream, JANET_STREAM_WRITABLE);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
|
||||
janet_stream_flags(stream, JANET_STREAM_WRITABLE | JANET_STREAM_SOCKET);
|
||||
double to = janet_optnumber(argv, argc, 2, INFINITY);
|
||||
if (janet_checktype(argv[1], JANET_BUFFER)) {
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_write_buffer(stream, janet_getbuffer(argv, 1), NULL);
|
||||
janet_ev_send_buffer(stream, janet_getbuffer(argv, 1), MSG_NOSIGNAL);
|
||||
} else {
|
||||
JanetByteView bytes = janet_getbytes(argv, 1);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_write_stringlike(stream, bytes.bytes, NULL);
|
||||
janet_ev_send_string(stream, bytes.bytes, MSG_NOSIGNAL);
|
||||
}
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_send_to(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 3, 4);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
check_stream_flag(stream, JANET_STREAM_UDPSERVER);
|
||||
void *dest = janet_getabstract(argv, 1, &AddressAT);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
|
||||
janet_stream_flags(stream, JANET_STREAM_UDPSERVER | JANET_STREAM_SOCKET);
|
||||
void *dest = janet_getabstract(argv, 1, &janet_address_type);
|
||||
double to = janet_optnumber(argv, argc, 3, INFINITY);
|
||||
if (janet_checktype(argv[2], JANET_BUFFER)) {
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_write_buffer(stream, janet_getbuffer(argv, 2), dest);
|
||||
janet_ev_sendto_buffer(stream, janet_getbuffer(argv, 2), dest, MSG_NOSIGNAL);
|
||||
} else {
|
||||
JanetByteView bytes = janet_getbytes(argv, 2);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_write_stringlike(stream, bytes.bytes, dest);
|
||||
janet_ev_sendto_string(stream, bytes.bytes, dest, MSG_NOSIGNAL);
|
||||
}
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_flush(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 2);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
check_stream_flag(stream, JANET_STREAM_WRITABLE);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
|
||||
janet_stream_flags(stream, JANET_STREAM_WRITABLE | JANET_STREAM_SOCKET);
|
||||
/* Toggle no delay flag */
|
||||
int flag = 1;
|
||||
setsockopt((JSock) stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
|
||||
@@ -1045,10 +598,9 @@ static Janet cfun_stream_flush(int32_t argc, Janet *argv) {
|
||||
return argv[0];
|
||||
}
|
||||
|
||||
static const JanetMethod stream_methods[] = {
|
||||
static const JanetMethod net_stream_methods[] = {
|
||||
{"chunk", cfun_stream_chunk},
|
||||
{"close", cfun_stream_close},
|
||||
{"closed?", cfun_stream_closed},
|
||||
{"close", janet_cfun_stream_close},
|
||||
{"read", cfun_stream_read},
|
||||
{"write", cfun_stream_write},
|
||||
{"flush", cfun_stream_flush},
|
||||
@@ -1056,13 +608,15 @@ static const JanetMethod stream_methods[] = {
|
||||
{"accept-loop", cfun_stream_accept_loop},
|
||||
{"send-to", cfun_stream_send_to},
|
||||
{"recv-from", cfun_stream_recv_from},
|
||||
{"recv-from", cfun_stream_recv_from},
|
||||
{"evread", janet_cfun_stream_read},
|
||||
{"evchunk", janet_cfun_stream_chunk},
|
||||
{"evwrite", janet_cfun_stream_write},
|
||||
{NULL, NULL}
|
||||
};
|
||||
|
||||
static int janet_stream_getter(void *p, Janet key, Janet *out) {
|
||||
(void) p;
|
||||
if (!janet_checktype(key, JANET_KEYWORD)) return 0;
|
||||
return janet_getmethod(janet_unwrap_keyword(key), stream_methods, out);
|
||||
static JanetStream *make_stream(JSock handle, uint32_t flags) {
|
||||
return janet_stream((JanetHandle) handle, flags | JANET_STREAM_SOCKET, net_stream_methods);
|
||||
}
|
||||
|
||||
static const JanetReg net_cfuns[] = {
|
||||
@@ -1071,7 +625,7 @@ static const JanetReg net_cfuns[] = {
|
||||
JDOC("(net/address host port &opt type)\n\n"
|
||||
"Look up the connection information for a given hostname, port, and connection type. Returns "
|
||||
"a handle that can be used to send datagrams over network without establishing a connection. "
|
||||
"On Posix platforms, you can use :unix for host to connet to a unix domain socket, where the name is "
|
||||
"On Posix platforms, you can use :unix for host to connect to a unix domain socket, where the name is "
|
||||
"given in the port argument. On Linux, abstract "
|
||||
"unix domain sockets are specified with a leading '@' character in port.")
|
||||
},
|
||||
@@ -1137,16 +691,6 @@ static const JanetReg net_cfuns[] = {
|
||||
"Make sure that a stream is not buffering any data. This temporarily disables Nagle's algorithm. "
|
||||
"Use this to make sure data is sent without delay. Returns stream.")
|
||||
},
|
||||
{
|
||||
"net/close", cfun_stream_close,
|
||||
JDOC("(net/close stream)\n\n"
|
||||
"Close a stream so that no further communication can occur.")
|
||||
},
|
||||
{
|
||||
"net/closed?", cfun_stream_closed,
|
||||
JDOC("(net/closed? stream)\n\n"
|
||||
"Check if a stream is closed.")
|
||||
},
|
||||
{
|
||||
"net/connect", cfun_net_connect,
|
||||
JDOC("(net/connect host porti &opt type)\n\n"
|
||||
|
||||
@@ -585,6 +585,14 @@ void janet_threads_deinit(void) {
|
||||
janet_vm_thread_decode = NULL;
|
||||
}
|
||||
|
||||
JanetThread *janet_thread_current(void) {
|
||||
if (NULL == janet_vm_thread_current) {
|
||||
janet_vm_thread_current = janet_make_thread(janet_vm_mailbox, janet_get_core_table("make-image-dict"));
|
||||
janet_gcroot(janet_wrap_abstract(janet_vm_thread_current));
|
||||
}
|
||||
return janet_vm_thread_current;
|
||||
}
|
||||
|
||||
/*
|
||||
* Cfuns
|
||||
*/
|
||||
@@ -592,11 +600,7 @@ void janet_threads_deinit(void) {
|
||||
static Janet cfun_thread_current(int32_t argc, Janet *argv) {
|
||||
(void) argv;
|
||||
janet_fixarity(argc, 0);
|
||||
if (NULL == janet_vm_thread_current) {
|
||||
janet_vm_thread_current = janet_make_thread(janet_vm_mailbox, janet_get_core_table("make-image-dict"));
|
||||
janet_gcroot(janet_wrap_abstract(janet_vm_thread_current));
|
||||
}
|
||||
return janet_wrap_abstract(janet_vm_thread_current);
|
||||
return janet_wrap_abstract(janet_thread_current());
|
||||
}
|
||||
|
||||
static Janet cfun_thread_new(int32_t argc, Janet *argv) {
|
||||
|
||||
@@ -140,6 +140,7 @@ void janet_lib_thread(JanetTable *env);
|
||||
#endif
|
||||
#ifdef JANET_NET
|
||||
void janet_lib_net(JanetTable *env);
|
||||
extern const JanetAbstractType janet_address_type;
|
||||
#endif
|
||||
#ifdef JANET_EV
|
||||
void janet_lib_ev(JanetTable *env);
|
||||
|
||||
@@ -1376,7 +1376,6 @@ static JanetSignal janet_continue_no_check(JanetFiber *fiber, Janet in, Janet *o
|
||||
/* Normal setup */
|
||||
if (janet_vm_root_fiber == NULL) janet_vm_root_fiber = fiber;
|
||||
janet_vm_fiber = fiber;
|
||||
janet_gcroot(janet_wrap_fiber(fiber));
|
||||
janet_fiber_set_status(fiber, JANET_STATUS_ALIVE);
|
||||
signal = run_vm(fiber, in);
|
||||
}
|
||||
@@ -1384,7 +1383,6 @@ static JanetSignal janet_continue_no_check(JanetFiber *fiber, Janet in, Janet *o
|
||||
/* Restore */
|
||||
if (janet_vm_root_fiber == fiber) janet_vm_root_fiber = NULL;
|
||||
janet_fiber_set_status(fiber, signal);
|
||||
janet_gcunroot(janet_wrap_fiber(fiber));
|
||||
janet_restore(&tstate);
|
||||
*out = tstate.payload;
|
||||
|
||||
|
||||
@@ -499,15 +499,22 @@ typedef void *JanetAbstract;
|
||||
|
||||
/* Event Loop Types */
|
||||
#ifdef JANET_EV
|
||||
#define JANET_POLL_FLAG_CLOSED 0x1
|
||||
#define JANET_POLL_FLAG_SOCKET 0x2
|
||||
#define JANET_POLL_FLAG_IOCP 0x4
|
||||
|
||||
#define JANET_STREAM_CLOSED 0x1
|
||||
#define JANET_STREAM_SOCKET 0x2
|
||||
#define JANET_STREAM_IOCP 0x4
|
||||
#define JANET_STREAM_READABLE 0x200
|
||||
#define JANET_STREAM_WRITABLE 0x400
|
||||
#define JANET_STREAM_ACCEPTABLE 0x800
|
||||
#define JANET_STREAM_UDPSERVER 0x1000
|
||||
|
||||
typedef enum {
|
||||
JANET_ASYNC_EVENT_INIT,
|
||||
JANET_ASYNC_EVENT_MARK,
|
||||
JANET_ASYNC_EVENT_DEINIT,
|
||||
JANET_ASYNC_EVENT_CLOSE,
|
||||
JANET_ASYNC_EVENT_ERR,
|
||||
JANET_ASYNC_EVENT_HUP,
|
||||
JANET_ASYNC_EVENT_READ,
|
||||
JANET_ASYNC_EVENT_WRITE,
|
||||
JANET_ASYNC_EVENT_TIMEOUT,
|
||||
@@ -517,7 +524,6 @@ typedef enum {
|
||||
|
||||
#define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ)
|
||||
#define JANET_ASYNC_LISTEN_WRITE (1 << JANET_ASYNC_EVENT_WRITE)
|
||||
#define JANET_ASYNC_LISTEN_SPAWNER 0x1000
|
||||
|
||||
typedef enum {
|
||||
JANET_ASYNC_STATUS_NOT_DONE,
|
||||
@@ -526,18 +532,19 @@ typedef enum {
|
||||
|
||||
/* Typedefs */
|
||||
typedef struct JanetListenerState JanetListenerState;
|
||||
typedef struct JanetPollable JanetPollable;
|
||||
typedef struct JanetStream JanetStream;
|
||||
typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEvent event);
|
||||
|
||||
/* Wrapper around file descriptors and HANDLEs that can be polled. */
|
||||
struct JanetPollable {
|
||||
struct JanetStream {
|
||||
JanetHandle handle;
|
||||
uint32_t flags;
|
||||
/* Linked list of all in-flight IO routines for this pollable */
|
||||
/* Linked list of all in-flight IO routines for this stream */
|
||||
JanetListenerState *state;
|
||||
/* internal - used to disallow multiple concurrent reads / writes on the same pollable.
|
||||
const void *methods; /* Methods for this stream */
|
||||
/* internal - used to disallow multiple concurrent reads / writes on the same stream.
|
||||
* this constraint may be lifted later but allowing such would require more internal book keeping
|
||||
* for some implementations. You can read and write at the same time on the same pollable, though. */
|
||||
* for some implementations. You can read and write at the same time on the same stream, though. */
|
||||
int _mask;
|
||||
};
|
||||
|
||||
@@ -545,7 +552,7 @@ struct JanetPollable {
|
||||
struct JanetListenerState {
|
||||
JanetListener machine;
|
||||
JanetFiber *fiber;
|
||||
JanetPollable *pollable;
|
||||
JanetStream *stream;
|
||||
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
|
||||
implementation of the event loop and the particular event. */
|
||||
#ifdef JANET_WINDOWS
|
||||
@@ -1248,28 +1255,56 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT];
|
||||
|
||||
#ifdef JANET_EV
|
||||
|
||||
extern JANET_API const JanetAbstractType janet_stream_type;
|
||||
|
||||
/* Run the event loop */
|
||||
JANET_API void janet_loop(void);
|
||||
|
||||
/* Wrapper around pollables */
|
||||
JANET_API void janet_pollable_init(JanetPollable *pollable, JanetHandle handle);
|
||||
JANET_API void janet_pollable_mark(JanetPollable *pollable);
|
||||
JANET_API void janet_pollable_deinit(JanetPollable *pollable);
|
||||
/* Wrapper around streams */
|
||||
JANET_API JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods);
|
||||
JANET_API void janet_stream_close(JanetStream *stream);
|
||||
JANET_API Janet janet_cfun_stream_close(int32_t argc, Janet *argv);
|
||||
JANET_API Janet janet_cfun_stream_read(int32_t argc, Janet *argv);
|
||||
JANET_API Janet janet_cfun_stream_chunk(int32_t argc, Janet *argv);
|
||||
JANET_API Janet janet_cfun_stream_write(int32_t argc, Janet *argv);
|
||||
JANET_API void janet_stream_flags(JanetStream *stream, uint32_t flags);
|
||||
|
||||
/* Queue a fiber to run on the event loop */
|
||||
JANET_API void janet_schedule(JanetFiber *fiber, Janet value);
|
||||
JANET_API void janet_cancel(JanetFiber *fiber, Janet value);
|
||||
JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig);
|
||||
|
||||
/* Start a state machine listening for events from a pollable */
|
||||
JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user);
|
||||
/* Start a state machine listening for events from a stream */
|
||||
JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user);
|
||||
|
||||
/* Shorthand for yielding to event loop in C */
|
||||
JANET_NO_RETURN JANET_API void janet_await(void);
|
||||
|
||||
/* For use inside listeners - adds a timeout to the current fiber, such that
|
||||
* it will be resumed after sec seconds if no other event schedules the current fiber. */
|
||||
void janet_addtimeout(double sec);
|
||||
JANET_API void janet_addtimeout(double sec);
|
||||
|
||||
/* Get last error from a an IO operation */
|
||||
JANET_API Janet janet_ev_lasterr(void);
|
||||
|
||||
/* Read async from a stream */
|
||||
JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
|
||||
JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
|
||||
#ifdef JANET_NET
|
||||
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
||||
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
||||
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
||||
#endif
|
||||
|
||||
/* Write async to a stream */
|
||||
JANET_API void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf);
|
||||
JANET_API void janet_ev_write_string(JanetStream *stream, JanetString str);
|
||||
#ifdef JANET_NET
|
||||
JANET_API void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags);
|
||||
JANET_API void janet_ev_send_string(JanetStream *stream, JanetString str, int flags);
|
||||
JANET_API void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags);
|
||||
JANET_API void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags);
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
||||
@@ -1581,6 +1616,9 @@ JANET_API Janet janet_resolve_core(const char *name);
|
||||
|
||||
/* New C API */
|
||||
|
||||
/* Shorthand for janet C function declarations */
|
||||
#define JANET_CFUN(name) Janet name (int32_t argc, Janet *argv)
|
||||
|
||||
/* Allow setting entry name for static libraries */
|
||||
#ifdef __cplusplus
|
||||
#define JANET_MODULE_PREFIX extern "C"
|
||||
|
||||
@@ -15,8 +15,8 @@
|
||||
(def truncated
|
||||
(if (> (length e) 40) (string (string/slice e 0 35) "...") (string e)))
|
||||
(if x
|
||||
(xprintf stdout "\e[32m✔\e[0m %s: %v" truncated x)
|
||||
(xprintf stdout "\n\e[31m✘\e[0m %s: %v" truncated x))
|
||||
(eprintf "\e[32m✔\e[0m %s: %v" truncated x)
|
||||
(eprintf "\n\e[31m✘\e[0m %s: %v" truncated x))
|
||||
x)
|
||||
|
||||
(defmacro assert-error
|
||||
@@ -32,10 +32,10 @@
|
||||
(defn start-suite [x]
|
||||
(set suite-num x)
|
||||
(set start-time (os/clock))
|
||||
(print "\nRunning test suite " x " tests...\n "))
|
||||
(eprint "\nRunning test suite " x " tests...\n "))
|
||||
|
||||
(defn end-suite []
|
||||
(def delta (- (os/clock) start-time))
|
||||
(printf "\n\nTest suite %d finished in %.3f seconds" suite-num delta)
|
||||
(print num-tests-passed " of " num-tests-run " tests passed.\n")
|
||||
(eprintf "\n\nTest suite %d finished in %.3f seconds" suite-num delta)
|
||||
(eprint num-tests-passed " of " num-tests-run " tests passed.\n")
|
||||
(if (not= num-tests-passed num-tests-run) (os/exit 1)))
|
||||
|
||||
@@ -33,16 +33,9 @@
|
||||
(:write stream b)
|
||||
(buffer/clear b)))
|
||||
|
||||
(def s (net/server "127.0.0.1" "8000"))
|
||||
(def s (net/server "127.0.0.1" "8000" handler))
|
||||
(assert s "made server 1")
|
||||
|
||||
(ev/go
|
||||
(coro
|
||||
(while (not (net/closed? s))
|
||||
(def conn (net/accept s))
|
||||
(unless conn (break))
|
||||
(ev/call handler conn))))
|
||||
|
||||
(defn test-echo [msg]
|
||||
(with [conn (net/connect "127.0.0.1" "8000")]
|
||||
(:write conn msg)
|
||||
@@ -55,4 +48,19 @@
|
||||
|
||||
(:close s)
|
||||
|
||||
# Create pipe
|
||||
|
||||
(var pipe-counter 0)
|
||||
(def [reader writer] (ev/pipe))
|
||||
(ev/spawn
|
||||
(while (ev/read reader 3)
|
||||
(++ pipe-counter))
|
||||
(assert (= 20 pipe-counter) "ev/pipe 1"))
|
||||
|
||||
(for i 0 10
|
||||
(ev/write writer "xxx---"))
|
||||
|
||||
(ev/close writer)
|
||||
(ev/sleep 0.1)
|
||||
|
||||
(end-suite)
|
||||
|
||||
Reference in New Issue
Block a user