From 12f09ad2d7398727cd868d4a2a39f4346b602a3d Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sat, 14 Nov 2020 14:28:10 -0600 Subject: [PATCH] Add ev/pipe and move stream code into ev.c Also adds a lot to the C API and changes things up. --- src/boot/boot.janet | 5 +- src/core/ev.c | 432 +++++++++++++++++++++++++++++++------------ src/core/net.c | 149 ++++----------- src/include/janet.h | 72 +++++--- test/suite0009.janet | 24 ++- 5 files changed, 412 insertions(+), 270 deletions(-) diff --git a/src/boot/boot.janet b/src/boot/boot.janet index b7aaf82e..4e93dd70 100644 --- a/src/boot/boot.janet +++ b/src/boot/boot.janet @@ -2748,6 +2748,9 @@ (ev/call (fn [] (net/accept-loop s handler)))) s)) +(guarddef ev/close + (defn net/close "Alias for ev/close." [stream] (ev/close stream))) + (undef guarddef) ### @@ -2757,7 +2760,7 @@ ### (defn- no-side-effects - "Check if form may have side effects. If returns true, then the src + "Check if form may have side effects. If rturns true, then the src must not have side effects, such as calling a C function." [src] (cond diff --git a/src/core/ev.c b/src/core/ev.c index 69ffed3a..bf06fd4b 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -216,9 +216,9 @@ static void add_timeout(JanetTimeout to) { } /* Create a new event listener */ -static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { - if (pollable->_mask & mask) { - janet_panic("cannot listen for duplicate event on pollable"); +static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { + if (stream->_mask & mask) { + janet_panic("cannot listen for duplicate event on stream"); } if (janet_vm_root_fiber->waiting != NULL) { janet_panic("current fiber is already waiting for event"); @@ -230,21 +230,16 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe JANET_OUT_OF_MEMORY; } state->machine = behavior; - if (mask & JANET_ASYNC_LISTEN_SPAWNER) { - state->fiber = NULL; - } else { - state->fiber = janet_vm_root_fiber; - janet_vm_root_fiber->waiting = state; - } - mask |= JANET_ASYNC_LISTEN_SPAWNER; - state->pollable = pollable; + state->fiber = janet_vm_root_fiber; + janet_vm_root_fiber->waiting = state; + state->stream = stream; state->_mask = mask; state->_index = 0; - pollable->_mask |= mask; + stream->_mask |= mask; janet_vm_active_listeners++; /* Prepend to linked list */ - state->_next = pollable->state; - pollable->state = state; + state->_next = stream->state; + stream->state = state; /* Emit INIT event for convenience */ state->event = user; state->machine(state, JANET_ASYNC_EVENT_INIT); @@ -256,14 +251,14 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe static void janet_unlisten_impl(JanetListenerState *state) { state->machine(state, JANET_ASYNC_EVENT_DEINIT); /* Remove state machine from poll list */ - JanetListenerState **iter = &(state->pollable->state); + JanetListenerState **iter = &(state->stream->state); while (*iter && *iter != state) iter = &((*iter)->_next); janet_assert(*iter, "failed to remove listener"); *iter = state->_next; janet_vm_active_listeners--; /* Remove mask */ - state->pollable->_mask &= ~(state->_mask); + state->stream->_mask &= ~(state->_mask); /* Ensure fiber does not reference this state */ JanetFiber *fiber = state->fiber; if (NULL != fiber && fiber->waiting == state) { @@ -272,17 +267,80 @@ static void janet_unlisten_impl(JanetListenerState *state) { free(state); } -/* Call after creating a pollable */ -void janet_pollable_init(JanetPollable *pollable, JanetHandle handle) { - pollable->handle = handle; - pollable->flags = 0; - pollable->state = NULL; - pollable->_mask = 0; +static const JanetMethod ev_default_stream_methods[] = { + {"close", janet_cfun_stream_close}, + {"read", janet_cfun_stream_read}, + {"chunk", janet_cfun_stream_chunk}, + {"write", janet_cfun_stream_write}, + {NULL, NULL} +}; + +/* Create a stream*/ +JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods) { + JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream)); + stream->handle = handle; + stream->flags = flags; + stream->state = NULL; + stream->_mask = 0; + if (methods == NULL) methods = ev_default_stream_methods; + stream->methods = methods; +#ifdef JANET_NET + if (flags & JANET_STREAM_SOCKET) { +#ifdef JANET_WINDOWS + u_long iMode = 0; + ioctlsocket(handle, FIONBIO, &iMode); +#else +#if !defined(SOCK_CLOEXEC) && defined(O_CLOEXEC) + int extra = O_CLOEXEC; +#else + int extra = 0; +#endif + fcntl(handle, F_SETFL, fcntl(handle, F_GETFL, 0) | O_NONBLOCK | extra); +#endif + } +#endif + return stream; } -/* Mark a pollable for GC */ -void janet_pollable_mark(JanetPollable *pollable) { - JanetListenerState *state = pollable->state; +/* Called to clean up a stream */ +static int janet_stream_gc(void *p, size_t s) { + (void) s; + JanetStream *stream = (JanetStream *)p; + janet_stream_close(stream); + return 0; +} + +/* Close a stream */ +void janet_stream_close(JanetStream *stream) { + if (stream->flags & JANET_STREAM_CLOSED) return; + JanetListenerState *state = stream->state; + while (NULL != state) { + state->machine(state, JANET_ASYNC_EVENT_CLOSE); + JanetListenerState *next_state = state->_next; + janet_unlisten(state); + state = next_state; + } + stream->state = NULL; + stream->flags |= JANET_STREAM_CLOSED; +#ifdef JANET_WINDOWS +#ifdef JANET_NET + if (stream->flags & JANET_STREAM_SOCKET) { + closesocket((SOCKET) stream->handle); + } else +#endif + { + CloseHandle(stream->handle); + } +#else + close(stream->handle); +#endif +} + +/* Mark a stream for GC */ +static int janet_stream_mark(void *p, size_t s) { + (void) s; + JanetStream *stream = (JanetStream *) p; + JanetListenerState *state = stream->state; while (NULL != state) { if (NULL != state->fiber) { janet_mark(janet_wrap_fiber(state->fiber)); @@ -290,22 +348,25 @@ void janet_pollable_mark(JanetPollable *pollable) { (state->machine)(state, JANET_ASYNC_EVENT_MARK); state = state->_next; } + return 0; } -/* Must be called to close all pollables - does NOT call `close` for you. - * Also does not free memory of the pollable, so can be used on close. */ -void janet_pollable_deinit(JanetPollable *pollable) { - pollable->flags |= JANET_POLL_FLAG_CLOSED; - JanetListenerState *state = pollable->state; - while (NULL != state) { - state->machine(state, JANET_ASYNC_EVENT_CLOSE); - JanetListenerState *next_state = state->_next; - janet_unlisten_impl(state); - state = next_state; - } - pollable->state = NULL; +static int janet_stream_getter(void *p, Janet key, Janet *out) { + JanetStream *stream = (JanetStream *)p; + if (!janet_checktype(key, JANET_KEYWORD)) return 0; + const JanetMethod *stream_methods = stream->methods; + return janet_getmethod(janet_unwrap_keyword(key), stream_methods, out); + return 0; } +const JanetAbstractType janet_stream_type = { + "core/stream", + janet_stream_gc, + janet_stream_mark, + janet_stream_getter, + JANET_ATEND_GET +}; + /* Register a fiber to resume with value */ void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig) { if (fiber->flags & JANET_FIBER_FLAG_SCHEDULED) return; @@ -721,14 +782,14 @@ void janet_ev_deinit(void) { CloseHandle(janet_vm_iocp); } -JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { +JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { /* Add the handle to the io completion port if not already added */ - JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); - if (!(pollable->flags & JANET_POLL_FLAG_IOCP)) { - if (NULL == CreateIoCompletionPort(pollable->handle, janet_vm_iocp, (ULONG_PTR) pollable, 0)) { + JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); + if (!(stream->flags & JANET_POLL_FLAG_IOCP)) { + if (NULL == CreateIoCompletionPort(stream->handle, janet_vm_iocp, (ULONG_PTR) stream, 0)) { janet_panic("failed to listen for events"); } - pollable->flags |= JANET_POLL_FLAG_IOCP; + stream->flags |= JANET_POLL_FLAG_IOCP; } return state; } @@ -763,8 +824,8 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { } } else { /* Normal event */ - JanetPollable *pollable = (JanetPollable *) completionKey; - JanetListenerState *state = pollable->state; + JanetStream *stream = (JanetStream *) completionKey; + JanetListenerState *state = stream->state; while (state != NULL) { if (state->tag == overlapped) { state->event = overlapped; @@ -810,16 +871,16 @@ static int make_epoll_events(int mask) { } /* Wait for the next event */ -JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { - int is_first = !(pollable->state); +JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { + int is_first = !(stream->state); int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); + JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); struct epoll_event ev; - ev.events = make_epoll_events(state->pollable->_mask); - ev.data.ptr = pollable; + ev.events = make_epoll_events(state->stream->_mask); + ev.data.ptr = stream; int status; do { - status = epoll_ctl(janet_vm_epoll, op, pollable->handle, &ev); + status = epoll_ctl(janet_vm_epoll, op, stream->handle, &ev); } while (status == -1 && errno == EINTR); if (status == -1) { janet_unlisten_impl(state); @@ -830,18 +891,20 @@ JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior /* Tell system we are done listening for a certain event */ static void janet_unlisten(JanetListenerState *state) { - JanetPollable *pollable = state->pollable; - int is_last = (state->_next == NULL && pollable->state == state); - int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; - struct epoll_event ev; - ev.events = make_epoll_events(pollable->_mask & ~state->_mask); - ev.data.ptr = pollable; - int status; - do { - status = epoll_ctl(janet_vm_epoll, op, pollable->handle, &ev); - } while (status == -1 && errno == EINTR); - if (status == -1) { - janet_panicf("failed to unschedule event: %s", strerror(errno)); + JanetStream *stream = state->stream; + if (!(stream->flags & JANET_STREAM_CLOSED)) { + int is_last = (state->_next == NULL && stream->state == state); + int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; + struct epoll_event ev; + ev.events = make_epoll_events(stream->_mask & ~state->_mask); + ev.data.ptr = stream; + int status; + do { + status = epoll_ctl(janet_vm_epoll, op, stream->handle, &ev); + } while (status == -1 && errno == EINTR); + if (status == -1) { + janet_panicf("failed to unschedule event: %s", strerror(errno)); + } } /* Destroy state machine and free memory */ janet_unlisten_impl(state); @@ -871,10 +934,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { /* Step state machines */ for (int i = 0; i < ready; i++) { - JanetPollable *pollable = events[i].data.ptr; - if (NULL != pollable) { /* If NULL, is a timeout */ + JanetStream *stream = events[i].data.ptr; + if (NULL != stream) { /* If NULL, is a timeout */ int mask = events[i].events; - JanetListenerState *state = pollable->state; + JanetListenerState *state = stream->state; state->event = events + i; while (NULL != state) { JanetListenerState *next_state = state->_next; @@ -884,7 +947,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); if (mask & EPOLLIN) status2 = state->machine(state, JANET_ASYNC_EVENT_READ); - if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE) + if (mask & EPOLLERR) + status3 = state->machine(state, JANET_ASYNC_EVENT_ERR); + if (mask & EPOLLHUP) + status4 = state->machine(state, JANET_ASYNC_EVENT_HUP); + if (status1 == JANET_ASYNC_STATUS_DONE || + status2 == JANET_ASYNC_STATUS_DONE || + status3 == JANET_ASYNC_STATUS_DONE || + status4 == JANET_ASYNC_STATUS_DONE) janet_unlisten(state); state = next_state; } @@ -964,11 +1034,11 @@ static void janet_push_pollfd(struct pollfd pfd) { } /* Wait for the next event */ -JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) { - JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user); +JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { + JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); struct pollfd ev; - ev.fd = pollable->handle; - ev.events = make_poll_events(state->pollable->_mask); + ev.fd = stream->handle; + ev.events = make_poll_events(state->stream->_mask); ev.revents = 0; state->_index = janet_vm_fdcount; janet_push_pollfd(ev); @@ -1006,18 +1076,26 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { for (size_t i = 0; i < janet_vm_fdcount; i++) { struct pollfd *pfd = janet_vm_fds + i; /* Skip fds where nothing interesting happened */ - if (!(pfd->revents & (pfd->events | POLLHUP | POLLERR | POLLNVAL))) continue; JanetListenerState *state = janet_vm_listener_map[i]; /* Normal event */ int mask = janet_vm_fds[i].revents; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; + JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE; + JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE; state->event = pfd; if (mask & POLLOUT) status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); if (mask & POLLIN) status2 = state->machine(state, JANET_ASYNC_EVENT_READ); - if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE) + if (mask & POLLERR) + status2 = state->machine(state, JANET_ASYNC_EVENT_ERR); + if (mask & POLLHUP) + status2 = state->machine(state, JANET_ASYNC_EVENT_HUP); + if (status1 == JANET_ASYNC_STATUS_DONE || + status2 == JANET_ASYNC_STATUS_DONE || + status3 == JANET_ASYNC_STATUS_DONE || + status4 == JANET_ASYNC_STATUS_DONE) janet_unlisten(state); } } @@ -1091,6 +1169,7 @@ typedef enum { typedef struct { JanetListenerState head; int32_t bytes_left; + int32_t bytes_read; JanetBuffer *buf; int is_chunk; JanetReadMode mode; @@ -1122,7 +1201,8 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { #ifdef JANET_WINDOWS case JANET_ASYNC_EVENT_COMPLETE: { /* Called when read finished */ - if (s->bytes == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { + state->bytes_read += s->bytes; + if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { janet_schedule(s->fiber, janet_wrap_nil()); return JANET_ASYNC_STATUS_DONE; } @@ -1130,7 +1210,7 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes); state->bytes_left -= s->bytes; - if (state->bytes_left <= 0 || !state->is_chunk) { + if (state->bytes_left <= 0 || !state->is_chunk || s->bytes == 0) { Janet resume_val; #ifdef JANET_NET if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { @@ -1158,27 +1238,38 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { state->wbuf.len = (ULONG) chunk_size; state->wbuf.buf = state->chunk_buf; if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { - status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); - } else + status = WSARecvFrom((SOCKET) s->stream->handle, &state->wbuf, 1, + NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); + } else { + status = WSARecv((SOCKET) s->stream->handle, &state->wbuf, 1, + NULL, &state->flags, &state->overlapped, NULL); } - status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL); - } - if (status && (WSA_IO_PENDING != WSAGetLastError())) { - janet_cancel(s->fiber, janet_ev_lasterr()); - return JANET_ASYNC_STATUS_DONE; - } - } else + } + if (status && (WSA_IO_PENDING != WSAGetLastError())) { + janet_cancel(s->fiber, janet_ev_lasterr()); + return JANET_ASYNC_STATUS_DONE; + } + } else #endif - { - status = ReadFile(s->pollable->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped); - if (status && (ERROR_IO_PENDING != WSAGetLastError())) { - janet_cancel(s->fiber, janet_ev_lasterr()); - return JANET_ASYNC_STATUS_DONE; + { + status = ReadFile(s->stream->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped); + if (status && (ERROR_IO_PENDING != WSAGetLastError())) { + janet_cancel(s->fiber, janet_ev_lasterr()); + return JANET_ASYNC_STATUS_DONE; + } } } -} -break; + break; #else + case JANET_ASYNC_EVENT_ERR: + case JANET_ASYNC_EVENT_HUP: { + if (state->bytes_read) { + janet_schedule(s->fiber, janet_wrap_buffer(state->buf)); + } else { + janet_schedule(s->fiber, janet_wrap_nil()); + } + return JANET_ASYNC_STATUS_DONE; + } case JANET_ASYNC_EVENT_READ: /* Read in bytes */ { @@ -1193,14 +1284,14 @@ break; do { #ifdef JANET_NET if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { - nread = recvfrom(s->pollable->handle, buffer->data + buffer->count, bytes_left, state->flags, + nread = recvfrom(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags, (struct sockaddr *)&saddr, &socklen); } else if (state->mode == JANET_ASYNC_READMODE_RECV) { - nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, state->flags); + nread = recv(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags); } else #endif { - nread = read(s->pollable->handle, buffer->data + buffer->count, bytes_left); + nread = read(s->stream->handle, buffer->data + buffer->count, bytes_left); } } while (nread == -1 && errno == EINTR); @@ -1212,22 +1303,19 @@ break; } /* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */ - if (nread == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { + state->bytes_read += nread; + if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { janet_schedule(s->fiber, janet_wrap_nil()); return JANET_ASYNC_STATUS_DONE; } /* Increment buffer counts */ - if (nread > 0) { - buffer->count += nread; - bytes_left -= nread; - } else { - bytes_left = 0; - } + buffer->count += nread; + bytes_left -= nread; state->bytes_left = bytes_left; /* Resume if done */ - if (!state->is_chunk || bytes_left == 0) { + if (!state->is_chunk || bytes_left == 0 || nread == 0) { Janet resume_val; #ifdef JANET_NET if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { @@ -1249,12 +1337,13 @@ break; return JANET_ASYNC_STATUS_NOT_DONE; } -static void janet_ev_read_generic(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) { +static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) { StateRead *state = (StateRead *) janet_listen(stream, ev_machine_read, JANET_ASYNC_LISTEN_READ, sizeof(StateRead), NULL); state->is_chunk = is_chunked; state->buf = buf; state->bytes_left = nbytes; + state->bytes_read = 0; state->mode = mode; #ifdef JANET_WINDOWS ev_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); @@ -1264,20 +1353,20 @@ static void janet_ev_read_generic(JanetPollable *stream, JanetBuffer *buf, int32 #endif } -void janet_ev_read(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes) { +void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_READ, 0); } -void janet_ev_readchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes) { +void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_READ, 0); } #ifdef JANET_NET -void janet_ev_recv(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) { +void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECV, flags); } -void janet_ev_recvchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) { +void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { janet_ev_read_generic(stream, buf, nbytes, 1, JANET_ASYNC_READMODE_RECV, flags); } -void janet_ev_recvfrom(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags) { +void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { janet_ev_read_generic(stream, buf, nbytes, 0, JANET_ASYNC_READMODE_RECVFROM, flags); } #endif @@ -1364,7 +1453,7 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) int status; #ifdef JANET_NET if (state->mode != JANET_ASYNC_WRITEMODE_WRITE) { - SOCKET sock = (SOCKET) s->pollable->handle; + SOCKET sock = (SOCKET) s->stream->handle; state->wbuf.buf = (char *) bytes; state->wbuf.len = len; if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { @@ -1381,7 +1470,7 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) } else #endif { - status = WriteFile(s->pollable->handle, bytes, len, NULL, &state->overlapped); + status = WriteFile(s->stream->handle, bytes, len, NULL, &state->overlapped); if (status && (ERROR_IO_PENDING != WSAGetLastError())) { janet_cancel(s->fiber, janet_cstringv("failed to write to stream")); return JANET_ASYNC_STATUS_DONE; @@ -1390,6 +1479,12 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) } break; #else + case JANET_ASYNC_EVENT_ERR: + janet_cancel(s->fiber, janet_cstringv("stream err")); + return JANET_ASYNC_STATUS_DONE; + case JANET_ASYNC_EVENT_HUP: + janet_cancel(s->fiber, janet_cstringv("stream hup")); + return JANET_ASYNC_STATUS_DONE; case JANET_ASYNC_EVENT_WRITE: { int32_t start, len; const uint8_t *bytes; @@ -1409,14 +1504,14 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) do { #ifdef JANET_NET if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { - nwrote = sendto(s->pollable->handle, bytes + start, nbytes, state->flags, + nwrote = sendto(s->stream->handle, bytes + start, nbytes, state->flags, (struct sockaddr *) dest_abst, janet_abstract_size(dest_abst)); } else if (state->mode == JANET_ASYNC_WRITEMODE_SEND) { - nwrote = send(s->pollable->handle, bytes + start, nbytes, state->flags); + nwrote = send(s->stream->handle, bytes + start, nbytes, state->flags); } else #endif { - nwrote = write(s->pollable->handle, bytes + start, nbytes); + nwrote = write(s->stream->handle, bytes + start, nbytes); } } while (nwrote == -1 && errno == EINTR); @@ -1452,7 +1547,7 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) return JANET_ASYNC_STATUS_NOT_DONE; } -static void janet_ev_write_generic(JanetPollable *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) { +static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_abst, JanetWriteMode mode, int is_buffer, int flags) { StateWrite *state = (StateWrite *) janet_listen(stream, ev_machine_write, JANET_ASYNC_LISTEN_WRITE, sizeof(StateWrite), NULL); state->is_buffer = is_buffer; @@ -1469,28 +1564,28 @@ static void janet_ev_write_generic(JanetPollable *stream, void *buf, void *dest_ } -void janet_ev_write_buffer(JanetPollable *stream, JanetBuffer *buf) { +void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf) { janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_WRITE, 1, 0); } -void janet_ev_write_string(JanetPollable *stream, JanetString str) { +void janet_ev_write_string(JanetStream *stream, JanetString str) { janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_WRITE, 0, 0); } #ifdef JANET_NET -void janet_ev_send_buffer(JanetPollable *stream, JanetBuffer *buf, int flags) { +void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags) { janet_ev_write_generic(stream, buf, NULL, JANET_ASYNC_WRITEMODE_SEND, 1, flags); } -void janet_ev_send_string(JanetPollable *stream, JanetString str, int flags) { +void janet_ev_send_string(JanetStream *stream, JanetString str, int flags) { janet_ev_write_generic(stream, (void *) str, NULL, JANET_ASYNC_WRITEMODE_SEND, 0, flags); } -void janet_ev_sendto_buffer(JanetPollable *stream, JanetBuffer *buf, void *dest, int flags) { +void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags) { janet_ev_write_generic(stream, buf, dest, JANET_ASYNC_WRITEMODE_SENDTO, 1, flags); } -void janet_ev_sendto_string(JanetPollable *stream, JanetString str, void *dest, int flags) { +void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) { janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags); } #endif @@ -1533,6 +1628,71 @@ static Janet cfun_ev_cancel(int32_t argc, Janet *argv) { return argv[0]; } +Janet janet_cfun_stream_close(int32_t argc, Janet *argv) { + janet_fixarity(argc, 1); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_close(stream); + return argv[0]; +} + +Janet janet_cfun_stream_read(int32_t argc, Janet *argv) { + janet_arity(argc, 2, 4); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_READABLE); + int32_t n = janet_getnat(argv, 1); + JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); + double to = janet_optnumber(argv, argc, 3, INFINITY); + if (to != INFINITY) janet_addtimeout(to); + janet_ev_read(stream, buffer, n); + janet_await(); +} + +Janet janet_cfun_stream_chunk(int32_t argc, Janet *argv) { + janet_arity(argc, 2, 4); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_READABLE); + int32_t n = janet_getnat(argv, 1); + JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); + double to = janet_optnumber(argv, argc, 3, INFINITY); + if (to != INFINITY) janet_addtimeout(to); + janet_ev_readchunk(stream, buffer, n); + janet_await(); +} + +Janet janet_cfun_stream_write(int32_t argc, Janet *argv) { + janet_arity(argc, 2, 3); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_WRITABLE); + double to = janet_optnumber(argv, argc, 2, INFINITY); + if (janet_checktype(argv[1], JANET_BUFFER)) { + if (to != INFINITY) janet_addtimeout(to); + janet_ev_write_buffer(stream, janet_getbuffer(argv, 1)); + } else { + JanetByteView bytes = janet_getbytes(argv, 1); + if (to != INFINITY) janet_addtimeout(to); + janet_ev_write_string(stream, bytes.bytes); + } + janet_await(); +} + +static Janet cfun_ev_pipe(int32_t argc, Janet *argv) { + (void) argv; + janet_fixarity(argc, 0); +#ifdef JANET_WINDOWS + JanetHandle rhandle, whandle; + if (!CreatePipe(&rhandle, &whandle, NULL, 0)) janet_panicv(janet_ev_lasterr()); + JanetStream *reader = janet_stream(rhandle, JANET_STREAM_READABLE, NULL); + JanetStream *writer = janet_stream(whandle, JANET_STREAM_WRITABLE, NULL); +#else + int fds[2]; + if (pipe(fds)) janet_panicv(janet_ev_lasterr()); + JanetStream *reader = janet_stream(fds[0], JANET_STREAM_READABLE, NULL); + JanetStream *writer = janet_stream(fds[1], JANET_STREAM_WRITABLE, NULL); +#endif + Janet tup[2] = {janet_wrap_abstract(reader), janet_wrap_abstract(writer)}; + return janet_wrap_tuple(janet_tuple_n(tup, 2)); +} + static const JanetReg ev_cfuns[] = { { "ev/call", cfun_ev_call, @@ -1600,6 +1760,40 @@ static const JanetReg ev_cfuns[] = { JDOC("(ev/rselect & clauses)\n\n" "Similar to ev/choice, but will try clauses in a random order for fairness.") }, + { + "ev/close", janet_cfun_stream_close, + JDOC("(ev/close stream)\n\n" + "Close a stream. This should be the same as calling (:close stream) for all streams.") + }, + { + "ev/read", janet_cfun_stream_read, + JDOC("(ev/read stream n &opt buffer timeout)\n\n" + "Read up to n bytes into a buffer asynchronously from a stream. " + "Optionally provide a buffer to write into " + "as well as a timeout in seconds after which to cancel the operation and raise an error. " + "Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an " + "error if there are problems with the IO operation.") + }, + { + "ev/chunk", janet_cfun_stream_chunk, + JDOC("(ev/chunk stream n &opt buffer timeout)\n\n" + "Same as ev/read, but will not return early if less than n bytes are available. If an end of " + "stream is reached, will also return early with the collected bytes.") + }, + { + "ev/write", janet_cfun_stream_write, + JDOC("(ev/write stream data &opt timeout)\n\n" + "Write data to a stream, suspending the current fiber until the write " + "completes. Takes an optional timeout in seconds, after which will return nil. " + "Returns nil, or raises an error if the write failed.") + }, + { + "ev/pipe", cfun_ev_pipe, + JDOC("(ev/pipe)\n\n" + "Create a readable stream and a writable stream that are connected. Returns a two element " + "tuple where the first element is a readable stream and the second element is the writable " + "stream.") + }, {NULL, NULL, NULL} }; diff --git a/src/core/net.c b/src/core/net.c index 7213c158..4af55c13 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -50,28 +50,6 @@ #include #endif -/* - * Streams - simple abstract type that wraps a pollable + extra flags - */ - -#define JANET_STREAM_READABLE 0x200 -#define JANET_STREAM_WRITABLE 0x400 -#define JANET_STREAM_ACCEPTABLE 0x800 -#define JANET_STREAM_UDPSERVER 0x1000 - -static int janet_stream_close(void *p, size_t s); -static int janet_stream_mark(void *p, size_t s); -static int janet_stream_getter(void *p, Janet key, Janet *out); -static const JanetAbstractType StreamAT = { - "core/stream", - janet_stream_close, - janet_stream_mark, - janet_stream_getter, - JANET_ATEND_GET -}; - -typedef JanetPollable JanetStream; - const JanetAbstractType janet_address_type = { "core/socket-address", JANET_ATEND_NAME @@ -82,14 +60,6 @@ const JanetAbstractType janet_address_type = { #define JSOCKVALID(x) ((x) != INVALID_SOCKET) #define JSock SOCKET #define JSOCKFLAGS 0 -static JanetStream *make_stream(SOCKET fd, uint32_t flags) { - u_long iMode = 0; - JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); - janet_pollable_init(stream, (JanetHandle) fd); - ioctlsocket(fd, FIONBIO, &iMode); - stream->flags = flags; - return stream; -} #else #define JSOCKCLOSE(x) close(x) #define JSOCKDEFAULT 0 @@ -100,35 +70,15 @@ static JanetStream *make_stream(SOCKET fd, uint32_t flags) { #else #define JSOCKFLAGS 0 #endif -static JanetStream *make_stream(int fd, uint32_t flags) { - JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); - janet_pollable_init(stream, fd); -#if !defined(SOCK_CLOEXEC) && defined(O_CLOEXEC) - int extra = O_CLOEXEC; -#else - int extra = 0; -#endif - fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK | extra); - stream->flags = flags; - return stream; -} #endif +static JanetStream *make_stream(JanetHandle handle, uint32_t flags); + /* We pass this flag to all send calls to prevent sigpipe */ #ifndef MSG_NOSIGNAL #define MSG_NOSIGNAL 0 #endif -static int janet_stream_close(void *p, size_t s) { - (void) s; - JanetStream *stream = p; - if (!(stream->flags & JANET_POLL_FLAG_CLOSED)) { - JSOCKCLOSE(stream->handle); - janet_pollable_deinit(stream); - } - return 0; -} - static void nosigpipe(JSock s) { #ifdef SO_NOSIGPIPE int enable = 1; @@ -141,12 +91,6 @@ static void nosigpipe(JSock s) { #endif } -static int janet_stream_mark(void *p, size_t s) { - (void) s; - janet_pollable_mark((JanetPollable *) p); - return 0; -} - /* State machine for accepting connections. */ #ifdef JANET_WINDOWS @@ -263,7 +207,7 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event janet_schedule(s->fiber, janet_wrap_nil()); return JANET_ASYNC_STATUS_DONE; case JANET_ASYNC_EVENT_READ: { - JSock connfd = accept(s->pollable->handle, NULL, NULL); + JSock connfd = accept(s->stream->handle, NULL, NULL); if (JSOCKVALID(connfd)) { nosigpipe(connfd); JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); @@ -543,29 +487,30 @@ static Janet cfun_net_listen(int32_t argc, Janet *argv) { } } -static void check_stream_flag(JanetStream *stream, int flag) { - if (!(stream->flags & flag) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { - const char *msg = ""; - if (flag == JANET_STREAM_READABLE) msg = "readable"; - if (flag == JANET_STREAM_WRITABLE) msg = "writable"; - if (flag == JANET_STREAM_ACCEPTABLE) msg = "server"; - if (flag == JANET_STREAM_UDPSERVER) msg = "datagram server"; - janet_panicf("bad stream, expected %s stream", msg); +void janet_stream_flags(JanetStream *stream, uint32_t flags) { + if ((stream->flags & flags) != flags || (stream->flags & JANET_STREAM_CLOSED)) { + const char *rmsg = "", *wmsg = "", *amsg = "", *dmsg = "", *smsg = "stream"; + if (flags & JANET_STREAM_READABLE) rmsg = "readable "; + if (flags & JANET_STREAM_WRITABLE) wmsg = "writable "; + if (flags & JANET_STREAM_ACCEPTABLE) amsg = "server "; + if (flags & JANET_STREAM_UDPSERVER) dmsg = "datagram "; + if (flags & JANET_STREAM_SOCKET) smsg = "socket"; + janet_panicf("bad stream, expected %s%s%s%s%s", rmsg, wmsg, amsg, dmsg, smsg); } } static Janet cfun_stream_accept_loop(int32_t argc, Janet *argv) { janet_fixarity(argc, 2); - JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - check_stream_flag(stream, JANET_STREAM_ACCEPTABLE); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_ACCEPTABLE | JANET_STREAM_SOCKET); JanetFunction *fun = janet_getfunction(argv, 1); janet_sched_accept(stream, fun); } static Janet cfun_stream_accept(int32_t argc, Janet *argv) { janet_arity(argc, 1, 2); - JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - check_stream_flag(stream, JANET_STREAM_ACCEPTABLE); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_ACCEPTABLE | JANET_STREAM_SOCKET); double to = janet_optnumber(argv, argc, 1, INFINITY); if (to != INFINITY) janet_addtimeout(to); janet_sched_accept(stream, NULL); @@ -573,8 +518,8 @@ static Janet cfun_stream_accept(int32_t argc, Janet *argv) { static Janet cfun_stream_read(int32_t argc, Janet *argv) { janet_arity(argc, 2, 4); - JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - check_stream_flag(stream, JANET_STREAM_READABLE); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET); int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); double to = janet_optnumber(argv, argc, 3, INFINITY); @@ -585,8 +530,8 @@ static Janet cfun_stream_read(int32_t argc, Janet *argv) { static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { janet_arity(argc, 2, 4); - JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - check_stream_flag(stream, JANET_STREAM_READABLE); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET); int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); double to = janet_optnumber(argv, argc, 3, INFINITY); @@ -597,8 +542,8 @@ static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) { janet_arity(argc, 3, 4); - JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - check_stream_flag(stream, JANET_STREAM_UDPSERVER); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_UDPSERVER | JANET_STREAM_SOCKET); int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_getbuffer(argv, 2); double to = janet_optnumber(argv, argc, 3, INFINITY); @@ -607,23 +552,10 @@ static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) { janet_await(); } -static Janet cfun_stream_close(int32_t argc, Janet *argv) { - janet_fixarity(argc, 1); - JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - janet_stream_close(stream, 0); - return janet_wrap_nil(); -} - -static Janet cfun_stream_closed(int32_t argc, Janet *argv) { - janet_fixarity(argc, 1); - JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - return janet_wrap_boolean(stream->flags & JANET_POLL_FLAG_CLOSED); -} - static Janet cfun_stream_write(int32_t argc, Janet *argv) { janet_arity(argc, 2, 3); - JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - check_stream_flag(stream, JANET_STREAM_WRITABLE); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_WRITABLE | JANET_STREAM_SOCKET); double to = janet_optnumber(argv, argc, 2, INFINITY); if (janet_checktype(argv[1], JANET_BUFFER)) { if (to != INFINITY) janet_addtimeout(to); @@ -638,8 +570,8 @@ static Janet cfun_stream_write(int32_t argc, Janet *argv) { static Janet cfun_stream_send_to(int32_t argc, Janet *argv) { janet_arity(argc, 3, 4); - JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - check_stream_flag(stream, JANET_STREAM_UDPSERVER); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_UDPSERVER | JANET_STREAM_SOCKET); void *dest = janet_getabstract(argv, 1, &janet_address_type); double to = janet_optnumber(argv, argc, 3, INFINITY); if (janet_checktype(argv[2], JANET_BUFFER)) { @@ -655,8 +587,8 @@ static Janet cfun_stream_send_to(int32_t argc, Janet *argv) { static Janet cfun_stream_flush(int32_t argc, Janet *argv) { janet_fixarity(argc, 2); - JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - check_stream_flag(stream, JANET_STREAM_WRITABLE); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_WRITABLE | JANET_STREAM_SOCKET); /* Toggle no delay flag */ int flag = 1; setsockopt((JSock) stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); @@ -665,10 +597,9 @@ static Janet cfun_stream_flush(int32_t argc, Janet *argv) { return argv[0]; } -static const JanetMethod stream_methods[] = { +static const JanetMethod net_stream_methods[] = { {"chunk", cfun_stream_chunk}, - {"close", cfun_stream_close}, - {"closed?", cfun_stream_closed}, + {"close", janet_cfun_stream_close}, {"read", cfun_stream_read}, {"write", cfun_stream_write}, {"flush", cfun_stream_flush}, @@ -676,13 +607,15 @@ static const JanetMethod stream_methods[] = { {"accept-loop", cfun_stream_accept_loop}, {"send-to", cfun_stream_send_to}, {"recv-from", cfun_stream_recv_from}, + {"recv-from", cfun_stream_recv_from}, + {"evread", janet_cfun_stream_read}, + {"evchunk", janet_cfun_stream_chunk}, + {"evwrite", janet_cfun_stream_write}, {NULL, NULL} }; -static int janet_stream_getter(void *p, Janet key, Janet *out) { - (void) p; - if (!janet_checktype(key, JANET_KEYWORD)) return 0; - return janet_getmethod(janet_unwrap_keyword(key), stream_methods, out); +static JanetStream *make_stream(JanetHandle handle, uint32_t flags) { + return janet_stream(handle, flags | JANET_STREAM_SOCKET, net_stream_methods); } static const JanetReg net_cfuns[] = { @@ -757,16 +690,6 @@ static const JanetReg net_cfuns[] = { "Make sure that a stream is not buffering any data. This temporarily disables Nagle's algorithm. " "Use this to make sure data is sent without delay. Returns stream.") }, - { - "net/close", cfun_stream_close, - JDOC("(net/close stream)\n\n" - "Close a stream so that no further communication can occur.") - }, - { - "net/closed?", cfun_stream_closed, - JDOC("(net/closed? stream)\n\n" - "Check if a stream is closed.") - }, { "net/connect", cfun_net_connect, JDOC("(net/connect host porti &opt type)\n\n" diff --git a/src/include/janet.h b/src/include/janet.h index 54ec8a27..8ab99612 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -500,15 +500,20 @@ typedef void *JanetAbstract; /* Event Loop Types */ #ifdef JANET_EV -#define JANET_POLL_FLAG_CLOSED 0x1 -#define JANET_POLL_FLAG_SOCKET 0x2 -#define JANET_POLL_FLAG_IOCP 0x4 +#define JANET_STREAM_CLOSED 0x1 +#define JANET_STREAM_SOCKET 0x2 +#define JANET_STREAM_READABLE 0x200 +#define JANET_STREAM_WRITABLE 0x400 +#define JANET_STREAM_ACCEPTABLE 0x800 +#define JANET_STREAM_UDPSERVER 0x1000 typedef enum { JANET_ASYNC_EVENT_INIT, JANET_ASYNC_EVENT_MARK, JANET_ASYNC_EVENT_DEINIT, JANET_ASYNC_EVENT_CLOSE, + JANET_ASYNC_EVENT_ERR, + JANET_ASYNC_EVENT_HUP, JANET_ASYNC_EVENT_READ, JANET_ASYNC_EVENT_WRITE, JANET_ASYNC_EVENT_TIMEOUT, @@ -518,7 +523,6 @@ typedef enum { #define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ) #define JANET_ASYNC_LISTEN_WRITE (1 << JANET_ASYNC_EVENT_WRITE) -#define JANET_ASYNC_LISTEN_SPAWNER 0x1000 typedef enum { JANET_ASYNC_STATUS_NOT_DONE, @@ -527,18 +531,19 @@ typedef enum { /* Typedefs */ typedef struct JanetListenerState JanetListenerState; -typedef struct JanetPollable JanetPollable; +typedef struct JanetStream JanetStream; typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEvent event); /* Wrapper around file descriptors and HANDLEs that can be polled. */ -struct JanetPollable { +struct JanetStream { JanetHandle handle; uint32_t flags; - /* Linked list of all in-flight IO routines for this pollable */ + /* Linked list of all in-flight IO routines for this stream */ JanetListenerState *state; - /* internal - used to disallow multiple concurrent reads / writes on the same pollable. + const void *methods; /* Methods for this stream */ + /* internal - used to disallow multiple concurrent reads / writes on the same stream. * this constraint may be lifted later but allowing such would require more internal book keeping - * for some implementations. You can read and write at the same time on the same pollable, though. */ + * for some implementations. You can read and write at the same time on the same stream, though. */ int _mask; }; @@ -546,7 +551,7 @@ struct JanetPollable { struct JanetListenerState { JanetListener machine; JanetFiber *fiber; - JanetPollable *pollable; + JanetStream *stream; void *event; /* Used to pass data from asynchronous IO event. Contents depend on both implementation of the event loop and the particular event. */ #ifdef JANET_WINDOWS @@ -1249,21 +1254,27 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT]; #ifdef JANET_EV +extern JANET_API const JanetAbstractType janet_stream_type; + /* Run the event loop */ JANET_API void janet_loop(void); -/* Wrapper around pollables */ -JANET_API void janet_pollable_init(JanetPollable *pollable, JanetHandle handle); -JANET_API void janet_pollable_mark(JanetPollable *pollable); -JANET_API void janet_pollable_deinit(JanetPollable *pollable); +/* Wrapper around streams */ +JANET_API JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods); +JANET_API void janet_stream_close(JanetStream *stream); +JANET_API Janet janet_cfun_stream_close(int32_t argc, Janet *argv); +JANET_API Janet janet_cfun_stream_read(int32_t argc, Janet *argv); +JANET_API Janet janet_cfun_stream_chunk(int32_t argc, Janet *argv); +JANET_API Janet janet_cfun_stream_write(int32_t argc, Janet *argv); +JANET_API void janet_stream_flags(JanetStream *stream, uint32_t flags); /* Queue a fiber to run on the event loop */ JANET_API void janet_schedule(JanetFiber *fiber, Janet value); JANET_API void janet_cancel(JanetFiber *fiber, Janet value); JANET_API void janet_schedule_signal(JanetFiber *fiber, Janet value, JanetSignal sig); -/* Start a state machine listening for events from a pollable */ -JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user); +/* Start a state machine listening for events from a stream */ +JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user); /* Shorthand for yielding to event loop in C */ JANET_NO_RETURN JANET_API void janet_await(void); @@ -1275,23 +1286,23 @@ JANET_API void janet_addtimeout(double sec); /* Get last error from a an IO operation */ JANET_API Janet janet_ev_lasterr(void); -/* Read async from a pollable */ -JANET_API void janet_ev_read(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes); -JANET_API void janet_ev_readchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes); +/* Read async from a stream */ +JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes); +JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes); #ifdef JANET_NET -JANET_API void janet_ev_recv(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags); -JANET_API void janet_ev_recvchunk(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags); -JANET_API void janet_ev_recvfrom(JanetPollable *stream, JanetBuffer *buf, int32_t nbytes, int flags); +JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); +JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); +JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); #endif -/* Write async to a pollable */ -JANET_API void janet_ev_write_buffer(JanetPollable *stream, JanetBuffer *buf); -JANET_API void janet_ev_write_string(JanetPollable *stream, JanetString str); +/* Write async to a stream */ +JANET_API void janet_ev_write_buffer(JanetStream *stream, JanetBuffer *buf); +JANET_API void janet_ev_write_string(JanetStream *stream, JanetString str); #ifdef JANET_NET -JANET_API void janet_ev_send_buffer(JanetPollable *stream, JanetBuffer *buf, int flags); -JANET_API void janet_ev_send_string(JanetPollable *stream, JanetString str, int flags); -JANET_API void janet_ev_sendto_buffer(JanetPollable *stream, JanetBuffer *buf, void *dest, int flags); -JANET_API void janet_ev_sendto_string(JanetPollable *stream, JanetString str, void *dest, int flags); +JANET_API void janet_ev_send_buffer(JanetStream *stream, JanetBuffer *buf, int flags); +JANET_API void janet_ev_send_string(JanetStream *stream, JanetString str, int flags); +JANET_API void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, int flags); +JANET_API void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags); #endif #endif @@ -1604,6 +1615,9 @@ JANET_API Janet janet_resolve_core(const char *name); /* New C API */ +/* Shorthand for janet C function declarations */ +#define JANET_CFUN(name) Janet name (int32_t argc, Janet *argv) + /* Allow setting entry name for static libraries */ #ifdef __cplusplus #define JANET_MODULE_PREFIX extern "C" diff --git a/test/suite0009.janet b/test/suite0009.janet index 7510a32a..3fc914bb 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -33,16 +33,9 @@ (:write stream b) (buffer/clear b))) -(def s (net/server "127.0.0.1" "8000")) +(def s (net/server "127.0.0.1" "8000" handler)) (assert s "made server 1") -(ev/go - (coro - (while (not (net/closed? s)) - (def conn (net/accept s)) - (unless conn (break)) - (ev/call handler conn)))) - (defn test-echo [msg] (with [conn (net/connect "127.0.0.1" "8000")] (:write conn msg) @@ -55,4 +48,19 @@ (:close s) +# Create pipe + +(var pipe-counter 0) +(def [reader writer] (ev/pipe)) +(ev/spawn + (while (ev/read reader 3) + (++ pipe-counter)) + (assert (= 20 pipe-counter) "ev/pipe 1")) + +(for i 0 10 + (ev/write writer "xxx---")) + +(ev/close writer) +(ev/sleep 0.1) + (end-suite)