1
0
mirror of https://github.com/janet-lang/janet synced 2026-04-08 07:51:26 +00:00

Compare commits

...

19 Commits

Author SHA1 Message Date
Calvin Rose
d199c817dc Broken Pipe error on windows should just be end of stream. 2020-11-14 16:03:51 -06:00
Calvin Rose
dc51bd09f7 Make sure all test logs go to the same stream. 2020-11-14 15:56:48 -06:00
Calvin Rose
139e3fab25 Invert status check for (Read/Write)File 2020-11-14 15:52:01 -06:00
Calvin Rose
7a98f9aa02 Improve windows error messages on write failures. 2020-11-14 15:48:21 -06:00
Calvin Rose
b53dd67e74 Use custom pipe implementation for overlapped io. 2020-11-14 15:44:19 -06:00
Calvin Rose
e546731093 ev_lasterr -> janet_ev_lasterr 2020-11-14 15:24:13 -06:00
Calvin Rose
d50c4ef6da Try again for windows build. 2020-11-14 15:01:52 -06:00
Calvin Rose
7d0b1955a2 typo 2020-11-14 14:55:26 -06:00
Calvin Rose
16cf7681f0 Fix some windows issues. 2020-11-14 14:40:39 -06:00
Calvin Rose
12f09ad2d7 Add ev/pipe and move stream code into ev.c
Also adds a lot to the C API and changes things up.
2020-11-14 14:29:11 -06:00
Calvin Rose
b3e88a8d80 Move read/write functions into ev.c from net.c
This code can also be used for non-network streams.
2020-11-14 11:48:23 -06:00
Calvin Rose
761273bcc4 Add janet_thread_current() to C api. 2020-11-12 18:42:41 -06:00
Calvin Rose
1a75f68cb2 Fix windows build. 2020-11-12 14:52:02 -06:00
Calvin Rose
1b0edf54f1 Fix gc leak.
Rather than trying to be clever with pinning/unpinning, always
mark the root fiber and that should serve as thei singular common root in almost
all cases.
2020-11-12 14:29:38 -06:00
Calvin Rose
caa6576719 Fix formatting. 2020-11-11 15:35:44 -06:00
Calvin Rose
93bd2c11fa Merge pull request #502 from pepe/fix-net-server-fiber
Fix net server fiber
2020-11-11 15:33:42 -06:00
Josef Pospíšil
2be09790a9 Change fix with ev/call 2020-11-10 10:39:33 +01:00
Josef Pospíšil
bf6eae711a Fix adding handler to loop with fiber 2020-11-10 10:32:47 +01:00
Calvin Rose
69b68c0091 Update changelog. 2020-11-09 11:24:28 -06:00
11 changed files with 930 additions and 643 deletions

View File

@@ -2,7 +2,13 @@
All notable changes to this project will be documented in this file.
## Unreleased - ???
- Add `janet_thread_current(void)` to C API
- Add integer parsing forms to pegs. This makes parsing many binary protocols easier.
- Lots of updates to networking code - now can use epoll (or poll) on linux and IOCP on windows.
- Add `ev/` module. This exposes a fiber scheduler, queues, timeouts, and other functionality to users
for single threaded cooperative scheduling and asynchornous IO.
- Add `net/accept-loop` and `net/listen`. These functions break down `net/server` into it's essential parts
and are more flexible. They also allow furter improvements to these utility functions.
## 1.12.2 - 2020-09-20
- Add janet\_try and janet\_restore to C API.

View File

