Fix linux issues with epoll on normal files.

We use the selfpipe trick if epoll fails with EPERM when trying to
register a file descriptor.
This commit is contained in:
Calvin Rose 2021-07-25 21:47:52 -05:00
parent 030dd747e9
commit eb84200f28
4 changed files with 95 additions and 60 deletions

View File

@ -943,11 +943,11 @@ Janet janet_disasm(JanetFuncDef *def) {
}
JANET_CORE_FN(cfun_asm,
"(asm assembly)",
"Returns a new function that is the compiled result of the assembly.\n"
"The syntax for the assembly can be found on the Janet website, and should correspond\n"
"to the return value of disasm. Will throw an\n"
"error on invalid assembly.") {
"(asm assembly)",
"Returns a new function that is the compiled result of the assembly.\n"
"The syntax for the assembly can be found on the Janet website, and should correspond\n"
"to the return value of disasm. Will throw an\n"
"error on invalid assembly.") {
janet_fixarity(argc, 1);
JanetAssembleResult res;
res = janet_asm(argv[0], 0);
@ -958,23 +958,23 @@ JANET_CORE_FN(cfun_asm,
}
JANET_CORE_FN(cfun_disasm,
"(disasm func &opt field)",
"Returns assembly that could be used to compile the given function. "
"func must be a function, not a c function. Will throw on error on a badly "
"typed argument. If given a field name, will only return that part of the function assembly. "
"Possible fields are:\n\n"
"* :arity - number of required and optional arguments.\n"
"* :min-arity - minimum number of arguments function can be called with.\n"
"* :max-arity - maximum number of arguments function can be called with.\n"
"* :vararg - true if function can take a variable number of arguments.\n"
"* :bytecode - array of parsed bytecode instructions. Each instruction is a tuple.\n"
"* :source - name of source file that this function was compiled from.\n"
"* :name - name of function.\n"
"* :slotcount - how many virtual registers, or slots, this function uses. Corresponds to stack space used by function.\n"
"* :constants - an array of constants referenced by this function.\n"
"* :sourcemap - a mapping of each bytecode instruction to a line and column in the source file.\n"
"* :environments - an internal mapping of which enclosing functions are referenced for bindings.\n"
"* :defs - other function definitions that this function may instantiate.\n") {
"(disasm func &opt field)",
"Returns assembly that could be used to compile the given function. "
"func must be a function, not a c function. Will throw on error on a badly "
"typed argument. If given a field name, will only return that part of the function assembly. "
"Possible fields are:\n\n"
"* :arity - number of required and optional arguments.\n"
"* :min-arity - minimum number of arguments function can be called with.\n"
"* :max-arity - maximum number of arguments function can be called with.\n"
"* :vararg - true if function can take a variable number of arguments.\n"
"* :bytecode - array of parsed bytecode instructions. Each instruction is a tuple.\n"
"* :source - name of source file that this function was compiled from.\n"
"* :name - name of function.\n"
"* :slotcount - how many virtual registers, or slots, this function uses. Corresponds to stack space used by function.\n"
"* :constants - an array of constants referenced by this function.\n"
"* :sourcemap - a mapping of each bytecode instruction to a line and column in the source file.\n"
"* :environments - an internal mapping of which enclosing functions are referenced for bindings.\n"
"* :defs - other function definitions that this function may instantiate.\n") {
janet_arity(argc, 1, 2);
JanetFunction *f = janet_getfunction(argv, 0);
if (argc == 2) {

View File

@ -851,9 +851,9 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout);
int janet_loop_done(void) {
return !(janet_vm.listener_count ||
(janet_vm.spawn.head != janet_vm.spawn.tail) ||
janet_vm.tq_count ||
janet_vm.extra_listeners);
(janet_vm.spawn.head != janet_vm.spawn.tail) ||
janet_vm.tq_count ||
janet_vm.extra_listeners);
}
JanetFiber *janet_loop1(void) {
@ -1082,6 +1082,23 @@ static int make_epoll_events(int mask) {
return events;
}
static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
JanetListenerState *state = msg.argp;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE)
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE)
status2 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0);
} else {
/* Repost event */
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
}
}
/* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
int is_first = !(stream->state);
@ -1095,8 +1112,22 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
status = epoll_ctl(janet_vm.epoll, op, stream->handle, &ev);
} while (status == -1 && errno == EINTR);
if (status == -1) {
janet_unlisten_impl(state, 0);
janet_panicv(janet_ev_lasterr());
if (errno == EPERM) {
/* Couldn't add to event loop, so assume that it completes
* synchronously. In that case, fire the completion
* event manually, since this should be a read or write
* event to a file. So we just post a custom event to do the read/write
* asap. */
/* Use flag to indicate state is not registered in epoll */
state->_mask |= (1 << JANET_ASYNC_EVENT_COMPLETE);
JanetEVGenericMessage msg = {0};
msg.argp = state;
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
} else {
/* Unexpected error */
janet_unlisten_impl(state, 0);
janet_panicv(janet_ev_lasterr());
}
}
return state;
}
@ -1105,17 +1136,20 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
static void janet_unlisten(JanetListenerState *state, int is_gc) {
JanetStream *stream = state->stream;
if (!(stream->flags & JANET_STREAM_CLOSED)) {
int is_last = (state->_next == NULL && stream->state == state);
int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
struct epoll_event ev;
ev.events = make_epoll_events(stream->_mask & ~state->_mask);
ev.data.ptr = stream;
int status;
do {
status = epoll_ctl(janet_vm.epoll, op, stream->handle, &ev);
} while (status == -1 && errno == EINTR);
if (status == -1) {
janet_panicv(janet_ev_lasterr());
/* Use flag to indicate state is not registered in epoll */
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) {
int is_last = (state->_next == NULL && stream->state == state);
int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
struct epoll_event ev;
ev.events = make_epoll_events(stream->_mask & ~state->_mask);
ev.data.ptr = stream;
int status;
do {
status = epoll_ctl(janet_vm.epoll, op, stream->handle, &ev);
} while (status == -1 && errno == EINTR);
if (status == -1) {
janet_panicv(janet_ev_lasterr());
}
}
}
/* Destroy state machine and free memory */
@ -1350,15 +1384,15 @@ void janet_ev_post_event(JanetVM *vm, JanetCallback cb, JanetEVGenericMessage ms
event->msg = msg;
event->cb = cb;
janet_assert(PostQueuedCompletionStatus(iocp,
sizeof(JanetSelfPipeEvent),
0,
(LPOVERLAPPED) event),
"failed to post completion event");
sizeof(JanetSelfPipeEvent),
0,
(LPOVERLAPPED) event),
"failed to post completion event");
#else
JanetSelfPipeEvent event;
event.msg = msg;
event.cb = cb;
int fd = vm->selfpipe;
int fd = vm->selfpipe[1];
/* handle a bit of back pressure before giving up. */
int tries = 4;
while (tries > 0) {

View File

@ -1542,10 +1542,10 @@ static JanetPeg *compile_peg(Janet x) {
*/
JANET_CORE_FN(cfun_peg_compile,
"(peg/compile peg)",
"Compiles a peg source data structure into a <core/peg>. This will speed up matching "
"if the same peg will be used multiple times. Will also use `(dyn :peg-grammar)` to suppliment "
"the grammar of the peg for otherwise undefined peg keywords.") {
"(peg/compile peg)",
"Compiles a peg source data structure into a <core/peg>. This will speed up matching "
"if the same peg will be used multiple times. Will also use `(dyn :peg-grammar)` to suppliment "
"the grammar of the peg for otherwise undefined peg keywords.") {
janet_fixarity(argc, 1);
JanetPeg *peg = compile_peg(argv[0]);
return janet_wrap_abstract(peg);
@ -1609,17 +1609,17 @@ static void peg_call_reset(PegCall *c) {
}
JANET_CORE_FN(cfun_peg_match,
"(peg/match peg text &opt start & args)",
"Match a Parsing Expression Grammar to a byte string and return an array of captured values. "
"Returns nil if text does not match the language defined by peg. The syntax of PEGs is documented on the Janet website.") {
"(peg/match peg text &opt start & args)",
"Match a Parsing Expression Grammar to a byte string and return an array of captured values. "
"Returns nil if text does not match the language defined by peg. The syntax of PEGs is documented on the Janet website.") {
PegCall c = peg_cfun_init(argc, argv, 0);
const uint8_t *result = peg_rule(&c.s, c.s.bytecode, c.bytes.bytes + c.start);
return result ? janet_wrap_array(c.s.captures) : janet_wrap_nil();
}
JANET_CORE_FN(cfun_peg_find,
"(peg/find peg text &opt start & args)",
"Find first index where the peg matches in text. Returns an integer, or nil if not found.") {
"(peg/find peg text &opt start & args)",
"Find first index where the peg matches in text. Returns an integer, or nil if not found.") {
PegCall c = peg_cfun_init(argc, argv, 0);
for (int32_t i = c.start; i < c.bytes.len; i++) {
peg_call_reset(&c);
@ -1630,8 +1630,8 @@ JANET_CORE_FN(cfun_peg_find,
}
JANET_CORE_FN(cfun_peg_find_all,
"(peg/find-all peg text &opt start & args)",
"Find all indexes where the peg matches in text. Returns an array of integers.") {
"(peg/find-all peg text &opt start & args)",
"Find all indexes where the peg matches in text. Returns an array of integers.") {
PegCall c = peg_cfun_init(argc, argv, 0);
JanetArray *ret = janet_array(0);
for (int32_t i = c.start; i < c.bytes.len; i++) {
@ -1671,15 +1671,15 @@ static Janet cfun_peg_replace_generic(int32_t argc, Janet *argv, int only_one) {
}
JANET_CORE_FN(cfun_peg_replace_all,
"(peg/replace-all peg repl text &opt start & args)",
"Replace all matches of peg in text with repl, returning a new buffer. The peg does not need to make captures to do replacement.") {
"(peg/replace-all peg repl text &opt start & args)",
"Replace all matches of peg in text with repl, returning a new buffer. The peg does not need to make captures to do replacement.") {
return cfun_peg_replace_generic(argc, argv, 0);
}
JANET_CORE_FN(cfun_peg_replace,
"(peg/replace peg repl text &opt start & args)",
"Replace first match of peg in text with repl, returning a new buffer. The peg does not need to make captures to do replacement. "
"If no matches are found, returns the input string in a new buffer.") {
"(peg/replace peg repl text &opt start & args)",
"Replace first match of peg in text with repl, returning a new buffer. The peg does not need to make captures to do replacement. "
"If no matches are found, returns the input string in a new buffer.") {
return cfun_peg_replace_generic(argc, argv, 1);
}

View File

@ -553,7 +553,8 @@ void janet_core_cfuns(JanetTable *env, const char *regprefix, const JanetReg *cf
}
void janet_core_def_sm(JanetTable *env, const char *name, Janet x, const void *p, const void *sf, int32_t sl) {
(void) sf, sl;
(void) sf;
(void) sl;
janet_core_def(env, name, x, p);
}