mirror of
https://github.com/janet-lang/janet
synced 2025-01-13 09:00:26 +00:00
Move read/write functions into ev.c from net.c
This code can also be used for non-network streams.
This commit is contained in:
parent
761273bcc4
commit
b3e88a8d80
@ -2733,6 +2733,12 @@
|
||||
(if (dyn sym)
|
||||
form))
|
||||
|
||||
(guarddef ev/go
|
||||
(defmacro ev/spawn
|
||||
"Run some code in a new fiber. This is shorthand for (ev/call (fn [] ;body))."
|
||||
[& body]
|
||||
~(,ev/call (fn [] ,;body))))
|
||||
|
||||
(guarddef net/listen
|
||||
(defn net/server
|
||||
"Start a server asynchornously with net/listen and net/accept-loop. Returns the new server stream."
|
||||
|
467
src/core/ev.c
467
src/core/ev.c
@ -32,14 +32,11 @@
|
||||
#ifdef JANET_EV
|
||||
|
||||
/* Includes */
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
|
||||
#include <winsock2.h>
|
||||
#include <windows.h>
|
||||
#include <math.h>
|
||||
|
||||
#else
|
||||
|
||||
#include <limits.h>
|
||||
#include <errno.h>
|
||||
#include <unistd.h>
|
||||
@ -47,20 +44,16 @@
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/types.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
#ifdef JANET_BSD
|
||||
#include <netinet/in.h>
|
||||
#endif
|
||||
|
||||
#include <netinet/tcp.h>
|
||||
#include <netdb.h>
|
||||
#include <sys/socket.h>
|
||||
#ifdef JANET_EV_EPOLL
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/timerfd.h>
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
||||
/* General queue */
|
||||
|
||||
/* Ring buffer for storing a list of fibers */
|
||||
typedef struct {
|
||||
int32_t capacity;
|
||||
@ -1050,6 +1043,458 @@ void janet_ev_deinit(void) {
|
||||
|
||||
#endif
|
||||
|
||||
/* C API helpers for reading and writing from streams.
|
||||
* There is some networking code in here as well as generic
|
||||
* reading and writing primitives. */
|
||||
|
||||
|
||||
/* When there is an IO error, we need to be able to convert it to a Janet
|
||||
* string to raise a Janet error. */
|
||||
#ifdef JANET_WINDOWS
|
||||
#define JANET_EV_CHUNKSIZE 4096
|
||||
Janet janet_ev_lasterr(void) {
|
||||
int code = GetLastError();
|
||||
char msgbuf[256];
|
||||
msgbuf[0] = '\0';
|
||||
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
|
||||
NULL,
|
||||
code,
|
||||
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
|
||||
msgbuf,
|
||||
sizeof(msgbuf),
|
||||
NULL);
|
||||
if (!*msgbuf) sprintf(msgbuf, "%d", code);
|
||||
char *c = msgbuf;
|
||||
while (*c) {
|
||||
if (*c == '\n' || *c == '\r') {
|
||||
*c = '\0';
|
||||
break;
|
||||
}
|
||||
c++;
|
||||
}
|
||||
return janet_cstringv(msgbuf);
|
||||
}
|
||||
#else
|
||||
Janet janet_ev_lasterr(void) {
|
||||
return janet_cstringv(strerror(errno));
|
||||
}
|
||||
#endif
|
||||
|
||||
/* State machine for read/recv/recvfrom */
|
||||
|
||||
typedef enum {
|
||||
JANET_ASYNC_READMODE_READ,
|
||||
JANET_ASYNC_READMODE_RECV,
|
||||
JANET_ASYNC_READMODE_RECVFROM
|
||||
} JanetReadMode;
|
||||
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
int32_t bytes_left;
|
||||
JanetBuffer *buf;
|
||||
int is_chunk;
|
||||
JanetReadMode mode;
|
||||
#ifdef JANET_WINDOWS
|
||||
OVERLAPPED overlapped;
|
||||
#ifdef JANET_NET
|
||||
WSABUF wbuf;
|
||||
DWORD flags;
|
||||
struct sockaddr from;
|
||||
int fromlen;
|
||||
#endif
|
||||
uint8_t chunk_buf[JANET_EV_CHUNKSIZE];
|
||||
#else
|
||||
int flags;
|
||||
#endif
|
||||
} StateRead;
|
||||
|
||||
JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
|
||||
StateRead *state = (StateRead *) s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK:
|
||||
janet_mark(janet_wrap_buffer(state->buf));
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_cancel(s->fiber, janet_cstringv("stream closed"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
#ifdef JANET_WINDOWS
|
||||
case JANET_ASYNC_EVENT_COMPLETE: {
|
||||
/* Called when read finished */
|
||||
if (s->bytes == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes);
|
||||
state->bytes_left -= s->bytes;
|
||||
|
||||
if (state->bytes_left <= 0 || !state->is_chunk) {
|
||||
Janet resume_val;
|
||||
#ifdef JANET_NET
|
||||
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
|
||||
void *abst = janet_abstract(&janet_address_type, state->fromlen);
|
||||
memcpy(abst, &state->from, state->fromlen);
|
||||
resume_val = janet_wrap_abstract(abst);
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
resume_val = janet_wrap_buffer(state->buf);
|
||||
}
|
||||
janet_schedule(s->fiber, resume_val);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
|
||||
/* fallthrough */
|
||||
case JANET_ASYNC_EVENT_USER: {
|
||||
int32_t chunk_size = state->bytes_left > JANET_EV_CHUNKSIZE ? JANET_EV_CHUNKSIZE : state->bytes_left;
|
||||
s->tag = &state->overlapped;
|
||||
memset(&(state->overlapped), 0, sizeof(OVERLAPPED));
|
||||
int status;
|
||||
#ifdef JANET_NET
|
||||
if (state->mode != JANET_ASYNC_READMODE_READ) {
|
||||
state->wbuf.len = (ULONG) chunk_size;
|
||||
state->wbuf.buf = state->chunk_buf;
|
||||
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
|
||||
status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
|
||||
} else
|
||||
}
|
||||
status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL);
|
||||
}
|
||||
if (status && (WSA_IO_PENDING != WSAGetLastError())) {
|
||||
janet_cancel(s->fiber, janet_ev_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
status = ReadFile(s->pollable->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped);
|
||||
if (status && (ERROR_IO_PENDING != WSAGetLastError())) {
|
||||
janet_cancel(s->fiber, janet_ev_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
#else
|
||||
case JANET_ASYNC_EVENT_READ:
|
||||
/* Read in bytes */
|
||||
{
|
||||
JanetBuffer *buffer = state->buf;
|
||||
int32_t bytes_left = state->bytes_left;
|
||||
janet_buffer_extra(buffer, bytes_left);
|
||||
ssize_t nread;
|
||||
#ifdef JANET_NET
|
||||
char saddr[256];
|
||||
socklen_t socklen = sizeof(saddr);
|
||||
#endif
|
||||
do {
|
||||
#ifdef JANET_NET
|
||||
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
|
||||
nread = recvfrom(s->pollable->handle, buffer->data + buffer->count, bytes_left, state->flags,
|
||||
(struct sockaddr *)&saddr, &socklen);
|
||||
} else if (state->mode == JANET_ASYNC_READMODE_RECV) {
|
||||
nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, state->flags);
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
nread = read(s->pollable->handle, buffer->data + buffer->count, bytes_left);
|
||||
}
|
||||
} while (nread == -1 && errno == EINTR);
|
||||
|
||||
/* Check for errors - special case errors that can just be waited on to fix */
|
||||
if (nread == -1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
|
||||
janet_cancel(s->fiber, janet_ev_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
|
||||
if (nread == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Increment buffer counts */
|
||||
if (nread > 0) {
|
||||
buffer->count += nread;
|
||||
bytes_left -= nread;
|
||||
} else {
|
||||
bytes_left = 0;
|
||||
}
|
||||
state->bytes_left = bytes_left;
|
||||
|
||||
/* Resume if done */
|
||||
if (!state->is_chunk || bytes_left == 0) {
|
||||
Janet resume_val;
|
||||
#ifdef JANET_NET
|
||||
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
|
||||
void *abst = janet_abstract(&janet_address_type, socklen);
|
||||
memcpy(abst, &saddr, socklen);
|
||||
resume_val = janet_wrap_abstract(abst);
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
resume_val = janet_wrap_buffer(buffer);
|
||||
}
|
||||
janet_schedule(s->fiber, resume_val);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
#endif
|
||||
}
|
||||
return JANET_ASYNC_STATUS_NOT_DONE;
|
||||
}
|
||||
|
||||
static void janet_ev_read_generic(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) {
|
||||
StateRead *state = (StateRead *) janet_listen(stream, ev_machine_read,
|
||||
JANET_ASYNC_LISTEN_READ, sizeof(StateRead), NULL);
|
||||
state->is_chunk = is_chunked;
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
state->mode = mode;
|
||||
#ifdef JANET_WINDOWS
|
||||
ev_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
state->flags = (DWORD) flags;
|
||||
#else
|
||||
state->flags = flags;
|
||||
#endif
|
||||
}
|
||||
|
||||
void janet_ev_read(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes) {
|
||||
janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_READ, 0);
|
||||
}
|
||||
void janet_ev_readchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes) {
|
||||
janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_READ, 0);
|
||||
}
|
||||
#ifdef JANET_NET
|
||||
void janet_ev_recv(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
|
||||
janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECV, flags);
|
||||
}
|
||||
void janet_ev_recvchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
|
||||
janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_RECV, flags);
|
||||
}
|
||||
void janet_ev_recvfrom(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
|
||||
janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECVFROM, flags);
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* State machine for write/send/send-to
|
||||
*/
|
||||
|
||||
typedef enum {
|
||||
JANET_ASYNC_WRITEMODE_WRITE,
|
||||
JANET_ASYNC_WRITEMODE_SEND,
|
||||
JANET_ASYNC_WRITEMODE_SENDTO
|
||||
} JanetWriteMode;
|
||||
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
union {
|
||||
JanetBuffer *buf;
|
||||
const uint8_t *str;
|
||||
} src;
|
||||
int is_buffer;
|
||||
JanetWriteMode mode;
|
||||
void *dest_abst;
|
||||
#ifdef JANET_WINDOWS
|
||||
OVERLAPPED overlapped;
|
||||
#ifdef JANET_NET
|
||||
WSABUF wbuf;
|
||||
DWORD flags;
|
||||
#endif
|
||||
#else
|
||||
int flags;
|
||||
int32_t start;
|
||||
#endif
|
||||
} StateWrite;
|
||||
|
||||
JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
|
||||
StateWrite *state = (StateWrite *) s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK:
|
||||
janet_mark(state->is_buffer
|
||||
? janet_wrap_buffer(state->src.buf)
|
||||
: janet_wrap_string(state->src.str));
|
||||
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
|
||||
janet_mark(janet_wrap_abstract(state->dest_abst));
|
||||
}
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_cancel(s->fiber, janet_cstringv("stream closed"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
#ifdef JANET_WINDOWS
|
||||
case JANET_ASYNC_EVENT_COMPLETE: {
|
||||
/* Called when write finished */
|
||||
if (s->bytes == 0 && (state->mode != JANET_ASYNC_WRITEMODE_SENDTO)) {
|
||||
janet_cancel(s->fiber, janet_cstringv("disconnect"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_USER: {
|
||||
/* Begin write */
|
||||
int32_t start, len;
|
||||
const uint8_t *bytes;
|
||||
if (state->is_buffer) {
|
||||
/* If buffer, convert to string. */
|
||||
/* TODO - be more efficient about this */
|
||||
JanetBuffer *buffer = state->src.buf;
|
||||
JanetString str = janet_string(buffer->data, buffer->count);
|
||||
bytes = str;
|
||||
len = buffer->count;
|
||||
state->is_buffer = 0;
|
||||
state->src.str = str;
|
||||
} else {
|
||||
bytes = state->src.str;
|
||||
len = janet_string_length(bytes);
|
||||
}
|
||||
s->tag = &state->overlapped;
|
||||
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
|
||||
|
||||
int status;
|
||||
#ifdef JANET_NET
|
||||
if (state->mode != JANET_ASYNC_WRITEMODE_WRITE) {
|
||||
SOCKET sock = (SOCKET) s->pollable->handle;
|
||||
state->wbuf.buf = (char *) bytes;
|
||||
state->wbuf.len = len;
|
||||
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
|
||||
const struct sockaddr *to = state->dest_abst;
|
||||
int tolen = (int) janet_abstract_size((void *) to);
|
||||
status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
|
||||
} else {
|
||||
status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL);
|
||||
}
|
||||
if (status && (WSA_IO_PENDING != WSAGetLastError())) {
|
||||
janet_cancel(s->fiber, janet_cstringv("failed to write to stream"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
status = WriteFile(s->pollable->handle, bytes, len, NULL, &state->overlapped);
|
||||
if (status && (ERROR_IO_PENDING != WSAGetLastError())) {
|
||||
janet_cancel(s->fiber, janet_cstringv("failed to write to stream"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
#else
|
||||
case JANET_ASYNC_EVENT_WRITE: {
|
||||
int32_t start, len;
|
||||
const uint8_t *bytes;
|
||||
start = state->start;
|
||||
if (state->is_buffer) {
|
||||
JanetBuffer *buffer = state->src.buf;
|
||||
bytes = buffer->data;
|
||||
len = buffer->count;
|
||||
} else {
|
||||
bytes = state->src.str;
|
||||
len = janet_string_length(bytes);
|
||||
}
|
||||
ssize_t nwrote = 0;
|
||||
if (start < len) {
|
||||
int32_t nbytes = len - start;
|
||||
void *dest_abst = state->dest_abst;
|
||||
do {
|
||||
#ifdef JANET_NET
|
||||
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
|
||||
nwrote = sendto(s->pollable->handle, bytes + start, nbytes, state->flags,
|
||||
(struct sockaddr *) dest_abst, janet_abstract_size(dest_abst));
|
||||
} else if (state->mode == JANET_ASYNC_WRITEMODE_SEND) {
|
||||
nwrote = send(s->pollable->handle, bytes + start, nbytes, state->flags);
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
nwrote = write(s->pollable->handle, bytes + start, nbytes);
|
||||
}
|
||||
} while (nwrote == -1 && errno == EINTR);
|
||||
|
||||
/* Handle write errors */
|
||||
if (nwrote == -1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
|
||||
janet_cancel(s->fiber, janet_ev_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Unless using datagrams, empty message is a disconnect */
|
||||
if (nwrote == 0 && !dest_abst) {
|
||||
janet_cancel(s->fiber, janet_cstringv("disconnect"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
if (nwrote > 0) {
|
||||
start += nwrote;
|
||||
} else {
|
||||
start = len;
|
||||
}
|
||||
}
|
||||
state->start = start;
|
||||
if (start >= len) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
#endif
|
||||
}
|
||||
return JANET_ASYNC_STATUS_NOT_DONE;
|
||||
}
|
||||
|
||||
static void janet_ev_write_generic(JanetPollable *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) {
|
||||
StateWrite *state = (StateWrite *) janet_listen(stream, ev_machine_write,
|
||||
JANET_ASYNC_LISTEN_WRITE, sizeof(StateWrite), NULL);
|
||||
state->is_buffer = is_buffer;
|
||||
state->src.buf = buf;
|
||||
state->dest_abst = dest_abst;
|
||||
state->mode = mode;
|
||||
#ifdef JANET_WINDOWS
|
||||
state->flags = (DWORD) flags;
|
||||
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
#else
|
||||
state->start = 0;
|
||||
state->flags = flags;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
void janet_ev_write_buffer(JanetPollable *stream, JanetBuffer *buf) {
|
||||
janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_WRITE, 1, 0);
|
||||
}
|
||||
|
||||
void janet_ev_write_string(JanetPollable *stream, JanetString str) {
|
||||
janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_WRITE, 0, 0);
|
||||
}
|
||||
|
||||
#ifdef JANET_NET
|
||||
void janet_ev_send_buffer(JanetPollable *stream, JanetBuffer *buf, int flags) {
|
||||
janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_SEND, 1, flags);
|
||||
}
|
||||
|
||||
void janet_ev_send_string(JanetPollable *stream, JanetString str, int flags) {
|
||||
janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_SEND, 0, flags);
|
||||
}
|
||||
|
||||
void janet_ev_sendto_buffer(JanetPollable *stream, JanetBuffer *buf, void *dest, int flags) {
|
||||
janet_ev_write_generic(stream, buf, dest, JANET_ASYNC_WRITEMODE_SENDTO, 1, flags);
|
||||
}
|
||||
|
||||
void janet_ev_sendto_string(JanetPollable *stream, JanetString str, void *dest, int flags) {
|
||||
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
|
||||
}
|
||||
#endif
|
||||
|
||||
/* C functions */
|
||||
|
||||
static Janet cfun_ev_go(int32_t argc, Janet *argv) {
|
||||
|
413
src/core/net.c
413
src/core/net.c
@ -72,23 +72,15 @@ static const JanetAbstractType StreamAT = {
|
||||
|
||||
typedef JanetPollable JanetStream;
|
||||
|
||||
static const JanetAbstractType AddressAT = {
|
||||
const JanetAbstractType janet_address_type = {
|
||||
"core/socket-address",
|
||||
JANET_ATEND_NAME
|
||||
};
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
#define JANET_NET_CHUNKSIZE 4096
|
||||
#define JSOCKCLOSE(x) closesocket((SOCKET) x)
|
||||
#define JSOCKDEFAULT INVALID_SOCKET
|
||||
#define JLASTERR WSAGetLastError()
|
||||
#define JSOCKVALID(x) ((x) != INVALID_SOCKET)
|
||||
#define JEINTR WSAEINTR
|
||||
#define JEWOULDBLOCK WSAEWOULDBLOCK
|
||||
#define JEAGAIN WSAEWOULDBLOCK
|
||||
#define JPOLL WSAPoll
|
||||
#define JSock SOCKET
|
||||
#define JReadInt long
|
||||
#define JSOCKFLAGS 0
|
||||
static JanetStream *make_stream(SOCKET fd, uint32_t flags) {
|
||||
u_long iMode = 0;
|
||||
@ -98,39 +90,11 @@ static JanetStream *make_stream(SOCKET fd, uint32_t flags) {
|
||||
stream->flags = flags;
|
||||
return stream;
|
||||
}
|
||||
static Janet net_lasterr(void) {
|
||||
int code = WSAGetLastError();
|
||||
char msgbuf[256];
|
||||
msgbuf[0] = '\0';
|
||||
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
|
||||
NULL,
|
||||
code,
|
||||
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
|
||||
msgbuf,
|
||||
sizeof(msgbuf),
|
||||
NULL);
|
||||
if (!*msgbuf) sprintf(msgbuf, "%d", code);
|
||||
char *c = msgbuf;
|
||||
while (*c) {
|
||||
if (*c == '\n' || *c == '\r') {
|
||||
*c = '\0';
|
||||
break;
|
||||
}
|
||||
c++;
|
||||
}
|
||||
return janet_cstringv(msgbuf);
|
||||
}
|
||||
#else
|
||||
#define JSOCKCLOSE(x) close(x)
|
||||
#define JSOCKDEFAULT 0
|
||||
#define JLASTERR errno
|
||||
#define JSOCKVALID(x) ((x) >= 0)
|
||||
#define JEINTR EINTR
|
||||
#define JEWOULDBLOCK EWOULDBLOCK
|
||||
#define JEAGAIN EAGAIN
|
||||
#define JPOLL poll
|
||||
#define JSock int
|
||||
#define JReadInt ssize_t
|
||||
#ifdef SOCK_CLOEXEC
|
||||
#define JSOCKFLAGS SOCK_CLOEXEC
|
||||
#else
|
||||
@ -148,9 +112,6 @@ static JanetStream *make_stream(int fd, uint32_t flags) {
|
||||
stream->flags = flags;
|
||||
return stream;
|
||||
}
|
||||
static Janet net_lasterr(void) {
|
||||
return janet_cstringv(strerror(errno));
|
||||
}
|
||||
#endif
|
||||
|
||||
/* We pass this flag to all send calls to prevent sigpipe */
|
||||
@ -186,351 +147,6 @@ static int janet_stream_mark(void *p, size_t s) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* State machine for read
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
int32_t bytes_left;
|
||||
JanetBuffer *buf;
|
||||
int is_chunk;
|
||||
int is_recv_from;
|
||||
#ifdef JANET_WINDOWS
|
||||
WSAOVERLAPPED overlapped;
|
||||
WSABUF wbuf;
|
||||
DWORD flags;
|
||||
int32_t chunk_size;
|
||||
struct sockaddr from;
|
||||
int fromlen;
|
||||
uint8_t chunk_buf[JANET_NET_CHUNKSIZE];
|
||||
#endif
|
||||
} NetStateRead;
|
||||
|
||||
JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
|
||||
NetStateRead *state = (NetStateRead *) s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK:
|
||||
janet_mark(janet_wrap_buffer(state->buf));
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_cancel(s->fiber, janet_cstringv("stream closed"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
#ifdef JANET_WINDOWS
|
||||
case JANET_ASYNC_EVENT_COMPLETE: {
|
||||
/* Called when read finished */
|
||||
if (s->bytes == 0 && !state->is_recv_from) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes);
|
||||
state->bytes_left -= s->bytes;
|
||||
|
||||
if (state->bytes_left <= 0 || !state->is_chunk) {
|
||||
Janet resume_val;
|
||||
if (state->is_recv_from) {
|
||||
void *abst = janet_abstract(&AddressAT, state->fromlen);
|
||||
memcpy(abst, &state->from, state->fromlen);
|
||||
resume_val = janet_wrap_abstract(abst);
|
||||
} else {
|
||||
resume_val = janet_wrap_buffer(state->buf);
|
||||
}
|
||||
janet_schedule(s->fiber, resume_val);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
|
||||
/* fallthrough */
|
||||
case JANET_ASYNC_EVENT_USER: {
|
||||
state->flags = 0;
|
||||
int32_t chunk_size = state->bytes_left > JANET_NET_CHUNKSIZE ? JANET_NET_CHUNKSIZE : state->bytes_left;
|
||||
state->wbuf.len = (ULONG) chunk_size;
|
||||
state->wbuf.buf = state->chunk_buf;
|
||||
state->chunk_size = chunk_size;
|
||||
s->tag = &state->overlapped;
|
||||
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
|
||||
int status;
|
||||
if (state->is_recv_from) {
|
||||
status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
|
||||
} else {
|
||||
status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL);
|
||||
}
|
||||
if (status && WSA_IO_PENDING != WSAGetLastError()) {
|
||||
janet_cancel(s->fiber, net_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
#else
|
||||
case JANET_ASYNC_EVENT_READ:
|
||||
/* Read in bytes */
|
||||
{
|
||||
JanetBuffer *buffer = state->buf;
|
||||
int32_t bytes_left = state->bytes_left;
|
||||
janet_buffer_extra(buffer, bytes_left);
|
||||
JReadInt nread;
|
||||
char saddr[256];
|
||||
socklen_t socklen = sizeof(saddr);
|
||||
do {
|
||||
if (state->is_recv_from) {
|
||||
nread = recvfrom(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0,
|
||||
(struct sockaddr *)&saddr, &socklen);
|
||||
} else {
|
||||
nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0);
|
||||
}
|
||||
} while (nread == -1 && JLASTERR == JEINTR);
|
||||
|
||||
/* Check for errors - special case errors that can just be waited on to fix */
|
||||
if (nread == -1) {
|
||||
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
|
||||
janet_cancel(s->fiber, net_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
|
||||
if (nread == 0 && !state->is_recv_from) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Increment buffer counts */
|
||||
if (nread > 0) {
|
||||
buffer->count += nread;
|
||||
bytes_left -= nread;
|
||||
} else {
|
||||
bytes_left = 0;
|
||||
}
|
||||
state->bytes_left = bytes_left;
|
||||
|
||||
/* Resume if done */
|
||||
if (!state->is_chunk || bytes_left == 0) {
|
||||
Janet resume_val;
|
||||
if (state->is_recv_from) {
|
||||
void *abst = janet_abstract(&AddressAT, socklen);
|
||||
memcpy(abst, &saddr, socklen);
|
||||
resume_val = janet_wrap_abstract(abst);
|
||||
} else {
|
||||
resume_val = janet_wrap_buffer(buffer);
|
||||
}
|
||||
janet_schedule(s->fiber, resume_val);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
#endif
|
||||
}
|
||||
return JANET_ASYNC_STATUS_NOT_DONE;
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
|
||||
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
|
||||
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
|
||||
state->is_chunk = 0;
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
state->is_recv_from = 0;
|
||||
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
|
||||
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
|
||||
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
|
||||
state->is_chunk = 1;
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
state->is_recv_from = 0;
|
||||
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
|
||||
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
|
||||
JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead), NULL);
|
||||
state->is_chunk = 0;
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
state->is_recv_from = 1;
|
||||
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
/*
|
||||
* State machine for write/send-to
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
union {
|
||||
JanetBuffer *buf;
|
||||
const uint8_t *str;
|
||||
} src;
|
||||
int32_t start;
|
||||
int is_buffer;
|
||||
void *dest_abst;
|
||||
#ifdef JANET_WINDOWS
|
||||
WSAOVERLAPPED overlapped;
|
||||
WSABUF wbuf;
|
||||
DWORD flags;
|
||||
#endif
|
||||
} NetStateWrite;
|
||||
|
||||
JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
|
||||
NetStateWrite *state = (NetStateWrite *) s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK:
|
||||
janet_mark(state->is_buffer
|
||||
? janet_wrap_buffer(state->src.buf)
|
||||
: janet_wrap_string(state->src.str));
|
||||
if (state->dest_abst != NULL) {
|
||||
janet_mark(janet_wrap_abstract(state->dest_abst));
|
||||
}
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_cancel(s->fiber, janet_cstringv("stream closed"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
#ifdef JANET_WINDOWS
|
||||
case JANET_ASYNC_EVENT_COMPLETE: {
|
||||
/* Called when write finished */
|
||||
if (s->bytes == 0 && !state->dest_abst) {
|
||||
janet_cancel(s->fiber, janet_cstringv("disconnect"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_USER: {
|
||||
/* Begin write */
|
||||
int32_t start, len;
|
||||
const uint8_t *bytes;
|
||||
start = state->start;
|
||||
if (state->is_buffer) {
|
||||
/* If buffer, convert to string. */
|
||||
/* TODO - be more efficient about this */
|
||||
JanetBuffer *buffer = state->src.buf;
|
||||
JanetString str = janet_string(buffer->data, buffer->count);
|
||||
bytes = str;
|
||||
len = buffer->count;
|
||||
state->is_buffer = 0;
|
||||
state->src.str = str;
|
||||
} else {
|
||||
bytes = state->src.str;
|
||||
len = janet_string_length(bytes);
|
||||
}
|
||||
state->wbuf.buf = (char *) bytes;
|
||||
state->wbuf.len = len;
|
||||
state->flags = 0;
|
||||
s->tag = &state->overlapped;
|
||||
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
|
||||
|
||||
int status;
|
||||
SOCKET sock = (SOCKET) s->pollable->handle;
|
||||
if (state->dest_abst) {
|
||||
const struct sockaddr *to = state->dest_abst;
|
||||
int tolen = (int) janet_abstract_size((void *) to);
|
||||
status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
|
||||
} else {
|
||||
status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL);
|
||||
}
|
||||
|
||||
if (status && WSA_IO_PENDING != WSAGetLastError()) {
|
||||
janet_cancel(s->fiber, janet_cstringv("failed to write to stream"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
#else
|
||||
case JANET_ASYNC_EVENT_WRITE: {
|
||||
int32_t start, len;
|
||||
const uint8_t *bytes;
|
||||
start = state->start;
|
||||
if (state->is_buffer) {
|
||||
JanetBuffer *buffer = state->src.buf;
|
||||
bytes = buffer->data;
|
||||
len = buffer->count;
|
||||
} else {
|
||||
bytes = state->src.str;
|
||||
len = janet_string_length(bytes);
|
||||
}
|
||||
JReadInt nwrote = 0;
|
||||
if (start < len) {
|
||||
int32_t nbytes = len - start;
|
||||
void *dest_abst = state->dest_abst;
|
||||
do {
|
||||
if (dest_abst) {
|
||||
nwrote = sendto(s->pollable->handle, bytes + start, nbytes, 0,
|
||||
(struct sockaddr *) dest_abst, janet_abstract_size(dest_abst));
|
||||
} else {
|
||||
nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL);
|
||||
}
|
||||
} while (nwrote == -1 && JLASTERR == JEINTR);
|
||||
|
||||
/* Handle write errors */
|
||||
if (nwrote == -1) {
|
||||
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
|
||||
janet_cancel(s->fiber, net_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Unless using datagrams, empty message is a disconnect */
|
||||
if (nwrote == 0 && !dest_abst) {
|
||||
janet_cancel(s->fiber, janet_cstringv("disconnect"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
if (nwrote > 0) {
|
||||
start += nwrote;
|
||||
} else {
|
||||
start = len;
|
||||
}
|
||||
}
|
||||
state->start = start;
|
||||
if (start >= len) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
#endif
|
||||
}
|
||||
return JANET_ASYNC_STATUS_NOT_DONE;
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf, void *dest_abst) {
|
||||
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
|
||||
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL);
|
||||
state->is_buffer = 1;
|
||||
state->start = 0;
|
||||
state->src.buf = buf;
|
||||
state->dest_abst = dest_abst;
|
||||
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str, void *dest_abst) {
|
||||
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
|
||||
JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite), NULL);
|
||||
state->is_buffer = 0;
|
||||
state->start = 0;
|
||||
state->src.str = str;
|
||||
state->dest_abst = dest_abst;
|
||||
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
/*
|
||||
* State machine for simple server
|
||||
*/
|
||||
|
||||
/* State machine for accepting connections. */
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
@ -750,7 +366,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) {
|
||||
#ifndef JANET_WINDOWS
|
||||
/* no unix domain socket support on windows yet */
|
||||
if (is_unix) {
|
||||
void *abst = janet_abstract(&AddressAT, sizeof(struct sockaddr_un));
|
||||
void *abst = janet_abstract(&janet_address_type, sizeof(struct sockaddr_un));
|
||||
memcpy(abst, ai, sizeof(struct sockaddr_un));
|
||||
Janet ret = janet_wrap_abstract(abst);
|
||||
return make_arr ? janet_wrap_array(janet_array_n(&ret, 1)) : ret;
|
||||
@ -761,7 +377,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) {
|
||||
JanetArray *arr = janet_array(10);
|
||||
struct addrinfo *iter = ai;
|
||||
while (NULL != iter) {
|
||||
void *abst = janet_abstract(&AddressAT, iter->ai_addrlen);
|
||||
void *abst = janet_abstract(&janet_address_type, iter->ai_addrlen);
|
||||
memcpy(abst, iter->ai_addr, iter->ai_addrlen);
|
||||
janet_array_push(arr, janet_wrap_abstract(abst));
|
||||
iter = iter->ai_next;
|
||||
@ -773,7 +389,7 @@ static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) {
|
||||
if (NULL == ai) {
|
||||
janet_panic("no data for given address");
|
||||
}
|
||||
void *abst = janet_abstract(&AddressAT, ai->ai_addrlen);
|
||||
void *abst = janet_abstract(&janet_address_type, ai->ai_addrlen);
|
||||
memcpy(abst, ai->ai_addr, ai->ai_addrlen);
|
||||
freeaddrinfo(ai);
|
||||
return janet_wrap_abstract(abst);
|
||||
@ -963,7 +579,8 @@ static Janet cfun_stream_read(int32_t argc, Janet *argv) {
|
||||
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
|
||||
double to = janet_optnumber(argv, argc, 3, INFINITY);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_read(stream, buffer, n);
|
||||
janet_ev_recv(stream, buffer, n, MSG_NOSIGNAL);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
|
||||
@ -974,7 +591,8 @@ static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
|
||||
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
|
||||
double to = janet_optnumber(argv, argc, 3, INFINITY);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_chunk(stream, buffer, n);
|
||||
janet_ev_recvchunk(stream, buffer, n, MSG_NOSIGNAL);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) {
|
||||
@ -985,7 +603,8 @@ static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) {
|
||||
JanetBuffer *buffer = janet_getbuffer(argv, 2);
|
||||
double to = janet_optnumber(argv, argc, 3, INFINITY);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_recv_from(stream, buffer, n);
|
||||
janet_ev_recvfrom(stream, buffer, n, MSG_NOSIGNAL);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_close(int32_t argc, Janet *argv) {
|
||||
@ -1008,28 +627,30 @@ static Janet cfun_stream_write(int32_t argc, Janet *argv) {
|
||||
double to = janet_optnumber(argv, argc, 2, INFINITY);
|
||||
if (janet_checktype(argv[1], JANET_BUFFER)) {
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_write_buffer(stream, janet_getbuffer(argv, 1), NULL);
|
||||
janet_ev_send_buffer(stream, janet_getbuffer(argv, 1), MSG_NOSIGNAL);
|
||||
} else {
|
||||
JanetByteView bytes = janet_getbytes(argv, 1);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_write_stringlike(stream, bytes.bytes, NULL);
|
||||
janet_ev_send_string(stream, bytes.bytes, MSG_NOSIGNAL);
|
||||
}
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_send_to(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 3, 4);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
check_stream_flag(stream, JANET_STREAM_UDPSERVER);
|
||||
void *dest = janet_getabstract(argv, 1, &AddressAT);
|
||||
void *dest = janet_getabstract(argv, 1, &janet_address_type);
|
||||
double to = janet_optnumber(argv, argc, 3, INFINITY);
|
||||
if (janet_checktype(argv[2], JANET_BUFFER)) {
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_write_buffer(stream, janet_getbuffer(argv, 2), dest);
|
||||
janet_ev_sendto_buffer(stream, janet_getbuffer(argv, 2), dest, MSG_NOSIGNAL);
|
||||
} else {
|
||||
JanetByteView bytes = janet_getbytes(argv, 2);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_write_stringlike(stream, bytes.bytes, dest);
|
||||
janet_ev_sendto_string(stream, bytes.bytes, dest, MSG_NOSIGNAL);
|
||||
}
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_flush(int32_t argc, Janet *argv) {
|
||||
|
@ -140,6 +140,7 @@ void janet_lib_thread(JanetTable *env);
|
||||
#endif
|
||||
#ifdef JANET_NET
|
||||
void janet_lib_net(JanetTable *env);
|
||||
extern const JanetAbstractType janet_address_type;
|
||||
#endif
|
||||
#ifdef JANET_EV
|
||||
void janet_lib_ev(JanetTable *env);
|
||||
|
@ -499,6 +499,7 @@ typedef void *JanetAbstract;
|
||||
|
||||
/* Event Loop Types */
|
||||
#ifdef JANET_EV
|
||||
|
||||
#define JANET_POLL_FLAG_CLOSED 0x1
|
||||
#define JANET_POLL_FLAG_SOCKET 0x2
|
||||
#define JANET_POLL_FLAG_IOCP 0x4
|
||||
@ -1269,7 +1270,29 @@ JANET_NO_RETURN JANET_API void janet_await(void);
|
||||
|
||||
/* For use inside listeners - adds a timeout to the current fiber, such that
|
||||
* it will be resumed after sec seconds if no other event schedules the current fiber. */
|
||||
void janet_addtimeout(double sec);
|
||||
JANET_API void janet_addtimeout(double sec);
|
||||
|
||||
/* Get last error from a an IO operation */
|
||||
JANET_API Janet janet_ev_lasterr(void);
|
||||
|
||||
/* Read async from a pollable */
|
||||
JANET_API void janet_ev_read(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes);
|
||||
JANET_API void janet_ev_readchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes);
|
||||
#ifdef JANET_NET
|
||||
JANET_API void janet_ev_recv(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
||||
JANET_API void janet_ev_recvchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
||||
JANET_API void janet_ev_recvfrom(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
||||
#endif
|
||||
|
||||
/* Write async to a pollable */
|
||||
JANET_API void janet_ev_write_buffer(JanetPollable *stream, JanetBuffer *buf);
|
||||
JANET_API void janet_ev_write_string(JanetPollable *stream, JanetString str);
|
||||
#ifdef JANET_NET
|
||||
JANET_API void janet_ev_send_buffer(JanetPollable *stream, JanetBuffer *buf, int flags);
|
||||
JANET_API void janet_ev_send_string(JanetPollable *stream, JanetString str, int flags);
|
||||
JANET_API void janet_ev_sendto_buffer(JanetPollable *stream, JanetBuffer *buf, void *dest, int flags);
|
||||
JANET_API void janet_ev_sendto_string(JanetPollable *stream, JanetString str, void *dest, int flags);
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user