Update ev.c to use the new binding style that provides source information

This commit is contained in:
Andrew Owen 2021-07-25 23:25:38 -06:00
parent 030dd747e9
commit 714ba808dd
1 changed files with 118 additions and 138 deletions

View File

@ -721,7 +721,10 @@ static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice)
/* Channel Methods */
static Janet cfun_channel_push(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_channel_push,
"(ev/give channel value)",
"Write a value to a channel, suspending the current fiber if the channel is full."
) {
janet_fixarity(argc, 2);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
if (janet_channel_push(channel, argv[1], 0)) {
@ -730,7 +733,10 @@ static Janet cfun_channel_push(int32_t argc, Janet *argv) {
return argv[0];
}
static Janet cfun_channel_pop(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_channel_pop,
"(ev/take channel)",
"Write a value to a channel, suspending the current fiber if the channel is full."
) {
janet_fixarity(argc, 1);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
Janet item;
@ -740,7 +746,13 @@ static Janet cfun_channel_pop(int32_t argc, Janet *argv) {
janet_await();
}
static Janet cfun_channel_choice(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_channel_choice,
"(ev/select & clauses)",
"Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where "
"a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for "
"a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first "
"clauses will take precedence over later clauses."
) {
janet_arity(argc, 1, -1);
int32_t len;
const Janet *data;
@ -782,19 +794,28 @@ static Janet cfun_channel_choice(int32_t argc, Janet *argv) {
janet_await();
}
static Janet cfun_channel_full(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_channel_full,
"(ev/full channel)",
"Check if a channel is full or not."
) {
janet_fixarity(argc, 1);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
return janet_wrap_boolean(janet_q_count(&channel->items) >= channel->limit);
}
static Janet cfun_channel_capacity(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_channel_capacity,
"(ev/capacity channel)",
"Get the number of items a channel will store before blocking writers."
) {
janet_fixarity(argc, 1);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
return janet_wrap_integer(channel->limit);
}
static Janet cfun_channel_count(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_channel_count,
"(ev/count channel)",
"Get the number of items currently waiting in a channel."
) {
janet_fixarity(argc, 1);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
return janet_wrap_integer(janet_q_count(&channel->items));
@ -810,12 +831,19 @@ static void fisher_yates_args(int32_t argc, Janet *argv) {
}
}
static Janet cfun_channel_rchoice(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_channel_rchoice,
"ev/rselect",
"Similar to ev/select, but will try clauses in a random order for fairness."
) {
fisher_yates_args(argc, argv);
return cfun_channel_choice(argc, argv);
}
static Janet cfun_channel_new(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_channel_new,
"(ev/chan &opt capacity)",
"Create a new channel. capacity is the number of values to queue before "
"blocking writers, defaults to 0 if not provided. Returns a new channel."
) {
janet_arity(argc, 0, 1);
int32_t limit = janet_optnat(argv, argc, 0, 0);
JanetChannel *channel = janet_abstract(&ChannelAT, sizeof(JanetChannel));
@ -2082,7 +2110,14 @@ error:
/* C functions */
static Janet cfun_ev_go(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_ev_go,
"(ev/go fiber &opt value supervisor)",
"Put a fiber on the event loop to be resumed later. Optionally pass "
"a value to resume with, otherwise resumes with nil. Returns the fiber. "
"An optional `core/channel` can be provided as well as a supervisor. When various "
"events occur in the newly scheduled fiber, an event will be pushed to the supervisor. "
"If not provided, the new fiber will inherit the current supervisor."
) {
janet_arity(argc, 1, 3);
JanetFiber *fiber = janet_getfiber(argv, 0);
Janet value = argc >= 2 ? argv[1] : janet_wrap_nil();
@ -2134,7 +2169,14 @@ static JanetEVGenericMessage janet_go_thread_subr(JanetEVGenericMessage args) {
return args;
}
static Janet cfun_ev_thread(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_ev_thread,
"(ev/thread fiber &opt value flags)",
"Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` "
"to resume with. "
"Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. "
"If you want to run the thread without waiting for a result, pass the `:n` flag to return nil immediately. "
"Otherwise, returns (a copy of) the final result from the fiber on the new thread."
) {
janet_arity(argc, 1, 3);
janet_getfiber(argv, 0);
Janet value = argc >= 2 ? argv[1] : janet_wrap_nil();
@ -2166,7 +2208,12 @@ static Janet cfun_ev_thread(int32_t argc, Janet *argv) {
}
}
static Janet cfun_ev_give_supervisor(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_ev_give_supervisor,
"(ev/give-supervisor tag & payload)",
"Send a message to the current supervior channel if there is one. The message will be a "
"tuple of all of the arguments combined into a single message, where the first element is tag. "
"By convention, tag should be a keyword indicating the type of message. Returns nil."
) {
janet_arity(argc, 1, -1);
JanetChannel *chan = janet_vm.root_fiber->supervisor_channel;
if (NULL != chan) {
@ -2188,13 +2235,22 @@ JANET_NO_RETURN void janet_sleep_await(double sec) {
janet_await();
}
static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_ev_sleep,
"(ev/sleep sec)",
"Suspend the current fiber for sec seconds without blocking the event loop."
) {
janet_fixarity(argc, 1);
double sec = janet_getnumber(argv, 0);
janet_sleep_await(sec);
}
static Janet cfun_ev_deadline(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_ev_deadline,
"(ev/deadline sec &opt tocancel tocheck)",
"Set a deadline for a fiber `tocheck`. If `tocheck` is not finished after `sec` seconds, "
"`tocancel` will be canceled as with `ev/cancel`. "
"If `tocancel` and `tocheck` are not given, they default to `(fiber/root)` and "
"`(fiber/current)` respectively. Returns `tocancel`."
) {
janet_arity(argc, 1, 3);
double sec = janet_getnumber(argv, 0);
JanetFiber *tocancel = janet_optfiber(argv, argc, 1, janet_vm.root_fiber);
@ -2209,7 +2265,10 @@ static Janet cfun_ev_deadline(int32_t argc, Janet *argv) {
return janet_wrap_fiber(tocancel);
}
static Janet cfun_ev_cancel(int32_t argc, Janet *argv) {
JANET_CORE_FN(cfun_ev_cancel,
"(ev/cancel fiber err)",
"Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately"
) {
janet_fixarity(argc, 2);
JanetFiber *fiber = janet_getfiber(argv, 0);
Janet err = argv[1];
@ -2217,14 +2276,25 @@ static Janet cfun_ev_cancel(int32_t argc, Janet *argv) {
return argv[0];
}
Janet janet_cfun_stream_close(int32_t argc, Janet *argv) {
JANET_CORE_FN(janet_cfun_stream_close,
"(ev/close stream)",
"Close a stream. This should be the same as calling (:close stream) for all streams."
) {
janet_fixarity(argc, 1);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_close(stream);
return argv[0];
}
Janet janet_cfun_stream_read(int32_t argc, Janet *argv) {
JANET_CORE_FN(janet_cfun_stream_read,
"(ev/read stream n &opt buffer timeout)",
"Read up to n bytes into a buffer asynchronously from a stream. `n` can also be the keyword "
"`:all` to read into the buffer until end of stream. "
"Optionally provide a buffer to write into "
"as well as a timeout in seconds after which to cancel the operation and raise an error. "
"Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an "
"error if there are problems with the IO operation."
) {
janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_READABLE);
@ -2241,7 +2311,11 @@ Janet janet_cfun_stream_read(int32_t argc, Janet *argv) {
janet_await();
}
Janet janet_cfun_stream_chunk(int32_t argc, Janet *argv) {
JANET_CORE_FN(janet_cfun_stream_chunk,
"(ev/chunk stream n &opt buffer timeout)",
"Same as ev/read, but will not return early if less than n bytes are available. If an end of "
"stream is reached, will also return early with the collected bytes."
) {
janet_arity(argc, 2, 4);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_READABLE);
@ -2253,7 +2327,11 @@ Janet janet_cfun_stream_chunk(int32_t argc, Janet *argv) {
janet_await();
}
Janet janet_cfun_stream_write(int32_t argc, Janet *argv) {
JANET_CORE_FN(janet_cfun_stream_write,
"(ev/write stream data &opt timeout)",
"Write data to a stream, suspending the current fiber until the write "
"completes. Takes an optional timeout in seconds, after which will return nil. "
"Returns nil, or raises an error if the write failed.") {
janet_arity(argc, 2, 3);
JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type);
janet_stream_flags(stream, JANET_STREAM_WRITABLE);
@ -2269,127 +2347,29 @@ Janet janet_cfun_stream_write(int32_t argc, Janet *argv) {
janet_await();
}
static const JanetReg ev_cfuns[] = {
{
"ev/go", cfun_ev_go,
JDOC("(ev/go fiber &opt value supervisor)\n\n"
"Put a fiber on the event loop to be resumed later. Optionally pass "
"a value to resume with, otherwise resumes with nil. Returns the fiber. "
"An optional `core/channel` can be provided as well as a supervisor. When various "
"events occur in the newly scheduled fiber, an event will be pushed to the supervisor. "
"If not provided, the new fiber will inherit the current supervisor.")
},
{
"ev/thread", cfun_ev_thread,
JDOC("(ev/thread fiber &opt value flags)\n\n"
"Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` "
"to resume with. "
"Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. "
"If you want to run the thread without waiting for a result, pass the `:n` flag to return nil immediately. "
"Otherwise, returns (a copy of) the final result from the fiber on the new thread.")
},
{
"ev/give-supervisor", cfun_ev_give_supervisor,
JDOC("(ev/give-supervsior tag & payload)\n\n"
"Send a message to the current supervior channel if there is one. The message will be a "
"tuple of all of the arguments combined into a single message, where the first element is tag. "
"By convention, tag should be a keyword indicating the type of message. Returns nil.")
},
{
"ev/sleep", cfun_ev_sleep,
JDOC("(ev/sleep sec)\n\n"
"Suspend the current fiber for sec seconds without blocking the event loop.")
},
{
"ev/deadline", cfun_ev_deadline,
JDOC("(ev/deadline sec &opt tocancel tocheck)\n\n"
"Set a deadline for a fiber `tocheck`. If `tocheck` is not finished after `sec` seconds, "
"`tocancel` will be canceled as with `ev/cancel`. "
"If `tocancel` and `tocheck` are not given, they default to `(fiber/root)` and "
"`(fiber/current)` respectively. Returns `tocancel`.")
},
{
"ev/chan", cfun_channel_new,
JDOC("(ev/chan &opt capacity)\n\n"
"Create a new channel. capacity is the number of values to queue before "
"blocking writers, defaults to 0 if not provided. Returns a new channel.")
},
{
"ev/give", cfun_channel_push,
JDOC("(ev/give channel value)\n\n"
"Write a value to a channel, suspending the current fiber if the channel is full.")
},
{
"ev/take", cfun_channel_pop,
JDOC("(ev/take channel)\n\n"
"Read from a channel, suspending the current fiber if no value is available.")
},
{
"ev/full", cfun_channel_full,
JDOC("(ev/full channel)\n\n"
"Check if a channel is full or not.")
},
{
"ev/capacity", cfun_channel_capacity,
JDOC("(ev/capacity channel)\n\n"
"Get the number of items a channel will store before blocking writers.")
},
{
"ev/count", cfun_channel_count,
JDOC("(ev/count channel)\n\n"
"Get the number of items currently waiting in a channel.")
},
{
"ev/cancel", cfun_ev_cancel,
JDOC("(ev/cancel fiber err)\n\n"
"Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately")
},
{
"ev/select", cfun_channel_choice,
JDOC("(ev/select & clauses)\n\n"
"Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where "
"a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for "
"a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first "
"clauses will take precedence over later clauses.")
},
{
"ev/rselect", cfun_channel_rchoice,
JDOC("(ev/rselect & clauses)\n\n"
"Similar to ev/select, but will try clauses in a random order for fairness.")
},
{
"ev/close", janet_cfun_stream_close,
JDOC("(ev/close stream)\n\n"
"Close a stream. This should be the same as calling (:close stream) for all streams.")
},
{
"ev/read", janet_cfun_stream_read,
JDOC("(ev/read stream n &opt buffer timeout)\n\n"
"Read up to n bytes into a buffer asynchronously from a stream. `n` can also be the keyword "
"`:all` to read into the buffer until end of stream. "
"Optionally provide a buffer to write into "
"as well as a timeout in seconds after which to cancel the operation and raise an error. "
"Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an "
"error if there are problems with the IO operation.")
},
{
"ev/chunk", janet_cfun_stream_chunk,
JDOC("(ev/chunk stream n &opt buffer timeout)\n\n"
"Same as ev/read, but will not return early if less than n bytes are available. If an end of "
"stream is reached, will also return early with the collected bytes.")
},
{
"ev/write", janet_cfun_stream_write,
JDOC("(ev/write stream data &opt timeout)\n\n"
"Write data to a stream, suspending the current fiber until the write "
"completes. Takes an optional timeout in seconds, after which will return nil. "
"Returns nil, or raises an error if the write failed.")
},
{NULL, NULL, NULL}
};
void janet_lib_ev(JanetTable *env) {
janet_core_cfuns(env, NULL, ev_cfuns);
JanetRegExt ev_cfuns_ext[] = {
JANET_CORE_REG("ev/give", cfun_channel_push),
JANET_CORE_REG("ev/take", cfun_channel_pop),
JANET_CORE_REG("ev/full", cfun_channel_full),
JANET_CORE_REG("ev/capacity", cfun_channel_capacity),
JANET_CORE_REG("ev/count", cfun_channel_count),
JANET_CORE_REG("ev/select", cfun_channel_choice),
JANET_CORE_REG("ev/chan", cfun_channel_new),
JANET_CORE_REG("ev/go", cfun_ev_go),
JANET_CORE_REG("ev/thread", cfun_ev_thread),
JANET_CORE_REG("ev/give-supervisor", cfun_ev_give_supervisor),
JANET_CORE_REG("ev/sleep", cfun_ev_sleep),
JANET_CORE_REG("ev/deadline", cfun_ev_deadline),
JANET_CORE_REG("ev/cancel", cfun_ev_cancel),
JANET_CORE_REG("ev/close", janet_cfun_stream_close),
JANET_CORE_REG("ev/read", janet_cfun_stream_read),
JANET_CORE_REG("ev/chunk", janet_cfun_stream_chunk),
JANET_CORE_REG("ev/write", janet_cfun_stream_write),
JANET_REG_END
};
janet_core_cfuns_ext(env, NULL, ev_cfuns_ext);
janet_register_abstract_type(&janet_stream_type);
}