1
0
mirror of https://github.com/janet-lang/janet synced 2025-01-13 17:10:27 +00:00

Add ev/pipe and move stream code into ev.c

Also adds a lot to the C API and changes things up.
This commit is contained in:
Calvin Rose 2020-11-14 14:28:10 -06:00
parent b3e88a8d80
commit 12f09ad2d7
5 changed files with 412 additions and 270 deletions

View File

@ -2748,6 +2748,9 @@
(ev/call (fn [] (net/accept-loop s handler)))) (ev/call (fn [] (net/accept-loop s handler))))
s)) s))
(guarddef ev/close
(defn net/close "Alias for ev/close." [stream] (ev/close stream)))
(undef guarddef) (undef guarddef)
### ###
@ -2757,7 +2760,7 @@
### ###
(defn- no-side-effects (defn- no-side-effects
"Check if form may have side effects. If returns true, then the src "Check if form may have side effects. If rturns true, then the src
must not have side effects, such as calling a C function." must not have side effects, such as calling a C function."
[src] [src]
(cond (cond

View File

@ -216,9 +216,9 @@ static void add_timeout(JanetTimeout to) {
} }
/* Create a new event listener */ /* Create a new event listener */
static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
if (pollable->_mask & mask) { if (stream->_mask & mask) {
janet_panic("cannot listen for duplicate event on pollable"); janet_panic("cannot listen for duplicate event on stream");
} }
if (janet_vm_root_fiber->waiting != NULL) { if (janet_vm_root_fiber->waiting != NULL) {
janet_panic("current fiber is already waiting for event"); janet_panic("current fiber is already waiting for event");
@ -230,21 +230,16 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe
JANET_OUT_OF_MEMORY; JANET_OUT_OF_MEMORY;
} }
state->machine = behavior; state->machine = behavior;
if (mask & JANET_ASYNC_LISTEN_SPAWNER) {
state->fiber = NULL;
} else {
state->fiber = janet_vm_root_fiber; state->fiber = janet_vm_root_fiber;
janet_vm_root_fiber->waiting = state; janet_vm_root_fiber->waiting = state;
} state->stream = stream;
mask |= JANET_ASYNC_LISTEN_SPAWNER;
state->pollable = pollable;
state->_mask = mask; state->_mask = mask;
state->_index = 0; state->_index = 0;
pollable->_mask |= mask; stream->_mask |= mask;
janet_vm_active_listeners++; janet_vm_active_listeners++;
/* Prepend to linked list */ /* Prepend to linked list */
state->_next = pollable->state; state->_next = stream->state;
pollable->state = state; stream->state = state;
/* Emit INIT event for convenience */ /* Emit INIT event for convenience */
state->event = user; state->event = user;
state->machine(state, JANET_ASYNC_EVENT_INIT); state->machine(state, JANET_ASYNC_EVENT_INIT);
@ -256,14 +251,14 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe
static void janet_unlisten_impl(JanetListenerState *state) { static void janet_unlisten_impl(JanetListenerState *state) {
state->machine(state, JANET_ASYNC_EVENT_DEINIT); state->machine(state, JANET_ASYNC_EVENT_DEINIT);
/* Remove state machine from poll list */ /* Remove state machine from poll list */
JanetListenerState **iter = &(state->pollable->state); JanetListenerState **iter = &(state->stream->state);
while (*iter && *iter != state) while (*iter && *iter != state)
iter = &((*iter)->_next); iter = &((*iter)->_next);
janet_assert(*iter, "failed to remove listener"); janet_assert(*iter, "failed to remove listener");
*iter = state->_next; *iter = state->_next;
janet_vm_active_listeners--; janet_vm_active_listeners--;
/* Remove mask */ /* Remove mask */
state->pollable->_mask &= ~(state->_mask); state->stream->_mask &= ~(state->_mask);
/* Ensure fiber does not reference this state */ /* Ensure fiber does not reference this state */
JanetFiber *fiber = state->fiber; JanetFiber *fiber = state->fiber;
if (NULL != fiber && fiber->waiting == state) { if (NULL != fiber && fiber->waiting == state) {
@ -272,17 +267,80 @@ static void janet_unlisten_impl(JanetListenerState *state) {
free(state); free(state);
} }
/* Call after creating a pollable */ static const JanetMethod ev_default_stream_methods[] = {
void janet_pollable_init(JanetPollable *pollable, JanetHandle handle) { {"close", janet_cfun_stream_close},
pollable->handle = handle; {"read", janet_cfun_stream_read},
pollable->flags = 0; {"chunk", janet_cfun_stream_chunk},
pollable->state = NULL; {"write", janet_cfun_stream_write},
pollable->_mask = 0; {NULL, NULL}
};
/* Create a stream*/
JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods) {
JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream));
stream->handle = handle;
stream->flags = flags;
stream->state = NULL;
stream->_mask = 0;
if (methods == NULL) methods = ev_default_stream_methods;
stream->methods = methods;
#ifdef JANET_NET
if (flags & JANET_STREAM_SOCKET) {
#ifdef JANET_WINDOWS
u_long iMode = 0;
ioctlsocket(handle, FIONBIO, &iMode);
#else
#if !defined(SOCK_CLOEXEC) && defined(O_CLOEXEC)
int extra = O_CLOEXEC;
#else
int extra = 0;
#endif
fcntl(handle, F_SETFL, fcntl(handle, F_GETFL, 0) | O_NONBLOCK | extra);
#endif
}
#endif
return stream;
} }
/* Mark a pollable for GC */ /* Called to clean up a stream */
void janet_pollable_mark(JanetPollable *pollable) { static int janet_stream_gc(void *p, size_t s) {
JanetListenerState *state = pollable->state; (void) s;
JanetStream *stream = (JanetStream *)p;
janet_stream_close(stream);
return 0;
}
/* Close a stream */
void janet_stream_close(JanetStream *stream) {
if (stream->flags & JANET_STREAM_CLOSED) return;
JanetListenerState *state = stream->state;
while (NULL != state) {
state->machine(state, JANET_ASYNC_EVENT_CLOSE);
JanetListenerState *next_state = state->_next;
janet_unlisten(state);
state = next_state;
}
stream->state = NULL;
stream->flags |= JANET_STREAM_CLOSED;
#ifdef JANET_WINDOWS
#ifdef JANET_NET
if (stream->flags & JANET_STREAM_SOCKET) {
closesocket((SOCKET) stream->handle);
} else
#endif
{
CloseHandle(stream->handle);
}
#else
close(stream->handle);
#endif
}
/* Mark a stream for GC */
static int janet_stream_mark(void *p, size_t s) {
(void) s;
JanetStream *stream = (JanetStream *) p;
JanetListenerState *state = stream->state;
while (NULL != state) { while (NULL != state) {
if (NULL != state->fiber) { if (NULL != state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber)); janet_mark(janet_wrap_fiber(state->fiber));
@ -290,22 +348,25 @@ void janet_pollable_mark(JanetPollable *pollable) {
(state->machine)(state, JANET_ASYNC_EVENT_MARK); (state->machine)(state, JANET_ASYNC_EVENT_MARK);
state = state->_next; state = state->_next;
} }
return 0;
} }
/* Must be called to close all pollables - does NOT call `close` for you. static int janet_stream_getter(void *p, Janet key, Janet *out) {
* Also does not free memory of the pollable, so can be used on close. */ JanetStream *stream = (JanetStream *)p;
void janet_pollable_deinit(JanetPollable *pollable) { if (!janet_checktype(key, JANET_KEYWORD)) return 0;
pollable->flags |= JANET_POLL_FLAG_CLOSED; const JanetMethod *stream_methods = stream->methods;
JanetListenerState *state = pollable->state; return janet_getmethod(janet_unwrap_keyword(key), stream_methods, out);
while (NULL != state) { return 0;
state->machine(state, JANET_ASYNC_EVENT_CLOSE);
JanetListenerState *next_state = state->_next;
janet_unlisten_impl(state);
state = next_state;
}
pollable->state = NULL;
} }
const JanetAbstractType janet_stream_type = {
"core/stream",
janet_stream_gc,
janet_stream_mark,
janet_stream_getter,
JANET_ATEND_GET
};
/* Register a fiber to resume with value */ /* Register a fiber to resume with value */
void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) { void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) {
if (fiber->flags & JANET_FIBER_FLAG_SCHEDULED) return; if (fiber->flags & JANET_FIBER_FLAG_SCHEDULED) return;
@ -721,14 +782,14 @@ void janet_ev_deinit(void) {
CloseHandle(janet_vm_iocp); CloseHandle(janet_vm_iocp);
} }
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
/* Add the handle to the io completion port if not already added */ /* Add the handle to the io completion port if not already added */
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
if (!(pollable->flags & JANET_POLL_FLAG_IOCP)) { if (!(stream->flags & JANET_POLL_FLAG_IOCP)) {
if (NULL == CreateIoCompletionPort(pollable->handle, janet_vm_iocp, (ULONG_PTR) pollable, 0)) { if (NULL == CreateIoCompletionPort(stream->handle, janet_vm_iocp, (ULONG_PTR) stream, 0)) {
janet_panic("failed to listen for events"); janet_panic("failed to listen for events");
} }
pollable->flags |= JANET_POLL_FLAG_IOCP; stream->flags |= JANET_POLL_FLAG_IOCP;
} }
return state; return state;
} }
@ -763,8 +824,8 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
} }
} else { } else {
/* Normal event */ /* Normal event */
JanetPollable *pollable = (JanetPollable *) completionKey; JanetStream *stream = (JanetStream *) completionKey;
JanetListenerState *state = pollable->state; JanetListenerState *state = stream->state;
while (state != NULL) { while (state != NULL) {
if (state->tag == overlapped) { if (state->tag == overlapped) {
state->event = overlapped; state->event = overlapped;
@ -810,16 +871,16 @@ static int make_epoll_events(int mask) {
} }
/* Wait for the next event */ /* Wait for the next event */
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
int is_first = !(pollable->state); int is_first = !(stream->state);
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
struct epoll_event ev; struct epoll_event ev;
ev.events = make_epoll_events(state->pollable->_mask); ev.events = make_epoll_events(state->stream->_mask);
ev.data.ptr = pollable; ev.data.ptr = stream;
int status; int status;
do { do {
status = epoll_ctl(janet_vm_epoll, op, pollable->handle, &ev); status = epoll_ctl(janet_vm_epoll, op, stream->handle, &ev);
} while (status == -1 && errno == EINTR); } while (status == -1 && errno == EINTR);
if (status == -1) { if (status == -1) {
janet_unlisten_impl(state); janet_unlisten_impl(state);
@ -830,19 +891,21 @@ JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior
/* Tell system we are done listening for a certain event */ /* Tell system we are done listening for a certain event */
static void janet_unlisten(JanetListenerState *state) { static void janet_unlisten(JanetListenerState *state) {
JanetPollable *pollable = state->pollable; JanetStream *stream = state->stream;
int is_last = (state->_next == NULL && pollable->state == state); if (!(stream->flags & JANET_STREAM_CLOSED)) {
int is_last = (state->_next == NULL && stream->state == state);
int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
struct epoll_event ev; struct epoll_event ev;
ev.events = make_epoll_events(pollable->_mask & ~state->_mask); ev.events = make_epoll_events(stream->_mask & ~state->_mask);
ev.data.ptr = pollable; ev.data.ptr = stream;
int status; int status;
do { do {
status = epoll_ctl(janet_vm_epoll, op, pollable->handle, &ev); status = epoll_ctl(janet_vm_epoll, op, stream->handle, &ev);
} while (status == -1 && errno == EINTR); } while (status == -1 && errno == EINTR);
if (status == -1) { if (status == -1) {
janet_panicf("failed to unschedule event: %s", strerror(errno)); janet_panicf("failed to unschedule event: %s", strerror(errno));
} }
}
/* Destroy state machine and free memory */ /* Destroy state machine and free memory */
janet_unlisten_impl(state); janet_unlisten_impl(state);
} }
@ -871,10 +934,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
/* Step state machines */ /* Step state machines */
for (int i = 0; i < ready; i++) { for (int i = 0; i < ready; i++) {
JanetPollable *pollable = events[i].data.ptr; JanetStream *stream = events[i].data.ptr;
if (NULL != pollable) { /* If NULL, is a timeout */ if (NULL != stream) { /* If NULL, is a timeout */
int mask = events[i].events; int mask = events[i].events;
JanetListenerState *state = pollable->state; JanetListenerState *state = stream->state;
state->event = events + i; state->event = events + i;
while (NULL != state) { while (NULL != state) {
JanetListenerState *next_state = state->_next; JanetListenerState *next_state = state->_next;
@ -884,7 +947,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (mask & EPOLLIN) if (mask & EPOLLIN)
status2 = state->machine(state, JANET_ASYNC_EVENT_READ); status2 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE) if (mask & EPOLLERR)
status3 = state->machine(state, JANET_ASYNC_EVENT_ERR);
if (mask & EPOLLHUP)
status4 = state->machine(state, JANET_ASYNC_EVENT_HUP);
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state); janet_unlisten(state);
state = next_state; state = next_state;
} }
@ -964,11 +1034,11 @@ static void janet_push_pollfd(struct pollfd pfd) {
} }
/* Wait for the next event */ /* Wait for the next event */
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
struct pollfd ev; struct pollfd ev;
ev.fd = pollable->handle; ev.fd = stream->handle;
ev.events = make_poll_events(state->pollable->_mask); ev.events = make_poll_events(state->stream->_mask);
ev.revents = 0; ev.revents = 0;
state->_index = janet_vm_fdcount; state->_index = janet_vm_fdcount;
janet_push_pollfd(ev); janet_push_pollfd(ev);
@ -1006,18 +1076,26 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
for (size_t i = 0; i < janet_vm_fdcount; i++) { for (size_t i = 0; i < janet_vm_fdcount; i++) {
struct pollfd *pfd = janet_vm_fds + i; struct pollfd *pfd = janet_vm_fds + i;
/* Skip fds where nothing interesting happened */ /* Skip fds where nothing interesting happened */
if (!(pfd->revents & (pfd->events | POLLHUP | POLLERR | POLLNVAL))) continue;
JanetListenerState *state = janet_vm_listener_map[i]; JanetListenerState *state = janet_vm_listener_map[i];
/* Normal event */ /* Normal event */
int mask = janet_vm_fds[i].revents; int mask = janet_vm_fds[i].revents;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE;
state->event = pfd; state->event = pfd;
if (mask & POLLOUT) if (mask & POLLOUT)
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (mask & POLLIN) if (mask & POLLIN)
status2 = state->machine(state, JANET_ASYNC_EVENT_READ); status2 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE) if (mask & POLLERR)
status2 = state->machine(state, JANET_ASYNC_EVENT_ERR);
if (mask & POLLHUP)
status2 = state->machine(state, JANET_ASYNC_EVENT_HUP);
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state); janet_unlisten(state);
} }
} }
@ -1091,6 +1169,7 @@ typedef enum {
typedef struct { typedef struct {
JanetListenerState head; JanetListenerState head;
int32_t bytes_left; int32_t bytes_left;
int32_t bytes_read;
JanetBuffer *buf; JanetBuffer *buf;
int is_chunk; int is_chunk;
JanetReadMode mode; JanetReadMode mode;
@ -1122,7 +1201,8 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: { case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when read finished */ /* Called when read finished */
if (s->bytes == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { state->bytes_read += s->bytes;
if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) {
janet_schedule(s->fiber, janet_wrap_nil()); janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
} }
@ -1130,7 +1210,7 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes); janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes);
state->bytes_left -= s->bytes; state->bytes_left -= s->bytes;
if (state->bytes_left <= 0 || !state->is_chunk) { if (state->bytes_left <= 0 || !state->is_chunk || s->bytes == 0) {
Janet resume_val; Janet resume_val;
#ifdef JANET_NET #ifdef JANET_NET
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
@ -1158,10 +1238,12 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
state->wbuf.len = (ULONG) chunk_size; state->wbuf.len = (ULONG) chunk_size;
state->wbuf.buf = state->chunk_buf; state->wbuf.buf = state->chunk_buf;
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); status = WSARecvFrom((SOCKET) s->stream->handle, &state->wbuf, 1,
} else NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
} else {
status = WSARecv((SOCKET) s->stream->handle, &state->wbuf, 1,
NULL, &state->flags, &state->overlapped, NULL);
} }
status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL);
} }
if (status && (WSA_IO_PENDING != WSAGetLastError())) { if (status && (WSA_IO_PENDING != WSAGetLastError())) {
janet_cancel(s->fiber, janet_ev_lasterr()); janet_cancel(s->fiber, janet_ev_lasterr());
@ -1170,7 +1252,7 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
} else } else
#endif #endif
{ {
status = ReadFile(s->pollable->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped); status = ReadFile(s->stream->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped);
if (status && (ERROR_IO_PENDING != WSAGetLastError())) { if (status && (ERROR_IO_PENDING != WSAGetLastError())) {
janet_cancel(s->fiber, janet_ev_lasterr()); janet_cancel(s->fiber, janet_ev_lasterr());
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
@ -1179,6 +1261,15 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
} }
break; break;
#else #else
case JANET_ASYNC_EVENT_ERR:
case JANET_ASYNC_EVENT_HUP: {
if (state->bytes_read) {
janet_schedule(s->fiber, janet_wrap_buffer(state->buf));
} else {
janet_schedule(s->fiber, janet_wrap_nil());
}
return JANET_ASYNC_STATUS_DONE;
}
case JANET_ASYNC_EVENT_READ: case JANET_ASYNC_EVENT_READ:
/* Read in bytes */ /* Read in bytes */
{ {
@ -1193,14 +1284,14 @@ break;
do { do {
#ifdef JANET_NET #ifdef JANET_NET
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
nread = recvfrom(s->pollable->handle, buffer->data + buffer->count, bytes_left, state->flags, nread = recvfrom(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags,
(struct sockaddr *)&saddr, &socklen); (struct sockaddr *)&saddr, &socklen);
} else if (state->mode == JANET_ASYNC_READMODE_RECV) { } else if (state->mode == JANET_ASYNC_READMODE_RECV) {
nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, state->flags); nread = recv(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags);
} else } else
#endif #endif
{ {
nread = read(s->pollable->handle, buffer->data + buffer->count, bytes_left); nread = read(s->stream->handle, buffer->data + buffer->count, bytes_left);
} }
} while (nread == -1 && errno == EINTR); } while (nread == -1 && errno == EINTR);
@ -1212,22 +1303,19 @@ break;
} }
/* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */ /* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
if (nread == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { state->bytes_read += nread;
if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) {
janet_schedule(s->fiber, janet_wrap_nil()); janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
} }
/* Increment buffer counts */ /* Increment buffer counts */
if (nread > 0) {
buffer->count += nread; buffer->count += nread;
bytes_left -= nread; bytes_left -= nread;
} else {
bytes_left = 0;
}
state->bytes_left = bytes_left; state->bytes_left = bytes_left;
/* Resume if done */ /* Resume if done */
if (!state->is_chunk || bytes_left == 0) { if (!state->is_chunk || bytes_left == 0 || nread == 0) {
Janet resume_val; Janet resume_val;
#ifdef JANET_NET #ifdef JANET_NET
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
@ -1249,12 +1337,13 @@ break;
return JANET_ASYNC_STATUS_NOT_DONE; return JANET_ASYNC_STATUS_NOT_DONE;
} }
static void janet_ev_read_generic(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) { static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) {
StateRead *state = (StateRead *) janet_listen(stream, ev_machine_read, StateRead *state = (StateRead *) janet_listen(stream, ev_machine_read,
JANET_ASYNC_LISTEN_READ, sizeof(StateRead), NULL); JANET_ASYNC_LISTEN_READ, sizeof(StateRead), NULL);
state->is_chunk = is_chunked; state->is_chunk = is_chunked;
state->buf = buf; state->buf = buf;
state->bytes_left = nbytes; state->bytes_left = nbytes;
state->bytes_read = 0;
state->mode = mode; state->mode = mode;
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
ev_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); ev_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
@ -1264,20 +1353,20 @@ static void janet_ev_read_generic(JanetPollable *stream, JanetBuffer *buf, int32
#endif #endif
} }
void janet_ev_read(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes) { void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_READ, 0); janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_READ, 0);
} }
void janet_ev_readchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes) { void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_READ, 0); janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_READ, 0);
} }
#ifdef JANET_NET #ifdef JANET_NET
void janet_ev_recv(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) { void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECV, flags); janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECV, flags);
} }
void janet_ev_recvchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) { void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_RECV, flags); janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_RECV, flags);
} }
void janet_ev_recvfrom(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) { void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECVFROM, flags); janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECVFROM, flags);
} }
#endif #endif
@ -1364,7 +1453,7 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
int status; int status;
#ifdef JANET_NET #ifdef JANET_NET
if (state->mode != JANET_ASYNC_WRITEMODE_WRITE) { if (state->mode != JANET_ASYNC_WRITEMODE_WRITE) {
SOCKET sock = (SOCKET) s->pollable->handle; SOCKET sock = (SOCKET) s->stream->handle;
state->wbuf.buf = (char *) bytes; state->wbuf.buf = (char *) bytes;
state->wbuf.len = len; state->wbuf.len = len;
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
@ -1381,7 +1470,7 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
} else } else
#endif #endif
{ {
status = WriteFile(s->pollable->handle, bytes, len, NULL, &state->overlapped); status = WriteFile(s->stream->handle, bytes, len, NULL, &state->overlapped);
if (status && (ERROR_IO_PENDING != WSAGetLastError())) { if (status && (ERROR_IO_PENDING != WSAGetLastError())) {
janet_cancel(s->fiber, janet_cstringv("failed to write to stream")); janet_cancel(s->fiber, janet_cstringv("failed to write to stream"));
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
@ -1390,6 +1479,12 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
} }
break; break;
#else #else
case JANET_ASYNC_EVENT_ERR:
janet_cancel(s->fiber, janet_cstringv("stream err"));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_HUP:
janet_cancel(s->fiber, janet_cstringv("stream hup"));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_WRITE: { case JANET_ASYNC_EVENT_WRITE: {
int32_t start, len; int32_t start, len;
const uint8_t *bytes; const uint8_t *bytes;
@ -1409,14 +1504,14 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
do { do {
#ifdef JANET_NET #ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
nwrote = sendto(s->pollable->handle, bytes + start, nbytes, state->flags, nwrote = sendto(s->stream->handle, bytes + start, nbytes, state->flags,
(struct sockaddr *) dest_abst, janet_abstract_size(dest_abst)); (struct sockaddr *) dest_abst, janet_abstract_size(dest_abst));
} else if (state->mode == JANET_ASYNC_WRITEMODE_SEND) { } else if (state->mode == JANET_ASYNC_WRITEMODE_SEND) {
nwrote = send(s->pollable->handle, bytes + start, nbytes, state->flags); nwrote = send(s->stream->handle, bytes + start, nbytes, state->flags);
} else } else
#endif #endif
{ {
nwrote = write(s->pollable->handle, bytes + start, nbytes); nwrote = write(s->stream->handle, bytes + start, nbytes);
} }
} while (nwrote == -1 && errno == EINTR); } while (nwrote == -1 && errno == EINTR);
@ -1452,7 +1547,7 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
return JANET_ASYNC_STATUS_NOT_DONE; return JANET_ASYNC_STATUS_NOT_DONE;
} }
static void janet_ev_write_generic(JanetPollable *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) { static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) {
StateWrite *state = (StateWrite *) janet_listen(stream, ev_machine_write, StateWrite *state = (StateWrite *) janet_listen(stream, ev_machine_write,
JANET_ASYNC_LISTEN_WRITE, sizeof(StateWrite), NULL); JANET_ASYNC_LISTEN_WRITE, sizeof(StateWrite), NULL);
state->is_buffer = is_buffer; state->is_buffer = is_buffer;
@ -1469,28 +1564,28 @@ static void janet_ev_write_generic(JanetPollable *stream, void *buf, void *dest_
} }
void janet_ev_write_buffer(JanetPollable *stream, JanetBuffer *buf) { void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf) {
janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_WRITE, 1, 0); janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_WRITE, 1, 0);
} }
void janet_ev_write_string(JanetPollable *stream, JanetString str) { void janet_ev_write_string(JanetStream *stream, JanetString str) {
janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_WRITE, 0, 0); janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_WRITE, 0, 0);
} }
#ifdef JANET_NET #ifdef JANET_NET
void janet_ev_send_buffer(JanetPollable *stream, JanetBuffer *buf, int flags) { void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags) {
janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_SEND, 1, flags); janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_SEND, 1, flags);
} }
void janet_ev_send_string(JanetPollable *stream, JanetString str, int flags) { void janet_ev_send_string(JanetStream *stream, JanetString str, int flags) {
janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_SEND, 0, flags); janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_SEND, 0, flags);
} }
void janet_ev_sendto_buffer(JanetPollable *stream, JanetBuffer *buf, void *dest, int flags) { void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags) {
janet_ev_write_generic(stream, buf, dest, JANET_ASYNC_WRITEMODE_SENDTO, 1, flags); janet_ev_write_generic(stream, buf, dest, JANET_ASYNC_WRITEMODE_SENDTO, 1, flags);
} }
void janet_ev_sendto_string(JanetPollable *stream, JanetString str, void *dest, int flags) { void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags); janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
} }
#endif #endif
@ -1533,6 +1628,71 @@ static Janet cfun_ev_cancel(int32_t argc, Janet *argv) {
return argv[0]; return argv[0];
} }
Janet janet_cfun_stream_close(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_close(stream);
return argv[0];
}
Janet janet_cfun_stream_read(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_READABLE);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_ev_read(stream, buffer, n);
janet_await();
}
Janet janet_cfun_stream_chunk(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_READABLE);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_ev_readchunk(stream, buffer, n);
janet_await();
}
Janet janet_cfun_stream_write(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 3);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_WRITABLE);
double to = janet_optnumber(argv, argc, 2, INFINITY);
if (janet_checktype(argv[1], JANET_BUFFER)) {
if (to != INFINITY) janet_addtimeout(to);
janet_ev_write_buffer(stream, janet_getbuffer(argv, 1));
} else {
JanetByteView bytes = janet_getbytes(argv, 1);
if (to != INFINITY) janet_addtimeout(to);
janet_ev_write_string(stream, bytes.bytes);
}
janet_await();
}
static Janet cfun_ev_pipe(int32_t argc, Janet *argv) {
(void) argv;
janet_fixarity(argc, 0);
#ifdef JANET_WINDOWS
JanetHandle rhandle, whandle;
if (!CreatePipe(&rhandle, &whandle, NULL, 0)) janet_panicv(janet_ev_lasterr());
JanetStream *reader = janet_stream(rhandle, JANET_STREAM_READABLE, NULL);
JanetStream *writer = janet_stream(whandle, JANET_STREAM_WRITABLE, NULL);
#else
int fds[2];
if (pipe(fds)) janet_panicv(janet_ev_lasterr());
JanetStream *reader = janet_stream(fds[0], JANET_STREAM_READABLE, NULL);
JanetStream *writer = janet_stream(fds[1], JANET_STREAM_WRITABLE, NULL);
#endif
Janet tup[2] = {janet_wrap_abstract(reader), janet_wrap_abstract(writer)};
return janet_wrap_tuple(janet_tuple_n(tup, 2));
}
static const JanetReg ev_cfuns[] = { static const JanetReg ev_cfuns[] = {
{ {
"ev/call", cfun_ev_call, "ev/call", cfun_ev_call,
@ -1600,6 +1760,40 @@ static const JanetReg ev_cfuns[] = {
JDOC("(ev/rselect & clauses)\n\n" JDOC("(ev/rselect & clauses)\n\n"
"Similar to ev/choice, but will try clauses in a random order for fairness.") "Similar to ev/choice, but will try clauses in a random order for fairness.")
}, },
{
"ev/close", janet_cfun_stream_close,
JDOC("(ev/close stream)\n\n"
"Close a stream. This should be the same as calling (:close stream) for all streams.")
},
{
"ev/read", janet_cfun_stream_read,
JDOC("(ev/read stream n &opt buffer timeout)\n\n"
"Read up to n bytes into a buffer asynchronously from a stream. "
"Optionally provide a buffer to write into "
"as well as a timeout in seconds after which to cancel the operation and raise an error. "
"Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an "
"error if there are problems with the IO operation.")
},
{
"ev/chunk", janet_cfun_stream_chunk,
JDOC("(ev/chunk stream n &opt buffer timeout)\n\n"
"Same as ev/read, but will not return early if less than n bytes are available. If an end of "
"stream is reached, will also return early with the collected bytes.")
},
{
"ev/write", janet_cfun_stream_write,
JDOC("(ev/write stream data &opt timeout)\n\n"
"Write data to a stream, suspending the current fiber until the write "
"completes. Takes an optional timeout in seconds, after which will return nil. "
"Returns nil, or raises an error if the write failed.")
},
{
"ev/pipe", cfun_ev_pipe,
JDOC("(ev/pipe)\n\n"
"Create a readable stream and a writable stream that are connected. Returns a two element "
"tuple where the first element is a readable stream and the second element is the writable "
"stream.")
},
{NULL, NULL, NULL} {NULL, NULL, NULL}
}; };

View File

@ -50,28 +50,6 @@
#include <fcntl.h> #include <fcntl.h>
#endif #endif
/*
* Streams - simple abstract type that wraps a pollable + extra flags
*/
#define JANET_STREAM_READABLE 0x200
#define JANET_STREAM_WRITABLE 0x400
#define JANET_STREAM_ACCEPTABLE 0x800
#define JANET_STREAM_UDPSERVER 0x1000
static int janet_stream_close(void *p, size_t s);
static int janet_stream_mark(void *p, size_t s);
static int janet_stream_getter(void *p, Janet key, Janet *out);
static const JanetAbstractType StreamAT = {
"core/stream",
janet_stream_close,
janet_stream_mark,
janet_stream_getter,
JANET_ATEND_GET
};
typedef JanetPollable JanetStream;
const JanetAbstractType janet_address_type = { const JanetAbstractType janet_address_type = {
"core/socket-address", "core/socket-address",
JANET_ATEND_NAME JANET_ATEND_NAME
@ -82,14 +60,6 @@ const JanetAbstractType janet_address_type = {
#define JSOCKVALID(x) ((x) != INVALID_SOCKET) #define JSOCKVALID(x) ((x) != INVALID_SOCKET)
#define JSock SOCKET #define JSock SOCKET
#define JSOCKFLAGS 0 #define JSOCKFLAGS 0
static JanetStream *make_stream(SOCKET fd, uint32_t flags) {
u_long iMode = 0;
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
janet_pollable_init(stream, (JanetHandle) fd);
ioctlsocket(fd, FIONBIO, &iMode);
stream->flags = flags;
return stream;
}
#else #else
#define JSOCKCLOSE(x) close(x) #define JSOCKCLOSE(x) close(x)
#define JSOCKDEFAULT 0 #define JSOCKDEFAULT 0
@ -100,35 +70,15 @@ static JanetStream *make_stream(SOCKET fd, uint32_t flags) {
#else #else
#define JSOCKFLAGS 0 #define JSOCKFLAGS 0
#endif #endif
static JanetStream *make_stream(int fd, uint32_t flags) {
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
janet_pollable_init(stream, fd);
#if !defined(SOCK_CLOEXEC) && defined(O_CLOEXEC)
int extra = O_CLOEXEC;
#else
int extra = 0;
#endif
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK | extra);
stream->flags = flags;
return stream;
}
#endif #endif
static JanetStream *make_stream(JanetHandle handle, uint32_t flags);
/* We pass this flag to all send calls to prevent sigpipe */ /* We pass this flag to all send calls to prevent sigpipe */
#ifndef MSG_NOSIGNAL #ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0 #define MSG_NOSIGNAL 0
#endif #endif
static int janet_stream_close(void *p, size_t s) {
(void) s;
JanetStream *stream = p;
if (!(stream->flags & JANET_POLL_FLAG_CLOSED)) {
JSOCKCLOSE(stream->handle);
janet_pollable_deinit(stream);
}
return 0;
}
static void nosigpipe(JSock s) { static void nosigpipe(JSock s) {
#ifdef SO_NOSIGPIPE #ifdef SO_NOSIGPIPE
int enable = 1; int enable = 1;
@ -141,12 +91,6 @@ static void nosigpipe(JSock s) {
#endif #endif
} }
static int janet_stream_mark(void *p, size_t s) {
(void) s;
janet_pollable_mark((JanetPollable *) p);
return 0;
}
/* State machine for accepting connections. */ /* State machine for accepting connections. */
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
@ -263,7 +207,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event
janet_schedule(s->fiber, janet_wrap_nil()); janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_READ: { case JANET_ASYNC_EVENT_READ: {
JSock connfd = accept(s->pollable->handle, NULL, NULL); JSock connfd = accept(s->stream->handle, NULL, NULL);
if (JSOCKVALID(connfd)) { if (JSOCKVALID(connfd)) {
nosigpipe(connfd); nosigpipe(connfd);
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
@ -543,29 +487,30 @@ static Janet cfun_net_listen(int32_t argc, Janet *argv) {
} }
} }
static void check_stream_flag(JanetStream *stream, int flag) { void janet_stream_flags(JanetStream *stream, uint32_t flags) {
if (!(stream->flags & flag) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { if ((stream->flags & flags) != flags || (stream->flags & JANET_STREAM_CLOSED)) {
const char *msg = ""; const char *rmsg = "", *wmsg = "", *amsg = "", *dmsg = "", *smsg = "stream";
if (flag == JANET_STREAM_READABLE) msg = "readable"; if (flags & JANET_STREAM_READABLE) rmsg = "readable ";
if (flag == JANET_STREAM_WRITABLE) msg = "writable"; if (flags & JANET_STREAM_WRITABLE) wmsg = "writable ";
if (flag == JANET_STREAM_ACCEPTABLE) msg = "server"; if (flags & JANET_STREAM_ACCEPTABLE) amsg = "server ";
if (flag == JANET_STREAM_UDPSERVER) msg = "datagram server"; if (flags & JANET_STREAM_UDPSERVER) dmsg = "datagram ";
janet_panicf("bad stream, expected %s stream", msg); if (flags & JANET_STREAM_SOCKET) smsg = "socket";
janet_panicf("bad stream, expected %s%s%s%s%s", rmsg, wmsg, amsg, dmsg, smsg);
} }
} }
static Janet cfun_stream_accept_loop(int32_t argc, Janet *argv) { static Janet cfun_stream_accept_loop(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2); janet_fixarity(argc, 2);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE); janet_stream_flags(stream, JANET_STREAM_ACCEPTABLE | JANET_STREAM_SOCKET);
JanetFunction *fun = janet_getfunction(argv, 1); JanetFunction *fun = janet_getfunction(argv, 1);
janet_sched_accept(stream, fun); janet_sched_accept(stream, fun);
} }
static Janet cfun_stream_accept(int32_t argc, Janet *argv) { static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 2); janet_arity(argc, 1, 2);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE); janet_stream_flags(stream, JANET_STREAM_ACCEPTABLE | JANET_STREAM_SOCKET);
double to = janet_optnumber(argv, argc, 1, INFINITY); double to = janet_optnumber(argv, argc, 1, INFINITY);
if (to != INFINITY) janet_addtimeout(to); if (to != INFINITY) janet_addtimeout(to);
janet_sched_accept(stream, NULL); janet_sched_accept(stream, NULL);
@ -573,8 +518,8 @@ static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
static Janet cfun_stream_read(int32_t argc, Janet *argv) { static Janet cfun_stream_read(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 4); janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
check_stream_flag(stream, JANET_STREAM_READABLE); janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET);
int32_t n = janet_getnat(argv, 1); int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY); double to = janet_optnumber(argv, argc, 3, INFINITY);
@ -585,8 +530,8 @@ static Janet cfun_stream_read(int32_t argc, Janet *argv) {
static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 4); janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
check_stream_flag(stream, JANET_STREAM_READABLE); janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET);
int32_t n = janet_getnat(argv, 1); int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY); double to = janet_optnumber(argv, argc, 3, INFINITY);
@ -597,8 +542,8 @@ static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) { static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) {
janet_arity(argc, 3, 4); janet_arity(argc, 3, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
check_stream_flag(stream, JANET_STREAM_UDPSERVER); janet_stream_flags(stream, JANET_STREAM_UDPSERVER | JANET_STREAM_SOCKET);
int32_t n = janet_getnat(argv, 1); int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_getbuffer(argv, 2); JanetBuffer *buffer = janet_getbuffer(argv, 2);
double to = janet_optnumber(argv, argc, 3, INFINITY); double to = janet_optnumber(argv, argc, 3, INFINITY);
@ -607,23 +552,10 @@ static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) {
janet_await(); janet_await();
} }
static Janet cfun_stream_close(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
janet_stream_close(stream, 0);
return janet_wrap_nil();
}
static Janet cfun_stream_closed(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
return janet_wrap_boolean(stream->flags & JANET_POLL_FLAG_CLOSED);
}
static Janet cfun_stream_write(int32_t argc, Janet *argv) { static Janet cfun_stream_write(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 3); janet_arity(argc, 2, 3);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
check_stream_flag(stream, JANET_STREAM_WRITABLE); janet_stream_flags(stream, JANET_STREAM_WRITABLE | JANET_STREAM_SOCKET);
double to = janet_optnumber(argv, argc, 2, INFINITY); double to = janet_optnumber(argv, argc, 2, INFINITY);
if (janet_checktype(argv[1], JANET_BUFFER)) { if (janet_checktype(argv[1], JANET_BUFFER)) {
if (to != INFINITY) janet_addtimeout(to); if (to != INFINITY) janet_addtimeout(to);
@ -638,8 +570,8 @@ static Janet cfun_stream_write(int32_t argc, Janet *argv) {
static Janet cfun_stream_send_to(int32_t argc, Janet *argv) { static Janet cfun_stream_send_to(int32_t argc, Janet *argv) {
janet_arity(argc, 3, 4); janet_arity(argc, 3, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
check_stream_flag(stream, JANET_STREAM_UDPSERVER); janet_stream_flags(stream, JANET_STREAM_UDPSERVER | JANET_STREAM_SOCKET);
void *dest = janet_getabstract(argv, 1, &janet_address_type); void *dest = janet_getabstract(argv, 1, &janet_address_type);
double to = janet_optnumber(argv, argc, 3, INFINITY); double to = janet_optnumber(argv, argc, 3, INFINITY);
if (janet_checktype(argv[2], JANET_BUFFER)) { if (janet_checktype(argv[2], JANET_BUFFER)) {
@ -655,8 +587,8 @@ static Janet cfun_stream_send_to(int32_t argc, Janet *argv) {
static Janet cfun_stream_flush(int32_t argc, Janet *argv) { static Janet cfun_stream_flush(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2); janet_fixarity(argc, 2);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
check_stream_flag(stream, JANET_STREAM_WRITABLE); janet_stream_flags(stream, JANET_STREAM_WRITABLE | JANET_STREAM_SOCKET);
/* Toggle no delay flag */ /* Toggle no delay flag */
int flag = 1; int flag = 1;
setsockopt((JSock) stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); setsockopt((JSock) stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
@ -665,10 +597,9 @@ static Janet cfun_stream_flush(int32_t argc, Janet *argv) {
return argv[0]; return argv[0];
} }
static const JanetMethod stream_methods[] = { static const JanetMethod net_stream_methods[] = {
{"chunk", cfun_stream_chunk}, {"chunk", cfun_stream_chunk},
{"close", cfun_stream_close}, {"close", janet_cfun_stream_close},
{"closed?", cfun_stream_closed},
{"read", cfun_stream_read}, {"read", cfun_stream_read},
{"write", cfun_stream_write}, {"write", cfun_stream_write},
{"flush", cfun_stream_flush}, {"flush", cfun_stream_flush},
@ -676,13 +607,15 @@ static const JanetMethod stream_methods[] = {
{"accept-loop", cfun_stream_accept_loop}, {"accept-loop", cfun_stream_accept_loop},
{"send-to", cfun_stream_send_to}, {"send-to", cfun_stream_send_to},
{"recv-from", cfun_stream_recv_from}, {"recv-from", cfun_stream_recv_from},
{"recv-from", cfun_stream_recv_from},
{"evread", janet_cfun_stream_read},
{"evchunk", janet_cfun_stream_chunk},
{"evwrite", janet_cfun_stream_write},
{NULL, NULL} {NULL, NULL}
}; };
static int janet_stream_getter(void *p, Janet key, Janet *out) { static JanetStream *make_stream(JanetHandle handle, uint32_t flags) {
(void) p; return janet_stream(handle, flags | JANET_STREAM_SOCKET, net_stream_methods);
if (!janet_checktype(key, JANET_KEYWORD)) return 0;
return janet_getmethod(janet_unwrap_keyword(key), stream_methods, out);
} }
static const JanetReg net_cfuns[] = { static const JanetReg net_cfuns[] = {
@ -757,16 +690,6 @@ static const JanetReg net_cfuns[] = {
"Make sure that a stream is not buffering any data. This temporarily disables Nagle's algorithm. " "Make sure that a stream is not buffering any data. This temporarily disables Nagle's algorithm. "
"Use this to make sure data is sent without delay. Returns stream.") "Use this to make sure data is sent without delay. Returns stream.")
}, },
{
"net/close", cfun_stream_close,
JDOC("(net/close stream)\n\n"
"Close a stream so that no further communication can occur.")
},
{
"net/closed?", cfun_stream_closed,
JDOC("(net/closed? stream)\n\n"
"Check if a stream is closed.")
},
{ {
"net/connect", cfun_net_connect, "net/connect", cfun_net_connect,
JDOC("(net/connect host porti &opt type)\n\n" JDOC("(net/connect host porti &opt type)\n\n"

View File

@ -500,15 +500,20 @@ typedef void *JanetAbstract;
/* Event Loop Types */ /* Event Loop Types */
#ifdef JANET_EV #ifdef JANET_EV
#define JANET_POLL_FLAG_CLOSED 0x1 #define JANET_STREAM_CLOSED 0x1
#define JANET_POLL_FLAG_SOCKET 0x2 #define JANET_STREAM_SOCKET 0x2
#define JANET_POLL_FLAG_IOCP 0x4 #define JANET_STREAM_READABLE 0x200
#define JANET_STREAM_WRITABLE 0x400
#define JANET_STREAM_ACCEPTABLE 0x800
#define JANET_STREAM_UDPSERVER 0x1000
typedef enum { typedef enum {
JANET_ASYNC_EVENT_INIT, JANET_ASYNC_EVENT_INIT,
JANET_ASYNC_EVENT_MARK, JANET_ASYNC_EVENT_MARK,
JANET_ASYNC_EVENT_DEINIT, JANET_ASYNC_EVENT_DEINIT,
JANET_ASYNC_EVENT_CLOSE, JANET_ASYNC_EVENT_CLOSE,
JANET_ASYNC_EVENT_ERR,
JANET_ASYNC_EVENT_HUP,
JANET_ASYNC_EVENT_READ, JANET_ASYNC_EVENT_READ,
JANET_ASYNC_EVENT_WRITE, JANET_ASYNC_EVENT_WRITE,
JANET_ASYNC_EVENT_TIMEOUT, JANET_ASYNC_EVENT_TIMEOUT,
@ -518,7 +523,6 @@ typedef enum {
#define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ) #define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ)
#define JANET_ASYNC_LISTEN_WRITE (1 << JANET_ASYNC_EVENT_WRITE) #define JANET_ASYNC_LISTEN_WRITE (1 << JANET_ASYNC_EVENT_WRITE)
#define JANET_ASYNC_LISTEN_SPAWNER 0x1000
typedef enum { typedef enum {
JANET_ASYNC_STATUS_NOT_DONE, JANET_ASYNC_STATUS_NOT_DONE,
@ -527,18 +531,19 @@ typedef enum {
/* Typedefs */ /* Typedefs */
typedef struct JanetListenerState JanetListenerState; typedef struct JanetListenerState JanetListenerState;
typedef struct JanetPollable JanetPollable; typedef struct JanetStream JanetStream;
typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEvent event); typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEvent event);
/* Wrapper around file descriptors and HANDLEs that can be polled. */ /* Wrapper around file descriptors and HANDLEs that can be polled. */
struct JanetPollable { struct JanetStream {
JanetHandle handle; JanetHandle handle;
uint32_t flags; uint32_t flags;
/* Linked list of all in-flight IO routines for this pollable */ /* Linked list of all in-flight IO routines for this stream */
JanetListenerState *state; JanetListenerState *state;
/* internal - used to disallow multiple concurrent reads / writes on the same pollable. const void *methods; /* Methods for this stream */
/* internal - used to disallow multiple concurrent reads / writes on the same stream.
* this constraint may be lifted later but allowing such would require more internal book keeping * this constraint may be lifted later but allowing such would require more internal book keeping
* for some implementations. You can read and write at the same time on the same pollable, though. */ * for some implementations. You can read and write at the same time on the same stream, though. */
int _mask; int _mask;
}; };
@ -546,7 +551,7 @@ struct JanetPollable {
struct JanetListenerState { struct JanetListenerState {
JanetListener machine; JanetListener machine;
JanetFiber *fiber; JanetFiber *fiber;
JanetPollable *pollable; JanetStream *stream;
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
implementation of the event loop and the particular event. */ implementation of the event loop and the particular event. */
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
@ -1249,21 +1254,27 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT];
#ifdef JANET_EV #ifdef JANET_EV
extern JANET_API const JanetAbstractType janet_stream_type;
/* Run the event loop */ /* Run the event loop */
JANET_API void janet_loop(void); JANET_API void janet_loop(void);
/* Wrapper around pollables */ /* Wrapper around streams */
JANET_API void janet_pollable_init(JanetPollable *pollable, JanetHandle handle); JANET_API JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods);
JANET_API void janet_pollable_mark(JanetPollable *pollable); JANET_API void janet_stream_close(JanetStream *stream);
JANET_API void janet_pollable_deinit(JanetPollable *pollable); JANET_API Janet janet_cfun_stream_close(int32_t argc, Janet *argv);
JANET_API Janet janet_cfun_stream_read(int32_t argc, Janet *argv);
JANET_API Janet janet_cfun_stream_chunk(int32_t argc, Janet *argv);
JANET_API Janet janet_cfun_stream_write(int32_t argc, Janet *argv);
JANET_API void janet_stream_flags(JanetStream *stream, uint32_t flags);
/* Queue a fiber to run on the event loop */ /* Queue a fiber to run on the event loop */
JANET_API void janet_schedule(JanetFiber *fiber, Janet value); JANET_API void janet_schedule(JanetFiber *fiber, Janet value);
JANET_API void janet_cancel(JanetFiber *fiber, Janet value); JANET_API void janet_cancel(JanetFiber *fiber, Janet value);
JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig); JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig);
/* Start a state machine listening for events from a pollable */ /* Start a state machine listening for events from a stream */
JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user); JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user);
/* Shorthand for yielding to event loop in C */ /* Shorthand for yielding to event loop in C */
JANET_NO_RETURN JANET_API void janet_await(void); JANET_NO_RETURN JANET_API void janet_await(void);
@ -1275,23 +1286,23 @@ JANET_API void janet_addtimeout(double sec);
/* Get last error from a an IO operation */ /* Get last error from a an IO operation */
JANET_API Janet janet_ev_lasterr(void); JANET_API Janet janet_ev_lasterr(void);
/* Read async from a pollable */ /* Read async from a stream */
JANET_API void janet_ev_read(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes); JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
JANET_API void janet_ev_readchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes); JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
#ifdef JANET_NET #ifdef JANET_NET
JANET_API void janet_ev_recv(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvfrom(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
#endif #endif
/* Write async to a pollable */ /* Write async to a stream */
JANET_API void janet_ev_write_buffer(JanetPollable *stream, JanetBuffer *buf); JANET_API void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf);
JANET_API void janet_ev_write_string(JanetPollable *stream, JanetString str); JANET_API void janet_ev_write_string(JanetStream *stream, JanetString str);
#ifdef JANET_NET #ifdef JANET_NET
JANET_API void janet_ev_send_buffer(JanetPollable *stream, JanetBuffer *buf, int flags); JANET_API void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags);
JANET_API void janet_ev_send_string(JanetPollable *stream, JanetString str, int flags); JANET_API void janet_ev_send_string(JanetStream *stream, JanetString str, int flags);
JANET_API void janet_ev_sendto_buffer(JanetPollable *stream, JanetBuffer *buf, void *dest, int flags); JANET_API void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags);
JANET_API void janet_ev_sendto_string(JanetPollable *stream, JanetString str, void *dest, int flags); JANET_API void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags);
#endif #endif
#endif #endif
@ -1604,6 +1615,9 @@ JANET_API Janet janet_resolve_core(const char *name);
/* New C API */ /* New C API */
/* Shorthand for janet C function declarations */
#define JANET_CFUN(name) Janet name (int32_t argc, Janet *argv)
/* Allow setting entry name for static libraries */ /* Allow setting entry name for static libraries */
#ifdef __cplusplus #ifdef __cplusplus
#define JANET_MODULE_PREFIX extern "C" #define JANET_MODULE_PREFIX extern "C"

View File

@ -33,16 +33,9 @@
(:write stream b) (:write stream b)
(buffer/clear b))) (buffer/clear b)))
(def s (net/server "127.0.0.1" "8000")) (def s (net/server "127.0.0.1" "8000" handler))
(assert s "made server 1") (assert s "made server 1")
(ev/go
(coro
(while (not (net/closed? s))
(def conn (net/accept s))
(unless conn (break))
(ev/call handler conn))))
(defn test-echo [msg] (defn test-echo [msg]
(with [conn (net/connect "127.0.0.1" "8000")] (with [conn (net/connect "127.0.0.1" "8000")]
(:write conn msg) (:write conn msg)
@ -55,4 +48,19 @@
(:close s) (:close s)
# Create pipe
(var pipe-counter 0)
(def [reader writer] (ev/pipe))
(ev/spawn
(while (ev/read reader 3)
(++ pipe-counter))
(assert (= 20 pipe-counter) "ev/pipe 1"))
(for i 0 10
(ev/write writer "xxx---"))
(ev/close writer)
(ev/sleep 0.1)
(end-suite) (end-suite)