Add :all option to ev/read.

Brings ev/read more in line with file/read.
This commit is contained in:
Calvin Rose 2020-12-29 20:37:59 -06:00
parent 8655530b19
commit ab37ee6ebb
3 changed files with 28 additions and 15 deletions

View File

@ -2,6 +2,8 @@
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## Unreleased - ??? ## Unreleased - ???
- Add `:all` keyword to `ev/read` and `net/read` to make them more like `file/read`. However, we
do not provide any `:line` option as that requires buffering.
- Change repl behavior to make Ctrl-C raise SIGINT on posix. The old behavior for Ctrl-C, - Change repl behavior to make Ctrl-C raise SIGINT on posix. The old behavior for Ctrl-C,
to clear the current line buffer, has been moved to Ctrl-Q. to clear the current line buffer, has been moved to Ctrl-Q.
- Importing modules that start with `/` is now the only way to import from project root. - Importing modules that start with `/` is now the only way to import from project root.

View File

@ -1297,7 +1297,7 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes); janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes);
state->bytes_left -= s->bytes; state->bytes_left -= s->bytes;
if (state->bytes_left <= 0 || !state->is_chunk || s->bytes == 0) { if (state->bytes_left == 0 || !state->is_chunk || s->bytes == 0) {
Janet resume_val; Janet resume_val;
#ifdef JANET_NET #ifdef JANET_NET
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
@ -1360,12 +1360,11 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
} }
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
} }
case JANET_ASYNC_EVENT_READ: case JANET_ASYNC_EVENT_READ: {
/* Read in bytes */
{
JanetBuffer *buffer = state->buf; JanetBuffer *buffer = state->buf;
int32_t bytes_left = state->bytes_left; int32_t bytes_left = state->bytes_left;
janet_buffer_extra(buffer, bytes_left); int32_t read_limit = bytes_left < 0 ? 4096 : bytes_left;
janet_buffer_extra(buffer, read_limit);
ssize_t nread; ssize_t nread;
#ifdef JANET_NET #ifdef JANET_NET
char saddr[256]; char saddr[256];
@ -1374,14 +1373,14 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) {
do { do {
#ifdef JANET_NET #ifdef JANET_NET
if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { if (state->mode == JANET_ASYNC_READMODE_RECVFROM) {
nread = recvfrom(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags, nread = recvfrom(s->stream->handle, buffer->data + buffer->count, read_limit, state->flags,
(struct sockaddr *)&saddr, &socklen); (struct sockaddr *)&saddr, &socklen);
} else if (state->mode == JANET_ASYNC_READMODE_RECV) { } else if (state->mode == JANET_ASYNC_READMODE_RECV) {
nread = recv(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags); nread = recv(s->stream->handle, buffer->data + buffer->count, read_limit, state->flags);
} else } else
#endif #endif
{ {
nread = read(s->stream->handle, buffer->data + buffer->count, bytes_left); nread = read(s->stream->handle, buffer->data + buffer->count, read_limit);
} }
} while (nread == -1 && errno == EINTR); } while (nread == -1 && errno == EINTR);
@ -1803,11 +1802,16 @@ Janet janet_cfun_stream_read(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 4); janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_READABLE); janet_stream_flags(stream, JANET_STREAM_READABLE);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY); double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to); if (janet_keyeq(argv[1], "all")) {
janet_ev_read(stream, buffer, n); if (to != INFINITY) janet_addtimeout(to);
janet_ev_readchunk(stream, buffer, -1);
} else {
int32_t n = janet_getnat(argv, 1);
if (to != INFINITY) janet_addtimeout(to);
janet_ev_read(stream, buffer, n);
}
janet_await(); janet_await();
} }
@ -1922,7 +1926,8 @@ static const JanetReg ev_cfuns[] = {
{ {
"ev/read", janet_cfun_stream_read, "ev/read", janet_cfun_stream_read,
JDOC("(ev/read stream n &opt buffer timeout)\n\n" JDOC("(ev/read stream n &opt buffer timeout)\n\n"
"Read up to n bytes into a buffer asynchronously from a stream. " "Read up to n bytes into a buffer asynchronously from a stream. `n` can also be the keyword "
"`:all` to read into the buffer until end of stream. "
"Optionally provide a buffer to write into " "Optionally provide a buffer to write into "
"as well as a timeout in seconds after which to cancel the operation and raise an error. " "as well as a timeout in seconds after which to cancel the operation and raise an error. "
"Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an " "Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an "

View File

@ -509,11 +509,16 @@ static Janet cfun_stream_read(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 4); janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET); janet_stream_flags(stream, JANET_STREAM_READABLE | JANET_STREAM_SOCKET);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY); double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to); if (janet_keyeq(argv[1], "all")) {
janet_ev_recv(stream, buffer, n, MSG_NOSIGNAL); if (to != INFINITY) janet_addtimeout(to);
janet_ev_recvchunk(stream, buffer, -1, MSG_NOSIGNAL);
} else {
int32_t n = janet_getnat(argv, 1);
if (to != INFINITY) janet_addtimeout(to);
janet_ev_recv(stream, buffer, n, MSG_NOSIGNAL);
}
janet_await(); janet_await();
} }
@ -643,6 +648,7 @@ static const JanetReg net_cfuns[] = {
"net/read", cfun_stream_read, "net/read", cfun_stream_read,
JDOC("(net/read stream nbytes &opt buf timeout)\n\n" JDOC("(net/read stream nbytes &opt buf timeout)\n\n"
"Read up to n bytes from a stream, suspending the current fiber until the bytes are available. " "Read up to n bytes from a stream, suspending the current fiber until the bytes are available. "
"`n` can also be the keyword `:all` to read into the buffer until end of stream. "
"If less than n bytes are available (and more than 0), will push those bytes and return early. " "If less than n bytes are available (and more than 0), will push those bytes and return early. "
"Takes an optional timeout in seconds, after which will return nil. " "Takes an optional timeout in seconds, after which will return nil. "
"Returns a buffer with up to n more bytes in it, or raises an error if the read failed.") "Returns a buffer with up to n more bytes in it, or raises an error if the read failed.")