1
0
mirror of https://github.com/janet-lang/janet synced 2024-12-11 01:10:25 +00:00

More work on ICOP

This commit is contained in:
Calvin Rose 2024-08-16 19:44:07 -05:00
parent 2aedc6beff
commit 06eec06ff0
3 changed files with 73 additions and 14 deletions

View File

@ -316,8 +316,9 @@ static const JanetMethod ev_default_stream_methods[] = {
}; };
/* Create a stream*/ /* Create a stream*/
JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods) { JanetStream *janet_stream_ext(JanetHandle handle, uint32_t flags, const JanetMethod *methods, size_t size) {
JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream)); janet_assert(size >= sizeof(JanetStream), "bad size");
JanetStream *stream = janet_abstract(&janet_stream_type, size);
stream->handle = handle; stream->handle = handle;
stream->flags = flags; stream->flags = flags;
stream->read_fiber = NULL; stream->read_fiber = NULL;
@ -329,6 +330,10 @@ JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod
return stream; return stream;
} }
JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods) {
return janet_stream_ext(handle, flags, methods, sizeof(JanetStream));
}
static void janet_stream_close_impl(JanetStream *stream) { static void janet_stream_close_impl(JanetStream *stream) {
stream->flags |= JANET_STREAM_CLOSED; stream->flags |= JANET_STREAM_CLOSED;
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS

View File

@ -39,9 +39,7 @@ typedef struct {
} JanetWatchFlagName; } JanetWatchFlagName;
typedef struct { typedef struct {
#ifdef JANET_WINDOWS #ifndef JANET_WINDOWS
OVERLAPPED overlapped;
#else
JanetStream *stream; JanetStream *stream;
#endif #endif
JanetTable watch_descriptors; JanetTable watch_descriptors;
@ -286,6 +284,13 @@ static void janet_watcher_init(JanetWatcher *watcher, JanetChannel *channel, uin
watcher->default_flags = default_flags; watcher->default_flags = default_flags;
} }
typedef struct {
JanetStream stream;
OVERLAPPED overlapped;
uint32_t flags;
FILE_NOTIFY_INFORMATION fni;
} OverlappedWatch;
static void janet_watcher_add(JanetWatcher *watcher, const char *path, uint32_t flags) { static void janet_watcher_add(JanetWatcher *watcher, const char *path, uint32_t flags) {
HANDLE handle = CreateFileA(path, GENERIC_READ, HANDLE handle = CreateFileA(path, GENERIC_READ,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
@ -296,25 +301,71 @@ static void janet_watcher_add(JanetWatcher *watcher, const char *path, uint32_t
if (handle == INVALID_HANDLE_VALUE) { if (handle == INVALID_HANDLE_VALUE) {
janet_panicv(janet_ev_lasterr()); janet_panicv(janet_ev_lasterr());
} }
Janet pathv = janet_wrap_cstringv(path); JanetStream *stream = janet_stream_ext(handle, 0, NULL, sizeof(OverlappedWatch));
Janet pointer = janet_wrap_pointer((void *) handle); Janet pathv = janet_cstringv(path);
janet_table_put(&watcher->watch_descriptors, pathv, pointer); stream->flags = flags | watcher->default_flags;
janet_table_put(&watcher->watch_descriptors, pointer, pathv); Janet streamv = janet_wrap_abstract(stream);
janet_table_put(&watcher->watch_descriptors, pathv, streamv);
janet_table_put(&watcher->watch_descriptors, streamv, pathv);
/* TODO - if listening, also listen for this new path */ /* TODO - if listening, also listen for this new path */
} }
static void janet_watcher_remove(JanetWatcher *watcher, const char *path) { static void janet_watcher_remove(JanetWatcher *watcher, const char *path) {
Janet pathv = janet_wrap_cstringv(path); Janet pathv = janet_cstringv(path);
Janet pointer = janet_table_get(&watcher->watch_descriptors, pathv); Janet streamv = janet_table_get(&watcher->watch_descriptors, pathv);
if (janet_checktype(pointer, JANET_NIL)) { if (janet_checktype(streamv, JANET_NIL)) {
janet_panicf("path %v is not being watched", pathv); janet_panicf("path %v is not being watched", pathv);
} }
janet_table_remove(&watcher->watch_descriptors, pathv); janet_table_remove(&watcher->watch_descriptors, pathv);
janet_table_remove(&watcher->watch_descriptors, pointer); janet_table_remove(&watcher->watch_descriptors, streamv);
OverlappedWatch *ow = janet_unwrap_abstract(streamv);
janet_stream_close((JanetStream *) ow);
}
static void watcher_callback_read(JanetFiber *fiber, JanetAsyncEvent event) {
JanetWatcher *watcher = (JanetWatcher *) fiber->ev_state;
char buf[1024];
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_MARK:
janet_mark(janet_wrap_abstract(watcher));
break;
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(fiber, janet_wrap_nil());
fiber->ev_state = NULL;
janet_async_end(fiber);
break;
case JANET_ASYNC_EVENT_ERR:
{
janet_schedule(fiber, janet_wrap_nil());
fiber->ev_state = NULL;
janet_async_end(fiber);
break;
}
break;
}
} }
static void janet_watcher_listen(JanetWatcher *watcher) { static void janet_watcher_listen(JanetWatcher *watcher) {
(void) watcher; for (int32_t i = 0; i < watcher.watch_descriptors.capacity; i++) {
const JanetKV *kv = watcher.watch_descriptors.items + i;
if (!janet_checktype(kv->key, JANET_POINTER)) continue;
OverlappedWatch *ow = janet_unwrap_pointer(kv->key);
Janet pathv = kv->value;
BOOL result = ReadDirecoryChangesW(ow->handle,
&ow->fni,
sizeof(FILE_NOTIFY_INFORMATION),
TRUE,
ow->flags,
NULL,
&ow->overlapped,
NULL);
if (!result) {
janet_panicv(janet_ev_lasterr());
}
}
janet_panic("nyi"); janet_panic("nyi");
} }
@ -360,7 +411,9 @@ static void janet_watcher_listen(JanetWatcher *watcher) {
static int janet_filewatch_mark(void *p, size_t s) { static int janet_filewatch_mark(void *p, size_t s) {
JanetWatcher *watcher = (JanetWatcher *) p; JanetWatcher *watcher = (JanetWatcher *) p;
(void) s; (void) s;
#ifndef JANET_WINDOWS
janet_mark(janet_wrap_abstract(watcher->stream)); janet_mark(janet_wrap_abstract(watcher->stream));
#endif
janet_mark(janet_wrap_abstract(watcher->channel)); janet_mark(janet_wrap_abstract(watcher->channel));
janet_mark(janet_wrap_table(&watcher->watch_descriptors)); janet_mark(janet_wrap_table(&watcher->watch_descriptors));
return 0; return 0;

View File

@ -1419,6 +1419,7 @@ JANET_API void janet_loop1_interrupt(JanetVM *vm);
/* Wrapper around streams */ /* Wrapper around streams */
JANET_API JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods); JANET_API JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod *methods);
JANET_API JanetStream *janet_stream_ext(JanetHandle handle, uint32_t flags, const JanetMethod *methods, size_t size); /* Allow for type punning streams */
JANET_API void janet_stream_close(JanetStream *stream); JANET_API void janet_stream_close(JanetStream *stream);
JANET_API Janet janet_cfun_stream_close(int32_t argc, Janet *argv); JANET_API Janet janet_cfun_stream_close(int32_t argc, Janet *argv);
JANET_API Janet janet_cfun_stream_read(int32_t argc, Janet *argv); JANET_API Janet janet_cfun_stream_read(int32_t argc, Janet *argv);