From 76be8006a43f916c31fcc471e6915be11bb769a3 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Thu, 10 Nov 2022 16:32:54 -0600 Subject: [PATCH] Add channel marshalling. --- CHANGELOG.md | 1 + src/core/ev.c | 38 ++++++++++++++++++++++++++++++++++++-- test/suite0009.janet | 11 +++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 16f9261b..37b36cc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ All notable changes to this project will be documented in this file. ## ??? - Unreleased +- Channels can now be marshalled. Pending state is not saved, only items in the channel. - Use the new `.length` function pointer on abstract types for lengths. Adding a `length` method will still work as well. - Support byte views on abstract types with the `bytes` function pointer. diff --git a/src/core/ev.c b/src/core/ev.c index d7747a97..32b31ab4 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -1175,14 +1175,48 @@ static Janet janet_chanat_next(void *p, Janet key) { return janet_nextmethod(ev_chanat_methods, key); } +static void janet_chanat_marshal(void *p, JanetMarshalContext *ctx) { + JanetChannel *channel = (JanetChannel *)p; + janet_marshal_byte(ctx, channel->closed); + janet_marshal_int(ctx, channel->limit); + int32_t count = janet_q_count(&channel->items); + janet_marshal_int(ctx, count); + JanetQueue *items = &channel->items; + Janet *data = channel->items.data; + if (items->head <= items->tail) { + for (int32_t i = items->head; i < items->tail; i++) + janet_marshal_janet(ctx, data[i]); + } else { + for (int32_t i = items->head; i < items->capacity; i++) + janet_marshal_janet(ctx, data[i]); + for (int32_t i = 0; i < items->tail; i++) + janet_marshal_janet(ctx, data[i]); + } +} + +static void *janet_chanat_unmarshal(JanetMarshalContext *ctx) { + JanetChannel *abst = janet_unmarshal_abstract(ctx, sizeof(JanetChannel)); + uint8_t is_closed = janet_unmarshal_byte(ctx); + int32_t limit = janet_unmarshal_int(ctx); + int32_t count = janet_unmarshal_int(ctx); + if (count < 0) janet_panic("invalid negative channel count"); + janet_chan_init(abst, limit, 0); + abst->closed = !!is_closed; + for (int32_t i = 0; i < count; i++) { + Janet item = janet_unmarshal_janet(ctx); + janet_q_push(&abst->items, &item, sizeof(item)); + } + return abst; +} + const JanetAbstractType janet_channel_type = { "core/channel", janet_chanat_gc, janet_chanat_mark, janet_chanat_get, NULL, /* put */ - NULL, /* marshal */ - NULL, /* unmarshal */ + janet_chanat_marshal, + janet_chanat_unmarshal, NULL, /* tostring */ NULL, /* compare */ NULL, /* hash */ diff --git a/test/suite0009.janet b/test/suite0009.janet index 018a7052..d75db543 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -265,4 +265,15 @@ (ev/do-thread (assert (ev/take ch) "channel packing bug for threaded abstracts on threaded channels.")) +# marshal channels + +(def ch (ev/chan 10)) +(ev/give ch "hello") +(ev/give ch "world") +(def ch2 (-> ch marshal unmarshal)) +(def item1 (ev/take ch2)) +(def item2 (ev/take ch2)) +(assert (= item1 "hello")) +(assert (= item2 "world")) + (end-suite)