@@ -2733,15 +2733,24 @@
(if (dyn sym)
form))
(guarddef ev/go
(defmacro ev/spawn
"Run some code in a new fiber. This is shorthand for (ev/call (fn [] ;body))."
[& body]
~(,ev/call (fn [] ,;body))))
(guarddef net/listen
(defn net/server
"Start a server asynchornously with net/listen and net/accept-loop. Returns the new server stream."
[host port &opt handler type]
(def s (net/listen host port type))
(if handler
(ev/go (fn [] (net/accept-loop s handler))))
(ev/call (fn [] (net/accept-loop s handler))))
s))
(guarddef ev/close
(defn net/close "Alias for ev/close." [stream] (ev/close stream)))
(undef guarddef)
###
@@ -2751,7 +2760,7 @@
###
(defn- no-side-effects
"Check if form may have side effects. If returns true, then the src
"Check if form may have side effects. If rturns true, then the src
must not have side effects, such as calling a C function."
[src]
(cond

File diff suppressed because it is too large Load Diff

View File

@@ -404,6 +404,7 @@ void janet_collect(void) {
#ifdef JANET_EV
janet_ev_mark();
#endif
janet_mark_fiber(janet_vm_root_fiber);
for (i = 0; i < orig_rootcount; i++)
janet_mark(janet_vm_roots[i]);
while (orig_rootcount < janet_vm_root_count) {

View File

@@ -28,6 +28,7 @@
#ifdef JANET_NET
#include <math.h>
#ifdef JANET_WINDOWS
#include <winsock2.h>
#include <windows.h>
@@ -47,127 +48,38 @@
#include <netinet/tcp.h>
#include <netdb.h>
#include <fcntl.h>
#include <math.h>
#endif
/*
* Streams - simple abstract type that wraps a pollable + extra flags
*/
#define JANET_STREAM_READABLE 0x200
#define JANET_STREAM_WRITABLE 0x400
#define JANET_STREAM_ACCEPTABLE 0x800
#define JANET_STREAM_UDPSERVER 0x1000
static int janet_stream_close(void *p, size_t s);
static int janet_stream_mark(void *p, size_t s);
static int janet_stream_getter(void *p, Janet key, Janet *out);
static const JanetAbstractType StreamAT = {
"core/stream",
janet_stream_close,
janet_stream_mark,
janet_stream_getter,
JANET_ATEND_GET
};
typedef JanetPollable JanetStream;
static const JanetAbstractType AddressAT = {
const JanetAbstractType janet_address_type = {
"core/socket-address",
JANET_ATEND_NAME
};
#ifdef JANET_WINDOWS
#define JANET_NET_CHUNKSIZE 4096
#define JSOCKCLOSE(x) closesocket((SOCKET) x)
#define JSOCKDEFAULT INVALID_SOCKET
#define JLASTERR WSAGetLastError()
#define JSOCKVALID(x) ((x) != INVALID_SOCKET)
#define JEINTR WSAEINTR
#define JEWOULDBLOCK WSAEWOULDBLOCK
#define JEAGAIN WSAEWOULDBLOCK
#define JPOLL WSAPoll
#define JSock SOCKET
#define JReadInt long
#define JSOCKFLAGS 0
static JanetStream *make_stream(SOCKET fd, uint32_t flags) {
u_long iMode = 0;
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
janet_pollable_init(stream, (JanetHandle) fd);
ioctlsocket(fd, FIONBIO, &iMode);
stream->flags = flags;
return stream;
}
static Janet net_lasterr(void) {
int code = WSAGetLastError();
char msgbuf[256];
msgbuf[0] = '\0';
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
code,
MAKELANGID (LANG_NEUTRAL, SUBLANG_DEFAULT),
msgbuf,
sizeof (msgbuf),
NULL);
if (!*msgbuf) sprintf(msgbuf, "%d", code);
char *c = msgbuf;
while (*c) {
if (*c == '\n' || *c == '\r') {
*c = '\0';
break;
}
c++;
}
return janet_cstringv(msgbuf);
}
#else
#define JSOCKCLOSE(x) close(x)
#define JSOCKDEFAULT 0
#define JLASTERR errno
#define JSOCKVALID(x) ((x) >= 0)
#define JEINTR EINTR
#define JEWOULDBLOCK EWOULDBLOCK
#define JEAGAIN EAGAIN
#define JPOLL poll
#define JSock int
#define JReadInt ssize_t
#ifdef SOCK_CLOEXEC
#define JSOCKFLAGS SOCK_CLOEXEC
#else
#define JSOCKFLAGS 0
#endif
static JanetStream *make_stream(int fd, uint32_t flags) {
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
janet_pollable_init(stream, fd);
#if !defined(SOCK_CLOEXEC) && defined(O_CLOEXEC)
int extra = O_CLOEXEC;
#else
int extra = 0;
#endif
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK | extra);
stream->flags = flags;
return stream;
}
static Janet net_lasterr(void) {
return janet_cstringv(strerror(errno));
}
#endif
static JanetStream *make_stream(JSock handle, uint32_t flags);
/* We pass this flag to all send calls to prevent sigpipe */
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0
#endif
static int janet_stream_close(void *p, size_t s) {
(void) s;
JanetStream *stream = p;
if (!(stream->flags & JANET_POLL_FLAG_CLOSED)) {
JSOCKCLOSE(stream->handle);
janet_pollable_deinit(stream);
}
return 0;
}
static void nosigpipe(JSock s) {
#ifdef SO_NOSIGPIPE
int enable = 1;
@@ -180,358 +92,6 @@ static void nosigpipe(JSock s) {
#endif
}
static int janet_stream_mark(void *p, size_t s) {
(void) s;
janet_pollable_mark((JanetPollable *) p);
return 0;
}
/*
* State machine for read
*/
typedef struct {
JanetListenerState head;
int32_t bytes_left;
JanetBuffer *buf;
int is_chunk;
int is_recv_from;
#ifdef JANET_WINDOWS
WSAOVERLAPPED overlapped;
WSABUF wbuf;
DWORD flags;
int32_t chunk_size;
struct sockaddr from;
int fromlen;
uint8_t chunk_buf[JANET_NET_CHUNKSIZE];
#endif
} NetStateRead;
JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
NetStateRead *state = (NetStateRead *) s;
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_MARK:
janet_mark(janet_wrap_buffer(state->buf));
break;
case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when read finished */
if (s->bytes == 0 && !state->is_recv_from) {
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
}
janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes);
state->bytes_left -= s->bytes;
if (state->bytes_left <= 0 || !state->is_chunk) {
Janet resume_val;
if (state->is_recv_from) {
void *abst = janet_abstract(&AddressAT, state->fromlen);
memcpy(abst, &state->from, state->fromlen);
resume_val = janet_wrap_abstract(abst);
} else {
resume_val = janet_wrap_buffer(state->buf);
}
janet_schedule(s->fiber, resume_val);
return JANET_ASYNC_STATUS_DONE;
}
}
/* fallthrough */
case JANET_ASYNC_EVENT_USER:
{
state->flags = 0;
int32_t chunk_size = state->bytes_left > JANET_NET_CHUNKSIZE ? JANET_NET_CHUNKSIZE : state->bytes_left;
state->wbuf.len = (ULONG) chunk_size;
state->wbuf.buf = state->chunk_buf;
state->chunk_size = chunk_size;
s->tag = &state->overlapped;
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
int status;
if (state->is_recv_from) {
status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
} else {
status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL);
}
if (status && WSA_IO_PENDING != WSAGetLastError()) {
janet_cancel(s->fiber, net_lasterr());
return JANET_ASYNC_STATUS_DONE;
}
}
break;
#else
case JANET_ASYNC_EVENT_READ:
/* Read in bytes */
{
JanetBuffer *buffer = state->buf;
int32_t bytes_left = state->bytes_left;
janet_buffer_extra(buffer, bytes_left);
JReadInt nread;
char saddr[256];
socklen_t socklen = sizeof(saddr);
do {
if (state->is_recv_from) {
nread = recvfrom(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0,
(struct sockaddr *)&saddr, &socklen);
} else {
nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0);
}
} while (nread == -1 && JLASTERR == JEINTR);
/* Check for errors - special case errors that can just be waited on to fix */
if (nread == -1) {
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
janet_cancel(s->fiber, net_lasterr());
return JANET_ASYNC_STATUS_DONE;
}
/* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
if (nread == 0 && !state->is_recv_from) {
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
}
/* Increment buffer counts */
if (nread > 0) {
buffer->count += nread;
bytes_left -= nread;
} else {
bytes_left = 0;
}
state->bytes_left = bytes_left;
/* Resume if done */
if (!state->is_chunk || bytes_left == 0) {
Janet resume_val;
if (state->is_recv_from) {
void *abst = janet_abstract(&AddressAT, socklen);
memcpy(abst, &saddr, socklen);
resume_val = janet_wrap_abstract(abst);
} else {
resume_val = janet_wrap_buffer(buffer);
}
janet_schedule(s->fiber, resume_val);
return JANET_ASYNC_STATUS_DONE;
}
}
break;
#endif
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
state->is_chunk = 0;
state->buf = buf;
state->bytes_left = nbytes;
state->is_recv_from = 0;
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
state->is_chunk = 1;
state->buf = buf;
state->bytes_left = nbytes;
state->is_recv_from = 0;
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
state->is_chunk = 0;
state->buf = buf;
state->bytes_left = nbytes;
state->is_recv_from = 1;
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
/*
* State machine for write/send-to
*/
typedef struct {
JanetListenerState head;
union {
JanetBuffer *buf;
const uint8_t *str;
} src;
int32_t start;
int is_buffer;
void *dest_abst;
#ifdef JANET_WINDOWS
WSAOVERLAPPED overlapped;
WSABUF wbuf;
DWORD flags;
#endif
} NetStateWrite;
JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
NetStateWrite *state = (NetStateWrite *) s;
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_MARK:
janet_mark(state->is_buffer
? janet_wrap_buffer(state->src.buf)
: janet_wrap_string(state->src.str));
if (state->dest_abst != NULL) {
janet_mark(janet_wrap_abstract(state->dest_abst));
}
break;
case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when write finished */
if (s->bytes == 0 && !state->dest_abst) {
janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE;
}
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
}
break;
case JANET_ASYNC_EVENT_USER: {
/* Begin write */
int32_t start, len;
const uint8_t *bytes;
start = state->start;
if (state->is_buffer) {
/* If buffer, convert to string. */
/* TODO - be more efficient about this */
JanetBuffer *buffer = state->src.buf;
JanetString str = janet_string(buffer->data, buffer->count);
bytes = str;
len = buffer->count;
state->is_buffer = 0;
state->src.str = str;
} else {
bytes = state->src.str;
len = janet_string_length(bytes);
}
state->wbuf.buf = (char *) bytes;
state->wbuf.len = len;
state->flags = 0;
s->tag = &state->overlapped;
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
int status;
SOCKET sock = (SOCKET) s->pollable->handle;
if (state->dest_abst) {
const struct sockaddr *to = state->dest_abst;
int tolen = (int) janet_abstract_size((void *) to);
status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
} else {
status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL);
}
if (status && WSA_IO_PENDING != WSAGetLastError()) {
janet_cancel(s->fiber, janet_cstringv("failed to write to stream"));
return JANET_ASYNC_STATUS_DONE;
}
}
break;
#else
case JANET_ASYNC_EVENT_WRITE: {
int32_t start, len;
const uint8_t *bytes;
start = state->start;
if (state->is_buffer) {
JanetBuffer *buffer = state->src.buf;
bytes = buffer->data;
len = buffer->count;
} else {
bytes = state->src.str;
len = janet_string_length(bytes);
}
JReadInt nwrote = 0;
if (start < len) {
int32_t nbytes = len - start;
void *dest_abst = state->dest_abst;
do {
if (dest_abst) {
nwrote = sendto(s->pollable->handle, bytes + start, nbytes, 0,
(struct sockaddr *) dest_abst, janet_abstract_size(dest_abst));
} else {
nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL);
}
} while (nwrote == -1 && JLASTERR == JEINTR);
/* Handle write errors */
if (nwrote == -1) {
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
janet_cancel(s->fiber, net_lasterr());
return JANET_ASYNC_STATUS_DONE;
}
/* Unless using datagrams, empty message is a disconnect */
if (nwrote == 0 && !dest_abst) {
janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE;
}
if (nwrote > 0) {
start += nwrote;
} else {
start = len;
}
}
state->start = start;
if (start >= len) {
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
}
break;
}
break;
#endif
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf, void *dest_abst) {
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL);
state->is_buffer = 1;
state->start = 0;
state->src.buf = buf;
state->dest_abst = dest_abst;
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str, void *dest_abst) {
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL);
state->is_buffer = 0;
state->start = 0;
state->src.str = str;
state->dest_abst = dest_abst;
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
/*
* State machine for simple server
*/
/* State machine for accepting connections. */
#ifdef JANET_WINDOWS
@@ -557,7 +117,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
if (state->astream) janet_mark(janet_wrap_abstract(state->astream));
if (state->function) janet_mark(janet_wrap_abstract(state->function));
break;
}
}
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
@@ -565,16 +125,16 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
int seconds;
int bytes = sizeof(seconds);
if (NO_ERROR != getsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_CONNECT_TIME,
(char *)&seconds, &bytes)) {
(char *)&seconds, &bytes)) {
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
return JANET_ASYNC_STATUS_DONE;
}
if (NO_ERROR != setsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char *)&(state->lstream->handle), sizeof(SOCKET))) {
if (NO_ERROR != setsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char *) & (state->lstream->handle), sizeof(SOCKET))) {
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
return JANET_ASYNC_STATUS_DONE;
}
Janet streamv = janet_wrap_abstract(state->astream);
if (state->function) {
/* Schedule worker */
@@ -587,7 +147,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
return JANET_ASYNC_STATUS_DONE;
}
} else {
janet_schedule(s->fiber, streamv);
janet_schedule(s->fiber, streamv);
return JANET_ASYNC_STATUS_DONE;
}
}
@@ -613,7 +173,7 @@ static int net_sched_accept_impl(NetStateAccept *state, Janet *err) {
SOCKET lsock = (SOCKET) state->lstream->handle;
SOCKET asock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (asock == INVALID_SOCKET) {
*err = net_lasterr();
*err = janet_ev_lasterr();
return 1;
}
JanetStream *astream = make_stream(asock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
@@ -622,7 +182,7 @@ static int net_sched_accept_impl(NetStateAccept *state, Janet *err) {
if (FALSE == AcceptEx(lsock, asock, state->buf, 0, socksize, socksize, NULL, &state->overlapped)) {
int code = WSAGetLastError();
if (code == WSA_IO_PENDING) return 0; /* indicates io is happening async */
*err = net_lasterr();
*err = janet_ev_lasterr();
return 1;
}
return 0;
@@ -648,7 +208,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_READ: {
JSock connfd = accept(s->pollable->handle, NULL, NULL);
JSock connfd = accept(s->stream->handle, NULL, NULL);
if (JSOCKVALID(connfd)) {
nosigpipe(connfd);
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
@@ -751,7 +311,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) {
#ifndef JANET_WINDOWS
/* no unix domain socket support on windows yet */
if (is_unix) {
void *abst = janet_abstract(&AddressAT, sizeof(struct sockaddr_un));
void *abst = janet_abstract(&janet_address_type, sizeof(struct sockaddr_un));
memcpy(abst, ai, sizeof(struct sockaddr_un));
Janet ret = janet_wrap_abstract(abst);
return make_arr ? janet_wrap_array(janet_array_n(&ret, 1)) : ret;
@@ -762,7 +322,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) {
JanetArray *arr = janet_array(10);
struct addrinfo *iter = ai;
while (NULL != iter) {
void *abst = janet_abstract(&AddressAT, iter->ai_addrlen);
void *abst = janet_abstract(&janet_address_type, iter->ai_addrlen);
memcpy(abst, iter->ai_addr, iter->ai_addrlen);
janet_array_push(arr, janet_wrap_abstract(abst));
iter = iter->ai_next;
@@ -774,7 +334,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) {
if (NULL == ai) {
janet_panic("no data for given address");
}
void *abst = janet_abstract(&AddressAT, ai->ai_addrlen);
void *abst = janet_abstract(&janet_address_type, ai->ai_addrlen);
memcpy(abst, ai->ai_addr, ai->ai_addrlen);
freeaddrinfo(ai);
return janet_wrap_abstract(abst);
@@ -928,29 +488,30 @@ static Janet cfun_net_listen(int32_t argc, Janet *argv) {
}
}
static void check_stream_flag(JanetStream *stream, int flag) {
if (!(stream->flags & flag) || (stream->flags & JANET_POLL_FLAG_CLOSED)) {
const char *msg = "";
if (flag == JANET_STREAM_READABLE) msg = "readable";
if (flag == JANET_STREAM_WRITABLE) msg = "writable";
if (flag == JANET_STREAM_ACCEPTABLE) msg = "server";
if (flag == JANET_STREAM_UDPSERVER) msg = "datagram server";
janet_panicf("bad stream, expected %s stream", msg);
void janet_stream_flags(JanetStream *stream, uint32_t flags) {
if ((stream->flags & flags) != flags || (stream->flags & JANET_STREAM_CLOSED)) {
const char *rmsg = "", *wmsg = "", *amsg = "", *dmsg = "", *smsg = "stream";
if (flags & JANET_STREAM_READABLE) rmsg = "readable ";
if (flags & JANET_STREAM_WRITABLE) wmsg = "writable ";
if (flags & JANET_STREAM_ACCEPTABLE) amsg = "server ";
if (flags & JANET_STREAM_UDPSERVER) dmsg = "datagram ";
if (flags & JANET_STREAM_SOCKET) smsg = "socket";
janet_panicf("bad stream, expected %s%s%s%s%s", rmsg, wmsg, amsg, dmsg, smsg);
}
}
static Janet cfun_stream_accept_loop(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_ACCEPTABLE | JANET_STREAM_SOCKET);
JanetFunction *fun = janet_getfunction(argv, 1);
janet_sched_accept(stream, fun);
}
static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 2);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_ACCEPTABLE | JANET_STREAM_SOCKET);
double to = janet_optnumber(argv, argc, 1, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_accept(stream, NULL);
@@ -958,85 +519,77 @@ static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
static Janet cfun_stream_read(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_READABLE);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_read(stream, buffer, n);
janet_ev_recv(stream, buffer, n, MSG_NOSIGNAL);
janet_await();
}
static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_READABLE);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_chunk(stream, buffer, n);
janet_ev_recvchunk(stream, buffer, n, MSG_NOSIGNAL);
janet_await();
}
static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) {
janet_arity(argc, 3, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_UDPSERVER);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_UDPSERVER | JANET_STREAM_SOCKET);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_getbuffer(argv, 2);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_recv_from(stream, buffer, n);
}
static Janet cfun_stream_close(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
janet_stream_close(stream, 0);
return janet_wrap_nil();
}
static Janet cfun_stream_closed(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
return janet_wrap_boolean(stream->flags & JANET_POLL_FLAG_CLOSED);
janet_ev_recvfrom(stream, buffer, n, MSG_NOSIGNAL);
janet_await();
}
static Janet cfun_stream_write(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 3);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_WRITABLE);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_WRITABLE | JANET_STREAM_SOCKET);
double to = janet_optnumber(argv, argc, 2, INFINITY);
if (janet_checktype(argv[1], JANET_BUFFER)) {
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_buffer(stream, janet_getbuffer(argv, 1), NULL);
janet_ev_send_buffer(stream, janet_getbuffer(argv, 1), MSG_NOSIGNAL);
} else {
JanetByteView bytes = janet_getbytes(argv, 1);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_stringlike(stream, bytes.bytes, NULL);
janet_ev_send_string(stream, bytes.bytes, MSG_NOSIGNAL);
}
janet_await();
}
static Janet cfun_stream_send_to(int32_t argc, Janet *argv) {
janet_arity(argc, 3, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_UDPSERVER);
void *dest = janet_getabstract(argv, 1, &AddressAT);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_UDPSERVER | JANET_STREAM_SOCKET);
void *dest = janet_getabstract(argv, 1, &janet_address_type);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (janet_checktype(argv[2], JANET_BUFFER)) {
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_buffer(stream, janet_getbuffer(argv, 2), dest);
janet_ev_sendto_buffer(stream, janet_getbuffer(argv, 2), dest, MSG_NOSIGNAL);
} else {
JanetByteView bytes = janet_getbytes(argv, 2);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_stringlike(stream, bytes.bytes, dest);
janet_ev_sendto_string(stream, bytes.bytes, dest, MSG_NOSIGNAL);
}
janet_await();
}
static Janet cfun_stream_flush(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_WRITABLE);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_WRITABLE | JANET_STREAM_SOCKET);
/* Toggle no delay flag */
int flag = 1;
setsockopt((JSock) stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
@@ -1045,10 +598,9 @@ static Janet cfun_stream_flush(int32_t argc, Janet *argv) {
return argv[0];
}
static const JanetMethod stream_methods[] = {
static const JanetMethod net_stream_methods[] = {
{"chunk", cfun_stream_chunk},
{"close", cfun_stream_close},
{"closed?", cfun_stream_closed},
{"close", janet_cfun_stream_close},
{"read", cfun_stream_read},
{"write", cfun_stream_write},
{"flush", cfun_stream_flush},
@@ -1056,13 +608,15 @@ static const JanetMethod stream_methods[] = {
{"accept-loop", cfun_stream_accept_loop},
{"send-to", cfun_stream_send_to},
{"recv-from", cfun_stream_recv_from},
{"recv-from", cfun_stream_recv_from},
{"evread", janet_cfun_stream_read},
{"evchunk", janet_cfun_stream_chunk},
{"evwrite", janet_cfun_stream_write},
{NULL, NULL}
};
static int janet_stream_getter(void *p, Janet key, Janet *out) {
(void) p;
if (!janet_checktype(key, JANET_KEYWORD)) return 0;
return janet_getmethod(janet_unwrap_keyword(key), stream_methods, out);
static JanetStream *make_stream(JSock handle, uint32_t flags) {
return janet_stream((JanetHandle) handle, flags | JANET_STREAM_SOCKET, net_stream_methods);
}
static const JanetReg net_cfuns[] = {
@@ -1071,7 +625,7 @@ static const JanetReg net_cfuns[] = {
JDOC("(net/address host port &opt type)\n\n"
"Look up the connection information for a given hostname, port, and connection type. Returns "
"a handle that can be used to send datagrams over network without establishing a connection. "
"On Posix platforms, you can use :unix for host to connet to a unix domain socket, where the name is "
"On Posix platforms, you can use :unix for host to connect to a unix domain socket, where the name is "
"given in the port argument. On Linux, abstract "
"unix domain sockets are specified with a leading '@' character in port.")
},
@@ -1137,16 +691,6 @@ static const JanetReg net_cfuns[] = {
"Make sure that a stream is not buffering any data. This temporarily disables Nagle's algorithm. "
"Use this to make sure data is sent without delay. Returns stream.")
},
{
"net/close", cfun_stream_close,
JDOC("(net/close stream)\n\n"
"Close a stream so that no further communication can occur.")
},
{
"net/closed?", cfun_stream_closed,
JDOC("(net/closed? stream)\n\n"
"Check if a stream is closed.")
},
{
"net/connect", cfun_net_connect,
JDOC("(net/connect host porti &opt type)\n\n"

View File

@@ -585,6 +585,14 @@ void janet_threads_deinit(void) {
janet_vm_thread_decode = NULL;
}
JanetThread *janet_thread_current(void) {
if (NULL == janet_vm_thread_current) {
janet_vm_thread_current = janet_make_thread(janet_vm_mailbox, janet_get_core_table("make-image-dict"));
janet_gcroot(janet_wrap_abstract(janet_vm_thread_current));
}
return janet_vm_thread_current;
}
/*
* Cfuns
*/
@@ -592,11 +600,7 @@ void janet_threads_deinit(void) {
static Janet cfun_thread_current(int32_t argc, Janet *argv) {
(void) argv;
janet_fixarity(argc, 0);
if (NULL == janet_vm_thread_current) {
janet_vm_thread_current = janet_make_thread(janet_vm_mailbox, janet_get_core_table("make-image-dict"));
janet_gcroot(janet_wrap_abstract(janet_vm_thread_current));
}
return janet_wrap_abstract(janet_vm_thread_current);
return janet_wrap_abstract(janet_thread_current());
}
static Janet cfun_thread_new(int32_t argc, Janet *argv) {

View File

@@ -140,6 +140,7 @@ void janet_lib_thread(JanetTable *env);
#endif
#ifdef JANET_NET
void janet_lib_net(JanetTable *env);
extern const JanetAbstractType janet_address_type;
#endif
#ifdef JANET_EV
void janet_lib_ev(JanetTable *env);

View File

@@ -1376,7 +1376,6 @@ static JanetSignal janet_continue_no_check(JanetFiber *fiber, Janet in, Janet *o
/* Normal setup */
if (janet_vm_root_fiber == NULL) janet_vm_root_fiber = fiber;
janet_vm_fiber = fiber;
janet_gcroot(janet_wrap_fiber(fiber));
janet_fiber_set_status(fiber, JANET_STATUS_ALIVE);
signal = run_vm(fiber, in);
}
@@ -1384,7 +1383,6 @@ static JanetSignal janet_continue_no_check(JanetFiber *fiber, Janet in, Janet *o
/* Restore */
if (janet_vm_root_fiber == fiber) janet_vm_root_fiber = NULL;
janet_fiber_set_status(fiber, signal);
janet_gcunroot(janet_wrap_fiber(fiber));
janet_restore(&tstate);
*out = tstate.payload;

View File

@@ -499,15 +499,22 @@ typedef void *JanetAbstract;
/* Event Loop Types */
#ifdef JANET_EV
#define JANET_POLL_FLAG_CLOSED 0x1
#define JANET_POLL_FLAG_SOCKET 0x2
#define JANET_POLL_FLAG_IOCP 0x4
#define JANET_STREAM_CLOSED 0x1
#define JANET_STREAM_SOCKET 0x2
#define JANET_STREAM_IOCP 0x4
#define JANET_STREAM_READABLE 0x200
#define JANET_STREAM_WRITABLE 0x400
#define JANET_STREAM_ACCEPTABLE 0x800
#define JANET_STREAM_UDPSERVER 0x1000
typedef enum {
JANET_ASYNC_EVENT_INIT,
JANET_ASYNC_EVENT_MARK,
JANET_ASYNC_EVENT_DEINIT,
JANET_ASYNC_EVENT_CLOSE,
JANET_ASYNC_EVENT_ERR,
JANET_ASYNC_EVENT_HUP,
JANET_ASYNC_EVENT_READ,
JANET_ASYNC_EVENT_WRITE,
JANET_ASYNC_EVENT_TIMEOUT,
@@ -517,7 +524,6 @@ typedef enum {
#define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ)
#define JANET_ASYNC_LISTEN_WRITE (1 << JANET_ASYNC_EVENT_WRITE)
#define JANET_ASYNC_LISTEN_SPAWNER 0x1000
typedef enum {
JANET_ASYNC_STATUS_NOT_DONE,
@@ -526,18 +532,19 @@ typedef enum {
/* Typedefs */
typedef struct JanetListenerState JanetListenerState;
typedef struct JanetPollable JanetPollable;
typedef struct JanetStream JanetStream;
typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEvent event);
/* Wrapper around file descriptors and HANDLEs that can be polled. */
struct JanetPollable {
struct JanetStream {
JanetHandle handle;
uint32_t flags;
/* Linked list of all in-flight IO routines for this pollable */
/* Linked list of all in-flight IO routines for this stream */
JanetListenerState *state;
/* internal - used to disallow multiple concurrent reads / writes on the same pollable.
const void *methods; /* Methods for this stream */
/* internal - used to disallow multiple concurrent reads / writes on the same stream.
* this constraint may be lifted later but allowing such would require more internal book keeping
* for some implementations. You can read and write at the same time on the same pollable, though. */
* for some implementations. You can read and write at the same time on the same stream, though. */
int _mask;
};
@@ -545,7 +552,7 @@ struct JanetPollable {
struct JanetListenerState {
JanetListener machine;
JanetFiber *fiber;
JanetPollable *pollable;
JanetStream *stream;
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
implementation of the event loop and the particular event. */
#ifdef JANET_WINDOWS
@@ -1248,28 +1255,56 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT];
#ifdef JANET_EV
extern JANET_API const JanetAbstractType janet_stream_type;
/* Run the event loop */
JANET_API void janet_loop(void);
/* Wrapper around pollables */
JANET_API void janet_pollable_init(JanetPollable *pollable, JanetHandle handle);
JANET_API void janet_pollable_mark(JanetPollable *pollable);
JANET_API void janet_pollable_deinit(JanetPollable *pollable);
/* Wrapper around streams */
JANET_API JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods);
JANET_API void janet_stream_close(JanetStream *stream);
JANET_API Janet janet_cfun_stream_close(int32_t argc, Janet *argv);
JANET_API Janet janet_cfun_stream_read(int32_t argc, Janet *argv);
JANET_API Janet janet_cfun_stream_chunk(int32_t argc, Janet *argv);
JANET_API Janet janet_cfun_stream_write(int32_t argc, Janet *argv);
JANET_API void janet_stream_flags(JanetStream *stream, uint32_t flags);
/* Queue a fiber to run on the event loop */
JANET_API void janet_schedule(JanetFiber *fiber, Janet value);
JANET_API void janet_cancel(JanetFiber *fiber, Janet value);
JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig);
/* Start a state machine listening for events from a pollable */
JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user);
/* Start a state machine listening for events from a stream */
JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user);
/* Shorthand for yielding to event loop in C */
JANET_NO_RETURN JANET_API void janet_await(void);
/* For use inside listeners - adds a timeout to the current fiber, such that
* it will be resumed after sec seconds if no other event schedules the current fiber. */
void janet_addtimeout(double sec);
JANET_API void janet_addtimeout(double sec);
/* Get last error from a an IO operation */
JANET_API Janet janet_ev_lasterr(void);
/* Read async from a stream */
JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
#ifdef JANET_NET
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
#endif
/* Write async to a stream */
JANET_API void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf);
JANET_API void janet_ev_write_string(JanetStream *stream, JanetString str);
#ifdef JANET_NET
JANET_API void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags);
JANET_API void janet_ev_send_string(JanetStream *stream, JanetString str, int flags);
JANET_API void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags);
JANET_API void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags);
#endif
#endif
@@ -1581,6 +1616,9 @@ JANET_API Janet janet_resolve_core(const char *name);
/* New C API */
/* Shorthand for janet C function declarations */
#define JANET_CFUN(name) Janet name (int32_t argc, Janet *argv)
/* Allow setting entry name for static libraries */
#ifdef __cplusplus
#define JANET_MODULE_PREFIX extern "C"

View File

@@ -15,8 +15,8 @@
(def truncated
(if (> (length e) 40) (string (string/slice e 0 35) "...") (string e)))
(if x
(xprintf stdout "\e[32m✔\e[0m %s: %v" truncated x)
(xprintf stdout "\n\e[31m✘\e[0m %s: %v" truncated x))
(eprintf "\e[32m✔\e[0m %s: %v" truncated x)
(eprintf "\n\e[31m✘\e[0m %s: %v" truncated x))
x)
(defmacro assert-error
@@ -32,10 +32,10 @@
(defn start-suite [x]
(set suite-num x)
(set start-time (os/clock))
(print "\nRunning test suite " x " tests...\n "))
(eprint "\nRunning test suite " x " tests...\n "))
(defn end-suite []
(def delta (- (os/clock) start-time))
(printf "\n\nTest suite %d finished in %.3f seconds" suite-num delta)
(print num-tests-passed " of " num-tests-run " tests passed.\n")
(eprintf "\n\nTest suite %d finished in %.3f seconds" suite-num delta)
(eprint num-tests-passed " of " num-tests-run " tests passed.\n")
(if (not= num-tests-passed num-tests-run) (os/exit 1)))

View File

@@ -33,16 +33,9 @@
(:write stream b)
(buffer/clear b)))
(def s (net/server "127.0.0.1" "8000"))
(def s (net/server "127.0.0.1" "8000" handler))
(assert s "made server 1")
(ev/go
(coro
(while (not (net/closed? s))
(def conn (net/accept s))
(unless conn (break))
(ev/call handler conn))))
(defn test-echo [msg]
(with [conn (net/connect "127.0.0.1" "8000")]
(:write conn msg)
@@ -55,4 +48,19 @@
(:close s)
# Create pipe
(var pipe-counter 0)
(def [reader writer] (ev/pipe))
(ev/spawn
(while (ev/read reader 3)
(++ pipe-counter))
(assert (= 20 pipe-counter) "ev/pipe 1"))
(for i 0 10
(ev/write writer "xxx---"))
(ev/close writer)
(ev/sleep 0.1)
(end-suite)