diff --git a/src/core/ev.c b/src/core/ev.c index 27f64e7c..652a56d1 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -326,6 +326,72 @@ void janet_addtimeout(double sec) { add_timeout(to); } +/* Channels */ + +typedef struct { + int32_t capacity; + int32_t head; + int32_t tail; + JanetFiber **fibers; +} JanetFiberQueue; + +#define JANET_MAX_FQ_CAPACITY 0xFFFFFF + +static void janet_fq_init(JanetFiberQueue *fq) { + fq->fibers = NULL; + fq->head = 0; + fq->tail = 0; + fq->capacity = 0; +} + +static int32_t janet_fq_count(JanetFiberQueue *fq) { + return (fq->head > fq->tail) + ? (fq->tail + fq->capacity - fq->head) + : (fq->tail - fq->head); +} + +static int janet_fq_push(JanetFiberQueue *fq, JanetFiber *fiber) { + int32_t count = janet_fq_count(fq); + if (count + 1 == JANET_MAX_FQ_CAPACITY) return 1; + /* Resize if needed */ + if (count + 1 > fq->capacity) { + int32_t newcap = (count + 1) * 2; + if (newcap > JANET_MAX_FQ_CAPACITY) newcap = JANET_MAX_FQ_CAPACITY; + if (fq->head > fq->tail) { + /* Two segments, we need to allocate and memcpy x2 */ + JanetFiber **newfibers = malloc(sizeof(JanetFiber *) * newcap); + if (NULL == newfibers) { + JANET_OUT_OF_MEMORY; + } + int32_t seg1 = fq->capacity - fq->head; + int32_t seq2 = fq->tail; + memcpy(newfibers, fq->fibers + fq->head, seg1 * sizeof(JanetFiber *)); + memcpy(newfibers + seg1, fq->fibers, seq2 * sizeof(JanetFiber *)); + free(fq->fibers); + fq->fibers = newfibers; + fq->head = 0; + fq->tail = count; + } else { + /* One segment, we can just realloc */ + fq->fibers = realloc(fq->fibers, sizeof(JanetFiber *) * newcap); + if (NULL == fq->fibers) { + JANET_OUT_OF_MEMORY; + } + } + fq->capacity = newcap; + } + fq->fibers[fq->tail++] = fiber; + if (fq->tail >= fq->capacity) fq->tail = 0; + return 0; +} + +static int janet_fq_pop(JanetFiberQueue *fq, JanetFiber **out) { + if (fq->head == fq->tail) return 1; + *out = fq->fibers[fq->head++]; + if (fq->head >= fq->capacity) fq->head = 0; + return 0; +} + /* Main event loop */ void janet_loop1_impl(void);