1
0
mirror of https://github.com/janet-lang/janet synced 2024-11-25 09:47:17 +00:00

Remove direct references to file descriptors.

If a descriptor is freed by the Janet code, other
uses of that descriptor, say in the event loop, need
to know that it has been closed.
This commit is contained in:
Calvin Rose 2020-02-11 08:57:44 -06:00
parent 135aff9e17
commit 79bb9e54d5
2 changed files with 138 additions and 111 deletions

View File

@ -70,10 +70,10 @@ static JanetSlot genericSSI(JanetFopts opts, int op, JanetSlot s, int32_t imm) {
/* Emit an insruction that implements a form by itself. */ /* Emit an insruction that implements a form by itself. */
static JanetSlot opfunction( static JanetSlot opfunction(
JanetFopts opts, JanetFopts opts,
JanetSlot *args, JanetSlot *args,
int op, int op,
Janet defaultArg2) { Janet defaultArg2) {
JanetCompiler *c = opts.compiler; JanetCompiler *c = opts.compiler;
int32_t len; int32_t len;
len = janet_v_count(args); len = janet_v_count(args);

View File

@ -36,10 +36,12 @@
#include <netdb.h> #include <netdb.h>
/* /*
* Event loops * Streams
*/ */
#define JANET_STREAM_CLOSED 1 #define JANET_STREAM_CLOSED 1
#define JANET_STREAM_READABLE 2
#define JANET_STREAM_WRITABLE 4
typedef struct { typedef struct {
int fd; int fd;
@ -48,6 +50,8 @@ typedef struct {
static int janet_stream_close(void *p, size_t s); static int janet_stream_close(void *p, size_t s);
static int janet_stream_getter(void *p, size_t, Janet key);
static const JanetAbstractType StreamAT = { static const JanetAbstractType StreamAT = {
"core/stream", "core/stream",
janet_stream_close, janet_stream_close,
@ -64,22 +68,25 @@ static int janet_stream_close(void *p, size_t s) {
return 0; return 0;
} }
static JanetStream *make_stream(int fd) { static JanetStream *make_stream(int fd, int flags) {
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream)); JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
int flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
stream->fd = fd; stream->fd = fd;
stream->flags = 0; stream->flags = flags;
return stream; return stream;
} }
/*
* Event loop
*/
/* This large struct describes a waiting file descriptor, as well /* This large struct describes a waiting file descriptor, as well
* as what to do when we get an event for it. It is a variant type, where * as what to do when we get an event for it. It is a variant type, where
* each variant implements a simple state machine. */ * each variant implements a simple state machine. */
typedef struct { typedef struct {
/* File descriptor to listen for events on. */ /* File descriptor to listen for events on. */
int fd; JanetStream *stream;
/* Fiber to resume when event finishes. Can be NULL, in which case, /* Fiber to resume when event finishes. Can be NULL, in which case,
* no fiber is resumed when event completes. */ * no fiber is resumed when event completes. */
@ -137,6 +144,7 @@ JANET_THREAD_LOCAL int janet_vm_loop_count;
void janet_net_markloop(void) { void janet_net_markloop(void) {
for (int i = 0; i < janet_vm_loop_count; i++) { for (int i = 0; i < janet_vm_loop_count; i++) {
JanetLoopFD lfd = janet_vm_loopfds[i]; JanetLoopFD lfd = janet_vm_loopfds[i];
janet_mark(janet_wrap_abstract(lfd.stream));
switch (lfd.event_type) { switch (lfd.event_type) {
default: default:
break; break;
@ -175,104 +183,121 @@ static int janet_loop_schedule(JanetLoopFD lfd) {
* we can separate out the polling logic */ * we can separate out the polling logic */
static size_t janet_loop_event(size_t index) { static size_t janet_loop_event(size_t index) {
JanetLoopFD *jlfd = janet_vm_loopfds + index; JanetLoopFD *jlfd = janet_vm_loopfds + index;
JanetStream *stream = jlfd->stream;
int fd = stream->fd;
int ret = 1; int ret = 1;
int should_resume = 0; int should_resume = 0;
Janet resumeval = janet_wrap_nil(); Janet resumeval = janet_wrap_nil();
switch (jlfd->event_type) { if (stream->flags & JANET_STREAM_CLOSED) {
case JLE_READ_CHUNK: should_resume = 1;
case JLE_READ_SOME: { ret = 0;
JanetBuffer *buffer = jlfd->data.read_chunk.buf; } else {
int32_t bytes_left = jlfd->data.read_chunk.bytes_left; switch (jlfd->event_type) {
janet_buffer_extra(buffer, bytes_left); case JLE_READ_CHUNK:
ssize_t nread; case JLE_READ_SOME: {
errno = 0; JanetBuffer *buffer = jlfd->data.read_chunk.buf;
do { int32_t bytes_left = jlfd->data.read_chunk.bytes_left;
nread = read(jlfd->fd, buffer->data + buffer->count, bytes_left); janet_buffer_extra(buffer, bytes_left);
} while (errno == EINTR); ssize_t nread;
if (errno == EAGAIN || errno == EWOULDBLOCK) { errno = 0;
ret = 1; if (!(stream->flags & JANET_STREAM_READABLE)) {
should_resume = 1;
ret = 0;
break;
}
do {
nread = read(fd, buffer->data + buffer->count, bytes_left);
} while (errno == EINTR);
if (errno == EAGAIN || errno == EWOULDBLOCK) {
ret = 1;
break;
}
if (nread > 0) {
buffer->count += nread;
bytes_left -= nread;
} else {
bytes_left = 0;
}
if (jlfd->event_type == JLE_READ_SOME || bytes_left == 0) {
should_resume = 1;
if (nread > 0) {
resumeval = janet_wrap_buffer(buffer);
}
ret = 0;
} else {
jlfd->data.read_chunk.bytes_left = bytes_left;
ret = 1;
}
break; break;
} }
if (nread > 0) { case JLE_READ_ACCEPT: {
buffer->count += nread; char addr[256] = {0}; /* Just make sure it is large enough for largest address type */
bytes_left -= nread; socklen_t len = 0;
} else { int connfd = accept(fd, (void *) &addr, &len);
bytes_left = 0; if (connfd >= 0) {
} /* Made a new connection socket */
if (jlfd->event_type == JLE_READ_SOME || bytes_left == 0) { JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
should_resume = 1; Janet streamv = janet_wrap_abstract(stream);
if (nread > 0) { JanetFunction *handler = jlfd->data.read_accept.handler;
resumeval = janet_wrap_buffer(buffer); Janet out;
JanetFiber *fiberp = NULL;
/* Launch connection fiber */
JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp);
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) {
janet_stacktrace(fiberp, out);
}
} }
ret = 0; ret = JANET_LOOPFD_MAX;
} else { break;
jlfd->data.read_chunk.bytes_left = bytes_left;
ret = 1;
} }
break; case JLE_WRITE_FROM_BUFFER:
} case JLE_WRITE_FROM_STRINGLIKE: {
case JLE_READ_ACCEPT: { int32_t start, len;
char addr[256] = {0}; /* Just make sure it is large enough for largest address type */ const uint8_t *bytes;
socklen_t len = 0; if (!(stream->flags & JANET_STREAM_WRITABLE)) {
int connfd = accept(jlfd->fd, (void *) &addr, &len); should_resume = 1;
if (connfd >= 0) { ret = 0;
/* Made a new connection socket */ break;
JanetStream *stream = make_stream(connfd);
Janet streamv = janet_wrap_abstract(stream);
JanetFunction *handler = jlfd->data.read_accept.handler;
Janet out;
JanetFiber *fiberp = NULL;
/* Launch connection fiber */
JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp);
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) {
janet_stacktrace(fiberp, out);
} }
}
ret = JANET_LOOPFD_MAX;
break;
}
case JLE_WRITE_FROM_BUFFER:
case JLE_WRITE_FROM_STRINGLIKE: {
int32_t start, len;
const uint8_t *bytes;
if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) {
JanetBuffer *buffer = jlfd->data.write_from_buffer.buf;
bytes = buffer->data;
len = buffer->count;
start = jlfd->data.write_from_buffer.start;
} else {
bytes = jlfd->data.write_from_stringlike.str;
len = janet_string_length(bytes);
start = jlfd->data.write_from_stringlike.start;
}
if (start < len) {
int32_t nbytes = len - start;
ssize_t nwrote;
do {
nwrote = write(jlfd->fd, bytes + start, nbytes);
} while (nwrote == EINTR);
if (nwrote > 0) {
start += nwrote;
} else {
start = len;
}
}
if (start >= len) {
should_resume = 1;
ret = 0;
} else {
if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) { if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) {
jlfd->data.write_from_buffer.start = start; JanetBuffer *buffer = jlfd->data.write_from_buffer.buf;
bytes = buffer->data;
len = buffer->count;
start = jlfd->data.write_from_buffer.start;
} else { } else {
jlfd->data.write_from_stringlike.start = start; bytes = jlfd->data.write_from_stringlike.str;
len = janet_string_length(bytes);
start = jlfd->data.write_from_stringlike.start;
} }
ret = 1; if (start < len) {
int32_t nbytes = len - start;
ssize_t nwrote;
do {
nwrote = write(fd, bytes + start, nbytes);
} while (nwrote == EINTR);
if (nwrote > 0) {
start += nwrote;
} else {
start = len;
}
}
if (start >= len) {
should_resume = 1;
ret = 0;
} else {
if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) {
jlfd->data.write_from_buffer.start = start;
} else {
jlfd->data.write_from_stringlike.start = start;
}
ret = 1;
}
break;
} }
break; case JLE_CONNECT: {
}
case JLE_CONNECT: {
break; break;
}
} }
} }
@ -303,9 +328,10 @@ static void janet_loop1(void) {
int fd_max = 0; int fd_max = 0;
for (int i = 0; i < janet_vm_loop_count; i++) { for (int i = 0; i < janet_vm_loop_count; i++) {
JanetLoopFD *jlfd = janet_vm_loopfds + i; JanetLoopFD *jlfd = janet_vm_loopfds + i;
if (jlfd->fd > fd_max) fd_max = jlfd->fd; int fd = jlfd->stream->fd;
if (fd > fd_max) fd_max = fd;
fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds; fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds;
FD_SET(jlfd->fd, set); FD_SET(fd, set);
} }
/* Blocking call - we should add timeout functionality */ /* Blocking call - we should add timeout functionality */
@ -314,8 +340,9 @@ static void janet_loop1(void) {
/* Now handle all events */ /* Now handle all events */
for (int i = 0; i < janet_vm_loop_count;) { for (int i = 0; i < janet_vm_loop_count;) {
JanetLoopFD *jlfd = janet_vm_loopfds + i; JanetLoopFD *jlfd = janet_vm_loopfds + i;
int fd = jlfd->stream->fd;
fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds; fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds;
if (FD_ISSET(jlfd->fd, set)) { if (FD_ISSET(fd, set)) {
size_t delta = janet_loop_event(i); size_t delta = janet_loop_event(i);
i += delta; i += delta;
} else { } else {
@ -336,9 +363,9 @@ void janet_loop(void) {
#define JANET_SCHED_FSOME 1 #define JANET_SCHED_FSOME 1
JANET_NO_RETURN static void janet_sched_read(int fd, JanetBuffer *buf, int32_t nbytes, int flags) { JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
JanetLoopFD lfd = {0}; JanetLoopFD lfd = {0};
lfd.fd = fd; lfd.stream = stream;
lfd.fiber = janet_root_fiber(); lfd.fiber = janet_root_fiber();
lfd.event_type = (flags & JANET_SCHED_FSOME) ? JLE_READ_SOME : JLE_READ_CHUNK; lfd.event_type = (flags & JANET_SCHED_FSOME) ? JLE_READ_SOME : JLE_READ_CHUNK;
lfd.data.read_chunk.buf = buf; lfd.data.read_chunk.buf = buf;
@ -347,9 +374,9 @@ JANET_NO_RETURN static void janet_sched_read(int fd, JanetBuffer *buf, int32_t n
janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil());
} }
JANET_NO_RETURN static void janet_sched_write_buffer(int fd, JanetBuffer *buf) { JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf) {
JanetLoopFD lfd = {0}; JanetLoopFD lfd = {0};
lfd.fd = fd; lfd.stream = stream;
lfd.fiber = janet_root_fiber(); lfd.fiber = janet_root_fiber();
lfd.event_type = JLE_WRITE_FROM_BUFFER; lfd.event_type = JLE_WRITE_FROM_BUFFER;
lfd.data.write_from_buffer.buf = buf; lfd.data.write_from_buffer.buf = buf;
@ -358,9 +385,9 @@ JANET_NO_RETURN static void janet_sched_write_buffer(int fd, JanetBuffer *buf) {
janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil()); janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil());
} }
JANET_NO_RETURN static void janet_sched_write_stringlike(int fd, const uint8_t *str) { JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str) {
JanetLoopFD lfd = {0}; JanetLoopFD lfd = {0};
lfd.fd = fd; lfd.stream = stream;
lfd.fiber = janet_root_fiber(); lfd.fiber = janet_root_fiber();
lfd.event_type = JLE_WRITE_FROM_STRINGLIKE; lfd.event_type = JLE_WRITE_FROM_STRINGLIKE;
lfd.data.write_from_stringlike.str = str; lfd.data.write_from_stringlike.str = str;
@ -413,7 +440,7 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
} }
/* Wrap socket in abstract type JanetStream */ /* Wrap socket in abstract type JanetStream */
JanetStream *stream = make_stream(sock); JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
return janet_wrap_abstract(stream); return janet_wrap_abstract(stream);
} }
@ -454,12 +481,12 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
/* Put sfd on our loop */ /* Put sfd on our loop */
JanetLoopFD lfd = {0}; JanetLoopFD lfd = {0};
lfd.fd = sfd; lfd.stream = make_stream(sfd, 0);
lfd.event_type = JLE_READ_ACCEPT; lfd.event_type = JLE_READ_ACCEPT;
lfd.data.read_accept.handler = fun; lfd.data.read_accept.handler = fun;
janet_loop_schedule(lfd); janet_loop_schedule(lfd);
return janet_wrap_nil(); return janet_wrap_abstract(lfd.stream);
} }
static Janet cfun_stream_read(int32_t argc, Janet *argv) { static Janet cfun_stream_read(int32_t argc, Janet *argv) {
@ -467,7 +494,7 @@ static Janet cfun_stream_read(int32_t argc, Janet *argv) {
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
int32_t n = janet_getnat(argv, 1); int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
janet_sched_read(stream->fd, buffer, n, JANET_SCHED_FSOME); janet_sched_read(stream, buffer, n, JANET_SCHED_FSOME);
} }
static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
@ -475,7 +502,7 @@ static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
int32_t n = janet_getnat(argv, 1); int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
janet_sched_read(stream->fd, buffer, n, 0); janet_sched_read(stream, buffer, n, 0);
} }
static Janet cfun_stream_close(int32_t argc, Janet *argv) { static Janet cfun_stream_close(int32_t argc, Janet *argv) {
@ -489,10 +516,10 @@ static Janet cfun_stream_write(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2); janet_fixarity(argc, 2);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
if (janet_checktype(argv[1], JANET_BUFFER)) { if (janet_checktype(argv[1], JANET_BUFFER)) {
janet_sched_write_buffer(stream->fd, janet_getbuffer(argv, 1)); janet_sched_write_buffer(stream, janet_getbuffer(argv, 1));
} else { } else {
JanetByteView bytes = janet_getbytes(argv, 1); JanetByteView bytes = janet_getbytes(argv, 1);
janet_sched_write_stringlike(stream->fd, bytes.bytes); janet_sched_write_stringlike(stream, bytes.bytes);
} }
} }