Disallow mutlitple state machines waiting for a single fiber.

A 'select' operator will be channel based, not state machine based.
This commit is contained in:
Calvin Rose 2020-08-08 07:51:46 -05:00
parent 1213990b7d
commit 78ffb63429
3 changed files with 8 additions and 21 deletions

View File

@ -26,7 +26,6 @@
#include "util.h" #include "util.h"
#include "gc.h" #include "gc.h"
#include "state.h" #include "state.h"
#include "vector.h"
#include "fiber.h" #include "fiber.h"
#endif #endif
@ -212,6 +211,9 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe
if (pollable->_mask & mask) { if (pollable->_mask & mask) {
janet_panic("cannot listen for duplicate event on pollable"); janet_panic("cannot listen for duplicate event on pollable");
} }
if (janet_vm_root_fiber->waiting != NULL) {
janet_panic("current fiber is already waiting for event");
}
if (size < sizeof(JanetListenerState)) if (size < sizeof(JanetListenerState))
size = sizeof(JanetListenerState); size = sizeof(JanetListenerState);
JanetListenerState *state = malloc(size); JanetListenerState *state = malloc(size);
@ -223,7 +225,7 @@ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListe
state->fiber = NULL; state->fiber = NULL;
} else { } else {
state->fiber = janet_vm_root_fiber; state->fiber = janet_vm_root_fiber;
janet_v_push(janet_vm_root_fiber->waiting, state); janet_vm_root_fiber->waiting = state;
} }
mask |= JANET_ASYNC_LISTEN_SPAWNER; mask |= JANET_ASYNC_LISTEN_SPAWNER;
state->pollable = pollable; state->pollable = pollable;
@ -253,15 +255,8 @@ static void janet_unlisten_impl(JanetListenerState *state) {
state->pollable->_mask &= ~(state->_mask); state->pollable->_mask &= ~(state->_mask);
/* Ensure fiber does not reference this state */ /* Ensure fiber does not reference this state */
JanetFiber *fiber = state->fiber; JanetFiber *fiber = state->fiber;
if (NULL != fiber) { if (NULL != fiber && fiber->waiting == state) {
int32_t count = janet_v_count(fiber->waiting); fiber->waiting = NULL;
for (int32_t i = 0; i < count; i++) {
if (fiber->waiting[i] == state) {
fiber->waiting[i] = janet_v_last(fiber->waiting);
janet_v_pop(fiber->waiting);
break;
}
}
} }
free(state); free(state);
} }
@ -302,12 +297,7 @@ void janet_pollable_deinit(JanetPollable *pollable) {
/* Cancel any state machines waiting on this fiber. */ /* Cancel any state machines waiting on this fiber. */
void janet_cancel(JanetFiber *fiber) { void janet_cancel(JanetFiber *fiber) {
int32_t lcount = janet_v_count(fiber->waiting); if (fiber->waiting) janet_unlisten(fiber->waiting);
janet_v_empty(fiber->waiting);
for (int32_t index = 0; index < lcount; index++) {
janet_unlisten(fiber->waiting[index]);
}
/* Clear timeout on the current fiber */
if (fiber->timeout_index >= 0) { if (fiber->timeout_index >= 0) {
pop_timeout(fiber->timeout_index); pop_timeout(fiber->timeout_index);
fiber->timeout_index = -1; fiber->timeout_index = -1;

View File

@ -286,9 +286,6 @@ static void janet_deinit_block(JanetGCObject *mem) {
break; break;
case JANET_MEMORY_FIBER: case JANET_MEMORY_FIBER:
free(((JanetFiber *)mem)->data); free(((JanetFiber *)mem)->data);
#ifdef JANET_EV
janet_v_free(((JanetFiber *)mem)->waiting);
#endif
break; break;
case JANET_MEMORY_BUFFER: case JANET_MEMORY_BUFFER:
janet_buffer_deinit((JanetBuffer *) mem); janet_buffer_deinit((JanetBuffer *) mem);

View File

@ -804,7 +804,7 @@ struct JanetFiber {
Janet *data; /* Dynamically resized stack memory */ Janet *data; /* Dynamically resized stack memory */
JanetFiber *child; /* Keep linked list of fibers for restarting pending fibers */ JanetFiber *child; /* Keep linked list of fibers for restarting pending fibers */
#ifdef JANET_EV #ifdef JANET_EV
JanetListenerState **waiting; JanetListenerState *waiting;
int32_t timeout_index; int32_t timeout_index;
#endif #endif
}; };