1
0
mirror of https://github.com/janet-lang/janet synced 2025-01-21 04:36:52 +00:00

Merge branch 'filewatch'

This commit is contained in:
Calvin Rose 2024-08-17 10:08:37 -05:00
commit 5c67c1165d
9 changed files with 261 additions and 25 deletions

View File

@ -2,6 +2,7 @@
All notable changes to this project will be documented in this file.
## Unreleased - ???
- Add experimental `filewatch/` module for listening to file system changes.
- Add `bundle/who-is` to query which bundle a file on disk was installed by.
- Add `geomean` function
- Add `:R` and `:W` flags to `os/pipe` to create blocking pipes on Posix and Windows systems.

View File

@ -79,6 +79,7 @@ conf.set('JANET_EV_NO_KQUEUE', not get_option('kqueue'))
conf.set('JANET_NO_INTERPRETER_INTERRUPT', not get_option('interpreter_interrupt'))
conf.set('JANET_NO_FFI', not get_option('ffi'))
conf.set('JANET_NO_FFI_JIT', not get_option('ffi_jit'))
conf.set('JANET_NO_FILEWATCH', not get_option('filewatch'))
conf.set('JANET_NO_CRYPTORAND', not get_option('cryptorand'))
if get_option('os_name') != ''
conf.set('JANET_OS_NAME', get_option('os_name'))

View File

@ -22,6 +22,7 @@ option('kqueue', type : 'boolean', value : true)
option('interpreter_interrupt', type : 'boolean', value : true)
option('ffi', type : 'boolean', value : true)
option('ffi_jit', type : 'boolean', value : true)
option('filewatch', type : 'boolean', value : true)
option('recursion_guard', type : 'integer', min : 10, max : 8000, value : 1024)
option('max_proto_depth', type : 'integer', min : 10, max : 8000, value : 200)

View File

@ -29,6 +29,7 @@
/* #define JANET_NO_NET */
/* #define JANET_NO_INT_TYPES */
/* #define JANET_NO_EV */
/* #define JANET_NO_FILEWATCH */
/* #define JANET_NO_REALPATH */
/* #define JANET_NO_SYMLINKS */
/* #define JANET_NO_UMASK */

View File

