Add timeouts to net functions.

Further debugging of the general timeout system, as well
as having a single fiber wait on multiple state machines (select).
This commit is contained in:
Calvin Rose 2020-07-19 19:41:12 -05:00
parent df145f4bc9
commit 553b4d9428
9 changed files with 66 additions and 36 deletions

View File

@ -14,4 +14,7 @@
# Run server.
(let [server (net/server "127.0.0.1" "8000")]
(print "Starting echo server on 127.0.0.1:8000")
(while true (ev/call handler (:accept server))))
(forever
(if-let [conn (:accept server 2.8)]
(ev/call handler conn)
(print "no new connections"))))

View File

@ -27,6 +27,7 @@
#include "gc.h"
#include "state.h"
#include "vector.h"
#include "fiber.h"
#endif
#ifdef JANET_EV
@ -240,12 +241,8 @@ void janet_pollable_deinit(JanetPollable *pollable) {
pollable->state = NULL;
}
/* In order to avoid unexpected wakeups on a fiber, prior to
* resuming a fiber after and event is triggered, we need to
* cancel all listeners that also want to wakeup this fiber.
* Otherwise, these listeners make wakeup the fiber on an unexpected
* await point. */
void janet_unschedule_others(JanetFiber *fiber) {
/* Cancel any state machines waiting on this fiber. */
void janet_cancel(JanetFiber *fiber) {
int32_t lcount = janet_v_count(fiber->waiting);
janet_v_empty(fiber->waiting);
for (int32_t index = 0; index < lcount; index++) {
@ -260,11 +257,8 @@ void janet_unschedule_others(JanetFiber *fiber) {
/* Register a fiber to resume with value */
void janet_schedule(JanetFiber *fiber, Janet value) {
if (fiber->gc.flags & 0x10000) {
/* already scheduled to run, do nothing */
return;
}
fiber->gc.flags |= 0x10000;
if (fiber->flags & JANET_FIBER_FLAG_SCHEDULED) return;
fiber->flags |= JANET_FIBER_FLAG_SCHEDULED;
size_t oldcount = janet_vm_spawn_count;
size_t newcount = oldcount + 1;
if (newcount > janet_vm_spawn_capacity) {
@ -294,9 +288,7 @@ void janet_ev_mark(void) {
/* Run a top level task */
static void run_one(JanetFiber *fiber, Janet value) {
/* Use a gc flag bit to indicate (is this fiber scheduled?) */
fiber->gc.flags &= ~0x10000;
janet_unschedule_others(fiber);
fiber->flags &= ~JANET_FIBER_FLAG_SCHEDULED;
Janet res;
JanetSignal sig = janet_continue(fiber, value, &res);
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) {

View File

@ -81,6 +81,7 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t
}
if (janet_fiber_funcframe(fiber, callee)) return NULL;
janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE;
fiber->waiting = NULL;
return fiber;
}

View File

@ -46,7 +46,8 @@
#define JANET_FIBER_MASK_USERN(N) (16 << (N))
#define JANET_FIBER_MASK_USER 0x3FF0
#define JANET_FIBER_STATUS_MASK 0xFF0000
#define JANET_FIBER_STATUS_MASK 0x7F0000
#define JANET_FIBER_FLAG_SCHEDULED 0x800000
#define JANET_FIBER_STATUS_OFFSET 16
#define JANET_FIBER_BREAKPOINT 0x1000000

View File

@ -285,8 +285,8 @@ static void marshal_one_def(MarshalState *st, JanetFuncDef *def, int flags) {
}
#define JANET_FIBER_FLAG_HASCHILD (1 << 29)
#define JANET_FIBER_FLAG_HASENV (1 << 28)
#define JANET_STACKFRAME_HASENV (1 << 30)
#define JANET_FIBER_FLAG_HASENV (1 << 30)
#define JANET_STACKFRAME_HASENV (1 << 31)
/* Marshal a fiber */
static void marshal_one_fiber(MarshalState *st, JanetFiber *fiber, int flags) {
@ -934,6 +934,8 @@ static const uint8_t *unmarshal_one_fiber(
fiber->data = NULL;
fiber->child = NULL;
fiber->env = NULL;
fiber->waiting = NULL;
fiber->timeout_index = -1;
/* Push fiber to seen stack */
janet_v_push(st->lookup, janet_wrap_fiber(fiber));
@ -1048,6 +1050,11 @@ static const uint8_t *unmarshal_one_fiber(
fiber->maxstack = fiber_maxstack;
fiber->env = fiber_env;
int status = janet_fiber_status(fiber);
if (status < 0 || status > JANET_STATUS_ALIVE) {
janet_panic("invalid fiber status");
}
/* Return data */
*out = fiber;
return data;

View File

@ -364,7 +364,6 @@ JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEven
janet_mark(janet_wrap_function(state->function));
break;
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(s->fiber, janet_wrap_nil());
janet_gcunroot(janet_wrap_abstract(s->pollable));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_READ: {
@ -650,7 +649,7 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
/* Server with handler */
JanetStream *stream = make_stream(sfd, 0);
NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server,
JANET_ASYNC_LISTEN_READ, sizeof(NetStateSimpleServer));
JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer));
ss->function = fun;
return janet_wrap_abstract(stream);
}
@ -669,36 +668,44 @@ static void check_stream_flag(JanetStream *stream, int flag) {
}
static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
janet_arity(argc, 1, 2);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE);
double to = janet_optnumber(argv, argc, 1, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_accept(stream);
}
static Janet cfun_stream_read(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 3);
janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_READABLE);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_read(stream, buffer, n);
}
static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 3);
janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_READABLE);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_chunk(stream, buffer, n);
}
static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) {
janet_fixarity(argc, 3);
janet_arity(argc, 3, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_UDPSERVER);
int32_t n = janet_getnat(argv, 1);
JanetBuffer *buffer = janet_getbuffer(argv, 2);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_recv_from(stream, buffer, n);
}
@ -710,26 +717,32 @@ static Janet cfun_stream_close(int32_t argc, Janet *argv) {
}
static Janet cfun_stream_write(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2);
janet_arity(argc, 2, 3);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_WRITABLE);
double to = janet_optnumber(argv, argc, 2, INFINITY);
if (janet_checktype(argv[1], JANET_BUFFER)) {
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_buffer(stream, janet_getbuffer(argv, 1), NULL);
} else {
JanetByteView bytes = janet_getbytes(argv, 1);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_stringlike(stream, bytes.bytes, NULL);
}
}
static Janet cfun_stream_send_to(int32_t argc, Janet *argv) {
janet_fixarity(argc, 3);
janet_arity(argc, 3, 4);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_UDPSERVER);
void *dest = janet_getabstract(argv, 1, &AddressAT);
double to = janet_optnumber(argv, argc, 3, INFINITY);
if (janet_checktype(argv[2], JANET_BUFFER)) {
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_buffer(stream, janet_getbuffer(argv, 2), dest);
} else {
JanetByteView bytes = janet_getbytes(argv, 2);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_write_stringlike(stream, bytes.bytes, dest);
}
}
@ -783,39 +796,44 @@ static const JanetReg net_cfuns[] = {
},
{
"net/accept", cfun_stream_accept,
JDOC("(net/accept stream)\n\n"
JDOC("(net/accept stream &opt timeout)\n\n"
"Get the next connection on a server stream. This would usually be called in a loop in a dedicated fiber. "
"Takes an optional timeout in seconds, after which will return nil. "
"Returns a new duplex stream which represents a connection to the client.")
},
{
"net/read", cfun_stream_read,
JDOC("(net/read stream nbytes &opt buf)\n\n"
JDOC("(net/read stream nbytes &opt buf timeout)\n\n"
"Read up to n bytes from a stream, suspending the current fiber until the bytes are available. "
"If less than n bytes are available (and more than 0), will push those bytes and return early. "
"Takes an optional timeout in seconds, after which will return nil. "
"Returns a buffer with up to n more bytes in it.")
},
{
"net/chunk", cfun_stream_chunk,
JDOC("(net/chunk stream nbytes &opt buf)\n\n"
"Same a net/read, but will wait for all n bytes to arrive rather than return early.")
JDOC("(net/chunk stream nbytes &opt buf timeout)\n\n"
"Same a net/read, but will wait for all n bytes to arrive rather than return early. "
"Takes an optional timeout in seconds, after which will return nil.")
},
{
"net/write", cfun_stream_write,
JDOC("(net/write stream data)\n\n"
JDOC("(net/write stream data &opt timeout)\n\n"
"Write data to a stream, suspending the current fiber until the write "
"completes. Returns stream.")
"completes. Takes an optional timeout in seconds, after which will return nil. "
"Returns stream.")
},
{
"net/send-to", cfun_stream_send_to,
JDOC("(net/send-to stream dest data)\n\n"
JDOC("(net/send-to stream dest data &opt timeout)\n\n"
"Writes a datagram to a server stream. dest is a the destination address of the packet. "
"Takes an optional timeout in seconds, after which will return nil. "
"Returns stream.")
},
{
"net/recv-from", cfun_stream_recv_from,
JDOC("(net/recv-from stream nbytes buf)\n\n"
JDOC("(net/recv-from stream nbytes buf &opt timoeut)\n\n"
"Receives data from a server stream and puts it into a buffer. Returns the socket-address the "
"packet came from.")
"packet came from. Takes an optional timeout in seconds, after which will return nil.")
},
{
"net/flush", cfun_stream_flush,

View File

@ -1286,6 +1286,10 @@ static JanetSignal janet_continue_no_check(JanetFiber *fiber, Janet in, Janet *o
JanetFiberStatus old_status = janet_fiber_status(fiber);
#ifdef JANET_EV
janet_cancel(fiber);
#endif
/* Continue child fiber if it exists */
if (fiber->child) {
if (janet_vm_root_fiber == NULL) janet_vm_root_fiber = fiber;

View File

@ -1218,6 +1218,10 @@ JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListene
/* Shorthand for yielding to event loop in C */
JANET_NO_RETURN JANET_API void janet_await(void);
/* Cancel a waiting fiber. Will notify the canceled state machines, but will not
* unwind the fiber. */
void janet_cancel(JanetFiber *fiber);
/* For use inside listeners - adds a timeout to the current fiber, such that
* it will be resumed after sec seconds if no other event schedules the current fiber. */
void janet_addtimeout(double sec);

View File

@ -224,7 +224,7 @@ neldb\0\0\0\xD8\x05printG\x01\0\xDE\xDE\xDE'\x03\0marshal_tes/\x02
# No segfault, valgrind clean.
(def x @"\xCC\xCD.nd\x80\0\r\x1C\xCDg!\0\x07\xCC\xCD\r\x1Ce\x10\0\r;\xCDb\x04\xFF9\xFF\x80\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04uu\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\0\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04}\x04\x04\x04\x04\x04\x04\x04\x04#\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\0\x01\0\0\x03\x04\x04\x04\xE2\x03\x04\x04\x04\x04\x04\x04\x04\x04\x04\x14\x1A\x04\x04\x04\x04\x04\x18\x04\x04!\x04\xE2\x03\x04\x04\x04\x04\x04\x04$\x04\x04\x04\x04\x04\x04\x04\x04\x04\x80\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04A\0\0\0\x03\0\0!\xBF\xFF")
(unmarshal x load-image-dict)
(assert-error "bad fiber status" (unmarshal x load-image-dict))
(gccollect)
(marshal x make-image-dict)