1
0
mirror of https://github.com/janet-lang/janet synced 2025-01-06 22:00:27 +00:00

Go back to ReadDirectoryChangesExW since it is better.

This commit is contained in:
Calvin Rose 2024-08-18 05:29:08 -07:00
parent 7e6aad2221
commit 33f55dc32f
6 changed files with 124 additions and 27 deletions

View File

@ -25,6 +25,7 @@
#include <janet.h> #include <janet.h>
#include "state.h" #include "state.h"
#include "fiber.h" #include "fiber.h"
#include "util.h"
#endif #endif
#ifndef JANET_SINGLE_THREADED #ifndef JANET_SINGLE_THREADED
@ -463,6 +464,33 @@ void janet_setdyn(const char *name, Janet value) {
} }
} }
/* Create a function that when called, returns X. Trivial in Janet, a pain in C. */
JanetFunction *janet_thunk_delay(Janet x) {
static const uint32_t bytecode[] = {
JOP_LOAD_CONSTANT,
JOP_RETURN
};
JanetFuncDef *def = janet_funcdef_alloc();
def->arity = 0;
def->min_arity = 0;
def->max_arity = INT32_MAX;
def->flags = JANET_FUNCDEF_FLAG_VARARG;
def->slotcount = 1;
def->bytecode = janet_malloc(sizeof(bytecode));
def->bytecode_length = (int32_t)(sizeof(bytecode) / sizeof(uint32_t));
def->constants = janet_malloc(sizeof(Janet));
def->constants_length = 1;
def->name = NULL;
if (!def->bytecode || !def->constants) {
JANET_OUT_OF_MEMORY;
}
def->constants[0] = x;
memcpy(def->bytecode, bytecode, sizeof(bytecode));
janet_def_addflags(def);
/* janet_verify(def); */
return janet_thunk(def);
}
uint64_t janet_getflags(const Janet *argv, int32_t n, const char *flags) { uint64_t janet_getflags(const Janet *argv, int32_t n, const char *flags) {
uint64_t ret = 0; uint64_t ret = 0;
const uint8_t *keyw = janet_getkeyword(argv, n); const uint8_t *keyw = janet_getkeyword(argv, n);

View File

@ -276,8 +276,7 @@ void janet_async_in_flight(JanetFiber *fiber) {
#endif #endif
} }
void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state) { void janet_async_start_fiber(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state) {
JanetFiber *fiber = janet_vm.root_fiber;
janet_assert(!fiber->ev_callback, "double async on fiber"); janet_assert(!fiber->ev_callback, "double async on fiber");
if (mode & JANET_ASYNC_LISTEN_READ) { if (mode & JANET_ASYNC_LISTEN_READ) {
stream->read_fiber = fiber; stream->read_fiber = fiber;
@ -291,6 +290,10 @@ void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback
janet_gcroot(janet_wrap_abstract(stream)); janet_gcroot(janet_wrap_abstract(stream));
fiber->ev_state = state; fiber->ev_state = state;
callback(fiber, JANET_ASYNC_EVENT_INIT); callback(fiber, JANET_ASYNC_EVENT_INIT);
}
void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state) {
janet_async_start_fiber(janet_vm.root_fiber, stream, mode, callback, state);
janet_await(); janet_await();
} }

View File

@ -252,12 +252,14 @@ static void watcher_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
static void janet_watcher_listen(JanetWatcher *watcher) { static void janet_watcher_listen(JanetWatcher *watcher) {
if (watcher->is_watching) janet_panic("already watching"); if (watcher->is_watching) janet_panic("already watching");
watcher->is_watching = 1; watcher->is_watching = 1;
janet_async_start(watcher->stream, JANET_ASYNC_LISTEN_READ, watcher_callback_read, watcher); JanetFunction *thunk = janet_thunk_delay(janet_wrap_nil());
JanetFiber *fiber = janet_fiber(thunk, 64, 0, NULL);
janet_async_start_fiber(fiber, watcher->stream, JANET_ASYNC_LISTEN_READ, watcher_callback_read, watcher);
} }
#elif JANET_WINDOWS #elif JANET_WINDOWS
static const uint32_t WATCHFLAG_RECURSIVE = 0x100000; #define WATCHFLAG_RECURSIVE 0x100000u
static const JanetWatchFlagName watcher_flags_windows[] = { static const JanetWatchFlagName watcher_flags_windows[] = {
{"attributes", FILE_NOTIFY_CHANGE_ATTRIBUTES}, {"attributes", FILE_NOTIFY_CHANGE_ATTRIBUTES},
@ -266,9 +268,9 @@ static const JanetWatchFlagName watcher_flags_windows[] = {
{"file-name", FILE_NOTIFY_CHANGE_FILE_NAME}, {"file-name", FILE_NOTIFY_CHANGE_FILE_NAME},
{"last-access", FILE_NOTIFY_CHANGE_LAST_ACCESS}, {"last-access", FILE_NOTIFY_CHANGE_LAST_ACCESS},
{"last-write", FILE_NOTIFY_CHANGE_LAST_WRITE}, {"last-write", FILE_NOTIFY_CHANGE_LAST_WRITE},
{"recursive", WATCHFLAG_RECURSIVE},
{"security", FILE_NOTIFY_CHANGE_SECURITY}, {"security", FILE_NOTIFY_CHANGE_SECURITY},
{"size", FILE_NOTIFY_CHANGE_SIZE}, {"size", FILE_NOTIFY_CHANGE_SIZE},
{"recursive", WATCHFLAG_RECURSIVE},
}; };
static uint32_t decode_watch_flags(const Janet *options, int32_t n) { static uint32_t decode_watch_flags(const Janet *options, int32_t n) {
@ -294,6 +296,7 @@ static void janet_watcher_init(JanetWatcher *watcher, JanetChannel *channel, uin
janet_table_init_raw(&watcher->watch_descriptors, 0); janet_table_init_raw(&watcher->watch_descriptors, 0);
watcher->channel = channel; watcher->channel = channel;
watcher->default_flags = default_flags; watcher->default_flags = default_flags;
watcher->is_watching = 0;
} }
/* Since the file info padding includes embedded file names, we want to include more space for data. /* Since the file info padding includes embedded file names, we want to include more space for data.
@ -305,19 +308,22 @@ typedef struct {
OVERLAPPED overlapped; OVERLAPPED overlapped;
JanetStream *stream; JanetStream *stream;
JanetWatcher *watcher; JanetWatcher *watcher;
JanetFiber *fiber;
JanetString dir_path;
uint32_t flags; uint32_t flags;
uint64_t buf[FILE_INFO_PADDING / sizeof(uint64_t)]; /* Ensure alignment */ uint64_t buf[FILE_INFO_PADDING / sizeof(uint64_t)]; /* Ensure alignment */
} OverlappedWatch; } OverlappedWatch;
static void read_dir_changes(OverlappedWatch *ow) { static void read_dir_changes(OverlappedWatch *ow) {
BOOL result = ReadDirectoryChangesW(ow->stream->handle, BOOL result = ReadDirectoryChangesExW(ow->stream->handle,
(FILE_NOTIFY_INFORMATION *) ow->buf, (FILE_NOTIFY_EXTENDED_INFORMATION *) ow->buf,
FILE_INFO_PADDING, FILE_INFO_PADDING,
(ow->flags & WATCHFLAG_RECURSIVE) ? TRUE : FALSE, (ow->flags & WATCHFLAG_RECURSIVE) ? TRUE : FALSE,
ow->flags & ~WATCHFLAG_RECURSIVE, ow->flags & ~WATCHFLAG_RECURSIVE,
NULL, NULL,
(OVERLAPPED *) ow, (OVERLAPPED *) ow,
NULL); NULL,
ReadDirectoryNotifyExtendedInformation);
if (!result) { if (!result) {
janet_panicv(janet_ev_lasterr()); janet_panicv(janet_ev_lasterr());
} }
@ -338,21 +344,30 @@ static void watcher_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
switch (event) { switch (event) {
default: default:
break; break;
case JANET_ASYNC_EVENT_INIT:
janet_async_in_flight(fiber);
break;
case JANET_ASYNC_EVENT_MARK: case JANET_ASYNC_EVENT_MARK:
janet_mark(janet_wrap_abstract(ow->stream)); janet_mark(janet_wrap_abstract(ow->stream));
janet_mark(janet_wrap_fiber(ow->fiber));
janet_mark(janet_wrap_abstract(watcher)); janet_mark(janet_wrap_abstract(watcher));
janet_mark(janet_wrap_string(ow->dir_path));
break; break;
case JANET_ASYNC_EVENT_CLOSE: case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(fiber, janet_wrap_nil()); janet_table_remove(&ow->watcher->watch_descriptors, janet_wrap_string(ow->dir_path));
janet_async_end(fiber);
break; break;
case JANET_ASYNC_EVENT_ERR: case JANET_ASYNC_EVENT_ERR:
janet_schedule(fiber, janet_wrap_nil()); case JANET_ASYNC_EVENT_FAILED:
janet_async_end(fiber); janet_stream_close(ow->stream);
break; break;
case JANET_ASYNC_EVENT_COMPLETE: case JANET_ASYNC_EVENT_COMPLETE:
{ {
FILE_NOTIFY_INFORMATION *fni = (FILE_NOTIFY_INFORMATION *) ow->buf; if (!watcher->is_watching) {
janet_stream_close(ow->stream);
break;
}
FILE_NOTIFY_EXTENDED_INFORMATION *fni = (FILE_NOTIFY_EXTENDED_INFORMATION *) ow->buf;
while (1) { while (1) {
/* Got an event */ /* Got an event */
@ -362,28 +377,40 @@ static void watcher_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
int32_t nbytes = (int32_t) WideCharToMultiByte(CP_UTF8, WC_SEPCHARS, fni->FileName, fni->FileNameLength / 2, tempbuf, sizeof(tempbuf), NULL, NULL); int32_t nbytes = (int32_t) WideCharToMultiByte(CP_UTF8, WC_SEPCHARS, fni->FileName, fni->FileNameLength / 2, tempbuf, sizeof(tempbuf), NULL, NULL);
JanetString filename = janet_string(tempbuf, nbytes); JanetString filename = janet_string(tempbuf, nbytes);
JanetKV *event = janet_struct_begin(2); JanetKV *event = janet_struct_begin(3);
janet_struct_put(event, janet_ckeywordv("action"), janet_ckeywordv(watcher_actions_windows[fni->Action])); janet_struct_put(event, janet_ckeywordv("action"), janet_ckeywordv(watcher_actions_windows[fni->Action]));
janet_struct_put(event, janet_ckeywordv("file-name"), janet_wrap_string(filename)); janet_struct_put(event, janet_ckeywordv("file-name"), janet_wrap_string(filename));
janet_struct_put(event, janet_ckeywordv("dir"), janet_wrap_string(ow->dir_path));
Janet eventv = janet_wrap_struct(janet_struct_end(event)); Janet eventv = janet_wrap_struct(janet_struct_end(event));
janet_channel_give(watcher->channel, eventv); janet_channel_give(watcher->channel, eventv);
/* Next event */ /* Next event */
if (!fni->NextEntryOffset) break; if (!fni->NextEntryOffset) break;
fni = (FILE_NOTIFY_INFORMATION *) ((char *)fni + fni->NextEntryOffset); fni = (FILE_NOTIFY_EXTENDED_INFORMATION *) ((char *)fni + fni->NextEntryOffset);
} }
/* Make another call to read directory changes */ /* Make another call to read directory changes */
read_dir_changes(ow); read_dir_changes(ow);
janet_async_in_flight(fiber);
} }
break; break;
} }
} }
static void start_listening_ow(OverlappedWatch *ow) {
read_dir_changes(ow);
JanetStream *stream = ow->stream;
JanetFunction *thunk = janet_thunk_delay(janet_wrap_nil());
JanetFiber *fiber = janet_fiber(thunk, 64, 0, NULL);
fiber->supervisor_channel = janet_root_fiber()->supervisor_channel;
ow->fiber = fiber;
janet_async_start_fiber(fiber, stream, JANET_ASYNC_LISTEN_READ, watcher_callback_read, ow);
}
static void janet_watcher_add(JanetWatcher *watcher, const char *path, uint32_t flags) { static void janet_watcher_add(JanetWatcher *watcher, const char *path, uint32_t flags) {
HANDLE handle = CreateFileA(path, HANDLE handle = CreateFileA(path,
FILE_LIST_DIRECTORY, FILE_LIST_DIRECTORY | GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL, NULL,
OPEN_EXISTING, OPEN_EXISTING,
@ -396,16 +423,16 @@ static void janet_watcher_add(JanetWatcher *watcher, const char *path, uint32_t
OverlappedWatch *ow = janet_malloc(sizeof(OverlappedWatch)); OverlappedWatch *ow = janet_malloc(sizeof(OverlappedWatch));
memset(ow, 0, sizeof(OverlappedWatch)); memset(ow, 0, sizeof(OverlappedWatch));
ow->stream = stream; ow->stream = stream;
Janet pathv = janet_cstringv(path); ow->dir_path = janet_cstring(path);
ow->fiber = NULL;
Janet pathv = janet_wrap_string(ow->dir_path);
ow->flags = flags | watcher->default_flags; ow->flags = flags | watcher->default_flags;
ow->watcher = watcher; ow->watcher = watcher;
ow->overlapped.hEvent = CreateEvent(NULL, FALSE, 0, NULL); /* Do we need this */ ow->overlapped.hEvent = CreateEvent(NULL, FALSE, 0, NULL); /* Do we need this */
Janet streamv = janet_wrap_pointer(ow); Janet streamv = janet_wrap_pointer(ow);
janet_table_put(&watcher->watch_descriptors, pathv, streamv); janet_table_put(&watcher->watch_descriptors, pathv, streamv);
janet_table_put(&watcher->watch_descriptors, streamv, pathv);
if (watcher->is_watching) { if (watcher->is_watching) {
read_dir_changes(ow); start_listening_ow(ow);
janet_async_start(stream, JANET_ASYNC_LISTEN_READ, watcher_callback_read, ow);
} }
} }
@ -416,7 +443,6 @@ static void janet_watcher_remove(JanetWatcher *watcher, const char *path) {
janet_panicf("path %v is not being watched", pathv); janet_panicf("path %v is not being watched", pathv);
} }
janet_table_remove(&watcher->watch_descriptors, pathv); janet_table_remove(&watcher->watch_descriptors, pathv);
janet_table_remove(&watcher->watch_descriptors, streamv);
OverlappedWatch *ow = janet_unwrap_pointer(streamv); OverlappedWatch *ow = janet_unwrap_pointer(streamv);
janet_stream_close(ow->stream); janet_stream_close(ow->stream);
} }
@ -426,13 +452,24 @@ static void janet_watcher_listen(JanetWatcher *watcher) {
watcher->is_watching = 1; watcher->is_watching = 1;
for (int32_t i = 0; i < watcher->watch_descriptors.capacity; i++) { for (int32_t i = 0; i < watcher->watch_descriptors.capacity; i++) {
const JanetKV *kv = watcher->watch_descriptors.data + i; const JanetKV *kv = watcher->watch_descriptors.data + i;
if (!janet_checktype(kv->key, JANET_ABSTRACT)) continue; if (!janet_checktype(kv->value, JANET_POINTER)) continue;
OverlappedWatch *ow = janet_unwrap_pointer(kv->key); OverlappedWatch *ow = janet_unwrap_pointer(kv->value);
read_dir_changes(ow); start_listening_ow(ow);
janet_async_start(ow->stream, JANET_ASYNC_LISTEN_READ, watcher_callback_read, ow);
} }
} }
static void janet_watcher_unlisten(JanetWatcher *watcher) {
if (!watcher->is_watching) return;
watcher->is_watching = 0;
for (int32_t i = 0; i < watcher->watch_descriptors.capacity; i++) {
const JanetKV *kv = watcher->watch_descriptors.data + i;
if (!janet_checktype(kv->value, JANET_POINTER)) continue;
OverlappedWatch *ow = janet_unwrap_pointer(kv->value);
janet_stream_close(ow->stream);
}
janet_table_clear(&watcher->watch_descriptors);
}
#else #else
/* Default implementation */ /* Default implementation */
@ -468,6 +505,11 @@ static void janet_watcher_listen(JanetWatcher *watcher) {
janet_panic("nyi"); janet_panic("nyi");
} }
static void janet_watcher_unlisten(JanetWatcher *watcher) {
(void) watcher;
janet_panic("nyi");
}
#endif #endif
/* C Functions */ /* C Functions */
@ -476,7 +518,16 @@ static int janet_filewatch_mark(void *p, size_t s) {
JanetWatcher *watcher = (JanetWatcher *) p; JanetWatcher *watcher = (JanetWatcher *) p;
(void) s; (void) s;
if (watcher->channel == NULL) return 0; /* Incomplete initialization */ if (watcher->channel == NULL) return 0; /* Incomplete initialization */
#ifndef JANET_WINDOWS #ifdef JANET_WINDOWS
for (int32_t i = 0; i < watcher->watch_descriptors.capacity; i++) {
const JanetKV *kv = watcher->watch_descriptors.data + i;
if (!janet_checktype(kv->value, JANET_POINTER)) continue;
OverlappedWatch *ow = janet_unwrap_pointer(kv->value);
janet_mark(janet_wrap_fiber(ow->fiber));
janet_mark(janet_wrap_abstract(ow->stream));
janet_mark(janet_wrap_string(ow->dir_path));
}
#else
janet_mark(janet_wrap_abstract(watcher->stream)); janet_mark(janet_wrap_abstract(watcher->stream));
#endif #endif
janet_mark(janet_wrap_abstract(watcher->channel)); janet_mark(janet_wrap_abstract(watcher->channel));
@ -540,6 +591,15 @@ JANET_CORE_FN(cfun_filewatch_listen,
return janet_wrap_nil(); return janet_wrap_nil();
} }
JANET_CORE_FN(cfun_filewatch_unlisten,
"(filewatch/unlisten watcher)",
"Stop listening for changes on a given watcher.") {
janet_fixarity(argc, 1);
JanetWatcher *watcher = janet_getabstract(argv, 0, &janet_filewatch_at);
janet_watcher_unlisten(watcher);
return janet_wrap_nil();
}
/* Module entry point */ /* Module entry point */
void janet_lib_filewatch(JanetTable *env) { void janet_lib_filewatch(JanetTable *env) {
JanetRegExt cfuns[] = { JanetRegExt cfuns[] = {
@ -547,6 +607,7 @@ void janet_lib_filewatch(JanetTable *env) {
JANET_CORE_REG("filewatch/add", cfun_filewatch_add), JANET_CORE_REG("filewatch/add", cfun_filewatch_add),
JANET_CORE_REG("filewatch/remove", cfun_filewatch_remove), JANET_CORE_REG("filewatch/remove", cfun_filewatch_remove),
JANET_CORE_REG("filewatch/listen", cfun_filewatch_listen), JANET_CORE_REG("filewatch/listen", cfun_filewatch_listen),
JANET_CORE_REG("filewatch/unlisten", cfun_filewatch_unlisten),
JANET_REG_END JANET_REG_END
}; };
janet_core_cfuns_ext(env, NULL, cfuns); janet_core_cfuns_ext(env, NULL, cfuns);

View File

@ -200,6 +200,7 @@ extern const JanetAbstractType janet_address_type;
#ifdef JANET_EV #ifdef JANET_EV
void janet_lib_ev(JanetTable *env); void janet_lib_ev(JanetTable *env);
void janet_ev_mark(void); void janet_ev_mark(void);
void janet_async_start_fiber(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state);
int janet_make_pipe(JanetHandle handles[2], int mode); int janet_make_pipe(JanetHandle handles[2], int mode);
#ifdef JANET_FILEWATCH #ifdef JANET_FILEWATCH
void janet_lib_filewatch(JanetTable *env); void janet_lib_filewatch(JanetTable *env);

View File

@ -633,7 +633,9 @@ typedef void (*JanetEVCallback)(JanetFiber *fiber, JanetAsyncEvent event);
* call when ever an event is sent from the event loop. state is an optional (can be NULL) * call when ever an event is sent from the event loop. state is an optional (can be NULL)
* pointer to data allocated with janet_malloc. This pointer will be passed to callback as * pointer to data allocated with janet_malloc. This pointer will be passed to callback as
* fiber->ev_state. It will also be freed for you by the runtime when the event loop determines * fiber->ev_state. It will also be freed for you by the runtime when the event loop determines
* it can no longer be referenced. On windows, the contents of state MUST contained an OVERLAPPED struct. */ * it can no longer be referenced. On windows, the contents of state MUST contained an OVERLAPPED struct at the 0 offset. */
JANET_API void janet_async_start_fiber(JanetFiber *fiber, JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state);
JANET_API JANET_NO_RETURN void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state); JANET_API JANET_NO_RETURN void janet_async_start(JanetStream *stream, JanetAsyncMode mode, JanetEVCallback callback, void *state);
/* Do not send any more events to the given callback. Call this after scheduling fiber to be resume /* Do not send any more events to the given callback. Call this after scheduling fiber to be resume
@ -1803,6 +1805,7 @@ JANET_API void janet_gcpressure(size_t s);
/* Functions */ /* Functions */
JANET_API JanetFuncDef *janet_funcdef_alloc(void); JANET_API JanetFuncDef *janet_funcdef_alloc(void);
JANET_API JanetFunction *janet_thunk(JanetFuncDef *def); JANET_API JanetFunction *janet_thunk(JanetFuncDef *def);
JANET_API JanetFunction *janet_thunk_delay(Janet x);
JANET_API int janet_verify(JanetFuncDef *def); JANET_API int janet_verify(JanetFuncDef *def);
/* Pretty printing */ /* Pretty printing */

View File

@ -1177,6 +1177,7 @@ int main(int argc, char **argv) {
janet_resolve(env, janet_csymbol("cli-main"), &mainfun); janet_resolve(env, janet_csymbol("cli-main"), &mainfun);
Janet mainargs[1] = { janet_wrap_array(args) }; Janet mainargs[1] = { janet_wrap_array(args) };
JanetFiber *fiber = janet_fiber(janet_unwrap_function(mainfun), 64, 1, mainargs); JanetFiber *fiber = janet_fiber(janet_unwrap_function(mainfun), 64, 1, mainargs);
janet_gcroot(janet_wrap_fiber(fiber));
fiber->env = env; fiber->env = env;
/* Run the fiber in an event loop */ /* Run the fiber in an event loop */