@ -456,7 +456,7 @@ JANET_CORE_FN(janet_core_range,
}
JanetArray *array = janet_array(int_count);
for (int32_t i = 0; i < int_count; i++) {
array->data[i] = janet_wrap_number(start + (double) i * step);
array->data[i] = janet_wrap_number((double) start + (double) i * step);
}
array->count = int_count;
return janet_wrap_array(array);
@ -1127,6 +1127,9 @@ static void janet_load_libs(JanetTable *env) {
#endif
#ifdef JANET_EV
janet_lib_ev(env);
#ifdef JANET_FILEWATCH
janet_lib_filewatch(env);
#endif
#endif
#ifdef JANET_NET
janet_lib_net(env);

View File

@ -316,8 +316,9 @@ static const JanetMethod ev_default_stream_methods[] = {
};
/* Create a stream*/
JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods) {
JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream));
JanetStream *janet_stream_ext(JanetHandle handle, uint32_t flags, const JanetMethod *methods, size_t size) {
janet_assert(size >= sizeof(JanetStream), "bad size");
JanetStream *stream = janet_abstract(&janet_stream_type, size);
stream->handle = handle;
stream->flags = flags;
stream->read_fiber = NULL;
@ -329,6 +330,10 @@ JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod
return stream;
}
JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods) {
return janet_stream_ext(handle, flags, methods, sizeof(JanetStream));
}
static void janet_stream_close_impl(JanetStream *stream) {
stream->flags |= JANET_STREAM_CLOSED;
#ifdef JANET_WINDOWS
@ -988,6 +993,20 @@ int janet_channel_take(JanetChannel *channel, Janet *out) {
return janet_channel_pop(channel, out, 2);
}
JanetChannel *janet_channel_make(uint32_t limit) {
janet_assert(limit <= INT32_MAX, "bad limit");
JanetChannel *channel = janet_abstract(&janet_channel_type, sizeof(JanetChannel));
janet_chan_init(channel, (int32_t) limit, 0);
return channel;
}
JanetChannel *janet_channel_make_threaded(uint32_t limit) {
janet_assert(limit <= INT32_MAX, "bad limit");
JanetChannel *channel = janet_abstract_threaded(&janet_channel_type, sizeof(JanetChannel));
janet_chan_init(channel, (int32_t) limit, 0);
return channel;
}
/* Channel Methods */
JANET_CORE_FN(cfun_channel_push,

View File

@ -27,24 +27,47 @@
#endif
#ifdef JANET_EV
typedef struct {
JanetTable *watch_descriptors;
JanetStream *stream;
JanetChannel *channel;
uint32_t default_flags;
} JanetWatcher;
#ifdef JANET_FILEWATCH
#ifdef JANET_LINUX
#include <sys/inotify.h>
#include <unistd.h>
#endif
#ifdef JANET_WINDOWS
#include <windows.h>
#endif
typedef struct {
const char *name;
uint32_t flag;
} JanetWatchFlagName;
typedef struct {
#ifndef JANET_WINDOWS
JanetStream *stream;
#endif
JanetTable watch_descriptors;
JanetChannel *channel;
uint32_t default_flags;
} JanetWatcher;
/* Reject certain filename events without sending anything to the channel
* to make things faster and not waste time and memory creating events. This
* should also let us watch only certain file names, patterns, etc. */
static int janet_watch_filter(JanetWatcher *watcher, Janet filename, int wd) {
/* TODO - add filtering */
(void) watcher;
(void) filename;
(void) wd;
return 0;
}
#ifdef JANET_LINUX
#include <sys/inotify.h>
#include <unistd.h>
static const JanetWatchFlagName watcher_flags_linux[] = {
{"access", IN_ACCESS},
{"all", IN_ALL_EVENTS},
@ -63,7 +86,8 @@ static const JanetWatchFlagName watcher_flags_linux[] = {
{"q-overflow", IN_Q_OVERFLOW},
{"unmount", IN_UNMOUNT},
};
static uint32_t decode_inotify_flags(const Janet *options, int32_t n) {
static uint32_t decode_watch_flags(const Janet *options, int32_t n) {
uint32_t flags = 0;
for (int32_t i = 0; i < n; i++) {
if (!(janet_checktype(options[i], JANET_KEYWORD))) {
@ -87,7 +111,10 @@ static void janet_watcher_init(JanetWatcher *watcher, JanetChannel *channel, uin
do {
fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
} while (fd == -1 && errno == EINTR);
watcher->watch_descriptors = janet_table(0);
if (fd == -1) {
janet_panicv(janet_ev_lasterr());
}
janet_table_init_raw(&watcher->watch_descriptors, 0);
watcher->channel = channel;
watcher->default_flags = default_flags;
watcher->stream = janet_stream(fd, JANET_STREAM_READABLE, NULL);
@ -99,15 +126,18 @@ static void janet_watcher_add(JanetWatcher *watcher, const char *path, uint32_t
do {
result = inotify_add_watch(watcher->stream->handle, path, flags);
} while (result == -1 && errno == EINTR);
if (result == -1) {
janet_panicv(janet_ev_lasterr());
}
Janet name = janet_cstringv(path);
Janet wd = janet_wrap_integer(result);
janet_table_put(watcher->watch_descriptors, name, wd);
janet_table_put(watcher->watch_descriptors, wd, name);
janet_table_put(&watcher->watch_descriptors, name, wd);
janet_table_put(&watcher->watch_descriptors, wd, name);
}
static void janet_watcher_remove(JanetWatcher *watcher, const char *path) {
if (watcher->stream == NULL) janet_panic("watcher closed");
Janet check = janet_table_get(watcher->watch_descriptors, janet_cstringv(path));
Janet check = janet_table_get(&watcher->watch_descriptors, janet_cstringv(path));
janet_assert(janet_checktype(check, JANET_NUMBER), "bad watch descriptor");
int watch_handle = janet_unwrap_integer(check);
int result;
@ -115,7 +145,7 @@ static void janet_watcher_remove(JanetWatcher *watcher, const char *path) {
result = inotify_rm_watch(watcher->stream->handle, watch_handle);
} while (result != -1 && errno == EINTR);
if (result == -1) {
janet_panicf("%s", janet_strerror(errno));
janet_panicv(janet_ev_lasterr());
}
}
@ -189,8 +219,11 @@ static void watcher_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
cursor += inevent.len;
}
/* Filter events by pattern */
if (!janet_watch_filter(watcher, name, inevent.wd)) continue;
/* Got an event */
Janet path = janet_table_get(watcher->watch_descriptors, janet_wrap_integer(inevent.wd));
Janet path = janet_table_get(&watcher->watch_descriptors, janet_wrap_integer(inevent.wd));
JanetKV *event = janet_struct_begin(6);
janet_struct_put(event, janet_ckeywordv("wd"), janet_wrap_integer(inevent.wd));
janet_struct_put(event, janet_ckeywordv("wd-path"), path);
@ -218,31 +251,200 @@ static void janet_watcher_listen(JanetWatcher *watcher) {
janet_async_start(watcher->stream, JANET_ASYNC_LISTEN_READ, watcher_callback_read, watcher);
}
#elif JANET_WINDOWS
static const JanetWatchFlagName watcher_flags_windows[] = {
{"file-name", FILE_NOTIFY_CHANGE_FILE_NAME},
{"dir-name", FILE_NOTIFY_CHANGE_DIR_NAME},
{"attributes", FILE_NOTIFY_CHANGE_ATTRIBUTES},
{"size", FILE_NOTIFY_CHANGE_SIZE},
{"last-write", FILE_NOTIFY_CHANGE_LAST_WRITE},
{"last-access", FILE_NOTIFY_CHANGE_LAST_ACCESS},
{"creation", FILE_NOTIFY_CHANGE_CREATION},
{"security", FILE_NOTIFY_CHANGE_SECURITY}
};
static uint32_t decode_watch_flags(const Janet *options, int32_t n) {
uint32_t flags = 0;
for (int32_t i = 0; i < n; i++) {
if (!(janet_checktype(options[i], JANET_KEYWORD))) {
janet_panicf("expected keyword, got %v", options[i]);
}
JanetKeyword keyw = janet_unwrap_keyword(options[i]);
const JanetWatchFlagName *result = janet_strbinsearch(watcher_flags_windows,
sizeof(watcher_flags_windows) / sizeof(JanetWatchFlagName),
sizeof(JanetWatchFlagName),
keyw);
if (!result) {
janet_panicf("unknown windows filewatch flag %v", options[i]);
}
flags |= result->flag;
}
return flags;
}
static void janet_watcher_init(JanetWatcher *watcher, JanetChannel *channel, uint32_t default_flags) {
janet_table_init_raw(&watcher->watch_descriptors, 0);
watcher->channel = channel;
watcher->default_flags = default_flags;
}
typedef struct {
JanetStream stream;
OVERLAPPED overlapped;
uint32_t flags;
FILE_NOTIFY_INFORMATION fni;
} OverlappedWatch;
static void janet_watcher_add(JanetWatcher *watcher, const char *path, uint32_t flags) {
HANDLE handle = CreateFileA(path, GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED | FILE_ATTRIBUTE_NORMAL,
0);
if (handle == INVALID_HANDLE_VALUE) {
janet_panicv(janet_ev_lasterr());
}
JanetStream *stream = janet_stream_ext(handle, 0, NULL, sizeof(OverlappedWatch));
Janet pathv = janet_cstringv(path);
stream->flags = flags | watcher->default_flags;
Janet streamv = janet_wrap_abstract(stream);
janet_table_put(&watcher->watch_descriptors, pathv, streamv);
janet_table_put(&watcher->watch_descriptors, streamv, pathv);
/* TODO - if listening, also listen for this new path */
}
static void janet_watcher_remove(JanetWatcher *watcher, const char *path) {
Janet pathv = janet_cstringv(path);
Janet streamv = janet_table_get(&watcher->watch_descriptors, pathv);
if (janet_checktype(streamv, JANET_NIL)) {
janet_panicf("path %v is not being watched", pathv);
}
janet_table_remove(&watcher->watch_descriptors, pathv);
janet_table_remove(&watcher->watch_descriptors, streamv);
OverlappedWatch *ow = janet_unwrap_abstract(streamv);
janet_stream_close((JanetStream *) ow);
}
static void watcher_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
JanetWatcher *watcher = (JanetWatcher *) fiber->ev_state;
char buf[1024];
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_MARK:
janet_mark(janet_wrap_abstract(watcher));
break;
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(fiber, janet_wrap_nil());
fiber->ev_state = NULL;
janet_async_end(fiber);
break;
case JANET_ASYNC_EVENT_ERR:
{
janet_schedule(fiber, janet_wrap_nil());
fiber->ev_state = NULL;
janet_async_end(fiber);
break;
}
break;
}
}
static void janet_watcher_listen(JanetWatcher *watcher) {
for (int32_t i = 0; i < watcher.watch_descriptors.capacity; i++) {
const JanetKV *kv = watcher.watch_descriptors.items + i;
if (!janet_checktype(kv->key, JANET_POINTER)) continue;
OverlappedWatch *ow = janet_unwrap_pointer(kv->key);
Janet pathv = kv->value;
BOOL result = ReadDirecoryChangesW(ow->handle,
&ow->fni,
sizeof(FILE_NOTIFY_INFORMATION),
TRUE,
ow->flags,
NULL,
&ow->overlapped,
NULL);
if (!result) {
janet_panicv(janet_ev_lasterr());
}
}
janet_panic("nyi");
}
#else
/* Default implementation */
static uint32_t decode_watch_flags(const Janet *options, int32_t n) {
(void) options;
(void) n;
return 0;
}
static void janet_watcher_init(JanetWatcher *watcher, JanetChannel *channel, uint32_t default_flags) {
(void) watcher;
(void) channel;
(void) default_flags;
janet_panic("filewatch not supported on this platform");
}
static void janet_watcher_add(JanetWatcher *watcher, const char *path, uint32_t flags) {
(void) watcher;
(void) flags;
(void) path;
janet_panic("nyi");
}
static void janet_watcher_remove(JanetWatcher *watcher, const char *path) {
(void) watcher;
(void) path;
janet_panic("nyi");
}
static void janet_watcher_listen(JanetWatcher *watcher) {
(void) watcher;
janet_panic("nyi");
}
#endif
/* C Functions */
static int janet_filewatch_mark(void *p, size_t s) {
JanetWatcher *watcher = (JanetWatcher *) p;
(void) s;
#ifndef JANET_WINDOWS
janet_mark(janet_wrap_abstract(watcher->stream));
#endif
janet_mark(janet_wrap_abstract(watcher->channel));
janet_mark(janet_wrap_table(watcher->watch_descriptors));
janet_mark(janet_wrap_table(&watcher->watch_descriptors));
return 0;
}
static int janet_filewatch_gc(void *p, size_t s) {
JanetWatcher *watcher = (JanetWatcher *) p;
(void) s;
janet_table_deinit(&watcher->watch_descriptors);
return 0;
}
static const JanetAbstractType janet_filewatch_at = {
"filewatch/watcher",
NULL,
janet_filewatch_gc,
janet_filewatch_mark,
JANET_ATEND_GCMARK
};
JANET_CORE_FN(cfun_filewatch_make,
"(filewatch/make channel &opt default-flags)",
"Create a new filewatcher.") {
"Create a new filewatcher that will give events to a channel channel.") {
janet_arity(argc, 1, -1);
JanetChannel *channel = janet_getchannel(argv, 0);
JanetWatcher *watcher = janet_abstract(&janet_filewatch_at, sizeof(JanetWatcher));
uint32_t default_flags = decode_inotify_flags(argv + 1, argc - 1);
uint32_t default_flags = decode_watch_flags(argv + 1, argc - 1);
janet_watcher_init(watcher, channel, default_flags);
return janet_wrap_abstract(watcher);
}
@ -253,7 +455,7 @@ JANET_CORE_FN(cfun_filewatch_add,
janet_arity(argc, 2, -1);
JanetWatcher *watcher = janet_getabstract(argv, 0, &janet_filewatch_at);
const char *path = janet_getcstring(argv, 1);
uint32_t flags = watcher->default_flags | decode_inotify_flags(argv + 2, argc - 2);
uint32_t flags = watcher->default_flags | decode_watch_flags(argv + 2, argc - 2);
janet_watcher_add(watcher, path, flags);
return argv[0];
}
@ -290,3 +492,4 @@ void janet_lib_filewatch(JanetTable *env) {
}
#endif
#endif

View File

@ -190,9 +190,6 @@ void janet_lib_debug(JanetTable *env);
#ifdef JANET_PEG
void janet_lib_peg(JanetTable *env);
#endif
#ifdef JANET_TYPED_ARRAY
void janet_lib_typed_array(JanetTable *env);
#endif
#ifdef JANET_INT_TYPES
void janet_lib_inttypes(JanetTable *env);
#endif
@ -204,10 +201,12 @@ extern const JanetAbstractType janet_address_type;
void janet_lib_ev(JanetTable *env);
void janet_ev_mark(void);
int janet_make_pipe(JanetHandle handles[2], int mode);
#ifdef JANET_FILEWATCH
void janet_lib_filewatch(JanetTable *env);
#endif
#ifdef JANET_FFI
void janet_lib_ffi(JanetTable *env);
#endif
#endif
#endif

View File

@ -210,6 +210,11 @@ extern "C" {
#define JANET_EV
#endif
/* Enable or disable the filewatch/ module */
#if !defined(JANET_NO_FILEWATCH)
#define JANET_FILEWATCH
#endif
/* Enable or disable networking */
#if defined(JANET_EV) && !defined(JANET_NO_NET) && !defined(__EMSCRIPTEN__)
#define JANET_NET
@ -1414,6 +1419,7 @@ JANET_API void janet_loop1_interrupt(JanetVM *vm);
/* Wrapper around streams */
JANET_API JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods);
JANET_API JanetStream *janet_stream_ext(JanetHandle handle, uint32_t flags, const JanetMethod *methods, size_t size); /* Allow for type punning streams */
JANET_API void janet_stream_close(JanetStream *stream);
JANET_API Janet janet_cfun_stream_close(int32_t argc, Janet *argv);
JANET_API Janet janet_cfun_stream_read(int32_t argc, Janet *argv);
@ -1445,6 +1451,8 @@ JANET_API int32_t janet_abstract_incref(void *abst);
JANET_API int32_t janet_abstract_decref(void *abst);
/* Expose channel utilities */
JanetChannel *janet_channel_make(uint32_t limit);
JanetChannel *janet_channel_make_threaded(uint32_t limit);
JanetChannel *janet_getchannel(const Janet *argv, int32_t n);
JanetChannel *janet_optchannel(const Janet *argv, int32_t argc, int32_t n, JanetChannel *dflt);
JANET_API int janet_channel_give(JanetChannel *channel, Janet x);