diff --git a/src/core/ev.c b/src/core/ev.c index ecc945ad..5a39e7c5 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -721,7 +721,10 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) /* Channel Methods */ -static Janet cfun_channel_push(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_channel_push, + "(ev/give channel value)", + "Write a value to a channel, suspending the current fiber if the channel is full." + ) { janet_fixarity(argc, 2); JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); if (janet_channel_push(channel, argv[1], 0)) { @@ -730,7 +733,10 @@ static Janet cfun_channel_push(int32_t argc, Janet *argv) { return argv[0]; } -static Janet cfun_channel_pop(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_channel_pop, + "(ev/take channel)", + "Write a value to a channel, suspending the current fiber if the channel is full." + ) { janet_fixarity(argc, 1); JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); Janet item; @@ -740,7 +746,13 @@ static Janet cfun_channel_pop(int32_t argc, Janet *argv) { janet_await(); } -static Janet cfun_channel_choice(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_channel_choice, + "(ev/select & clauses)", + "Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where " + "a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for " + "a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first " + "clauses will take precedence over later clauses." + ) { janet_arity(argc, 1, -1); int32_t len; const Janet *data; @@ -782,19 +794,28 @@ static Janet cfun_channel_choice(int32_t argc, Janet *argv) { janet_await(); } -static Janet cfun_channel_full(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_channel_full, + "(ev/full channel)", + "Check if a channel is full or not." + ) { janet_fixarity(argc, 1); JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); return janet_wrap_boolean(janet_q_count(&channel->items) >= channel->limit); } -static Janet cfun_channel_capacity(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_channel_capacity, + "(ev/capacity channel)", + "Get the number of items a channel will store before blocking writers." + ) { janet_fixarity(argc, 1); JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); return janet_wrap_integer(channel->limit); } -static Janet cfun_channel_count(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_channel_count, + "(ev/count channel)", + "Get the number of items currently waiting in a channel." + ) { janet_fixarity(argc, 1); JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT); return janet_wrap_integer(janet_q_count(&channel->items)); @@ -810,12 +831,19 @@ static void fisher_yates_args(int32_t argc, Janet *argv) { } } -static Janet cfun_channel_rchoice(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_channel_rchoice, + "ev/rselect", + "Similar to ev/select, but will try clauses in a random order for fairness." + ) { fisher_yates_args(argc, argv); return cfun_channel_choice(argc, argv); } -static Janet cfun_channel_new(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_channel_new, + "(ev/chan &opt capacity)", + "Create a new channel. capacity is the number of values to queue before " + "blocking writers, defaults to 0 if not provided. Returns a new channel." + ) { janet_arity(argc, 0, 1); int32_t limit = janet_optnat(argv, argc, 0, 0); JanetChannel *channel = janet_abstract(&ChannelAT, sizeof(JanetChannel)); @@ -2082,7 +2110,14 @@ error: /* C functions */ -static Janet cfun_ev_go(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_ev_go, + "(ev/go fiber &opt value supervisor)", + "Put a fiber on the event loop to be resumed later. Optionally pass " + "a value to resume with, otherwise resumes with nil. Returns the fiber. " + "An optional `core/channel` can be provided as well as a supervisor. When various " + "events occur in the newly scheduled fiber, an event will be pushed to the supervisor. " + "If not provided, the new fiber will inherit the current supervisor." + ) { janet_arity(argc, 1, 3); JanetFiber *fiber = janet_getfiber(argv, 0); Janet value = argc >= 2 ? argv[1] : janet_wrap_nil(); @@ -2134,7 +2169,14 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) { return args; } -static Janet cfun_ev_thread(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_ev_thread, + "(ev/thread fiber &opt value flags)", + "Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` " + "to resume with. " + "Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. " + "If you want to run the thread without waiting for a result, pass the `:n` flag to return nil immediately. " + "Otherwise, returns (a copy of) the final result from the fiber on the new thread." + ) { janet_arity(argc, 1, 3); janet_getfiber(argv, 0); Janet value = argc >= 2 ? argv[1] : janet_wrap_nil(); @@ -2166,7 +2208,12 @@ static Janet cfun_ev_thread(int32_t argc, Janet *argv) { } } -static Janet cfun_ev_give_supervisor(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_ev_give_supervisor, + "(ev/give-supervisor tag & payload)", + "Send a message to the current supervior channel if there is one. The message will be a " + "tuple of all of the arguments combined into a single message, where the first element is tag. " + "By convention, tag should be a keyword indicating the type of message. Returns nil." + ) { janet_arity(argc, 1, -1); JanetChannel *chan = janet_vm.root_fiber->supervisor_channel; if (NULL != chan) { @@ -2188,13 +2235,22 @@ JANET_NO_RETURN void janet_sleep_await(double sec) { janet_await(); } -static Janet cfun_ev_sleep(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_ev_sleep, + "(ev/sleep sec)", + "Suspend the current fiber for sec seconds without blocking the event loop." + ) { janet_fixarity(argc, 1); double sec = janet_getnumber(argv, 0); janet_sleep_await(sec); } -static Janet cfun_ev_deadline(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_ev_deadline, + "(ev/deadline sec &opt tocancel tocheck)", + "Set a deadline for a fiber `tocheck`. If `tocheck` is not finished after `sec` seconds, " + "`tocancel` will be canceled as with `ev/cancel`. " + "If `tocancel` and `tocheck` are not given, they default to `(fiber/root)` and " + "`(fiber/current)` respectively. Returns `tocancel`." + ) { janet_arity(argc, 1, 3); double sec = janet_getnumber(argv, 0); JanetFiber *tocancel = janet_optfiber(argv, argc, 1, janet_vm.root_fiber); @@ -2209,7 +2265,10 @@ static Janet cfun_ev_deadline(int32_t argc, Janet *argv) { return janet_wrap_fiber(tocancel); } -static Janet cfun_ev_cancel(int32_t argc, Janet *argv) { +JANET_CORE_FN(cfun_ev_cancel, + "(ev/cancel fiber err)", + "Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately" + ) { janet_fixarity(argc, 2); JanetFiber *fiber = janet_getfiber(argv, 0); Janet err = argv[1]; @@ -2217,14 +2276,25 @@ static Janet cfun_ev_cancel(int32_t argc, Janet *argv) { return argv[0]; } -Janet janet_cfun_stream_close(int32_t argc, Janet *argv) { +JANET_CORE_FN(janet_cfun_stream_close, + "(ev/close stream)", + "Close a stream. This should be the same as calling (:close stream) for all streams." + ) { janet_fixarity(argc, 1); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); janet_stream_close(stream); return argv[0]; } -Janet janet_cfun_stream_read(int32_t argc, Janet *argv) { +JANET_CORE_FN(janet_cfun_stream_read, + "(ev/read stream n &opt buffer timeout)", + "Read up to n bytes into a buffer asynchronously from a stream. `n` can also be the keyword " + "`:all` to read into the buffer until end of stream. " + "Optionally provide a buffer to write into " + "as well as a timeout in seconds after which to cancel the operation and raise an error. " + "Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an " + "error if there are problems with the IO operation." + ) { janet_arity(argc, 2, 4); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); janet_stream_flags(stream, JANET_STREAM_READABLE); @@ -2241,7 +2311,11 @@ Janet janet_cfun_stream_read(int32_t argc, Janet *argv) { janet_await(); } -Janet janet_cfun_stream_chunk(int32_t argc, Janet *argv) { +JANET_CORE_FN(janet_cfun_stream_chunk, + "(ev/chunk stream n &opt buffer timeout)", + "Same as ev/read, but will not return early if less than n bytes are available. If an end of " + "stream is reached, will also return early with the collected bytes." + ) { janet_arity(argc, 2, 4); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); janet_stream_flags(stream, JANET_STREAM_READABLE); @@ -2253,7 +2327,11 @@ Janet janet_cfun_stream_chunk(int32_t argc, Janet *argv) { janet_await(); } -Janet janet_cfun_stream_write(int32_t argc, Janet *argv) { +JANET_CORE_FN(janet_cfun_stream_write, + "(ev/write stream data &opt timeout)", + "Write data to a stream, suspending the current fiber until the write " + "completes. Takes an optional timeout in seconds, after which will return nil. " + "Returns nil, or raises an error if the write failed.") { janet_arity(argc, 2, 3); JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); janet_stream_flags(stream, JANET_STREAM_WRITABLE); @@ -2269,127 +2347,29 @@ Janet janet_cfun_stream_write(int32_t argc, Janet *argv) { janet_await(); } -static const JanetReg ev_cfuns[] = { - { - "ev/go", cfun_ev_go, - JDOC("(ev/go fiber &opt value supervisor)\n\n" - "Put a fiber on the event loop to be resumed later. Optionally pass " - "a value to resume with, otherwise resumes with nil. Returns the fiber. " - "An optional `core/channel` can be provided as well as a supervisor. When various " - "events occur in the newly scheduled fiber, an event will be pushed to the supervisor. " - "If not provided, the new fiber will inherit the current supervisor.") - }, - { - "ev/thread", cfun_ev_thread, - JDOC("(ev/thread fiber &opt value flags)\n\n" - "Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` " - "to resume with. " - "Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. " - "If you want to run the thread without waiting for a result, pass the `:n` flag to return nil immediately. " - "Otherwise, returns (a copy of) the final result from the fiber on the new thread.") - }, - { - "ev/give-supervisor", cfun_ev_give_supervisor, - JDOC("(ev/give-supervsior tag & payload)\n\n" - "Send a message to the current supervior channel if there is one. The message will be a " - "tuple of all of the arguments combined into a single message, where the first element is tag. " - "By convention, tag should be a keyword indicating the type of message. Returns nil.") - }, - { - "ev/sleep", cfun_ev_sleep, - JDOC("(ev/sleep sec)\n\n" - "Suspend the current fiber for sec seconds without blocking the event loop.") - }, - { - "ev/deadline", cfun_ev_deadline, - JDOC("(ev/deadline sec &opt tocancel tocheck)\n\n" - "Set a deadline for a fiber `tocheck`. If `tocheck` is not finished after `sec` seconds, " - "`tocancel` will be canceled as with `ev/cancel`. " - "If `tocancel` and `tocheck` are not given, they default to `(fiber/root)` and " - "`(fiber/current)` respectively. Returns `tocancel`.") - }, - { - "ev/chan", cfun_channel_new, - JDOC("(ev/chan &opt capacity)\n\n" - "Create a new channel. capacity is the number of values to queue before " - "blocking writers, defaults to 0 if not provided. Returns a new channel.") - }, - { - "ev/give", cfun_channel_push, - JDOC("(ev/give channel value)\n\n" - "Write a value to a channel, suspending the current fiber if the channel is full.") - }, - { - "ev/take", cfun_channel_pop, - JDOC("(ev/take channel)\n\n" - "Read from a channel, suspending the current fiber if no value is available.") - }, - { - "ev/full", cfun_channel_full, - JDOC("(ev/full channel)\n\n" - "Check if a channel is full or not.") - }, - { - "ev/capacity", cfun_channel_capacity, - JDOC("(ev/capacity channel)\n\n" - "Get the number of items a channel will store before blocking writers.") - }, - { - "ev/count", cfun_channel_count, - JDOC("(ev/count channel)\n\n" - "Get the number of items currently waiting in a channel.") - }, - { - "ev/cancel", cfun_ev_cancel, - JDOC("(ev/cancel fiber err)\n\n" - "Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately") - }, - { - "ev/select", cfun_channel_choice, - JDOC("(ev/select & clauses)\n\n" - "Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where " - "a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for " - "a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first " - "clauses will take precedence over later clauses.") - }, - { - "ev/rselect", cfun_channel_rchoice, - JDOC("(ev/rselect & clauses)\n\n" - "Similar to ev/select, but will try clauses in a random order for fairness.") - }, - { - "ev/close", janet_cfun_stream_close, - JDOC("(ev/close stream)\n\n" - "Close a stream. This should be the same as calling (:close stream) for all streams.") - }, - { - "ev/read", janet_cfun_stream_read, - JDOC("(ev/read stream n &opt buffer timeout)\n\n" - "Read up to n bytes into a buffer asynchronously from a stream. `n` can also be the keyword " - "`:all` to read into the buffer until end of stream. " - "Optionally provide a buffer to write into " - "as well as a timeout in seconds after which to cancel the operation and raise an error. " - "Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an " - "error if there are problems with the IO operation.") - }, - { - "ev/chunk", janet_cfun_stream_chunk, - JDOC("(ev/chunk stream n &opt buffer timeout)\n\n" - "Same as ev/read, but will not return early if less than n bytes are available. If an end of " - "stream is reached, will also return early with the collected bytes.") - }, - { - "ev/write", janet_cfun_stream_write, - JDOC("(ev/write stream data &opt timeout)\n\n" - "Write data to a stream, suspending the current fiber until the write " - "completes. Takes an optional timeout in seconds, after which will return nil. " - "Returns nil, or raises an error if the write failed.") - }, - {NULL, NULL, NULL} -}; - void janet_lib_ev(JanetTable *env) { - janet_core_cfuns(env, NULL, ev_cfuns); + JanetRegExt ev_cfuns_ext[] = { + JANET_CORE_REG("ev/give", cfun_channel_push), + JANET_CORE_REG("ev/take", cfun_channel_pop), + JANET_CORE_REG("ev/full", cfun_channel_full), + JANET_CORE_REG("ev/capacity", cfun_channel_capacity), + JANET_CORE_REG("ev/count", cfun_channel_count), + JANET_CORE_REG("ev/select", cfun_channel_choice), + JANET_CORE_REG("ev/chan", cfun_channel_new), + JANET_CORE_REG("ev/go", cfun_ev_go), + JANET_CORE_REG("ev/thread", cfun_ev_thread), + JANET_CORE_REG("ev/give-supervisor", cfun_ev_give_supervisor), + JANET_CORE_REG("ev/sleep", cfun_ev_sleep), + JANET_CORE_REG("ev/deadline", cfun_ev_deadline), + JANET_CORE_REG("ev/cancel", cfun_ev_cancel), + JANET_CORE_REG("ev/close", janet_cfun_stream_close), + JANET_CORE_REG("ev/read", janet_cfun_stream_read), + JANET_CORE_REG("ev/chunk", janet_cfun_stream_chunk), + JANET_CORE_REG("ev/write", janet_cfun_stream_write), + JANET_REG_END + }; + + janet_core_cfuns_ext(env, NULL, ev_cfuns_ext); janet_register_abstract_type(&janet_stream_type); }