diff --git a/CHANGELOG.md b/CHANGELOG.md index 36cca4e0..a25094d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ All notable changes to this project will be documented in this file. ## Unreleased - ??? +- Add `os/open` if ev is enabled. +- Add `os/pipe` if ev is enabled. - Add `janet_thread_current(void)` to C API - Add integer parsing forms to pegs. This makes parsing many binary protocols easier. - Lots of updates to networking code - now can use epoll (or poll) on linux and IOCP on windows. diff --git a/src/core/ev.c b/src/core/ev.c index 453188c6..a539b8d4 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -1254,7 +1254,7 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { status = ReadFile(s->stream->handle, state->chunk_buf, chunk_size, NULL, &state->overlapped); if (!status && (ERROR_IO_PENDING != WSAGetLastError())) { if (WSAGetLastError() == ERROR_BROKEN_PIPE) { - janet_schedule(s->fiber, janet_wrap_nil()); + janet_schedule(s->fiber, janet_wrap_nil()); } else { janet_cancel(s->fiber, janet_ev_lasterr()); } @@ -1678,60 +1678,6 @@ Janet janet_cfun_stream_write(int32_t argc, Janet *argv) { janet_await(); } -/* For a pipe ID */ -#ifdef JANET_WINDOWS -static volatile long PipeSerialNumber; -#endif - -static Janet cfun_ev_pipe(int32_t argc, Janet *argv) { - (void) argv; - janet_fixarity(argc, 0); -#ifdef JANET_WINDOWS - /* - * On windows, the built in CreatePipe function doesn't support overlapped IO - * so we lift from the windows source code and modify for our own version. - */ - JanetHandle rhandle, whandle; - UCHAR PipeNameBuffer[MAX_PATH]; - sprintf(PipeNameBuffer, - "\\\\.\\Pipe\\JanetExeAnon.%08x.%08x", - GetCurrentProcessId(), - InterlockedIncrement(&PipeSerialNumber)); - rhandle = CreateNamedPipeA( - PipeNameBuffer, - PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_WAIT, - 1, /* Number of pipes */ - 4096, /* Out buffer size */ - 4096, /* In buffer size */ - 120 * 1000, /* Timeout in ms */ - NULL); - if (!rhandle) janet_panicv(janet_ev_lasterr()); - whandle = CreateFileA( - PipeNameBuffer, - GENERIC_WRITE, - 0, - NULL, - OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, - NULL); - if (whandle == INVALID_HANDLE_VALUE) { - Janet x = janet_ev_lasterr(); - CloseHandle(rhandle); - janet_panicv(x); - } - JanetStream *reader = janet_stream(rhandle, JANET_STREAM_READABLE, NULL); - JanetStream *writer = janet_stream(whandle, JANET_STREAM_WRITABLE, NULL); -#else - int fds[2]; - if (pipe(fds)) janet_panicv(janet_ev_lasterr()); - JanetStream *reader = janet_stream(fds[0], JANET_STREAM_READABLE, NULL); - JanetStream *writer = janet_stream(fds[1], JANET_STREAM_WRITABLE, NULL); -#endif - Janet tup[2] = {janet_wrap_abstract(reader), janet_wrap_abstract(writer)}; - return janet_wrap_tuple(janet_tuple_n(tup, 2)); -} - static const JanetReg ev_cfuns[] = { { "ev/call", cfun_ev_call, @@ -1826,13 +1772,6 @@ static const JanetReg ev_cfuns[] = { "completes. Takes an optional timeout in seconds, after which will return nil. " "Returns nil, or raises an error if the write failed.") }, - { - "ev/pipe", cfun_ev_pipe, - JDOC("(ev/pipe)\n\n" - "Create a readable stream and a writable stream that are connected. Returns a two element " - "tuple where the first element is a readable stream and the second element is the writable " - "stream.") - }, {NULL, NULL, NULL} }; diff --git a/src/core/os.c b/src/core/os.c index b393fb52..3e1cba31 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -1291,6 +1291,11 @@ static jmode_t os_getmode(const Janet *argv, int32_t n) { return janet_perm_from_unix(os_get_unix_mode(argv, n)); } +static jmode_t os_optmode(int32_t argc, const Janet *argv, int32_t n, int32_t dflt) { + if (argc > n) return os_getmode(argv, n); + return janet_perm_from_unix(dflt); +} + /* Getters */ static Janet os_stat_dev(jstat_t *st) { return janet_wrap_number(st->st_dev); @@ -1529,6 +1534,211 @@ static Janet os_permission_int(int32_t argc, Janet *argv) { return janet_wrap_integer(os_get_unix_mode(argv, 0)); } +#ifdef JANET_EV +static Janet os_open(int32_t argc, Janet *argv) { + janet_arity(argc, 1, 3); + const char *path = janet_getcstring(argv, 0); + const uint8_t *opt_flags = janet_optkeyword(argv, argc, 1, (const uint8_t *) "r"); + jmode_t mode = os_optmode(argc, argv, 2, 0666); + uint32_t stream_flags = 0; + JanetHandle fd; +#ifdef JANET_WINDOWS + DWORD desiredAccess = 0; + DWORD shareMode = 0; + DWORD creationDisp = 0; + DWORD flagsAndAttributes = FILE_FLAG_OVERLAPPED; + /* We map unix-like open flags to the creationDisp parameter */ + int creatUnix = 0; +#define OCREAT 1 +#define OEXCL 2 +#define OTRUNC 4 + for (const uint8_t *c = opt_flags; *c; c++) { + switch (*c) { + default: + break; + case 'r': + desiredAccess |= GENERIC_READ; + stream_flags |= JANET_STREAM_READABLE; + break; + case 'w': + desiredAccess |= GENERIC_WRITE; + stream_flags |= JANET_STREAM_WRITABLE; + break; + case 'c': + creatUnix |= OCREAT; + break; + case 'e': + creatUnix |= OEXCL; + break; + case 't': + creatUnix |= OTRUNC; + break; + /* Windows only flags */ + case 'D': + shareMode |= FILE_SHARE_DELETE; + break; + case 'R': + shareMode |= FILE_SHARE_READ; + break; + case 'W': + shareMode |= FILE_SHARE_WRITE; + break; + case 'H': + flagsAndAttributes |= FILE_ATTRIBUTE_HIDDEN; + break; + case 'O': + flagsAndAttributes |= FILE_ATTRIBUTE_READONLY; + break; + case 'F': + flagsAndAttributes |= FILE_ATTRIBUTE_OFFLINE; + break; + case 'T': + flagsAndAttributes |= FILE_ATTRIBUTE_TEMPORARY; + break; + case 'd': + flagsAndAttributes |= FILE_FLAG_DELETE_ON_CLOSE; + break; + case 'b': + flagsAndAttributes |= FILE_FLAG_NO_BUFFERING; + break; + /* we could potentially add more here - + * https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea + */ + } + } + switch (creatUnix) { + default: + janet_panic("invalid creation flags"); + case 0: + creationDisp = OPEN_EXISTING; + break; + case OCREAT: + creationDisp = OPEN_ALWAYS; + break; + case OCREAT + OEXCL: + creationDisp = CREATE_NEW; + break; + case OCREAT + OTRUNC: + creationDisp = CREATE_ALWAYS; + break; + case OTRUNC: + creationDisp = TRUNCATE_EXISTING; + break; + } + fd = CreateFileA(path, desiredAccess, shareMode, NULL, creationDisp, flagsAndAttributes, NULL); + if (fd == INVALID_HANDLE_VALUE) janet_panicv(janet_ev_lasterr()); +#else + int open_flags = O_NONBLOCK; +#ifdef JANET_LINUX + open_flags |= O_CLOEXEC; +#endif + for (const uint8_t *c = opt_flags; *c; c++) { + switch (*c) { + default: + break; + case 'r': + open_flags = (open_flags & O_WRONLY) + ? ((open_flags & ~O_WRONLY) | O_RDWR) + : (open_flags | O_RDONLY); + stream_flags |= JANET_STREAM_READABLE; + break; + case 'w': + open_flags = (open_flags & O_RDONLY) + ? ((open_flags & ~O_RDONLY) | O_RDWR) + : (open_flags | O_WRONLY); + stream_flags |= JANET_STREAM_WRITABLE; + break; + case 'c': + open_flags |= O_CREAT; + break; + case 'e': + open_flags |= O_EXCL; + break; + case 't': + open_flags |= O_TRUNC; + break; + /* posix only */ + case 'x': + open_flags |= O_SYNC; + break; + case 'y': + open_flags |= O_DSYNC; + break; + case 'z': + open_flags |= O_RSYNC; + break; + case 'C': + open_flags |= O_NOCTTY; + break; + case 'a': + open_flags |= O_APPEND; + break; + } + } + do { + fd = open(path, open_flags, mode); + } while (fd == -1 && errno == EINTR); + if (fd == -1) janet_panicv(janet_ev_lasterr()); +#endif + return janet_wrap_abstract(janet_stream(fd, stream_flags, NULL)); +} + +/* For a pipe ID */ +#ifdef JANET_WINDOWS +static volatile long PipeSerialNumber; +#endif + +static Janet os_pipe(int32_t argc, Janet *argv) { + (void) argv; + janet_fixarity(argc, 0); +#ifdef JANET_WINDOWS + /* + * On windows, the built in CreatePipe function doesn't support overlapped IO + * so we lift from the windows source code and modify for our own version. + */ + JanetHandle rhandle, whandle; + UCHAR PipeNameBuffer[MAX_PATH]; + sprintf(PipeNameBuffer, + "\\\\.\\Pipe\\JanetPipeFile.%08x.%08x", + GetCurrentProcessId(), + InterlockedIncrement(&PipeSerialNumber)); + rhandle = CreateNamedPipeA( + PipeNameBuffer, + PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_WAIT, + 1, /* Number of pipes */ + 4096, /* Out buffer size */ + 4096, /* In buffer size */ + 120 * 1000, /* Timeout in ms */ + NULL); + if (!rhandle) janet_panicv(janet_ev_lasterr()); + whandle = CreateFileA( + PipeNameBuffer, + GENERIC_WRITE, + 0, + NULL, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, + NULL); + if (whandle == INVALID_HANDLE_VALUE) { + Janet x = janet_ev_lasterr(); + CloseHandle(rhandle); + janet_panicv(x); + } + JanetStream *reader = janet_stream(rhandle, JANET_STREAM_READABLE, NULL); + JanetStream *writer = janet_stream(whandle, JANET_STREAM_WRITABLE, NULL); +#else + int fds[2]; + if (pipe(fds)) janet_panicv(janet_ev_lasterr()); + JanetStream *reader = janet_stream(fds[0], JANET_STREAM_READABLE, NULL); + JanetStream *writer = janet_stream(fds[1], JANET_STREAM_WRITABLE, NULL); +#endif + Janet tup[2] = {janet_wrap_abstract(reader), janet_wrap_abstract(writer)}; + return janet_wrap_tuple(janet_tuple_n(tup, 2)); +} + +#endif + #endif /* JANET_REDUCED_OS */ static const JanetReg os_cfuns[] = { @@ -1798,6 +2008,44 @@ static const JanetReg os_cfuns[] = { JDOC("(os/perm-int bytes)\n\n" "Parse a 9 character permission string and return an integer that can be used by chmod.") }, +#ifdef JANET_EV + { + "os/open", os_open, + JDOC("(os/open path &opt flags mode)\n\n" + "Create a stream from a file, like the POSIX open system call. Returns a new stream. " + "mode should be a file mode as passed to os/chmod, but only if the create flag is given. " + "The default mode is 8r666. " + "Allowed flags are as follows:\n\n" + "\t:r - open this file for reading\n" + "\t:w - open this file for writing\n" + "\t:c - create a new file (O_CREATE)\n" + "\t:e - fail if the file exists (O_EXCL)\n" + "\t:t - shorten an existing file to length 0 (O_TRUNC)\n\n" + "Posix only flags:\n" + "\t:a - append to a file (O_APPEND)\n" + "\t:x - O_DSYNC\n" + "\t:y - O_SYNC\n" + "\t:z - O_RSYNC\n" + "\t:C - O_NOCTTY\n\n" + "Windows only flags:\n" + "\t:R - share reads (FILE_SHARE_READ)\n" + "\t:W - share writes (FILE_SHARE_WRITE)\n" + "\t:D - share deletes (FILE_SHARE_DELETE)\n" + "\t:H - FILE_ATTRIBUTE_HIDDEN\n" + "\t:O - FILE_ATTRIBUTE_READONLY\n" + "\t:F - FILE_ATTRIBUTE_OFFLINE\n" + "\t:T - FILE_ATTRIBUTE_TEMPORARY\n" + "\t:d - FILE_FLAG_DELETE_ON_CLOSE\n" + "\t:b - FILE_FLAG_NO_BUFFERING\n") + }, + { + "os/pipe", os_pipe, + JDOC("(os/pipe)\n\n" + "Create a readable stream and a writable stream that are connected. Returns a two element " + "tuple where the first element is a readable stream and the second element is the writable " + "stream.") + }, +#endif #endif {NULL, NULL, NULL} }; diff --git a/test/suite0009.janet b/test/suite0009.janet index 3fc914bb..97b896ec 100644 --- a/test/suite0009.janet +++ b/test/suite0009.janet @@ -51,7 +51,7 @@ # Create pipe (var pipe-counter 0) -(def [reader writer] (ev/pipe)) +(def [reader writer] (os/pipe)) (ev/spawn (while (ev/read reader 3) (++ pipe-counter))