1
0
mirror of https://github.com/janet-lang/janet synced 2025-11-27 20:45:13 +00:00

Change fiber signal model to add user signals. This

should allow easier implementations of eventloops,
threadpools, or custom data flows with fibers.
This commit is contained in:
Calvin Rose
2018-05-16 22:09:36 -04:00
parent 0fd9224e4a
commit 51bdc41014
14 changed files with 258 additions and 204 deletions

View File

@@ -36,7 +36,7 @@ DST_THREAD_LOCAL DstFiber *dst_vm_fiber = NULL;
if (dst_vm_next_collection >= dst_vm_gc_interval) dst_collect(); } while (0)
/* Start running the VM from where it left off. */
Dst dst_run(DstFiber *fiber) {
DstSignal dst_continue(DstFiber *fiber, Dst in, Dst *out) {
/* Save old fiber to reset */
DstFiber *old_vm_fiber = dst_vm_fiber;
@@ -50,25 +50,56 @@ Dst dst_run(DstFiber *fiber) {
* Values stored here should be used immediately */
Dst retreg;
/* Signal to return when done */
DstSignal signal = DST_SIGNAL_OK;
/* Ensure fiber is not alive, dead, or error */
DstFiberStatus startstatus = dst_fiber_status(fiber);
if (startstatus == DST_STATUS_ALIVE ||
startstatus == DST_STATUS_DEAD ||
startstatus == DST_STATUS_ERROR) {
*out = dst_cstringv("cannot resume alive, dead, or errored fiber");
return DST_SIGNAL_ERROR;
}
/* Increment the stackn */
if (dst_vm_stackn >= DST_RECURSION_GUARD) {
fiber->status = DST_FIBER_ERROR;
return dst_cstringv("C stack recursed too deeply");
dst_fiber_set_status(fiber, DST_STATUS_ERROR);
*out = dst_cstringv("C stack recursed too deeply");
return DST_SIGNAL_ERROR;
}
dst_vm_stackn++;
/* Setup fiber state */
dst_vm_fiber = fiber;
dst_gcroot(dst_wrap_fiber(fiber));
if (fiber->flags & DST_FIBER_FLAG_NEW) {
dst_gcroot(in);
if (startstatus == DST_STATUS_NEW) {
dst_fiber_push(fiber, in);
dst_fiber_funcframe(fiber, fiber->root);
fiber->flags &= ~DST_FIBER_FLAG_NEW;
}
fiber->status = DST_FIBER_ALIVE;
dst_fiber_set_status(fiber, DST_STATUS_ALIVE);
stack = fiber->data + fiber->frame;
pc = dst_stack_frame(stack)->pc;
func = dst_stack_frame(stack)->func;
/* Used to extract bits from the opcode that correspond to arguments.
* Pulls out unsigned integers */
#define oparg(shift, mask) (((*pc) >> ((shift) << 3)) & (mask))
/* Check for child fiber. If there is a child, run child before self.
* This should only be hit when the current fiber is pending on a RESUME
* instruction. */
if (fiber->child) {
retreg = in;
goto vm_resume_child;
} else if (fiber->flags & DST_FIBER_FLAG_SIGNAL_WAITING) {
/* If waiting for response to signal, use input and increment pc */
stack[oparg(1, 0xFF)] = in;
pc++;
fiber->flags &= ~DST_FIBER_FLAG_SIGNAL_WAITING;
}
/* Use computed gotos for GCC and clang, otherwise use switch */
#ifdef __GNUC__
#define VM_START() {vm_next();
@@ -143,13 +174,12 @@ static void *op_lookup[255] = {
&&label_DOP_CALL,
&&label_DOP_TAILCALL,
&&label_DOP_RESUME,
&&label_DOP_YIELD,
&&label_DOP_SIGNAL,
&&label_DOP_GET,
&&label_DOP_PUT,
&&label_DOP_GET_INDEX,
&&label_DOP_PUT_INDEX,
&&label_DOP_LENGTH,
&&label_DOP_DEBUG,
&&label_unknown_op
};
#else
@@ -162,10 +192,6 @@ static void *op_lookup[255] = {
#define vm_checkgc_next() dst_maybe_collect(); vm_next()
/* Used to extract bits from the opcode that correspond to arguments.
* Pulls out unsigned integers */
#define oparg(shift, mask) (((*pc) >> ((shift) << 3)) & (mask))
#define vm_throw(e) do { retreg = dst_cstringv(e); goto vm_error; } while (0)
#define vm_assert(cond, e) do {if (!(cond)) vm_throw((e)); } while (0)
@@ -212,8 +238,8 @@ static void *op_lookup[255] = {
VM_START();
VM_DEFAULT();
VM_OP(DOP_DEBUG)
goto vm_debug;
retreg = dst_wrap_nil();
goto vm_exit;
VM_OP(DOP_NOOP)
pc++;
@@ -565,9 +591,9 @@ static void *op_lookup[255] = {
int32_t eindex = oparg(2, 0xFF);
int32_t vindex = oparg(3, 0xFF);
DstFuncEnv *env;
vm_assert(func->def->environments_length > eindex, "invalid upvalue");
vm_assert(func->def->environments_length > eindex, "invalid upvalue environment");
env = func->envs[eindex];
vm_assert(env->length > vindex, "invalid upvalue");
vm_assert(env->length > vindex, "invalid upvalue index");
if (env->offset) {
/* On stack */
stack[oparg(1, 0xFF)] = env->as.fiber->data[env->offset + vindex];
@@ -584,9 +610,9 @@ static void *op_lookup[255] = {
int32_t eindex = oparg(2, 0xFF);
int32_t vindex = oparg(3, 0xFF);
DstFuncEnv *env;
vm_assert(func->def->environments_length > eindex, "invalid upvalue");
vm_assert(func->def->environments_length > eindex, "invalid upvalue environment");
env = func->envs[eindex];
vm_assert(env->length > vindex, "invalid upvalue");
vm_assert(env->length > vindex, "invalid upvalue index");
if (env->offset) {
env->as.fiber->data[env->offset + vindex] = stack[oparg(1, 0xFF)];
} else {
@@ -723,71 +749,21 @@ static void *op_lookup[255] = {
VM_OP(DOP_RESUME)
{
DstFiber *nextfiber;
Dst fiberval = stack[oparg(2, 0xFF)];
Dst val = stack[oparg(3, 0xFF)];
vm_assert(dst_checktype(fiberval, DST_FIBER), "expected fiber");
nextfiber = dst_unwrap_fiber(fiberval);
switch (nextfiber->status) {
default:
vm_throw("expected pending, new, or debug fiber");
case DST_FIBER_NEW:
{
dst_fiber_push(nextfiber, val);
dst_fiber_funcframe(nextfiber, nextfiber->root);
nextfiber->flags &= ~DST_FIBER_FLAG_NEW;
break;
}
case DST_FIBER_DEBUG:
{
if (!nextfiber->child) {
DstStackFrame *nextframe = dst_fiber_frame(nextfiber);
nextframe->pc++;
}
break;
}
case DST_FIBER_PENDING:
{
if (!nextfiber->child) {
DstStackFrame *nextframe = dst_fiber_frame(nextfiber);
nextfiber->data[nextfiber->frame + ((*nextframe->pc >> 8) & 0xFF)] = val;
nextframe->pc++;
}
break;
}
}
fiber->child = nextfiber;
retreg = dst_run(nextfiber);
dst_vm_fiber = fiber;
switch (nextfiber->status) {
case DST_FIBER_DEBUG:
if (nextfiber->flags & DST_FIBER_MASK_DEBUG) goto vm_debug;
fiber->child = NULL;
break;
case DST_FIBER_ERROR:
if (nextfiber->flags & DST_FIBER_MASK_ERROR) goto vm_error;
fiber->child = NULL;
break;
case DST_FIBER_PENDING:
if (nextfiber->flags & DST_FIBER_MASK_YIELD) {
fiber->status = DST_FIBER_PENDING;
goto vm_exit;
}
fiber->child = NULL;
break;
default:
fiber->child = NULL;
break;
}
stack[oparg(1, 0xFF)] = retreg;
pc++;
vm_checkgc_next();
retreg = stack[oparg(3, 0xFF)];
fiber->child = dst_unwrap_fiber(fiberval);
goto vm_resume_child;
}
VM_OP(DOP_YIELD)
VM_OP(DOP_SIGNAL)
{
retreg = stack[oparg(2, 0xFFFF)];
fiber->status = DST_FIBER_PENDING;
int32_t s = oparg(3, 0xFF);
if (s > DST_SIGNAL_USER9) s = DST_SIGNAL_USER9;
if (s < 0) s = 0;
signal = s;
retreg = stack[oparg(2, 0xFF)];
fiber->flags |= DST_FIBER_FLAG_SIGNAL_WAITING;
goto vm_exit;
}
@@ -828,7 +804,7 @@ static void *op_lookup[255] = {
vm_return_cfunc:
{
dst_fiber_popframe(fiber);
if (fiber->frame == 0) goto vm_return_root;
if (fiber->frame == 0) goto vm_exit;
stack = fiber->data + fiber->frame;
stack[oparg(1, 0xFF)] = retreg;
pc++;
@@ -840,7 +816,7 @@ static void *op_lookup[255] = {
{
dst_fiber_popframe(fiber);
dst_fiber_popframe(fiber);
if (fiber->frame == 0) goto vm_return_root;
if (fiber->frame == 0) goto vm_exit;
goto vm_reset;
}
@@ -848,40 +824,59 @@ static void *op_lookup[255] = {
vm_return:
{
dst_fiber_popframe(fiber);
if (fiber->frame == 0) goto vm_return_root;
if (fiber->frame == 0) goto vm_exit;
goto vm_reset;
}
/* Exit loop with return value */
vm_return_root:
/* Resume a child fiber */
vm_resume_child:
{
fiber->status = DST_FIBER_DEAD;
goto vm_exit;
DstFiber *child = fiber->child;
DstFiberStatus status = dst_fiber_status(child);
if (status == DST_STATUS_ALIVE ||
status == DST_STATUS_DEAD ||
status == DST_STATUS_ERROR) {
vm_throw("cannot resume alive, dead, or errored fiber");
}
signal = dst_continue(child, retreg, &retreg);
if (signal != DST_SIGNAL_OK) {
if (child->flags & (1 << signal)) {
/* Intercept signal */
signal = DST_SIGNAL_OK;
fiber->child = NULL;
} else {
/* Propogate signal */
goto vm_exit;
}
}
stack[oparg(1, 0xFF)] = retreg;
pc++;
vm_checkgc_next();
}
/* Handle errors from c functions and vm opcodes */
vm_error:
{
fiber->status = DST_FIBER_ERROR;
signal = DST_SIGNAL_ERROR;
goto vm_exit;
}
/* Handle debugger interrupts */
vm_debug:
{
fiber->status = DST_FIBER_DEBUG;
retreg = dst_wrap_nil();
goto vm_exit;
}
/* Exit from vm loop */
/* Exit from vm loop. If signal is not set explicitely, does
* a successful return (DST_SIGNAL_OK). */
vm_exit:
{
dst_stack_frame(stack)->pc = pc;
dst_vm_stackn--;
dst_gcunroot(in);
dst_gcunroot(dst_wrap_fiber(fiber));
dst_vm_fiber = old_vm_fiber;
return retreg;
*out = retreg;
/* All statuses correspond to signals except new and alive,
* which cannot be entered when exiting the vm loop.
* DST_SIGNAL_OK -> DST_STATUS_DEAD
* DST_SIGNAL_YIELD -> DST_STATUS_PENDING */
dst_fiber_set_status(fiber, signal);
return signal;
}
/* Reset state of machine */
@@ -907,33 +902,17 @@ static void *op_lookup[255] = {
#undef vm_binop_immediate
}
Dst dst_resume(DstFiber *fiber, int32_t argn, const Dst *argv) {
switch (fiber->status) {
default:
dst_exit("expected new, pending or debug fiber");
case DST_FIBER_DEBUG:
break;
case DST_FIBER_NEW:
{
int32_t i;
for (i = 0; i < argn; i++)
dst_fiber_push(fiber, argv[i]);
dst_fiber_funcframe(fiber, fiber->root);
fiber->flags &= ~DST_FIBER_FLAG_NEW;
break;
}
case DST_FIBER_PENDING:
{
DstStackFrame *frame = dst_fiber_frame(fiber);
fiber->data[fiber->frame + ((*frame->pc >> 8) & 0xFF)] = argn > 0
? argv[0]
: dst_wrap_nil();
frame->pc++;
break;
}
DstSignal dst_call(DstFunction *fun, int32_t argn, const Dst *argv, Dst *out) {
int32_t i;
DstFiber *fiber = dst_fiber(fun, 64);
for (i = 0; i < argn; i++) {
dst_fiber_push(fiber, argv[i]);
}
return dst_run(fiber);
dst_fiber_funcframe(fiber, fiber->root);
/* Prevent push an extra value on the stack */
dst_fiber_set_status(fiber, DST_STATUS_PENDING);
return dst_continue(fiber, dst_wrap_nil(), out);
}
/* Setup VM */