diff --git a/CHANGELOG.md b/CHANGELOG.md index a2f4ad58..7d42e893 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ All notable changes to this project will be documented in this file. ## Unreleased - ??? +- Add `:all` keyword to `ev/read` and `net/read` to make them more like `file/read`. However, we + do not provide any `:line` option as that requires buffering. - Change repl behavior to make Ctrl-C raise SIGINT on posix. The old behavior for Ctrl-C, to clear the current line buffer, has been moved to Ctrl-Q. - Importing modules that start with `/` is now the only way to import from project root. diff --git a/src/core/ev.c b/src/core/ev.c index b99e223b..1ebbf912 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -1297,7 +1297,7 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes); state->bytes_left -= s->bytes; - if (state->bytes_left <= 0 || !state->is_chunk || s->bytes == 0) { + if (state->bytes_left == 0 || !state->is_chunk || s->bytes == 0) { Janet resume_val; #ifdef JANET_NET if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { @@ -1360,12 +1360,11 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { } return JANET_ASYNC_STATUS_DONE; } - case JANET_ASYNC_EVENT_READ: - /* Read in bytes */ - { + case JANET_ASYNC_EVENT_READ: { JanetBuffer *buffer = state->buf; int32_t bytes_left = state->bytes_left; - janet_buffer_extra(buffer, bytes_left); + int32_t read_limit = bytes_left < 0 ? 4096 : bytes_left; + janet_buffer_extra(buffer, read_limit); ssize_t nread; #ifdef JANET_NET char saddr[256]; @@ -1374,14 +1373,14 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { do { #ifdef JANET_NET if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { - nread = recvfrom(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags, + nread = recvfrom(s->stream->handle, buffer->data + buffer->count, read_limit, state->flags, (struct sockaddr *)&saddr, &socklen); } else if (state->mode == JANET_ASYNC_READMODE_RECV) { - nread = recv(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags); + nread = recv(s->stream->handle, buffer->data + buffer->count, read_limit, state->flags); } else #endif { - nread = read(s->stream->handle, buffer->data + buffer->count, bytes_left); + nread = read(s->stream->handle, buffer->data + buffer->count, read_limit); } } while (nread == -1 && errno == EINTR); @@ -1803,11 +1802,16 @@ Janet janet_cfun_stream_read(int32_t argc, Janet *argv) { janet_arity(argc, 2, 4); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); janet_stream_flags(stream, JANET_STREAM_READABLE); - int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); double to = janet_optnumber(argv, argc, 3, INFINITY); - if (to != INFINITY) janet_addtimeout(to); - janet_ev_read(stream, buffer, n); + if (janet_keyeq(argv[1], "all")) { + if (to != INFINITY) janet_addtimeout(to); + janet_ev_readchunk(stream, buffer, -1); + } else { + int32_t n = janet_getnat(argv, 1); + if (to != INFINITY) janet_addtimeout(to); + janet_ev_read(stream, buffer, n); + } janet_await(); } @@ -1922,7 +1926,8 @@ static const JanetReg ev_cfuns[] = { { "ev/read", janet_cfun_stream_read, JDOC("(ev/read stream n &opt buffer timeout)\n\n" - "Read up to n bytes into a buffer asynchronously from a stream. " + "Read up to n bytes into a buffer asynchronously from a stream. `n` can also be the keyword " + "`:all` to read into the buffer until end of stream. " "Optionally provide a buffer to write into " "as well as a timeout in seconds after which to cancel the operation and raise an error. " "Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an " diff --git a/src/core/net.c b/src/core/net.c index f0b3984e..ed8a6e17 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -509,11 +509,16 @@ static Janet cfun_stream_read(int32_t argc, Janet *argv) { janet_arity(argc, 2, 4); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET); - int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); double to = janet_optnumber(argv, argc, 3, INFINITY); - if (to != INFINITY) janet_addtimeout(to); - janet_ev_recv(stream, buffer, n, MSG_NOSIGNAL); + if (janet_keyeq(argv[1], "all")) { + if (to != INFINITY) janet_addtimeout(to); + janet_ev_recvchunk(stream, buffer, -1, MSG_NOSIGNAL); + } else { + int32_t n = janet_getnat(argv, 1); + if (to != INFINITY) janet_addtimeout(to); + janet_ev_recv(stream, buffer, n, MSG_NOSIGNAL); + } janet_await(); } @@ -643,6 +648,7 @@ static const JanetReg net_cfuns[] = { "net/read", cfun_stream_read, JDOC("(net/read stream nbytes &opt buf timeout)\n\n" "Read up to n bytes from a stream, suspending the current fiber until the bytes are available. " + "`n` can also be the keyword `:all` to read into the buffer until end of stream. " "If less than n bytes are available (and more than 0), will push those bytes and return early. " "Takes an optional timeout in seconds, after which will return nil. " "Returns a buffer with up to n more bytes in it, or raises an error if the read failed.")