mirror of
https://github.com/janet-lang/janet
synced 2025-01-13 17:10:27 +00:00
Working TCP echo server and client.
Required a few changes to APIs, namely janet_root_fiber() to get topmost fiber that is active in the current scheduler. This is distinct from janet_current_fiber(), which gets the bottom most fiber in the fiber stack - it might have a parent, and so cannot be reliably resumed. This is the kind of situation that makes symmetric coroutines more attractive.
This commit is contained in:
parent
7f1f684b21
commit
f4d7fd97f6
7
examples/tcpclient.janet
Normal file
7
examples/tcpclient.janet
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
(def conn (net/connect "127.0.0.1" "8000"))
|
||||||
|
(printf "Connected to %q!" conn)
|
||||||
|
(net/write conn "Echo...")
|
||||||
|
(print "Wrote to connection...")
|
||||||
|
(def res (net/read conn 1024))
|
||||||
|
(pp res)
|
||||||
|
(net/close conn)
|
13
examples/tcpserver.janet
Normal file
13
examples/tcpserver.janet
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
(defn handler
|
||||||
|
"Simple handler for connections."
|
||||||
|
[stream]
|
||||||
|
(def id (gensym))
|
||||||
|
(def b @"")
|
||||||
|
(print "Connection " id "!")
|
||||||
|
(while (net/read stream 1024 b)
|
||||||
|
(net/write stream b)
|
||||||
|
(buffer/clear b))
|
||||||
|
(printf "Done %v!" id)
|
||||||
|
(net/close stream))
|
||||||
|
|
||||||
|
(net/server "127.0.0.1" "8000" handler)
|
@ -1825,6 +1825,7 @@
|
|||||||
(default on-parse-error bad-parse)
|
(default on-parse-error bad-parse)
|
||||||
(default evaluator (fn evaluate [x &] (x)))
|
(default evaluator (fn evaluate [x &] (x)))
|
||||||
(default where "<anonymous>")
|
(default where "<anonymous>")
|
||||||
|
(default guard :yed)
|
||||||
|
|
||||||
# Are we done yet?
|
# Are we done yet?
|
||||||
(var going true)
|
(var going true)
|
||||||
@ -1851,7 +1852,7 @@
|
|||||||
(string err " on line " line ", column " column)
|
(string err " on line " line ", column " column)
|
||||||
err))
|
err))
|
||||||
(on-compile-error msg errf where))))
|
(on-compile-error msg errf where))))
|
||||||
(or guard :a)))
|
guard))
|
||||||
(fiber/setenv f env)
|
(fiber/setenv f env)
|
||||||
(while (let [fs (fiber/status f)]
|
(while (let [fs (fiber/status f)]
|
||||||
(and (not= :dead fs) (not= :error fs)))
|
(and (not= :dead fs) (not= :error fs)))
|
||||||
@ -2290,6 +2291,7 @@
|
|||||||
(def h (in handlers n))
|
(def h (in handlers n))
|
||||||
(if h (h i) (do (print "unknown flag -" n) ((in handlers "h")))))
|
(if h (h i) (do (print "unknown flag -" n) ((in handlers "h")))))
|
||||||
|
|
||||||
|
# Use special evaulator for fly checking (-k option)
|
||||||
(def- safe-forms {'defn true 'defn- true 'defmacro true 'defmacro- true})
|
(def- safe-forms {'defn true 'defn- true 'defmacro true 'defmacro- true})
|
||||||
(def- importers {'import true 'import* true 'use true 'dofile true 'require true})
|
(def- importers {'import true 'import* true 'use true 'dofile true 'require true})
|
||||||
(defn- evaluator
|
(defn- evaluator
|
||||||
|
@ -337,6 +337,10 @@ JanetFiber *janet_current_fiber(void) {
|
|||||||
return janet_vm_fiber;
|
return janet_vm_fiber;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JanetFiber *janet_root_fiber(void) {
|
||||||
|
return janet_vm_root_fiber;
|
||||||
|
}
|
||||||
|
|
||||||
/* CFuns */
|
/* CFuns */
|
||||||
|
|
||||||
static Janet cfun_fiber_getenv(int32_t argc, Janet *argv) {
|
static Janet cfun_fiber_getenv(int32_t argc, Janet *argv) {
|
||||||
|
352
src/core/net.c
352
src/core/net.c
@ -39,39 +39,72 @@
|
|||||||
* Event loops
|
* Event loops
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define JANET_STREAM_CLOSED 1
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int fd;
|
||||||
|
int flags;
|
||||||
|
} JanetStream;
|
||||||
|
|
||||||
|
static int janet_stream_close(void *p, size_t s);
|
||||||
|
|
||||||
|
static const JanetAbstractType StreamAT = {
|
||||||
|
"core/stream",
|
||||||
|
janet_stream_close,
|
||||||
|
JANET_ATEND_GC
|
||||||
|
};
|
||||||
|
|
||||||
|
static int janet_stream_close(void *p, size_t s) {
|
||||||
|
(void) s;
|
||||||
|
JanetStream *stream = p;
|
||||||
|
if (!(stream->flags & JANET_STREAM_CLOSED)) {
|
||||||
|
stream->flags |= JANET_STREAM_CLOSED;
|
||||||
|
close(stream->fd);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static JanetStream *make_stream(int fd) {
|
||||||
|
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
|
||||||
|
int flags = fcntl(fd, F_GETFL, 0);
|
||||||
|
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
stream->fd = fd;
|
||||||
|
stream->flags = 0;
|
||||||
|
return stream;
|
||||||
|
}
|
||||||
|
|
||||||
/* This large struct describes a waiting file descriptor, as well
|
/* This large struct describes a waiting file descriptor, as well
|
||||||
* as what to do when we get an event for it. */
|
* as what to do when we get an event for it. It is a variant type, where
|
||||||
|
* each variant implements a simple state machine. */
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
||||||
/* File descriptor to listen for events on. */
|
/* File descriptor to listen for events on. */
|
||||||
int fd;
|
int fd;
|
||||||
|
|
||||||
/* Fiber to resume when event finishes. Can be NULL. */
|
/* Fiber to resume when event finishes. Can be NULL, in which case,
|
||||||
|
* no fiber is resumed when event completes. */
|
||||||
JanetFiber *fiber;
|
JanetFiber *fiber;
|
||||||
|
|
||||||
/* We need to tell which fd_set to put in for select. */
|
|
||||||
enum {
|
|
||||||
JLFD_READ,
|
|
||||||
JLFD_WRITE
|
|
||||||
} select_mode;
|
|
||||||
|
|
||||||
/* What kind of event we are listening for.
|
/* What kind of event we are listening for.
|
||||||
* As more IO functionality get's added, we can
|
* As more IO functionality get's added, we can
|
||||||
* expand this. */
|
* expand this. */
|
||||||
enum {
|
enum {
|
||||||
JLE_READ_INTO_BUFFER,
|
JLE_READ_CHUNK,
|
||||||
|
JLE_READ_SOME,
|
||||||
JLE_READ_ACCEPT,
|
JLE_READ_ACCEPT,
|
||||||
|
JLE_CONNECT,
|
||||||
JLE_WRITE_FROM_BUFFER,
|
JLE_WRITE_FROM_BUFFER,
|
||||||
JLE_WRITE_FROM_STRINGLIKE
|
JLE_WRITE_FROM_STRINGLIKE
|
||||||
} event_type;
|
} event_type;
|
||||||
|
|
||||||
|
/* Each variant can have a different payload. */
|
||||||
union {
|
union {
|
||||||
|
|
||||||
/* JLE_READ_INTO_BUFFER */
|
/* JLE_READ_CHUNK/JLE_READ_SOME */
|
||||||
struct {
|
struct {
|
||||||
int32_t n;
|
int32_t bytes_left;
|
||||||
JanetBuffer *buf;
|
JanetBuffer *buf;
|
||||||
} read_into_buffer;
|
} read_chunk;
|
||||||
|
|
||||||
/* JLE_READ_ACCEPT */
|
/* JLE_READ_ACCEPT */
|
||||||
struct {
|
struct {
|
||||||
@ -81,12 +114,15 @@ typedef struct {
|
|||||||
/* JLE_WRITE_FROM_BUFFER */
|
/* JLE_WRITE_FROM_BUFFER */
|
||||||
struct {
|
struct {
|
||||||
JanetBuffer *buf;
|
JanetBuffer *buf;
|
||||||
|
int32_t start;
|
||||||
} write_from_buffer;
|
} write_from_buffer;
|
||||||
|
|
||||||
/* JLE_WRITE_FROM_STRINGLIKE */
|
/* JLE_WRITE_FROM_STRINGLIKE */
|
||||||
struct {
|
struct {
|
||||||
const uint8_t *str;
|
const uint8_t *str;
|
||||||
|
int32_t start;
|
||||||
} write_from_stringlike;
|
} write_from_stringlike;
|
||||||
|
|
||||||
} data;
|
} data;
|
||||||
|
|
||||||
} JanetLoopFD;
|
} JanetLoopFD;
|
||||||
@ -104,17 +140,20 @@ void janet_net_markloop(void) {
|
|||||||
switch (lfd.event_type) {
|
switch (lfd.event_type) {
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
case JLE_READ_INTO_BUFFER:
|
case JLE_READ_CHUNK:
|
||||||
janet_mark(janet_wrap_buffer(lfd.data.read_into_buffer.buf));
|
case JLE_READ_SOME:
|
||||||
|
janet_mark(janet_wrap_buffer(lfd.data.read_chunk.buf));
|
||||||
break;
|
break;
|
||||||
case JLE_READ_ACCEPT:
|
case JLE_READ_ACCEPT:
|
||||||
janet_mark(janet_wrap_function(lfd.data.read_accept.handler));
|
janet_mark(janet_wrap_function(lfd.data.read_accept.handler));
|
||||||
break;
|
break;
|
||||||
|
case JLE_CONNECT:
|
||||||
|
break;
|
||||||
case JLE_WRITE_FROM_BUFFER:
|
case JLE_WRITE_FROM_BUFFER:
|
||||||
janet_mark(janet_wrap_buffer(lfd.data.write_from_buffer.buf));
|
janet_mark(janet_wrap_buffer(lfd.data.write_from_buffer.buf));
|
||||||
break;
|
break;
|
||||||
case JLE_WRITE_FROM_STRINGLIKE:
|
case JLE_WRITE_FROM_STRINGLIKE:
|
||||||
janet_mark(janet_wrap_buffer(lfd.data.write_from_buffer.buf));
|
janet_mark(janet_wrap_string(lfd.data.write_from_stringlike.str));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -132,11 +171,6 @@ static int janet_loop_schedule(JanetLoopFD lfd) {
|
|||||||
return index;
|
return index;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Remove an event listener by the handle it returned when scheduled. */
|
|
||||||
static void janet_loop_unschedule(int index) {
|
|
||||||
janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count];
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Return delta in number of loop fds. Abstracted out so
|
/* Return delta in number of loop fds. Abstracted out so
|
||||||
* we can separate out the polling logic */
|
* we can separate out the polling logic */
|
||||||
static size_t janet_loop_event(size_t index) {
|
static size_t janet_loop_event(size_t index) {
|
||||||
@ -145,55 +179,122 @@ static size_t janet_loop_event(size_t index) {
|
|||||||
int should_resume = 0;
|
int should_resume = 0;
|
||||||
Janet resumeval = janet_wrap_nil();
|
Janet resumeval = janet_wrap_nil();
|
||||||
switch (jlfd->event_type) {
|
switch (jlfd->event_type) {
|
||||||
case JLE_READ_INTO_BUFFER:
|
case JLE_READ_CHUNK:
|
||||||
{
|
case JLE_READ_SOME: {
|
||||||
JanetBuffer *buffer = jlfd->data.read_into_buffer.buf;
|
JanetBuffer *buffer = jlfd->data.read_chunk.buf;
|
||||||
int32_t how_much = jlfd->data.read_into_buffer.n;
|
int32_t bytes_left = jlfd->data.read_chunk.bytes_left;
|
||||||
janet_buffer_extra(buffer, how_much);
|
janet_buffer_extra(buffer, bytes_left);
|
||||||
int status = read(jlfd->fd, buffer->data + buffer->count, how_much);
|
ssize_t nread;
|
||||||
if (status > 0) {
|
errno = 0;
|
||||||
buffer->count += how_much;
|
do {
|
||||||
}
|
nread = read(jlfd->fd, buffer->data + buffer->count, bytes_left);
|
||||||
should_resume = 1;
|
} while (errno == EINTR);
|
||||||
resumeval = janet_wrap_buffer(buffer);
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
/* Bag pop */
|
ret = 1;
|
||||||
janet_loop_unschedule(index);
|
|
||||||
ret = 0;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case JLE_READ_ACCEPT:
|
if (nread > 0) {
|
||||||
{
|
buffer->count += nread;
|
||||||
char addr[256]; /* Just make sure it is large enough for largest address type */
|
bytes_left -= nread;
|
||||||
socklen_t len;
|
} else {
|
||||||
|
bytes_left = 0;
|
||||||
|
}
|
||||||
|
if (jlfd->event_type == JLE_READ_SOME || bytes_left == 0) {
|
||||||
|
should_resume = 1;
|
||||||
|
if (nread > 0) {
|
||||||
|
resumeval = janet_wrap_buffer(buffer);
|
||||||
|
}
|
||||||
|
ret = 0;
|
||||||
|
} else {
|
||||||
|
jlfd->data.read_chunk.bytes_left = bytes_left;
|
||||||
|
ret = 1;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case JLE_READ_ACCEPT: {
|
||||||
|
char addr[256] = {0}; /* Just make sure it is large enough for largest address type */
|
||||||
|
socklen_t len = 0;
|
||||||
int connfd = accept(jlfd->fd, (void *) &addr, &len);
|
int connfd = accept(jlfd->fd, (void *) &addr, &len);
|
||||||
if (connfd >= 0) {
|
if (connfd >= 0) {
|
||||||
/* Made a new connection socket */
|
/* Made a new connection socket */
|
||||||
int flags = fcntl(connfd, F_GETFL, 0);
|
JanetStream *stream = make_stream(connfd);
|
||||||
fcntl(connfd, F_SETFL, flags | O_NONBLOCK);
|
Janet streamv = janet_wrap_abstract(stream);
|
||||||
FILE *f = fdopen(connfd, "r+");
|
|
||||||
Janet filev = janet_makefile(f, JANET_FILE_WRITE | JANET_FILE_READ);
|
|
||||||
JanetFunction *handler = jlfd->data.read_accept.handler;
|
JanetFunction *handler = jlfd->data.read_accept.handler;
|
||||||
Janet out;
|
Janet out;
|
||||||
|
JanetFiber *fiberp = NULL;
|
||||||
/* Launch connection fiber */
|
/* Launch connection fiber */
|
||||||
janet_pcall(handler, 1, &filev, &out, NULL);
|
JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp);
|
||||||
|
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) {
|
||||||
|
janet_stacktrace(fiberp, out);
|
||||||
}
|
}
|
||||||
ret = 1;
|
}
|
||||||
|
ret = JANET_LOOPFD_MAX;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case JLE_WRITE_FROM_BUFFER:
|
case JLE_WRITE_FROM_BUFFER:
|
||||||
case JLE_WRITE_FROM_STRINGLIKE:
|
case JLE_WRITE_FROM_STRINGLIKE: {
|
||||||
|
int32_t start, len;
|
||||||
|
const uint8_t *bytes;
|
||||||
|
if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) {
|
||||||
|
JanetBuffer *buffer = jlfd->data.write_from_buffer.buf;
|
||||||
|
bytes = buffer->data;
|
||||||
|
len = buffer->count;
|
||||||
|
start = jlfd->data.write_from_buffer.start;
|
||||||
|
} else {
|
||||||
|
bytes = jlfd->data.write_from_stringlike.str;
|
||||||
|
len = janet_string_length(bytes);
|
||||||
|
start = jlfd->data.write_from_stringlike.start;
|
||||||
|
}
|
||||||
|
if (start < len) {
|
||||||
|
int32_t nbytes = len - start;
|
||||||
|
ssize_t nwrote;
|
||||||
|
do {
|
||||||
|
nwrote = write(jlfd->fd, bytes + start, nbytes);
|
||||||
|
} while (nwrote == EINTR);
|
||||||
|
if (nwrote > 0) {
|
||||||
|
start += nwrote;
|
||||||
|
} else {
|
||||||
|
start = len;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (start >= len) {
|
||||||
|
should_resume = 1;
|
||||||
|
ret = 0;
|
||||||
|
} else {
|
||||||
|
if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) {
|
||||||
|
jlfd->data.write_from_buffer.start = start;
|
||||||
|
} else {
|
||||||
|
jlfd->data.write_from_stringlike.start = start;
|
||||||
|
}
|
||||||
ret = 1;
|
ret = 1;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case JLE_CONNECT: {
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Resume a fiber for some events */
|
||||||
if (NULL != jlfd->fiber && should_resume) {
|
if (NULL != jlfd->fiber && should_resume) {
|
||||||
/* Resume the fiber */
|
/* Resume the fiber */
|
||||||
Janet out;
|
Janet out;
|
||||||
janet_continue(jlfd->fiber, resumeval, &out);
|
JanetSignal sig = janet_continue(jlfd->fiber, resumeval, &out);
|
||||||
|
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_USER9) {
|
||||||
|
janet_stacktrace(jlfd->fiber, out);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Remove this handler from the handler pool. */
|
||||||
|
if (should_resume) {
|
||||||
|
janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count];
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void janet_loop1(void) {
|
static void janet_loop1(void) {
|
||||||
/* Set up fd_sets */
|
/* Set up fd_sets */
|
||||||
fd_set readfds;
|
fd_set readfds;
|
||||||
fd_set writefds;
|
fd_set writefds;
|
||||||
@ -203,20 +304,17 @@ void janet_loop1(void) {
|
|||||||
for (int i = 0; i < janet_vm_loop_count; i++) {
|
for (int i = 0; i < janet_vm_loop_count; i++) {
|
||||||
JanetLoopFD *jlfd = janet_vm_loopfds + i;
|
JanetLoopFD *jlfd = janet_vm_loopfds + i;
|
||||||
if (jlfd->fd > fd_max) fd_max = jlfd->fd;
|
if (jlfd->fd > fd_max) fd_max = jlfd->fd;
|
||||||
fd_set *set = (jlfd->select_mode == JLFD_READ) ? &readfds : &writefds;
|
fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds;
|
||||||
FD_SET(jlfd->fd, set);
|
FD_SET(jlfd->fd, set);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Blocking call - we should add timeout functionality */
|
/* Blocking call - we should add timeout functionality */
|
||||||
printf("selecting %d!\n", janet_vm_loop_count);
|
select(fd_max + 1, &readfds, &writefds, NULL, NULL);
|
||||||
int status = select(fd_max, &readfds, &writefds, NULL, NULL);
|
|
||||||
(void) status;
|
|
||||||
printf("selected!\n");
|
|
||||||
|
|
||||||
/* Now handle all events */
|
/* Now handle all events */
|
||||||
for (int i = 0; i < janet_vm_loop_count;) {
|
for (int i = 0; i < janet_vm_loop_count;) {
|
||||||
JanetLoopFD *jlfd = janet_vm_loopfds + i;
|
JanetLoopFD *jlfd = janet_vm_loopfds + i;
|
||||||
fd_set *set = (jlfd->select_mode == JLFD_READ) ? &readfds : &writefds;
|
fd_set *set = (jlfd->event_type <= JLE_READ_ACCEPT) ? &readfds : &writefds;
|
||||||
if (FD_ISSET(jlfd->fd, set)) {
|
if (FD_ISSET(jlfd->fd, set)) {
|
||||||
size_t delta = janet_loop_event(i);
|
size_t delta = janet_loop_event(i);
|
||||||
i += delta;
|
i += delta;
|
||||||
@ -227,23 +325,57 @@ void janet_loop1(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void janet_loop(void) {
|
void janet_loop(void) {
|
||||||
while (janet_vm_loop_count) janet_loop1();
|
while (janet_vm_loop_count) {
|
||||||
|
janet_loop1();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* C Funs
|
* Scheduling Helpers
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static Janet cfun_net_server(int32_t argc, Janet *argv) {
|
#define JANET_SCHED_FSOME 1
|
||||||
janet_fixarity(argc, 3);
|
|
||||||
|
|
||||||
/* Get host, port, and handler*/
|
JANET_NO_RETURN static void janet_sched_read(int fd, JanetBuffer *buf, int32_t nbytes, int flags) {
|
||||||
const char *host = janet_getcstring(argv, 0);
|
JanetLoopFD lfd = {0};
|
||||||
const char *port = janet_getcstring(argv, 1);
|
lfd.fd = fd;
|
||||||
JanetFunction *fun = janet_getfunction(argv, 2);
|
lfd.fiber = janet_root_fiber();
|
||||||
|
lfd.event_type = (flags & JANET_SCHED_FSOME) ? JLE_READ_SOME : JLE_READ_CHUNK;
|
||||||
|
lfd.data.read_chunk.buf = buf;
|
||||||
|
lfd.data.read_chunk.bytes_left = nbytes;
|
||||||
|
janet_loop_schedule(lfd);
|
||||||
|
janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil());
|
||||||
|
}
|
||||||
|
|
||||||
|
JANET_NO_RETURN static void janet_sched_write_buffer(int fd, JanetBuffer *buf) {
|
||||||
|
JanetLoopFD lfd = {0};
|
||||||
|
lfd.fd = fd;
|
||||||
|
lfd.fiber = janet_root_fiber();
|
||||||
|
lfd.event_type = JLE_WRITE_FROM_BUFFER;
|
||||||
|
lfd.data.write_from_buffer.buf = buf;
|
||||||
|
lfd.data.write_from_buffer.start = 0;
|
||||||
|
janet_loop_schedule(lfd);
|
||||||
|
janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil());
|
||||||
|
}
|
||||||
|
|
||||||
|
JANET_NO_RETURN static void janet_sched_write_stringlike(int fd, const uint8_t *str) {
|
||||||
|
JanetLoopFD lfd = {0};
|
||||||
|
lfd.fd = fd;
|
||||||
|
lfd.fiber = janet_root_fiber();
|
||||||
|
lfd.event_type = JLE_WRITE_FROM_STRINGLIKE;
|
||||||
|
lfd.data.write_from_stringlike.str = str;
|
||||||
|
lfd.data.write_from_stringlike.start = 0;
|
||||||
|
janet_loop_schedule(lfd);
|
||||||
|
janet_signalv(JANET_SIGNAL_USER9, janet_wrap_nil());
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Needs argc >= offset + 2 */
|
||||||
|
static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset) {
|
||||||
|
/* Get host and port */
|
||||||
|
const char *host = janet_getcstring(argv, offset);
|
||||||
|
const char *port = janet_getcstring(argv, offset + 1);
|
||||||
/* getaddrinfo */
|
/* getaddrinfo */
|
||||||
struct addrinfo *ai;
|
struct addrinfo *ai = NULL;
|
||||||
struct addrinfo hints = {0};
|
struct addrinfo hints = {0};
|
||||||
hints.ai_family = AF_UNSPEC;
|
hints.ai_family = AF_UNSPEC;
|
||||||
hints.ai_socktype = SOCK_STREAM;
|
hints.ai_socktype = SOCK_STREAM;
|
||||||
@ -253,8 +385,46 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
|
|||||||
if (status) {
|
if (status) {
|
||||||
janet_panicf("could not get address info: %s", gai_strerror(status));
|
janet_panicf("could not get address info: %s", gai_strerror(status));
|
||||||
}
|
}
|
||||||
|
return ai;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* C Funs
|
||||||
|
*/
|
||||||
|
|
||||||
|
static Janet cfun_net_connect(int32_t argc, Janet *argv) {
|
||||||
|
janet_fixarity(argc, 2);
|
||||||
|
|
||||||
|
struct addrinfo *ai = janet_get_addrinfo(argv, 0);
|
||||||
|
|
||||||
|
/* Create socket */
|
||||||
|
int sock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
|
||||||
|
if (sock < 0) {
|
||||||
|
freeaddrinfo(ai);
|
||||||
|
janet_panic("could not create socket");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Connect to socket */
|
||||||
|
int status = connect(sock, ai->ai_addr, ai->ai_addrlen);
|
||||||
|
freeaddrinfo(ai);
|
||||||
|
if (status < 0) {
|
||||||
|
close(sock);
|
||||||
|
janet_panic("could not connect to socket");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Wrap socket in abstract type JanetStream */
|
||||||
|
JanetStream *stream = make_stream(sock);
|
||||||
|
return janet_wrap_abstract(stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Janet cfun_net_server(int32_t argc, Janet *argv) {
|
||||||
|
janet_fixarity(argc, 3);
|
||||||
|
|
||||||
|
/* Get host, port, and handler*/
|
||||||
|
JanetFunction *fun = janet_getfunction(argv, 2);
|
||||||
|
|
||||||
|
struct addrinfo *ai = janet_get_addrinfo(argv, 0);
|
||||||
|
|
||||||
/* bind */
|
|
||||||
/* Check all addrinfos in a loop for the first that we can bind to. */
|
/* Check all addrinfos in a loop for the first that we can bind to. */
|
||||||
int sfd = 0;
|
int sfd = 0;
|
||||||
struct addrinfo *rp = NULL;
|
struct addrinfo *rp = NULL;
|
||||||
@ -270,9 +440,9 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* listen */
|
/* listen */
|
||||||
status = listen(sfd, 1024);
|
int status = listen(sfd, 1024);
|
||||||
if (status) {
|
|
||||||
freeaddrinfo(ai);
|
freeaddrinfo(ai);
|
||||||
|
if (status) {
|
||||||
close(sfd);
|
close(sfd);
|
||||||
janet_panic("could not listen on file descriptor");
|
janet_panic("could not listen on file descriptor");
|
||||||
}
|
}
|
||||||
@ -282,13 +452,9 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
|
|||||||
* We don't want to blow up the whole application. */
|
* We don't want to blow up the whole application. */
|
||||||
signal(SIGPIPE, SIG_IGN);
|
signal(SIGPIPE, SIG_IGN);
|
||||||
|
|
||||||
/* cleanup */
|
|
||||||
freeaddrinfo(ai);
|
|
||||||
|
|
||||||
/* Put sfd on our loop */
|
/* Put sfd on our loop */
|
||||||
JanetLoopFD lfd = {0};
|
JanetLoopFD lfd = {0};
|
||||||
lfd.fd = sfd;
|
lfd.fd = sfd;
|
||||||
lfd.select_mode = JLFD_READ;
|
|
||||||
lfd.event_type = JLE_READ_ACCEPT;
|
lfd.event_type = JLE_READ_ACCEPT;
|
||||||
lfd.data.read_accept.handler = fun;
|
lfd.data.read_accept.handler = fun;
|
||||||
janet_loop_schedule(lfd);
|
janet_loop_schedule(lfd);
|
||||||
@ -296,18 +462,50 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
|
|||||||
return janet_wrap_nil();
|
return janet_wrap_nil();
|
||||||
}
|
}
|
||||||
|
|
||||||
static Janet cfun_net_loop(int32_t argc, Janet *argv) {
|
static Janet cfun_stream_read(int32_t argc, Janet *argv) {
|
||||||
janet_fixarity(argc, 0);
|
janet_arity(argc, 2, 3);
|
||||||
(void) argv;
|
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||||
printf("starting loop...\n");
|
int32_t n = janet_getnat(argv, 1);
|
||||||
janet_loop();
|
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
|
||||||
|
janet_sched_read(stream->fd, buffer, n, JANET_SCHED_FSOME);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
|
||||||
|
janet_arity(argc, 2, 3);
|
||||||
|
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||||
|
int32_t n = janet_getnat(argv, 1);
|
||||||
|
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
|
||||||
|
janet_sched_read(stream->fd, buffer, n, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Janet cfun_stream_close(int32_t argc, Janet *argv) {
|
||||||
|
janet_fixarity(argc, 1);
|
||||||
|
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||||
|
janet_stream_close(stream, 0);
|
||||||
return janet_wrap_nil();
|
return janet_wrap_nil();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Janet cfun_stream_write(int32_t argc, Janet *argv) {
|
||||||
|
janet_fixarity(argc, 2);
|
||||||
|
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||||
|
if (janet_checktype(argv[1], JANET_BUFFER)) {
|
||||||
|
janet_sched_write_buffer(stream->fd, janet_getbuffer(argv, 1));
|
||||||
|
} else {
|
||||||
|
JanetByteView bytes = janet_getbytes(argv, 1);
|
||||||
|
janet_sched_write_stringlike(stream->fd, bytes.bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static const JanetReg net_cfuns[] = {
|
static const JanetReg net_cfuns[] = {
|
||||||
{"net/server", cfun_net_server,
|
{
|
||||||
JDOC("(net/server host port)\n\nStart a simple TCP echo server.")},
|
"net/server", cfun_net_server,
|
||||||
{"net/loop", cfun_net_loop, NULL},
|
JDOC("(net/server host port)\n\nStart a TCP server.")
|
||||||
|
},
|
||||||
|
{"net/read", cfun_stream_read, NULL},
|
||||||
|
{"net/chunk", cfun_stream_chunk, NULL},
|
||||||
|
{"net/write", cfun_stream_write, NULL},
|
||||||
|
{"net/close", cfun_stream_close, NULL},
|
||||||
|
{"net/connect", cfun_net_connect, NULL},
|
||||||
{NULL, NULL, NULL}
|
{NULL, NULL, NULL}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ int janet_dobytes(JanetTable *env, const uint8_t *bytes, int32_t len, const char
|
|||||||
JanetFiber *fiber = janet_fiber(f, 64, 0, NULL);
|
JanetFiber *fiber = janet_fiber(f, 64, 0, NULL);
|
||||||
fiber->env = env;
|
fiber->env = env;
|
||||||
JanetSignal status = janet_continue(fiber, janet_wrap_nil(), &ret);
|
JanetSignal status = janet_continue(fiber, janet_wrap_nil(), &ret);
|
||||||
if (status != JANET_SIGNAL_OK) {
|
if (status != JANET_SIGNAL_OK && status < JANET_SIGNAL_USER0) {
|
||||||
janet_stacktrace(fiber, ret);
|
janet_stacktrace(fiber, ret);
|
||||||
errflags |= 0x01;
|
errflags |= 0x01;
|
||||||
done = 1;
|
done = 1;
|
||||||
|
@ -43,6 +43,7 @@ extern JANET_THREAD_LOCAL int janet_vm_stackn;
|
|||||||
/* The current running fiber on the current thread.
|
/* The current running fiber on the current thread.
|
||||||
* Set and unset by janet_run. */
|
* Set and unset by janet_run. */
|
||||||
extern JANET_THREAD_LOCAL JanetFiber *janet_vm_fiber;
|
extern JANET_THREAD_LOCAL JanetFiber *janet_vm_fiber;
|
||||||
|
extern JANET_THREAD_LOCAL JanetFiber *janet_vm_root_fiber;
|
||||||
|
|
||||||
/* The current pointer to the inner most jmp_buf. The current
|
/* The current pointer to the inner most jmp_buf. The current
|
||||||
* return point for panics. */
|
* return point for panics. */
|
||||||
|
@ -448,11 +448,15 @@ static int thread_worker(JanetMailbox *mailbox) {
|
|||||||
Janet argv[1] = { parentv };
|
Janet argv[1] = { parentv };
|
||||||
fiber = janet_fiber(func, 64, 1, argv);
|
fiber = janet_fiber(func, 64, 1, argv);
|
||||||
JanetSignal sig = janet_continue(fiber, janet_wrap_nil(), &out);
|
JanetSignal sig = janet_continue(fiber, janet_wrap_nil(), &out);
|
||||||
if (sig != JANET_SIGNAL_OK) {
|
if (sig != JANET_SIGNAL_OK && sig < JANET_SIGNAL_USER0) {
|
||||||
janet_eprintf("in thread %v: ", janet_wrap_abstract(janet_make_thread(mailbox, encode)));
|
janet_eprintf("in thread %v: ", janet_wrap_abstract(janet_make_thread(mailbox, encode)));
|
||||||
janet_stacktrace(fiber, out);
|
janet_stacktrace(fiber, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef JANET_NET
|
||||||
|
janet_loop();
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Normal exit */
|
/* Normal exit */
|
||||||
janet_deinit();
|
janet_deinit();
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -35,6 +35,7 @@ JANET_THREAD_LOCAL JanetTable *janet_vm_core_env;
|
|||||||
JANET_THREAD_LOCAL JanetTable *janet_vm_registry;
|
JANET_THREAD_LOCAL JanetTable *janet_vm_registry;
|
||||||
JANET_THREAD_LOCAL int janet_vm_stackn = 0;
|
JANET_THREAD_LOCAL int janet_vm_stackn = 0;
|
||||||
JANET_THREAD_LOCAL JanetFiber *janet_vm_fiber = NULL;
|
JANET_THREAD_LOCAL JanetFiber *janet_vm_fiber = NULL;
|
||||||
|
JANET_THREAD_LOCAL JanetFiber *janet_vm_root_fiber = NULL;
|
||||||
JANET_THREAD_LOCAL Janet *janet_vm_return_reg = NULL;
|
JANET_THREAD_LOCAL Janet *janet_vm_return_reg = NULL;
|
||||||
JANET_THREAD_LOCAL jmp_buf *janet_vm_jmp_buf = NULL;
|
JANET_THREAD_LOCAL jmp_buf *janet_vm_jmp_buf = NULL;
|
||||||
|
|
||||||
@ -1231,7 +1232,9 @@ Janet janet_call(JanetFunction *fun, int32_t argc, const Janet *argv) {
|
|||||||
janet_vm_stackn = oldn;
|
janet_vm_stackn = oldn;
|
||||||
janet_gcunlock(handle);
|
janet_gcunlock(handle);
|
||||||
|
|
||||||
if (signal != JANET_SIGNAL_OK) janet_panicv(*janet_vm_return_reg);
|
if (signal != JANET_SIGNAL_OK) {
|
||||||
|
janet_panicv(*janet_vm_return_reg);
|
||||||
|
}
|
||||||
|
|
||||||
return *janet_vm_return_reg;
|
return *janet_vm_return_reg;
|
||||||
}
|
}
|
||||||
@ -1277,6 +1280,7 @@ JanetSignal janet_continue(JanetFiber *fiber, Janet in, Janet *out) {
|
|||||||
Janet *old_vm_return_reg = janet_vm_return_reg;
|
Janet *old_vm_return_reg = janet_vm_return_reg;
|
||||||
|
|
||||||
/* Setup fiber */
|
/* Setup fiber */
|
||||||
|
if (oldn == 0) janet_vm_root_fiber = fiber;
|
||||||
janet_vm_fiber = fiber;
|
janet_vm_fiber = fiber;
|
||||||
janet_gcroot(janet_wrap_fiber(fiber));
|
janet_gcroot(janet_wrap_fiber(fiber));
|
||||||
janet_fiber_set_status(fiber, JANET_STATUS_ALIVE);
|
janet_fiber_set_status(fiber, JANET_STATUS_ALIVE);
|
||||||
@ -1302,6 +1306,7 @@ JanetSignal janet_continue(JanetFiber *fiber, Janet in, Janet *out) {
|
|||||||
janet_gcunroot(janet_wrap_fiber(fiber));
|
janet_gcunroot(janet_wrap_fiber(fiber));
|
||||||
|
|
||||||
/* Restore global state */
|
/* Restore global state */
|
||||||
|
if (oldn == 0) janet_vm_root_fiber = NULL;
|
||||||
janet_vm_gc_suspend = handle;
|
janet_vm_gc_suspend = handle;
|
||||||
janet_vm_fiber = old_vm_fiber;
|
janet_vm_fiber = old_vm_fiber;
|
||||||
janet_vm_stackn = oldn;
|
janet_vm_stackn = oldn;
|
||||||
@ -1369,6 +1374,9 @@ int janet_init(void) {
|
|||||||
janet_vm_core_env = NULL;
|
janet_vm_core_env = NULL;
|
||||||
/* Seed RNG */
|
/* Seed RNG */
|
||||||
janet_rng_seed(janet_default_rng(), 0);
|
janet_rng_seed(janet_default_rng(), 0);
|
||||||
|
/* Fibers */
|
||||||
|
janet_vm_fiber = NULL;
|
||||||
|
janet_vm_root_fiber = NULL;
|
||||||
/* Threads */
|
/* Threads */
|
||||||
#ifdef JANET_THREADS
|
#ifdef JANET_THREADS
|
||||||
janet_threads_init();
|
janet_threads_init();
|
||||||
@ -1386,6 +1394,8 @@ void janet_deinit(void) {
|
|||||||
janet_vm_root_capacity = 0;
|
janet_vm_root_capacity = 0;
|
||||||
janet_vm_registry = NULL;
|
janet_vm_registry = NULL;
|
||||||
janet_vm_core_env = NULL;
|
janet_vm_core_env = NULL;
|
||||||
|
janet_vm_fiber = NULL;
|
||||||
|
janet_vm_root_fiber = NULL;
|
||||||
#ifdef JANET_THREADS
|
#ifdef JANET_THREADS
|
||||||
janet_threads_deinit();
|
janet_threads_deinit();
|
||||||
#endif
|
#endif
|
||||||
|
@ -1099,6 +1099,11 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT];
|
|||||||
|
|
||||||
/***** START SECTION MAIN *****/
|
/***** START SECTION MAIN *****/
|
||||||
|
|
||||||
|
/* Event Loop */
|
||||||
|
#ifdef JANET_NET
|
||||||
|
JANET_API void janet_loop(void);
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Parsing */
|
/* Parsing */
|
||||||
JANET_API void janet_parser_init(JanetParser *parser);
|
JANET_API void janet_parser_init(JanetParser *parser);
|
||||||
JANET_API void janet_parser_deinit(JanetParser *parser);
|
JANET_API void janet_parser_deinit(JanetParser *parser);
|
||||||
@ -1273,6 +1278,7 @@ JANET_API JanetFiber *janet_fiber(JanetFunction *callee, int32_t capacity, int32
|
|||||||
JANET_API JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t argc, const Janet *argv);
|
JANET_API JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t argc, const Janet *argv);
|
||||||
JANET_API JanetFiberStatus janet_fiber_status(JanetFiber *fiber);
|
JANET_API JanetFiberStatus janet_fiber_status(JanetFiber *fiber);
|
||||||
JANET_API JanetFiber *janet_current_fiber(void);
|
JANET_API JanetFiber *janet_current_fiber(void);
|
||||||
|
JANET_API JanetFiber *janet_root_fiber(void);
|
||||||
|
|
||||||
/* Treat similar types through uniform interfaces for iteration */
|
/* Treat similar types through uniform interfaces for iteration */
|
||||||
JANET_API int janet_indexed_view(Janet seq, const Janet **data, int32_t *len);
|
JANET_API int janet_indexed_view(Janet seq, const Janet **data, int32_t *len);
|
||||||
|
@ -869,10 +869,14 @@ int main(int argc, char **argv) {
|
|||||||
JanetFiber *fiber = janet_fiber(janet_unwrap_function(mainfun), 64, 1, mainargs);
|
JanetFiber *fiber = janet_fiber(janet_unwrap_function(mainfun), 64, 1, mainargs);
|
||||||
fiber->env = env;
|
fiber->env = env;
|
||||||
status = janet_continue(fiber, janet_wrap_nil(), &out);
|
status = janet_continue(fiber, janet_wrap_nil(), &out);
|
||||||
if (status != JANET_SIGNAL_OK) {
|
if (status != JANET_SIGNAL_OK && status < JANET_SIGNAL_USER0) {
|
||||||
janet_stacktrace(fiber, out);
|
janet_stacktrace(fiber, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef JANET_NET
|
||||||
|
janet_loop();
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Deinitialize vm */
|
/* Deinitialize vm */
|
||||||
janet_deinit();
|
janet_deinit();
|
||||||
janet_line_deinit();
|
janet_line_deinit();
|
||||||
|
Loading…
Reference in New Issue
Block a user