diff --git a/src/core/ev.c b/src/core/ev.c index b39e4a97..83a117c6 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -46,10 +46,12 @@ #endif typedef struct { + JanetVM *thread; JanetFiber *fiber; uint32_t sched_id; enum { - JANET_CP_MODE_ITEM, + JANET_CP_MODE_READ, + JANET_CP_MODE_WRITE, JANET_CP_MODE_CHOICE_READ, JANET_CP_MODE_CHOICE_WRITE } mode; @@ -61,6 +63,12 @@ typedef struct { JanetQueue write_pending; int32_t limit; int closed; + int ref_count; /* refcount < 0 means no threading */ +#ifdef JANET_WINDOWS + CRITICAL_SECTION lock; +#else + pthread_mutex_t lock; +#endif } JanetChannel; typedef struct { @@ -503,6 +511,7 @@ void janet_ev_mark(void) { } static int janet_channel_push(JanetChannel *channel, Janet x, int mode); +static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice); static Janet make_supervisor_event(const char *name, JanetFiber *fiber) { Janet tup[2]; @@ -520,6 +529,7 @@ void janet_ev_init_common(void) { janet_vm.tq = NULL; janet_vm.tq_count = 0; janet_vm.tq_capacity = 0; + janet_table_init_raw(&janet_vm.channel_map, 0); janet_rng_seed(&janet_vm.ev_rng, 0); } @@ -529,6 +539,7 @@ void janet_ev_deinit_common(void) { janet_free(janet_vm.tq); janet_free(janet_vm.listeners); janet_vm.listeners = NULL; + janet_table_deinit(&janet_vm.channel_map); } /* Short hand to yield to event loop */ @@ -560,18 +571,87 @@ void janet_ev_dec_refcount(void) { #define JANET_MAX_CHANNEL_CAPACITY 0xFFFFFF -static void janet_chan_init(JanetChannel *chan, int32_t limit) { +static inline int janet_chan_is_threaded(JanetChannel *chan) { + return chan->ref_count >= 0; +} + +static int janet_chan_pack(JanetChannel *chan, Janet *x) { + if (chan->ref_count < 0) return 0; + switch (janet_type(*x)) { + default: + return 1; + case JANET_NIL: + case JANET_NUMBER: + case JANET_POINTER: + case JANET_BOOLEAN: + case JANET_CFUNCTION: + return 0; + } +} + +static int janet_chan_unpack(JanetChannel *chan, Janet *x) { + if (!janet_chan_is_threaded(chan)) return 0; + switch (janet_type(*x)) { + default: + return 1; + case JANET_NIL: + case JANET_NUMBER: + case JANET_POINTER: + case JANET_BOOLEAN: + case JANET_CFUNCTION: + return 0; + } +} + +static void janet_chan_init(JanetChannel *chan, int32_t limit, int threaded) { chan->limit = limit; chan->closed = 0; + chan->ref_count = threaded ? 1 : -1; janet_q_init(&chan->items); janet_q_init(&chan->read_pending); janet_q_init(&chan->write_pending); +#ifdef JANET_WINDOWS + InitializeCriticalSection(&chan->lock); +#else + pthread_mutex_init(&chan->lock, NULL); +#endif } static void janet_chan_deinit(JanetChannel *chan) { janet_q_deinit(&chan->read_pending); janet_q_deinit(&chan->write_pending); + if (janet_chan_is_threaded(chan)) { + /* Unpack everything in the queue so that memory isn't leaked + * and destructors are called for everything in the queue. */ + Janet item; + while (!janet_q_pop(&chan->items, &item, sizeof(item))) { + janet_chan_unpack(chan, &item); + } + } janet_q_deinit(&chan->items); +#ifdef JANET_WINDOWS + DeleteCriticalSection(&chan->lock); +#else + pthread_mutex_destroy(&chan->lock); +#endif +} + +static void janet_chan_lock(JanetChannel *chan) { + if (!janet_chan_is_threaded(chan)) return; +#ifdef JANET_WINDOWS + EnterCriticalSection(&chan->lock); +#else + pthread_mutex_lock(&chan->lock); +#endif +} + +static void janet_chan_unlock(JanetChannel *chan) { + if (!janet_chan_is_threaded(chan)) return; +#ifdef JANET_WINDOWS + LeaveCriticalSection(&chan->lock); +#else + pthread_mutex_unlock(&chan->lock); +#endif } /* @@ -582,6 +662,8 @@ static int janet_chanat_mark(void *p, size_t s); static int janet_chanat_gc(void *p, size_t s); static Janet janet_chanat_next(void *p, Janet key); static int janet_chanat_get(void *p, Janet key, Janet *out); +static void janet_chanat_marshal(void *p, JanetMarshalContext *ctx); +static void *janet_chanat_unmarshal(JanetMarshalContext *ctx); static const JanetAbstractType ChannelAT = { "core/channel", @@ -589,8 +671,8 @@ static const JanetAbstractType ChannelAT = { janet_chanat_mark, janet_chanat_get, NULL, /* put */ - NULL, /* marshal */ - NULL, /* unmarshal */ + janet_chanat_marshal, + janet_chanat_unmarshal, NULL, /* tostring */ NULL, /* compare */ NULL, /* hash */ @@ -598,10 +680,101 @@ static const JanetAbstractType ChannelAT = { JANET_ATEND_NEXT }; +static Janet janet_wrap_channel(JanetChannel *channel) { + if (janet_chan_is_threaded(channel)) { + return janet_table_get(&janet_vm.channel_map, janet_wrap_pointer(channel)); + } else { + return janet_wrap_abstract(channel); + } +} + +static void janet_chanat_marshal(void *p, JanetMarshalContext *ctx) { + size_t s = janet_abstract_size(p); + int is_threaded = s == sizeof(JanetChannel *); + janet_marshal_int(ctx, is_threaded); + janet_marshal_abstract(ctx, p); + if (is_threaded) { + if (!(ctx->flags & JANET_MARSHAL_UNSAFE)) { + janet_panic("can only marshal threaded channel with unsafe flag"); + } + JanetChannel *channel = *((JanetChannel **) p); + + janet_marshal_int64(ctx, (int64_t) channel); + + /* Increment refcount BEFORE sending to thread - this prevents the channel from being GCed while in transit. + * The runtime will ensure that the channel will be unmarshalled eventually, even if no thread can receive it to + * run the gc finailizer and eventually decrement the refcount. */ + janet_chan_lock(channel); + channel->ref_count++; + janet_chan_unlock(channel); + + } else { + JanetChannel *channel = (JanetChannel *)p; + janet_marshal_int(ctx, channel->closed); + janet_marshal_int(ctx, channel->limit); + janet_marshal_int(ctx, janet_q_count(&channel->items)); + Janet *items = channel->items.data; + for (int32_t i = channel->items.head; i < channel->items.tail; i++) { + janet_marshal_janet(ctx, items[i]); + } + } +} + +static void *janet_chanat_unmarshal(JanetMarshalContext *ctx) { + int is_threaded = janet_unmarshal_int(ctx); + if (is_threaded) { + if (!(ctx->flags & JANET_MARSHAL_UNSAFE)) { + janet_panic("can only unmarshal threaded channel with unsafe flag"); + } + JanetChannel **pchan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel *)); + int64_t intptr = janet_unmarshal_int64(ctx); + JanetChannel *chan = (JanetChannel *) intptr; + *pchan = chan; + return pchan; + } else { + JanetChannel *chan = janet_unmarshal_abstract(ctx, sizeof(JanetChannel)); + chan->limit = 0; + chan->closed = 1; + chan->ref_count = -1; + janet_q_init(&chan->items); + janet_q_init(&chan->read_pending); + janet_q_init(&chan->write_pending); + int closed = janet_unmarshal_int(ctx); + int32_t limit = janet_unmarshal_int(ctx); + if (limit < 0) { + janet_panicf("invalid channel limit found: %d", limit); + } + int32_t count = janet_unmarshal_int(ctx); + if (count < 0) { + janet_panicf("invalid channel count found: %d", count); + } + chan->limit = limit; + chan->closed = closed; + for (int32_t i = 0; i < count; i++) { + Janet item = janet_unmarshal_janet(ctx); + janet_q_push(&chan->items, &item, sizeof(item)); + } + return chan; + } +} + static int janet_chanat_gc(void *p, size_t s) { - (void) s; - JanetChannel *channel = p; - janet_chan_deinit(channel); + JanetChannel *channel; + if (s == sizeof(JanetChannel *)) { + /* threaded */ + channel = *((JanetChannel **)p); + janet_table_remove(&janet_vm.channel_map, janet_wrap_pointer(channel)); + janet_chan_lock(channel); + if (--channel->ref_count == 0) { + janet_chan_deinit(channel); + } else { + janet_chan_unlock(channel); + } + } else { + /* not threaded */ + channel = p; + janet_chan_deinit(channel); + } return 0; } @@ -619,7 +792,7 @@ static void janet_chanat_mark_fq(JanetQueue *fq) { } static int janet_chanat_mark(void *p, size_t s) { - (void) s; + if (s == sizeof(JanetChannel *)) return 0; JanetChannel *chan = p; janet_chanat_mark_fq(&chan->read_pending); janet_chanat_mark_fq(&chan->write_pending); @@ -640,14 +813,14 @@ static int janet_chanat_mark(void *p, size_t s) { static Janet make_write_result(JanetChannel *channel) { Janet *tup = janet_tuple_begin(2); tup[0] = janet_ckeywordv("give"); - tup[1] = janet_wrap_abstract(channel); + tup[1] = janet_wrap_channel(channel); return janet_wrap_tuple(janet_tuple_end(tup)); } static Janet make_read_result(JanetChannel *channel, Janet x) { Janet *tup = janet_tuple_begin(3); tup[0] = janet_ckeywordv("take"); - tup[1] = janet_wrap_abstract(channel); + tup[1] = janet_wrap_channel(channel); tup[2] = x; return janet_wrap_tuple(janet_tuple_end(tup)); } @@ -655,41 +828,127 @@ static Janet make_read_result(JanetChannel *channel, Janet x) { static Janet make_close_result(JanetChannel *channel) { Janet *tup = janet_tuple_begin(2); tup[0] = janet_ckeywordv("close"); - tup[1] = janet_wrap_abstract(channel); + tup[1] = janet_wrap_channel(channel); return janet_wrap_tuple(janet_tuple_end(tup)); } +/* Callback to use for scheduling a fiber from another thread. */ +static void janet_thread_chan_cb(JanetEVGenericMessage msg) { + uint32_t sched_id = (uint32_t) msg.argi; + JanetFiber *fiber = msg.fiber; + int mode = msg.tag; + JanetChannel *channel = (JanetChannel *) msg.argp; + Janet x = msg.argj; + if (fiber->sched_id == sched_id) { + if (mode == JANET_CP_MODE_CHOICE_READ) { + janet_assert(!janet_chan_unpack(channel, &x), "packing error"); + janet_schedule(fiber, make_read_result(channel, x)); + } else if (mode == JANET_CP_MODE_CHOICE_WRITE) { + janet_schedule(fiber, make_write_result(channel)); + } else if (mode == JANET_CP_MODE_READ) { + janet_assert(!janet_chan_unpack(channel, &x), "packing error"); + janet_schedule(fiber, x); + } else { /* MODE_WRITE */ + janet_schedule(fiber, janet_wrap_channel(channel)); + } + } else { + /* Fiber has already been cancelled or resumed. */ + /* Resend event to another waiting thread, depending on mode */ + int is_read = (mode == JANET_CP_MODE_CHOICE_READ) || (mode == JANET_CP_MODE_READ); + if (is_read) { + JanetChannelPending reader; + janet_chan_lock(channel); + if (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) { + JanetVM *vm = reader.thread; + JanetEVGenericMessage msg; + msg.tag = reader.mode; + msg.fiber = reader.fiber; + msg.argi = (int32_t) reader.sched_id; + msg.argp = channel; + msg.argj = x; + janet_ev_post_event(vm, janet_thread_chan_cb, msg); + } + janet_chan_unlock(channel); + } else { + JanetChannelPending writer; + janet_chan_lock(channel); + if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { + JanetVM *vm = writer.thread; + JanetEVGenericMessage msg; + msg.tag = writer.mode; + msg.fiber = writer.fiber; + msg.argi = (int32_t) writer.sched_id; + msg.argp = channel; + msg.argj = janet_wrap_nil(); + janet_ev_post_event(vm, janet_thread_chan_cb, msg); + } + janet_chan_unlock(channel); + } + } +} + /* Push a value to a channel, and return 1 if channel should block, zero otherwise. - * If the push would block, will add to the write_pending queue in the channel. */ + * If the push would block, will add to the write_pending queue in the channel. + * Handles both threaded and unthreaded channels. */ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { JanetChannelPending reader; int is_empty; + if (janet_chan_pack(channel, &x)) { + janet_panicf("failed to pack value for channel: %v", x); + } + janet_chan_lock(channel); + if (channel->closed) { + janet_chan_unlock(channel); + janet_panic("cannot write to closed channel"); + } + int is_threaded = janet_chan_is_threaded(channel); do { is_empty = janet_q_pop(&channel->read_pending, &reader, sizeof(reader)); } while (!is_empty && (reader.sched_id != reader.fiber->sched_id)); if (is_empty) { /* No pending reader */ if (janet_q_push(&channel->items, &x, sizeof(Janet))) { + janet_chan_unlock(channel); janet_panicf("channel overflow: %v", x); } else if (janet_q_count(&channel->items) > channel->limit) { /* No root fiber, we are in completion on a root fiber. Don't block. */ - if (mode == 2) return 0; + if (mode == 2) { + janet_chan_unlock(channel); + return 0; + } /* Pushed successfully, but should block. */ JanetChannelPending pending; + pending.thread = &janet_vm; pending.fiber = janet_vm.root_fiber, pending.sched_id = janet_vm.root_fiber->sched_id, - pending.mode = mode ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_ITEM; + pending.mode = mode ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_WRITE; janet_q_push(&channel->write_pending, &pending, sizeof(pending)); + janet_chan_unlock(channel); + if (is_threaded) { + janet_gcroot(janet_wrap_fiber(pending.fiber)); + } return 1; } } else { /* Pending reader */ - if (reader.mode == JANET_CP_MODE_CHOICE_READ) { - janet_schedule(reader.fiber, make_read_result(channel, x)); + if (is_threaded) { + JanetVM *vm = reader.thread; + JanetEVGenericMessage msg; + msg.tag = reader.mode; + msg.fiber = reader.fiber; + msg.argi = (int32_t) reader.sched_id; + msg.argp = channel; + msg.argj = x; + janet_ev_post_event(vm, janet_thread_chan_cb, msg); } else { - janet_schedule(reader.fiber, x); + if (reader.mode == JANET_CP_MODE_CHOICE_READ) { + janet_schedule(reader.fiber, make_read_result(channel, x)); + } else { + janet_schedule(reader.fiber, x); + } } } + janet_chan_unlock(channel); return 0; } @@ -698,26 +957,62 @@ static int janet_channel_push(JanetChannel *channel, Janet x, int mode) { * queue in the channel. */ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) { JanetChannelPending writer; + janet_chan_lock(channel); + if (channel->closed) { + janet_chan_unlock(channel); + *item = janet_wrap_nil(); + return 1; + } + int is_threaded = channel->ref_count >= 0; if (janet_q_pop(&channel->items, item, sizeof(Janet))) { /* Queue empty */ JanetChannelPending pending; + pending.thread = &janet_vm; pending.fiber = janet_vm.root_fiber, pending.sched_id = janet_vm.root_fiber->sched_id; - pending.mode = is_choice ? JANET_CP_MODE_CHOICE_READ : JANET_CP_MODE_ITEM; + pending.mode = is_choice ? JANET_CP_MODE_CHOICE_READ : JANET_CP_MODE_READ; janet_q_push(&channel->read_pending, &pending, sizeof(pending)); + janet_chan_unlock(channel); + if (is_threaded) { + janet_gcroot(janet_wrap_fiber(pending.fiber)); + } return 0; } + janet_assert(!janet_chan_unpack(channel, item), "bad channel packing"); if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { - /* pending writer */ - if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { - janet_schedule(writer.fiber, make_write_result(channel)); + /* Pending writer */ + if (is_threaded) { + JanetVM *vm = writer.thread; + JanetEVGenericMessage msg; + msg.tag = writer.mode; + msg.fiber = writer.fiber; + msg.argi = (int32_t) writer.sched_id; + msg.argp = channel; + msg.argj = janet_wrap_nil(); + janet_ev_post_event(vm, janet_thread_chan_cb, msg); } else { - janet_schedule(writer.fiber, janet_wrap_abstract(channel)); + if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { + janet_schedule(writer.fiber, make_write_result(channel)); + } else { + janet_schedule(writer.fiber, janet_wrap_abstract(channel)); + } } } + janet_chan_unlock(channel); return 1; } +static JanetChannel *janet_getchannel(const Janet *argv, int32_t n) { + void *p = janet_getabstract(argv, n, &ChannelAT); + /* Rely on Janet's abstract type size tracking and the fact that a channel will + * surely be bigger than a pointer to a channel */ + if (janet_abstract_size(p) == sizeof(JanetChannel *)) { + return *((JanetChannel **)p); + } else { + return (JanetChannel *)p; + } +} + /* Channel Methods */ JANET_CORE_FN(cfun_channel_push, @@ -725,10 +1020,7 @@ JANET_CORE_FN(cfun_channel_push, "Write a value to a channel, suspending the current fiber if the channel is full. " "Returns the channel if the write succeeded, nil otherwise.") { janet_fixarity(argc, 2); - JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); - if (channel->closed) { - janet_panic("cannot write to closed channel"); - } + JanetChannel *channel = janet_getchannel(argv, 0); if (janet_channel_push(channel, argv[1], 0)) { janet_await(); } @@ -739,9 +1031,8 @@ JANET_CORE_FN(cfun_channel_pop, "(ev/take channel)", "Read from a channel, suspending the current fiber if no value is available.") { janet_fixarity(argc, 1); - JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); + JanetChannel *channel = janet_getchannel(argv, 0); Janet item; - if (channel->closed) return janet_wrap_nil(); if (janet_channel_pop(channel, &item, 0)) { janet_schedule(janet_vm.root_fiber, item); } @@ -753,7 +1044,8 @@ JANET_CORE_FN(cfun_channel_choice, "Block until the first of several channel operations occur. Returns a tuple of the form [:give chan], [:take chan x], or [:close chan], where " "a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for " "a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first " - "clauses will take precedence over later clauses. Both and give and take operation can return a [:close chan] tuple, which indicates that the specified channel was closed.") { + "clauses will take precedence over later clauses. Both and give and take operations can return a [:close chan] tuple, which indicates that " + "the specified channel was closed while waiting, or that the channel was already closed.") { janet_arity(argc, 1, -1); int32_t len; const Janet *data; @@ -762,16 +1054,21 @@ JANET_CORE_FN(cfun_channel_choice, for (int32_t i = 0; i < argc; i++) { if (janet_indexed_view(argv[i], &data, &len) && len == 2) { /* Write */ - JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT); - if (chan->closed) continue; + JanetChannel *chan = janet_getchannel(data, 0); + janet_chan_lock(chan); + if (chan->closed) { + return make_close_result(chan); + } if (janet_q_count(&chan->items) < chan->limit) { janet_channel_push(chan, data[1], 1); return make_write_result(chan); } } else { /* Read */ - JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT); - if (chan->closed) continue; + JanetChannel *chan = janet_getchannel(argv, i); + if (chan->closed) { + return make_close_result(chan); + } if (chan->items.head != chan->items.tail) { Janet item; janet_channel_pop(chan, &item, 1); @@ -784,13 +1081,13 @@ JANET_CORE_FN(cfun_channel_choice, for (int32_t i = 0; i < argc; i++) { if (janet_indexed_view(argv[i], &data, &len) && len == 2) { /* Write */ - JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT); + JanetChannel *chan = janet_getchannel(data, 0); if (chan->closed) continue; janet_channel_push(chan, data[1], 1); } else { /* Read */ Janet item; - JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT); + JanetChannel *chan = janet_getchannel(argv, i); if (chan->closed) continue; janet_channel_pop(chan, &item, 1); } @@ -803,24 +1100,33 @@ JANET_CORE_FN(cfun_channel_full, "(ev/full channel)", "Check if a channel is full or not.") { janet_fixarity(argc, 1); - JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); - return janet_wrap_boolean(janet_q_count(&channel->items) >= channel->limit); + JanetChannel *channel = janet_getchannel(argv, 0); + janet_chan_lock(channel); + Janet ret = janet_wrap_boolean(janet_q_count(&channel->items) >= channel->limit); + janet_chan_unlock(channel); + return ret; } JANET_CORE_FN(cfun_channel_capacity, "(ev/capacity channel)", "Get the number of items a channel will store before blocking writers.") { janet_fixarity(argc, 1); - JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); - return janet_wrap_integer(channel->limit); + JanetChannel *channel = janet_getchannel(argv, 0); + janet_chan_lock(channel); + Janet ret = janet_wrap_integer(channel->limit); + janet_chan_unlock(channel); + return ret; } JANET_CORE_FN(cfun_channel_count, "(ev/count channel)", "Get the number of items currently waiting in a channel.") { janet_fixarity(argc, 1); - JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); - return janet_wrap_integer(janet_q_count(&channel->items)); + JanetChannel *channel = janet_getchannel(argv, 0); + janet_chan_lock(channel); + Janet ret = janet_wrap_integer(janet_q_count(&channel->items)); + janet_chan_unlock(channel); + return ret; } /* Fisher yates shuffle of arguments to get fairness */ @@ -847,36 +1153,61 @@ JANET_CORE_FN(cfun_channel_new, janet_arity(argc, 0, 1); int32_t limit = janet_optnat(argv, argc, 0, 0); JanetChannel *channel = janet_abstract(&ChannelAT, sizeof(JanetChannel)); - janet_chan_init(channel, limit); + janet_chan_init(channel, limit, 0); return janet_wrap_abstract(channel); } +JANET_CORE_FN(cfun_channel_new_threaded, + "(ev/thread-chan &opt limit)", + "Create a threaded channel. A threaded channel is a channel that can be shared between threads and " + "used to communicate between any number of operating system threads.") { + janet_arity(argc, 0, 1); + int32_t limit = janet_optnat(argv, argc, 0, 0); + JanetChannel *tchan = janet_malloc(sizeof(JanetChannel)); + janet_chan_init(tchan, limit, 1); + JanetChannel **wrap = janet_abstract(&ChannelAT, sizeof(JanetChannel *)); + *wrap = tchan; + Janet ret = janet_wrap_abstract(wrap); + janet_table_put(&janet_vm.channel_map, janet_wrap_pointer(tchan), ret); + return ret; +} + JANET_CORE_FN(cfun_channel_close, "(ev/chan-close chan)", "Close a channel. A closed channel will cause all pending reads and writes to return nil. " "Returns the channel.") { janet_fixarity(argc, 1); - JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); + JanetChannel *channel = janet_getchannel(argv, 0); + janet_chan_lock(channel); if (!channel->closed) { channel->closed = 1; JanetChannelPending writer; while (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) { - if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { - janet_schedule(writer.fiber, janet_wrap_nil()); + if (writer.thread != &janet_vm) { + /* TODO - post message */ } else { - janet_schedule(writer.fiber, make_close_result(channel)); + if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) { + janet_schedule(writer.fiber, janet_wrap_nil()); + } else { + janet_schedule(writer.fiber, make_close_result(channel)); + } } } JanetChannelPending reader; while (!janet_q_pop(&channel->read_pending, &reader, sizeof(reader))) { - if (reader.mode == JANET_CP_MODE_CHOICE_READ) { - janet_schedule(reader.fiber, janet_wrap_nil()); + if (reader.thread != &janet_vm) { + /* TODO - post message */ } else { - janet_schedule(reader.fiber, make_close_result(channel)); + if (reader.mode == JANET_CP_MODE_CHOICE_READ) { + janet_schedule(reader.fiber, janet_wrap_nil()); + } else { + janet_schedule(reader.fiber, make_close_result(channel)); + } } } } - return janet_wrap_abstract(channel); + janet_chan_unlock(channel); + return argv[0]; } static const JanetMethod ev_chanat_methods[] = { @@ -2440,6 +2771,8 @@ void janet_lib_ev(JanetTable *env) { JANET_CORE_REG("ev/select", cfun_channel_choice), JANET_CORE_REG("ev/rselect", cfun_channel_rchoice), JANET_CORE_REG("ev/chan", cfun_channel_new), + JANET_CORE_REG("ev/thread-chan", cfun_channel_new_threaded), + JANET_CORE_REG("ev/chan-close", cfun_channel_close), JANET_CORE_REG("ev/go", cfun_ev_go), JANET_CORE_REG("ev/thread", cfun_ev_thread), JANET_CORE_REG("ev/give-supervisor", cfun_ev_give_supervisor), @@ -2450,7 +2783,6 @@ void janet_lib_ev(JanetTable *env) { JANET_CORE_REG("ev/read", janet_cfun_stream_read), JANET_CORE_REG("ev/chunk", janet_cfun_stream_chunk), JANET_CORE_REG("ev/write", janet_cfun_stream_write), - JANET_CORE_REG("ev/chan-close", cfun_channel_close), JANET_REG_END }; diff --git a/src/core/state.h b/src/core/state.h index b38f4505..6b36b022 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -160,6 +160,7 @@ struct JanetVM { size_t listener_count; size_t listener_cap; size_t extra_listeners; + JanetTable channel_map; /* threaded channel lookup table, no gc */ #ifdef JANET_WINDOWS void **iocp; #elif defined(JANET_EV_EPOLL) diff --git a/src/core/table.c b/src/core/table.c index d3cd43de..2c3aa2b0 100644 --- a/src/core/table.c +++ b/src/core/table.c @@ -67,14 +67,23 @@ static JanetTable *janet_table_init_impl(JanetTable *table, int32_t capacity, in return table; } -/* Initialize a table */ +/* Initialize a table (for use withs scratch memory) */ JanetTable *janet_table_init(JanetTable *table, int32_t capacity) { return janet_table_init_impl(table, capacity, 1); } +/* Initialize a table without using scratch memory */ +JanetTable *janet_table_init_raw(JanetTable *table, int32_t capacity) { + return janet_table_init_impl(table, capacity, 0); +} + /* Deinitialize a table */ void janet_table_deinit(JanetTable *table) { - janet_sfree(table->data); + if (table->gc.flags & JANET_TABLE_FLAG_STACK) { + janet_sfree(table->data); + } else { + janet_free(table->data); + } } /* Create a new table */ diff --git a/src/include/janet.h b/src/include/janet.h index 66322b2b..bf460484 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1343,7 +1343,7 @@ JANET_API void janet_addtimeout(double sec); JANET_API void janet_ev_inc_refcount(void); JANET_API void janet_ev_dec_refcount(void); -/* Get last error from a an IO operation */ +/* Get last error from an IO operation */ JANET_API Janet janet_ev_lasterr(void); /* Async service for calling a function or syscall in a background thread. This is not @@ -1357,6 +1357,7 @@ typedef struct { int tag; int argi; void *argp; + Janet argj; JanetFiber *fiber; } JanetEVGenericMessage; @@ -1586,6 +1587,7 @@ JANET_API const JanetKV *janet_struct_find(JanetStruct st, Janet key); /* Table functions */ JANET_API JanetTable *janet_table(int32_t capacity); JANET_API JanetTable *janet_table_init(JanetTable *table, int32_t capacity); +JANET_API JanetTable *janet_table_init_raw(JanetTable *table, int32_t capacity); JANET_API void janet_table_deinit(JanetTable *table); JANET_API Janet janet_table_get(JanetTable *t, Janet key); JANET_API Janet janet_table_get_ex(JanetTable *t, Janet key, JanetTable **which);