diff --git a/src/core/cfuns.c b/src/core/cfuns.c index 96b040be..b2e9b0b5 100644 --- a/src/core/cfuns.c +++ b/src/core/cfuns.c @@ -70,10 +70,10 @@ static JanetSlot genericSSI(JanetFopts opts, int op, JanetSlot s, int32_t imm) { /* Emit an insruction that implements a form by itself. */ static JanetSlot opfunction( - JanetFopts opts, - JanetSlot *args, - int op, - Janet defaultArg2) { + JanetFopts opts, + JanetSlot *args, + int op, + Janet defaultArg2) { JanetCompiler *c = opts.compiler; int32_t len; len = janet_v_count(args); diff --git a/src/core/net.c b/src/core/net.c index ad485203..0060dba2 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -36,10 +36,12 @@ #include /* - * Event loops + * Streams */ #define JANET_STREAM_CLOSED 1 +#define JANET_STREAM_READABLE 2 +#define JANET_STREAM_WRITABLE 4 typedef struct { int fd; @@ -48,6 +50,8 @@ typedef struct { static int janet_stream_close(void *p, size_t s); +static int janet_stream_getter(void *p, size_t, Janet key); + static const JanetAbstractType StreamAT = { "core/stream", janet_stream_close, @@ -64,22 +68,25 @@ static int janet_stream_close(void *p, size_t s) { return 0; } -static JanetStream *make_stream(int fd) { +static JanetStream *make_stream(int fd, int flags) { JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); - int flags = fcntl(fd, F_GETFL, 0); - fcntl(fd, F_SETFL, flags | O_NONBLOCK); + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); stream->fd = fd; - stream->flags = 0; + stream->flags = flags; return stream; } +/* + * Event loop + */ + /* This large struct describes a waiting file descriptor, as well * as what to do when we get an event for it. It is a variant type, where * each variant implements a simple state machine. */ typedef struct { /* File descriptor to listen for events on. */ - int fd; + JanetStream *stream; /* Fiber to resume when event finishes. Can be NULL, in which case, * no fiber is resumed when event completes. */ @@ -137,6 +144,7 @@ JANET_THREAD_LOCAL int janet_vm_loop_count; void janet_net_markloop(void) { for (int i = 0; i < janet_vm_loop_count; i++) { JanetLoopFD lfd = janet_vm_loopfds[i]; + janet_mark(janet_wrap_abstract(lfd.stream)); switch (lfd.event_type) { default: break; @@ -175,104 +183,121 @@ static int janet_loop_schedule(JanetLoopFD lfd) { * we can separate out the polling logic */ static size_t janet_loop_event(size_t index) { JanetLoopFD *jlfd = janet_vm_loopfds + index; + JanetStream *stream = jlfd->stream; + int fd = stream->fd; int ret = 1; int should_resume = 0; Janet resumeval = janet_wrap_nil(); - switch (jlfd->event_type) { - case JLE_READ_CHUNK: - case JLE_READ_SOME: { - JanetBuffer *buffer = jlfd->data.read_chunk.buf; - int32_t bytes_left = jlfd->data.read_chunk.bytes_left; - janet_buffer_extra(buffer, bytes_left); - ssize_t nread; - errno = 0; - do { - nread = read(jlfd->fd, buffer->data + buffer->count, bytes_left); - } while (errno == EINTR); - if (errno == EAGAIN || errno == EWOULDBLOCK) { - ret = 1; + if (stream->flags & JANET_STREAM_CLOSED) { + should_resume = 1; + ret = 0; + } else { + switch (jlfd->event_type) { + case JLE_READ_CHUNK: + case JLE_READ_SOME: { + JanetBuffer *buffer = jlfd->data.read_chunk.buf; + int32_t bytes_left = jlfd->data.read_chunk.bytes_left; + janet_buffer_extra(buffer, bytes_left); + ssize_t nread; + errno = 0; + if (!(stream->flags & JANET_STREAM_READABLE)) { + should_resume = 1; + ret = 0; + break; + } + do { + nread = read(fd, buffer->data + buffer->count, bytes_left); + } while (errno == EINTR); + if (errno == EAGAIN || errno == EWOULDBLOCK) { + ret = 1; + break; + } + if (nread > 0) { + buffer->count += nread; + bytes_left -= nread; + } else { + bytes_left = 0; + } + if (jlfd->event_type == JLE_READ_SOME || bytes_left == 0) { + should_resume = 1; + if (nread > 0) { + resumeval = janet_wrap_buffer(buffer); + } + ret = 0; + } else { + jlfd->data.read_chunk.bytes_left = bytes_left; + ret = 1; + } break; } - if (nread > 0) { - buffer->count += nread; - bytes_left -= nread; - } else { - bytes_left = 0; - } - if (jlfd->event_type == JLE_READ_SOME || bytes_left == 0) { - should_resume = 1; - if (nread > 0) { - resumeval = janet_wrap_buffer(buffer); + case JLE_READ_ACCEPT: { + char addr[256] = {0}; /* Just make sure it is large enough for largest address type */ + socklen_t len = 0; + int connfd = accept(fd, (void *) &addr, &len); + if (connfd >= 0) { + /* Made a new connection socket */ + JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); + Janet streamv = janet_wrap_abstract(stream); + JanetFunction *handler = jlfd->data.read_accept.handler; + Janet out; + JanetFiber *fiberp = NULL; + /* Launch connection fiber */ + JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp); + if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) { + janet_stacktrace(fiberp, out); + } } - ret = 0; - } else { - jlfd->data.read_chunk.bytes_left = bytes_left; - ret = 1; + ret = JANET_LOOPFD_MAX; + break; } - break; - } - case JLE_READ_ACCEPT: { - char addr[256] = {0}; /* Just make sure it is large enough for largest address type */ - socklen_t len = 0; - int connfd = accept(jlfd->fd, (void *) &addr, &len); - if (connfd >= 0) { - /* Made a new connection socket */ - JanetStream *stream = make_stream(connfd); - Janet streamv = janet_wrap_abstract(stream); - JanetFunction *handler = jlfd->data.read_accept.handler; - Janet out; - JanetFiber *fiberp = NULL; - /* Launch connection fiber */ - JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp); - if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) { - janet_stacktrace(fiberp, out); + case JLE_WRITE_FROM_BUFFER: + case JLE_WRITE_FROM_STRINGLIKE: { + int32_t start, len; + const uint8_t *bytes; + if (!(stream->flags & JANET_STREAM_WRITABLE)) { + should_resume = 1; + ret = 0; + break; } - } - ret = JANET_LOOPFD_MAX; - break; - } - case JLE_WRITE_FROM_BUFFER: - case JLE_WRITE_FROM_STRINGLIKE: { - int32_t start, len; - const uint8_t *bytes; - if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { - JanetBuffer *buffer = jlfd->data.write_from_buffer.buf; - bytes = buffer->data; - len = buffer->count; - start = jlfd->data.write_from_buffer.start; - } else { - bytes = jlfd->data.write_from_stringlike.str; - len = janet_string_length(bytes); - start = jlfd->data.write_from_stringlike.start; - } - if (start < len) { - int32_t nbytes = len - start; - ssize_t nwrote; - do { - nwrote = write(jlfd->fd, bytes + start, nbytes); - } while (nwrote == EINTR); - if (nwrote > 0) { - start += nwrote; - } else { - start = len; - } - } - if (start >= len) { - should_resume = 1; - ret = 0; - } else { if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { - jlfd->data.write_from_buffer.start = start; + JanetBuffer *buffer = jlfd->data.write_from_buffer.buf; + bytes = buffer->data; + len = buffer->count; + start = jlfd->data.write_from_buffer.start; } else { - jlfd->data.write_from_stringlike.start = start; + bytes = jlfd->data.write_from_stringlike.str; + len = janet_string_length(bytes); + start = jlfd->data.write_from_stringlike.start; } - ret = 1; + if (start < len) { + int32_t nbytes = len - start; + ssize_t nwrote; + do { + nwrote = write(fd, bytes + start, nbytes); + } while (nwrote == EINTR); + if (nwrote > 0) { + start += nwrote; + } else { + start = len; + } + } + if (start >= len) { + should_resume = 1; + ret = 0; + } else { + if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { + jlfd->data.write_from_buffer.start = start; + } else { + jlfd->data.write_from_stringlike.start = start; + } + ret = 1; + } + break; } - break; - } - case JLE_CONNECT: { + case JLE_CONNECT: { - break; + break; + } } } @@ -303,9 +328,10 @@ static void janet_loop1(void) { int fd_max = 0; for (int i = 0; i < janet_vm_loop_count; i++) { JanetLoopFD *jlfd = janet_vm_loopfds + i; - if (jlfd->fd > fd_max) fd_max = jlfd->fd; + int fd = jlfd->stream->fd; + if (fd > fd_max) fd_max = fd; fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds; - FD_SET(jlfd->fd, set); + FD_SET(fd, set); } /* Blocking call - we should add timeout functionality */ @@ -314,8 +340,9 @@ static void janet_loop1(void) { /* Now handle all events */ for (int i = 0; i < janet_vm_loop_count;) { JanetLoopFD *jlfd = janet_vm_loopfds + i; + int fd = jlfd->stream->fd; fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds; - if (FD_ISSET(jlfd->fd, set)) { + if (FD_ISSET(fd, set)) { size_t delta = janet_loop_event(i); i += delta; } else { @@ -336,9 +363,9 @@ void janet_loop(void) { #define JANET_SCHED_FSOME 1 -JANET_NO_RETURN static void janet_sched_read(int fd, JanetBuffer *buf, int32_t nbytes, int flags) { +JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) { JanetLoopFD lfd = {0}; - lfd.fd = fd; + lfd.stream = stream; lfd.fiber = janet_root_fiber(); lfd.event_type = (flags & JANET_SCHED_FSOME) ? JLE_READ_SOME : JLE_READ_CHUNK; lfd.data.read_chunk.buf = buf; @@ -347,9 +374,9 @@ JANET_NO_RETURN static void janet_sched_read(int fd, JanetBuffer *buf, int32_t n janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); } -JANET_NO_RETURN static void janet_sched_write_buffer(int fd, JanetBuffer *buf) { +JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf) { JanetLoopFD lfd = {0}; - lfd.fd = fd; + lfd.stream = stream; lfd.fiber = janet_root_fiber(); lfd.event_type = JLE_WRITE_FROM_BUFFER; lfd.data.write_from_buffer.buf = buf; @@ -358,9 +385,9 @@ JANET_NO_RETURN static void janet_sched_write_buffer(int fd, JanetBuffer *buf) { janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); } -JANET_NO_RETURN static void janet_sched_write_stringlike(int fd, const uint8_t *str) { +JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str) { JanetLoopFD lfd = {0}; - lfd.fd = fd; + lfd.stream = stream; lfd.fiber = janet_root_fiber(); lfd.event_type = JLE_WRITE_FROM_STRINGLIKE; lfd.data.write_from_stringlike.str = str; @@ -413,7 +440,7 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { } /* Wrap socket in abstract type JanetStream */ - JanetStream *stream = make_stream(sock); + JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); return janet_wrap_abstract(stream); } @@ -454,12 +481,12 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { /* Put sfd on our loop */ JanetLoopFD lfd = {0}; - lfd.fd = sfd; + lfd.stream = make_stream(sfd, 0); lfd.event_type = JLE_READ_ACCEPT; lfd.data.read_accept.handler = fun; janet_loop_schedule(lfd); - return janet_wrap_nil(); + return janet_wrap_abstract(lfd.stream); } static Janet cfun_stream_read(int32_t argc, Janet *argv) { @@ -467,7 +494,7 @@ static Janet cfun_stream_read(int32_t argc, Janet *argv) { JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); - janet_sched_read(stream->fd, buffer, n, JANET_SCHED_FSOME); + janet_sched_read(stream, buffer, n, JANET_SCHED_FSOME); } static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { @@ -475,7 +502,7 @@ static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); - janet_sched_read(stream->fd, buffer, n, 0); + janet_sched_read(stream, buffer, n, 0); } static Janet cfun_stream_close(int32_t argc, Janet *argv) { @@ -489,10 +516,10 @@ static Janet cfun_stream_write(int32_t argc, Janet *argv) { janet_fixarity(argc, 2); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); if (janet_checktype(argv[1], JANET_BUFFER)) { - janet_sched_write_buffer(stream->fd, janet_getbuffer(argv, 1)); + janet_sched_write_buffer(stream, janet_getbuffer(argv, 1)); } else { JanetByteView bytes = janet_getbytes(argv, 1); - janet_sched_write_stringlike(stream->fd, bytes.bytes); + janet_sched_write_stringlike(stream, bytes.bytes); } }