From 33f55dc32f26ba872e5b2ee34c43c9eb9f9e35f4 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Sun, 18 Aug 2024 05:29:08 -0700 Subject: [PATCH] Go back to ReadDirectoryChangesExW since it is better. --- src/core/capi.c | 28 +++++++++++ src/core/ev.c | 7 ++- src/core/filewatch.c | 109 ++++++++++++++++++++++++++++++++--------- src/core/util.h | 1 + src/include/janet.h | 5 +- src/mainclient/shell.c | 1 + 6 files changed, 124 insertions(+), 27 deletions(-) diff --git a/src/core/capi.c b/src/core/capi.c index 9dd5d29d..959ea985 100644 --- a/src/core/capi.c +++ b/src/core/capi.c @@ -25,6 +25,7 @@ #include #include "state.h" #include "fiber.h" +#include "util.h" #endif #ifndef JANET_SINGLE_THREADED @@ -463,6 +464,33 @@ void janet_setdyn(const char *name, Janet value) { } } +/* Create a function that when called, returns X. Trivial in Janet, a pain in C. */ +JanetFunction *janet_thunk_delay(Janet x) { + static const uint32_t bytecode[] = { + JOP_LOAD_CONSTANT, + JOP_RETURN + }; + JanetFuncDef *def = janet_funcdef_alloc(); + def->arity = 0; + def->min_arity = 0; + def->max_arity = INT32_MAX; + def->flags = JANET_FUNCDEF_FLAG_VARARG; + def->slotcount = 1; + def->bytecode = janet_malloc(sizeof(bytecode)); + def->bytecode_length = (int32_t)(sizeof(bytecode) / sizeof(uint32_t)); + def->constants = janet_malloc(sizeof(Janet)); + def->constants_length = 1; + def->name = NULL; + if (!def->bytecode || !def->constants) { + JANET_OUT_OF_MEMORY; + } + def->constants[0] = x; + memcpy(def->bytecode, bytecode, sizeof(bytecode)); + janet_def_addflags(def); + /* janet_verify(def); */ + return janet_thunk(def); +} + uint64_t janet_getflags(const Janet *argv, int32_t n, const char *flags) { uint64_t ret = 0; const uint8_t *keyw = janet_getkeyword(argv, n); diff --git a/src/core/ev.c b/src/core/ev.c index 452b2ff1..1ee0b207 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -276,8 +276,7 @@ void janet_async_in_flight(JanetFiber *fiber) { #endif } -void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state) { - JanetFiber *fiber = janet_vm.root_fiber; +void janet_async_start_fiber(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state) { janet_assert(!fiber->ev_callback, "double async on fiber"); if (mode & JANET_ASYNC_LISTEN_READ) { stream->read_fiber = fiber; @@ -291,6 +290,10 @@ void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback janet_gcroot(janet_wrap_abstract(stream)); fiber->ev_state = state; callback(fiber, JANET_ASYNC_EVENT_INIT); +} + +void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state) { + janet_async_start_fiber(janet_vm.root_fiber, stream, mode, callback, state); janet_await(); } diff --git a/src/core/filewatch.c b/src/core/filewatch.c index a556527b..c08e1684 100644 --- a/src/core/filewatch.c +++ b/src/core/filewatch.c @@ -252,12 +252,14 @@ static void watcher_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { static void janet_watcher_listen(JanetWatcher *watcher) { if (watcher->is_watching) janet_panic("already watching"); watcher->is_watching = 1; - janet_async_start(watcher->stream, JANET_ASYNC_LISTEN_READ, watcher_callback_read, watcher); + JanetFunction *thunk = janet_thunk_delay(janet_wrap_nil()); + JanetFiber *fiber = janet_fiber(thunk, 64, 0, NULL); + janet_async_start_fiber(fiber, watcher->stream, JANET_ASYNC_LISTEN_READ, watcher_callback_read, watcher); } #elif JANET_WINDOWS -static const uint32_t WATCHFLAG_RECURSIVE = 0x100000; +#define WATCHFLAG_RECURSIVE 0x100000u static const JanetWatchFlagName watcher_flags_windows[] = { {"attributes", FILE_NOTIFY_CHANGE_ATTRIBUTES}, @@ -266,9 +268,9 @@ static const JanetWatchFlagName watcher_flags_windows[] = { {"file-name", FILE_NOTIFY_CHANGE_FILE_NAME}, {"last-access", FILE_NOTIFY_CHANGE_LAST_ACCESS}, {"last-write", FILE_NOTIFY_CHANGE_LAST_WRITE}, + {"recursive", WATCHFLAG_RECURSIVE}, {"security", FILE_NOTIFY_CHANGE_SECURITY}, {"size", FILE_NOTIFY_CHANGE_SIZE}, - {"recursive", WATCHFLAG_RECURSIVE}, }; static uint32_t decode_watch_flags(const Janet *options, int32_t n) { @@ -294,6 +296,7 @@ static void janet_watcher_init(JanetWatcher *watcher, JanetChannel *channel, uin janet_table_init_raw(&watcher->watch_descriptors, 0); watcher->channel = channel; watcher->default_flags = default_flags; + watcher->is_watching = 0; } /* Since the file info padding includes embedded file names, we want to include more space for data. @@ -305,19 +308,22 @@ typedef struct { OVERLAPPED overlapped; JanetStream *stream; JanetWatcher *watcher; + JanetFiber *fiber; + JanetString dir_path; uint32_t flags; uint64_t buf[FILE_INFO_PADDING / sizeof(uint64_t)]; /* Ensure alignment */ } OverlappedWatch; static void read_dir_changes(OverlappedWatch *ow) { - BOOL result = ReadDirectoryChangesW(ow->stream->handle, - (FILE_NOTIFY_INFORMATION *) ow->buf, + BOOL result = ReadDirectoryChangesExW(ow->stream->handle, + (FILE_NOTIFY_EXTENDED_INFORMATION *) ow->buf, FILE_INFO_PADDING, (ow->flags & WATCHFLAG_RECURSIVE) ? TRUE : FALSE, ow->flags & ~WATCHFLAG_RECURSIVE, NULL, (OVERLAPPED *) ow, - NULL); + NULL, + ReadDirectoryNotifyExtendedInformation); if (!result) { janet_panicv(janet_ev_lasterr()); } @@ -338,21 +344,30 @@ static void watcher_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { switch (event) { default: break; + case JANET_ASYNC_EVENT_INIT: + janet_async_in_flight(fiber); + break; case JANET_ASYNC_EVENT_MARK: janet_mark(janet_wrap_abstract(ow->stream)); + janet_mark(janet_wrap_fiber(ow->fiber)); janet_mark(janet_wrap_abstract(watcher)); + janet_mark(janet_wrap_string(ow->dir_path)); break; case JANET_ASYNC_EVENT_CLOSE: - janet_schedule(fiber, janet_wrap_nil()); - janet_async_end(fiber); + janet_table_remove(&ow->watcher->watch_descriptors, janet_wrap_string(ow->dir_path)); break; case JANET_ASYNC_EVENT_ERR: - janet_schedule(fiber, janet_wrap_nil()); - janet_async_end(fiber); + case JANET_ASYNC_EVENT_FAILED: + janet_stream_close(ow->stream); break; case JANET_ASYNC_EVENT_COMPLETE: { - FILE_NOTIFY_INFORMATION *fni = (FILE_NOTIFY_INFORMATION *) ow->buf; + if (!watcher->is_watching) { + janet_stream_close(ow->stream); + break; + } + + FILE_NOTIFY_EXTENDED_INFORMATION *fni = (FILE_NOTIFY_EXTENDED_INFORMATION *) ow->buf; while (1) { /* Got an event */ @@ -362,28 +377,40 @@ static void watcher_callback_read(JanetFiber *fiber, JanetAsyncEvent event) { int32_t nbytes = (int32_t) WideCharToMultiByte(CP_UTF8, WC_SEPCHARS, fni->FileName, fni->FileNameLength / 2, tempbuf, sizeof(tempbuf), NULL, NULL); JanetString filename = janet_string(tempbuf, nbytes); - JanetKV *event = janet_struct_begin(2); + JanetKV *event = janet_struct_begin(3); janet_struct_put(event, janet_ckeywordv("action"), janet_ckeywordv(watcher_actions_windows[fni->Action])); janet_struct_put(event, janet_ckeywordv("file-name"), janet_wrap_string(filename)); + janet_struct_put(event, janet_ckeywordv("dir"), janet_wrap_string(ow->dir_path)); Janet eventv = janet_wrap_struct(janet_struct_end(event)); janet_channel_give(watcher->channel, eventv); /* Next event */ if (!fni->NextEntryOffset) break; - fni = (FILE_NOTIFY_INFORMATION *) ((char *)fni + fni->NextEntryOffset); + fni = (FILE_NOTIFY_EXTENDED_INFORMATION *) ((char *)fni + fni->NextEntryOffset); } /* Make another call to read directory changes */ read_dir_changes(ow); + janet_async_in_flight(fiber); } break; } } +static void start_listening_ow(OverlappedWatch *ow) { + read_dir_changes(ow); + JanetStream *stream = ow->stream; + JanetFunction *thunk = janet_thunk_delay(janet_wrap_nil()); + JanetFiber *fiber = janet_fiber(thunk, 64, 0, NULL); + fiber->supervisor_channel = janet_root_fiber()->supervisor_channel; + ow->fiber = fiber; + janet_async_start_fiber(fiber, stream, JANET_ASYNC_LISTEN_READ, watcher_callback_read, ow); +} + static void janet_watcher_add(JanetWatcher *watcher, const char *path, uint32_t flags) { HANDLE handle = CreateFileA(path, - FILE_LIST_DIRECTORY, + FILE_LIST_DIRECTORY | GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL, OPEN_EXISTING, @@ -396,16 +423,16 @@ static void janet_watcher_add(JanetWatcher *watcher, const char *path, uint32_t OverlappedWatch *ow = janet_malloc(sizeof(OverlappedWatch)); memset(ow, 0, sizeof(OverlappedWatch)); ow->stream = stream; - Janet pathv = janet_cstringv(path); + ow->dir_path = janet_cstring(path); + ow->fiber = NULL; + Janet pathv = janet_wrap_string(ow->dir_path); ow->flags = flags | watcher->default_flags; ow->watcher = watcher; ow->overlapped.hEvent = CreateEvent(NULL, FALSE, 0, NULL); /* Do we need this */ Janet streamv = janet_wrap_pointer(ow); janet_table_put(&watcher->watch_descriptors, pathv, streamv); - janet_table_put(&watcher->watch_descriptors, streamv, pathv); if (watcher->is_watching) { - read_dir_changes(ow); - janet_async_start(stream, JANET_ASYNC_LISTEN_READ, watcher_callback_read, ow); + start_listening_ow(ow); } } @@ -416,7 +443,6 @@ static void janet_watcher_remove(JanetWatcher *watcher, const char *path) { janet_panicf("path %v is not being watched", pathv); } janet_table_remove(&watcher->watch_descriptors, pathv); - janet_table_remove(&watcher->watch_descriptors, streamv); OverlappedWatch *ow = janet_unwrap_pointer(streamv); janet_stream_close(ow->stream); } @@ -426,13 +452,24 @@ static void janet_watcher_listen(JanetWatcher *watcher) { watcher->is_watching = 1; for (int32_t i = 0; i < watcher->watch_descriptors.capacity; i++) { const JanetKV *kv = watcher->watch_descriptors.data + i; - if (!janet_checktype(kv->key, JANET_ABSTRACT)) continue; - OverlappedWatch *ow = janet_unwrap_pointer(kv->key); - read_dir_changes(ow); - janet_async_start(ow->stream, JANET_ASYNC_LISTEN_READ, watcher_callback_read, ow); + if (!janet_checktype(kv->value, JANET_POINTER)) continue; + OverlappedWatch *ow = janet_unwrap_pointer(kv->value); + start_listening_ow(ow); } } +static void janet_watcher_unlisten(JanetWatcher *watcher) { + if (!watcher->is_watching) return; + watcher->is_watching = 0; + for (int32_t i = 0; i < watcher->watch_descriptors.capacity; i++) { + const JanetKV *kv = watcher->watch_descriptors.data + i; + if (!janet_checktype(kv->value, JANET_POINTER)) continue; + OverlappedWatch *ow = janet_unwrap_pointer(kv->value); + janet_stream_close(ow->stream); + } + janet_table_clear(&watcher->watch_descriptors); +} + #else /* Default implementation */ @@ -468,6 +505,11 @@ static void janet_watcher_listen(JanetWatcher *watcher) { janet_panic("nyi"); } +static void janet_watcher_unlisten(JanetWatcher *watcher) { + (void) watcher; + janet_panic("nyi"); +} + #endif /* C Functions */ @@ -476,7 +518,16 @@ static int janet_filewatch_mark(void *p, size_t s) { JanetWatcher *watcher = (JanetWatcher *) p; (void) s; if (watcher->channel == NULL) return 0; /* Incomplete initialization */ -#ifndef JANET_WINDOWS +#ifdef JANET_WINDOWS + for (int32_t i = 0; i < watcher->watch_descriptors.capacity; i++) { + const JanetKV *kv = watcher->watch_descriptors.data + i; + if (!janet_checktype(kv->value, JANET_POINTER)) continue; + OverlappedWatch *ow = janet_unwrap_pointer(kv->value); + janet_mark(janet_wrap_fiber(ow->fiber)); + janet_mark(janet_wrap_abstract(ow->stream)); + janet_mark(janet_wrap_string(ow->dir_path)); + } +#else janet_mark(janet_wrap_abstract(watcher->stream)); #endif janet_mark(janet_wrap_abstract(watcher->channel)); @@ -540,6 +591,15 @@ JANET_CORE_FN(cfun_filewatch_listen, return janet_wrap_nil(); } +JANET_CORE_FN(cfun_filewatch_unlisten, + "(filewatch/unlisten watcher)", + "Stop listening for changes on a given watcher.") { + janet_fixarity(argc, 1); + JanetWatcher *watcher = janet_getabstract(argv, 0, &janet_filewatch_at); + janet_watcher_unlisten(watcher); + return janet_wrap_nil(); +} + /* Module entry point */ void janet_lib_filewatch(JanetTable *env) { JanetRegExt cfuns[] = { @@ -547,6 +607,7 @@ void janet_lib_filewatch(JanetTable *env) { JANET_CORE_REG("filewatch/add", cfun_filewatch_add), JANET_CORE_REG("filewatch/remove", cfun_filewatch_remove), JANET_CORE_REG("filewatch/listen", cfun_filewatch_listen), + JANET_CORE_REG("filewatch/unlisten", cfun_filewatch_unlisten), JANET_REG_END }; janet_core_cfuns_ext(env, NULL, cfuns); diff --git a/src/core/util.h b/src/core/util.h index 8f698e7a..7a0dbaff 100644 --- a/src/core/util.h +++ b/src/core/util.h @@ -200,6 +200,7 @@ extern const JanetAbstractType janet_address_type; #ifdef JANET_EV void janet_lib_ev(JanetTable *env); void janet_ev_mark(void); +void janet_async_start_fiber(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state); int janet_make_pipe(JanetHandle handles[2], int mode); #ifdef JANET_FILEWATCH void janet_lib_filewatch(JanetTable *env); diff --git a/src/include/janet.h b/src/include/janet.h index 62a61153..11981b54 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -633,7 +633,9 @@ typedef void (*JanetEVCallback)(JanetFiber *fiber, JanetAsyncEvent event); * call when ever an event is sent from the event loop. state is an optional (can be NULL) * pointer to data allocated with janet_malloc. This pointer will be passed to callback as * fiber->ev_state. It will also be freed for you by the runtime when the event loop determines - * it can no longer be referenced. On windows, the contents of state MUST contained an OVERLAPPED struct. */ + * it can no longer be referenced. On windows, the contents of state MUST contained an OVERLAPPED struct at the 0 offset. */ + +JANET_API void janet_async_start_fiber(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state); JANET_API JANET_NO_RETURN void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state); /* Do not send any more events to the given callback. Call this after scheduling fiber to be resume @@ -1803,6 +1805,7 @@ JANET_API void janet_gcpressure(size_t s); /* Functions */ JANET_API JanetFuncDef *janet_funcdef_alloc(void); JANET_API JanetFunction *janet_thunk(JanetFuncDef *def); +JANET_API JanetFunction *janet_thunk_delay(Janet x); JANET_API int janet_verify(JanetFuncDef *def); /* Pretty printing */ diff --git a/src/mainclient/shell.c b/src/mainclient/shell.c index ec93df69..8643df54 100644 --- a/src/mainclient/shell.c +++ b/src/mainclient/shell.c @@ -1177,6 +1177,7 @@ int main(int argc, char **argv) { janet_resolve(env, janet_csymbol("cli-main"), &mainfun); Janet mainargs[1] = { janet_wrap_array(args) }; JanetFiber *fiber = janet_fiber(janet_unwrap_function(mainfun), 64, 1, mainargs); + janet_gcroot(janet_wrap_fiber(fiber)); fiber->env = env; /* Run the fiber in an event loop */