mirror of
https://github.com/janet-lang/janet
synced 2024-12-26 00:10:27 +00:00
Merge branch 'ev_execute'
This commit is contained in:
commit
115556fcf2
@ -3,6 +3,7 @@ All notable changes to this project will be documented in this file.
|
||||
|
||||
## Unreleased - ???
|
||||
- Add `dflt` argument to find-index.
|
||||
- Deprecate `file/popen` in favor of `os/spawn`.
|
||||
- Add `:all` keyword to `ev/read` and `net/read` to make them more like `file/read`. However, we
|
||||
do not provide any `:line` option as that requires buffering.
|
||||
- Change repl behavior to make Ctrl-C raise SIGINT on posix. The old behavior for Ctrl-C,
|
||||
|
22
examples/async-execute.janet
Normal file
22
examples/async-execute.janet
Normal file
@ -0,0 +1,22 @@
|
||||
(defn dowork [name n]
|
||||
(print name " starting work...")
|
||||
(os/execute [(dyn :executable) "-e" (string "(os/sleep " n ")")])
|
||||
(print name " finished work!"))
|
||||
|
||||
# Will be done in parallel
|
||||
(print "starting group A")
|
||||
(ev/call dowork "A 2" 2)
|
||||
(ev/call dowork "A 1" 1)
|
||||
(ev/call dowork "A 3" 3)
|
||||
|
||||
(ev/sleep 4)
|
||||
|
||||
# Will also be done in parallel
|
||||
(print "starting group B")
|
||||
(ev/call dowork "B 2" 2)
|
||||
(ev/call dowork "B 1" 1)
|
||||
(ev/call dowork "B 3" 3)
|
||||
|
||||
(ev/sleep 4)
|
||||
|
||||
(print "all work done")
|
270
src/core/ev.c
270
src/core/ev.c
@ -31,12 +31,12 @@
|
||||
|
||||
#ifdef JANET_EV
|
||||
|
||||
/* Includes */
|
||||
#include <math.h>
|
||||
#ifdef JANET_WINDOWS
|
||||
#include <winsock2.h>
|
||||
#include <windows.h>
|
||||
#else
|
||||
#include <pthread.h>
|
||||
#include <limits.h>
|
||||
#include <errno.h>
|
||||
#include <unistd.h>
|
||||
@ -48,6 +48,7 @@
|
||||
#include <netinet/tcp.h>
|
||||
#include <netdb.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/wait.h>
|
||||
#ifdef JANET_EV_EPOLL
|
||||
#include <sys/epoll.h>
|
||||
#include <sys/timerfd.h>
|
||||
@ -148,6 +149,7 @@ JANET_THREAD_LOCAL JanetRNG janet_vm_ev_rng;
|
||||
JANET_THREAD_LOCAL JanetListenerState **janet_vm_listeners = NULL;
|
||||
JANET_THREAD_LOCAL size_t janet_vm_listener_count = 0;
|
||||
JANET_THREAD_LOCAL size_t janet_vm_listener_cap = 0;
|
||||
JANET_THREAD_LOCAL size_t janet_vm_extra_listeners = 0;
|
||||
|
||||
/* Get current timestamp (millisecond precision) */
|
||||
static JanetTimestamp ts_now(void);
|
||||
@ -519,6 +521,7 @@ void janet_ev_init_common(void) {
|
||||
/* Common deinit code */
|
||||
void janet_ev_deinit_common(void) {
|
||||
janet_q_deinit(&janet_vm_spawn);
|
||||
free(janet_vm_tq);
|
||||
free(janet_vm_listeners);
|
||||
janet_vm_listeners = NULL;
|
||||
}
|
||||
@ -540,6 +543,14 @@ void janet_addtimeout(double sec) {
|
||||
add_timeout(to);
|
||||
}
|
||||
|
||||
void janet_ev_inc_refcount(void) {
|
||||
janet_vm_extra_listeners++;
|
||||
}
|
||||
|
||||
void janet_ev_dec_refcount(void) {
|
||||
janet_vm_extra_listeners--;
|
||||
}
|
||||
|
||||
/* Channels */
|
||||
|
||||
typedef struct {
|
||||
@ -839,14 +850,16 @@ void janet_loop1(void) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Run scheduled fibers */
|
||||
while (janet_vm_spawn.head != janet_vm_spawn.tail) {
|
||||
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK};
|
||||
janet_q_pop(&janet_vm_spawn, &task, sizeof(task));
|
||||
run_one(task.fiber, task.value, task.sig);
|
||||
}
|
||||
|
||||
/* Poll for events */
|
||||
if (janet_vm_listener_count || janet_vm_tq_count) {
|
||||
if (janet_vm_listener_count || janet_vm_tq_count || janet_vm_extra_listeners) {
|
||||
JanetTimeout to;
|
||||
memset(&to, 0, sizeof(to));
|
||||
int has_timeout;
|
||||
@ -855,18 +868,69 @@ void janet_loop1(void) {
|
||||
pop_timeout(0);
|
||||
}
|
||||
/* Run polling implementation only if pending timeouts or pending events */
|
||||
if (janet_vm_tq_count || janet_vm_listener_count) {
|
||||
if (janet_vm_tq_count || janet_vm_listener_count || janet_vm_extra_listeners) {
|
||||
janet_loop1_impl(has_timeout, to.when);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void janet_loop(void) {
|
||||
while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) {
|
||||
while (janet_vm_listener_count || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count || janet_vm_extra_listeners) {
|
||||
janet_loop1();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Self-pipe handling code.
|
||||
*/
|
||||
|
||||
/* Wrap return value by pairing it with the callback used to handle it
|
||||
* in the main thread */
|
||||
typedef struct {
|
||||
JanetEVGenericMessage msg;
|
||||
JanetThreadedCallback cb;
|
||||
} JanetSelfPipeEvent;
|
||||
|
||||
/* Structure used to initialize threads in the thread pool
|
||||
* (same head structure as self pipe event)*/
|
||||
typedef struct {
|
||||
JanetEVGenericMessage msg;
|
||||
JanetThreadedCallback cb;
|
||||
JanetThreadedSubroutine subr;
|
||||
JanetHandle write_pipe;
|
||||
} JanetEVThreadInit;
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
|
||||
/* On windows, use PostQueuedCompletionStatus instead for
|
||||
* custom events */
|
||||
|
||||
#else
|
||||
|
||||
static JANET_THREAD_LOCAL JanetHandle janet_vm_selfpipe[2];
|
||||
|
||||
static void janet_ev_setup_selfpipe(void) {
|
||||
if (janet_make_pipe(janet_vm_selfpipe)) {
|
||||
JANET_EXIT("failed to initialize self pipe in event loop");
|
||||
}
|
||||
}
|
||||
|
||||
/* Handle events from the self pipe inside the event loop */
|
||||
static void janet_ev_handle_selfpipe(void) {
|
||||
JanetSelfPipeEvent response;
|
||||
while (read(janet_vm_selfpipe[0], &response, sizeof(response)) > 0) {
|
||||
response.cb(response.msg);
|
||||
janet_ev_dec_refcount();
|
||||
}
|
||||
}
|
||||
|
||||
static void janet_ev_cleanup_selfpipe(void) {
|
||||
close(janet_vm_selfpipe[0]);
|
||||
close(janet_vm_selfpipe[1]);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
|
||||
JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL;
|
||||
@ -926,6 +990,12 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
|
||||
if (!has_timeout) {
|
||||
/* queue emptied */
|
||||
}
|
||||
} else if (0 == completionKey) {
|
||||
/* Custom event */
|
||||
JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped);
|
||||
response->cb(response->msg);
|
||||
free(response);
|
||||
janet_ev_dec_refcount();
|
||||
} else {
|
||||
/* Normal event */
|
||||
JanetStream *stream = (JanetStream *) completionKey;
|
||||
@ -1034,8 +1104,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
|
||||
|
||||
/* Step state machines */
|
||||
for (int i = 0; i < ready; i++) {
|
||||
JanetStream *stream = events[i].data.ptr;
|
||||
if (NULL != stream) { /* If NULL, is a timeout */
|
||||
void *p = events[i].data.ptr;
|
||||
if (&janet_vm_timerfd == p) {
|
||||
/* Timer expired, ignore */;
|
||||
} else if (janet_vm_selfpipe == p) {
|
||||
/* Self-pipe handling */
|
||||
janet_ev_handle_selfpipe();
|
||||
} else {
|
||||
JanetStream *stream = p;
|
||||
int mask = events[i].events;
|
||||
JanetListenerState *state = stream->state;
|
||||
state->event = events + i;
|
||||
@ -1066,14 +1142,18 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
|
||||
|
||||
void janet_ev_init(void) {
|
||||
janet_ev_init_common();
|
||||
janet_ev_setup_selfpipe();
|
||||
janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC);
|
||||
janet_vm_timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
|
||||
janet_vm_timer_enabled = 0;
|
||||
if (janet_vm_epoll == -1 || janet_vm_timerfd == -1) goto error;
|
||||
struct epoll_event ev;
|
||||
ev.events = EPOLLIN | EPOLLET;
|
||||
ev.data.ptr = NULL;
|
||||
ev.data.ptr = &janet_vm_timerfd;
|
||||
if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_timerfd, &ev)) goto error;
|
||||
ev.events = EPOLLIN | EPOLLET;
|
||||
ev.data.ptr = janet_vm_selfpipe;
|
||||
if (-1 == epoll_ctl(janet_vm_epoll, EPOLL_CTL_ADD, janet_vm_selfpipe[0], &ev)) goto error;
|
||||
return;
|
||||
error:
|
||||
JANET_EXIT("failed to initialize event loop");
|
||||
@ -1083,6 +1163,7 @@ void janet_ev_deinit(void) {
|
||||
janet_ev_deinit_common();
|
||||
close(janet_vm_epoll);
|
||||
close(janet_vm_timerfd);
|
||||
janet_ev_cleanup_selfpipe();
|
||||
janet_vm_epoll = 0;
|
||||
}
|
||||
|
||||
@ -1119,7 +1200,7 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
|
||||
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
|
||||
size_t newsize = janet_vm_listener_cap;
|
||||
if (newsize > oldsize) {
|
||||
janet_vm_fds = realloc(janet_vm_fds, newsize * sizeof(struct pollfd));
|
||||
janet_vm_fds = realloc(janet_vm_fds, (newsize + 1) * sizeof(struct pollfd));
|
||||
if (NULL == janet_vm_fds) {
|
||||
JANET_OUT_OF_MEMORY;
|
||||
}
|
||||
@ -1128,12 +1209,12 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
|
||||
ev.fd = stream->handle;
|
||||
ev.events = make_poll_events(state->stream->_mask);
|
||||
ev.revents = 0;
|
||||
janet_vm_fds[state->_index] = ev;
|
||||
janet_vm_fds[state->_index + 1] = ev;
|
||||
return state;
|
||||
}
|
||||
|
||||
static void janet_unlisten(JanetListenerState *state) {
|
||||
janet_vm_fds[state->_index] = janet_vm_fds[janet_vm_listener_count - 1];
|
||||
janet_vm_fds[state->_index + 1] = janet_vm_fds[janet_vm_listener_count];
|
||||
janet_unlisten_impl(state);
|
||||
}
|
||||
|
||||
@ -1146,19 +1227,25 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
|
||||
JanetTimestamp now = ts_now();
|
||||
to = now > timeout ? 0 : (int)(timeout - now);
|
||||
}
|
||||
ready = poll(janet_vm_fds, janet_vm_listener_count, to);
|
||||
ready = poll(janet_vm_fds, janet_vm_listener_count + 1, to);
|
||||
} while (ready == -1 && errno == EINTR);
|
||||
if (ready == -1) {
|
||||
JANET_EXIT("failed to poll events");
|
||||
}
|
||||
|
||||
/* Check selfpipe */
|
||||
if (janet_vm_fds[0].revents & POLLIN) {
|
||||
janet_vm_fds[0].revents = 0;
|
||||
janet_ev_handle_selfpipe();
|
||||
}
|
||||
|
||||
/* Step state machines */
|
||||
for (size_t i = 0; i < janet_vm_listener_count; i++) {
|
||||
struct pollfd *pfd = janet_vm_fds + i;
|
||||
struct pollfd *pfd = janet_vm_fds + i + 1;
|
||||
/* Skip fds where nothing interesting happened */
|
||||
JanetListenerState *state = janet_vm_listeners[i];
|
||||
/* Normal event */
|
||||
int mask = janet_vm_fds[i].revents;
|
||||
int mask = pfd->revents;
|
||||
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
|
||||
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
|
||||
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
|
||||
@ -1183,20 +1270,157 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
|
||||
void janet_ev_init(void) {
|
||||
janet_ev_init_common();
|
||||
janet_vm_fds = NULL;
|
||||
janet_ev_setup_selfpipe();
|
||||
janet_vm_fds = malloc(sizeof(struct pollfd));
|
||||
if (NULL == janet_vm_fds) {
|
||||
JANET_OUT_OF_MEMORY;
|
||||
}
|
||||
janet_vm_fds[0].fd = janet_vm_selfpipe[0];
|
||||
janet_vm_fds[0].events = POLLIN;
|
||||
janet_vm_fds[0].revents = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
void janet_ev_deinit(void) {
|
||||
janet_ev_deinit_common();
|
||||
janet_ev_cleanup_selfpipe();
|
||||
free(janet_vm_fds);
|
||||
janet_vm_fds = NULL;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/* C API helpers for reading and writing from streams.
|
||||
/*
|
||||
* End poll implementation
|
||||
*/
|
||||
|
||||
/*
|
||||
* Threaded calls
|
||||
*/
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
static DWORD WINAPI janet_thread_body(LPVOID ptr) {
|
||||
JanetEVThreadInit *init = (JanetEVThreadInit *)ptr;
|
||||
JanetEVGenericMessage msg = init->msg;
|
||||
JanetThreadedSubroutine subr = init->subr;
|
||||
JanetThreadedCallback cb = init->cb;
|
||||
JanetHandle iocp = init->write_pipe;
|
||||
/* Reuse memory from thread init for returning data */
|
||||
init->msg = subr(msg);
|
||||
init->cb = cb;
|
||||
janet_assert(PostQueuedCompletionStatus(iocp,
|
||||
sizeof(JanetSelfPipeEvent),
|
||||
0,
|
||||
(LPOVERLAPPED) init),
|
||||
"failed to post completion event");
|
||||
return 0;
|
||||
}
|
||||
#else
|
||||
static void *janet_thread_body(void *ptr) {
|
||||
JanetEVThreadInit *init = (JanetEVThreadInit *)ptr;
|
||||
JanetEVGenericMessage msg = init->msg;
|
||||
JanetThreadedSubroutine subr = init->subr;
|
||||
JanetThreadedCallback cb = init->cb;
|
||||
int fd = init->write_pipe;
|
||||
free(init);
|
||||
JanetSelfPipeEvent response;
|
||||
response.msg = subr(msg);
|
||||
response.cb = cb;
|
||||
/* handle a bit of back pressure before giving up. */
|
||||
int tries = 4;
|
||||
while (tries > 0) {
|
||||
int status;
|
||||
do {
|
||||
status = write(fd, &response, sizeof(response));
|
||||
} while (status == -1 && errno == EINTR);
|
||||
if (status > 0) break;
|
||||
sleep(1);
|
||||
tries--;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb) {
|
||||
JanetEVThreadInit *init = malloc(sizeof(JanetEVThreadInit));
|
||||
if (NULL == init) {
|
||||
JANET_OUT_OF_MEMORY;
|
||||
}
|
||||
init->msg = arguments;
|
||||
init->subr = fp;
|
||||
init->cb = cb;
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
init->write_pipe = janet_vm_iocp;
|
||||
HANDLE thread_handle = CreateThread(NULL, 0, janet_thread_body, init, 0, NULL);
|
||||
if (NULL == thread_handle) {
|
||||
free(init);
|
||||
janet_panic("failed to create thread");
|
||||
}
|
||||
CloseHandle(thread_handle); /* detach from thread */
|
||||
#else
|
||||
init->write_pipe = janet_vm_selfpipe[1];
|
||||
pthread_t waiter_thread;
|
||||
int err = pthread_create(&waiter_thread, NULL, janet_thread_body, init);
|
||||
if (err) {
|
||||
free(init);
|
||||
janet_panicf("%s", strerror(err));
|
||||
}
|
||||
pthread_detach(waiter_thread);
|
||||
#endif
|
||||
|
||||
/* Increment ev refcount so we don't quit while waiting for a subprocess */
|
||||
janet_ev_inc_refcount();
|
||||
}
|
||||
|
||||
/* Default callback for janet_ev_threaded_await. */
|
||||
void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) {
|
||||
switch (return_value.tag) {
|
||||
default:
|
||||
case JANET_EV_TCTAG_NIL:
|
||||
janet_schedule(return_value.fiber, janet_wrap_nil());
|
||||
break;
|
||||
case JANET_EV_TCTAG_INTEGER:
|
||||
janet_schedule(return_value.fiber, janet_wrap_integer(return_value.argi));
|
||||
break;
|
||||
case JANET_EV_TCTAG_STRING:
|
||||
case JANET_EV_TCTAG_STRINGF:
|
||||
janet_schedule(return_value.fiber, janet_cstringv((const char *) return_value.argp));
|
||||
if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp);
|
||||
break;
|
||||
case JANET_EV_TCTAG_KEYWORD:
|
||||
janet_schedule(return_value.fiber, janet_ckeywordv((const char *) return_value.argp));
|
||||
break;
|
||||
case JANET_EV_TCTAG_ERR_STRING:
|
||||
case JANET_EV_TCTAG_ERR_STRINGF:
|
||||
janet_cancel(return_value.fiber, janet_cstringv((const char *) return_value.argp));
|
||||
if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp);
|
||||
break;
|
||||
case JANET_EV_TCTAG_ERR_KEYWORD:
|
||||
janet_cancel(return_value.fiber, janet_ckeywordv((const char *) return_value.argp));
|
||||
break;
|
||||
}
|
||||
janet_gcunroot(janet_wrap_fiber(return_value.fiber));
|
||||
}
|
||||
|
||||
|
||||
/* Convenience method for common case */
|
||||
void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) {
|
||||
JanetEVGenericMessage arguments;
|
||||
arguments.tag = tag;
|
||||
arguments.argi = argi;
|
||||
arguments.argp = argp;
|
||||
arguments.fiber = janet_root_fiber();
|
||||
janet_gcroot(janet_wrap_fiber(arguments.fiber));
|
||||
janet_ev_threaded_call(fp, arguments, janet_ev_default_threaded_callback);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
/*
|
||||
* C API helpers for reading and writing from streams.
|
||||
* There is some networking code in here as well as generic
|
||||
* reading and writing primitives. */
|
||||
* reading and writing primitives.
|
||||
*/
|
||||
|
||||
void janet_stream_flags(JanetStream *stream, uint32_t flags) {
|
||||
if (stream->flags & JANET_STREAM_CLOSED) {
|
||||
@ -1730,7 +1954,13 @@ int janet_make_pipe(JanetHandle handles[2]) {
|
||||
return 0;
|
||||
#else
|
||||
if (pipe(handles)) return -1;
|
||||
if (fcntl(handles[0], F_SETFL, O_NONBLOCK)) goto error;
|
||||
if (fcntl(handles[1], F_SETFL, O_NONBLOCK)) goto error;
|
||||
return 0;
|
||||
error:
|
||||
close(handles[0]);
|
||||
close(handles[1]);
|
||||
return -1;
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -1755,9 +1985,7 @@ static Janet cfun_ev_call(int32_t argc, Janet *argv) {
|
||||
return janet_wrap_fiber(fiber);
|
||||
}
|
||||
|
||||
static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 1);
|
||||
double sec = janet_getnumber(argv, 0);
|
||||
JANET_NO_RETURN void janet_sleep_await(double sec) {
|
||||
JanetTimeout to;
|
||||
to.when = ts_delta(ts_now(), sec);
|
||||
to.fiber = janet_vm_root_fiber;
|
||||
@ -1768,6 +1996,12 @@ static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 1);
|
||||
double sec = janet_getnumber(argv, 0);
|
||||
janet_sleep_await(sec);
|
||||
}
|
||||
|
||||
static Janet cfun_ev_deadline(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 1, 3);
|
||||
double sec = janet_getnumber(argv, 0);
|
||||
|
@ -777,7 +777,7 @@ static const JanetReg io_cfuns[] = {
|
||||
#ifndef JANET_NO_PROCESSES
|
||||
{
|
||||
"file/popen", cfun_io_popen,
|
||||
JDOC("(file/popen command &opt mode)\n\n"
|
||||
JDOC("(file/popen command &opt mode) (DEPRECATED for os/spawn)\n\n"
|
||||
"Open a file that is backed by a process. The file must be opened in either "
|
||||
"the :r (read) or the :w (write) mode. In :r mode, the stdout of the "
|
||||
"process can be read from the file. In :w mode, the stdin of the process "
|
||||
|
142
src/core/os.c
142
src/core/os.c
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Calvin Rose
|
||||
* Copyright (c) 2021 Calvin Rose
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to
|
||||
@ -185,6 +185,7 @@ static Janet os_exit(int32_t argc, Janet *argv) {
|
||||
#ifndef JANET_REDUCED_OS
|
||||
|
||||
#ifndef JANET_NO_PROCESSES
|
||||
|
||||
/* Get env for os_execute */
|
||||
static char **os_execute_env(int32_t argc, const Janet *argv) {
|
||||
char **envp = NULL;
|
||||
@ -319,13 +320,15 @@ static JanetBuffer *os_exec_escape(JanetView args) {
|
||||
static const JanetAbstractType ProcAT;
|
||||
#define JANET_PROC_CLOSED 1
|
||||
#define JANET_PROC_WAITED 2
|
||||
#define JANET_PROC_WAITING 4
|
||||
#define JANET_PROC_ERROR_NONZERO 8
|
||||
typedef struct {
|
||||
int flags;
|
||||
#ifdef JANET_WINDOWS
|
||||
HANDLE pHandle;
|
||||
HANDLE tHandle;
|
||||
#else
|
||||
int pid;
|
||||
pid_t pid;
|
||||
#endif
|
||||
int return_code;
|
||||
#ifdef JANET_EV
|
||||
@ -339,6 +342,62 @@ typedef struct {
|
||||
#endif
|
||||
} JanetProc;
|
||||
|
||||
#ifdef JANET_EV
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
|
||||
static JanetEVGenericMessage janet_proc_wait_subr(JanetEVGenericMessage args) {
|
||||
JanetProc *proc = (JanetProc *) args.argp;
|
||||
WaitForSingleObject(proc->pHandle, INFINITE);
|
||||
GetExitCodeProcess(proc->pHandle, &args.argi);
|
||||
return args;
|
||||
}
|
||||
|
||||
#else /* windows check */
|
||||
|
||||
/* Function that is called in separate thread to wait on a pid */
|
||||
static JanetEVGenericMessage janet_proc_wait_subr(JanetEVGenericMessage args) {
|
||||
JanetProc *proc = (JanetProc *) args.argp;
|
||||
pid_t result;
|
||||
int status = 0;
|
||||
do {
|
||||
result = waitpid(proc->pid, &status, 0);
|
||||
} while (result == -1 && errno == EINTR);
|
||||
/* Use POSIX shell semantics for interpreting signals */
|
||||
if (WIFEXITED(status)) {
|
||||
status = WEXITSTATUS(status);
|
||||
} else if (WIFSTOPPED(status)) {
|
||||
status = WSTOPSIG(status) + 128;
|
||||
} else {
|
||||
status = WTERMSIG(status) + 128;
|
||||
}
|
||||
args.argi = status;
|
||||
return args;
|
||||
}
|
||||
|
||||
#endif /* End windows check */
|
||||
|
||||
/* Callback that is called in main thread when subroutine completes. */
|
||||
static void janet_proc_wait_cb(JanetEVGenericMessage args) {
|
||||
int status = args.argi;
|
||||
JanetProc *proc = (JanetProc *) args.argp;
|
||||
if (NULL != proc) {
|
||||
proc->return_code = (int32_t) status;
|
||||
proc->flags |= JANET_PROC_WAITED;
|
||||
proc->flags &= ~JANET_PROC_WAITING;
|
||||
janet_gcunroot(janet_wrap_abstract(proc));
|
||||
janet_gcunroot(janet_wrap_fiber(args.fiber));
|
||||
if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) {
|
||||
JanetString s = janet_formatc("command failed with non-zero exit code %d", status);
|
||||
janet_cancel(args.fiber, janet_wrap_string(s));
|
||||
} else {
|
||||
janet_schedule(args.fiber, janet_wrap_integer(status));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif /* End ev check */
|
||||
|
||||
static int janet_proc_gc(void *p, size_t s) {
|
||||
(void) s;
|
||||
JanetProc *proc = (JanetProc *) p;
|
||||
@ -367,10 +426,28 @@ static int janet_proc_mark(void *p, size_t s) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static Janet os_proc_wait_impl(JanetProc *proc) {
|
||||
if (proc->flags & JANET_PROC_WAITED) {
|
||||
janet_panicf("cannot wait on process that has already finished");
|
||||
#ifdef JANET_EV
|
||||
static JANET_NO_RETURN void
|
||||
#else
|
||||
static Janet
|
||||
#endif
|
||||
os_proc_wait_impl(JanetProc *proc) {
|
||||
if (proc->flags & (JANET_PROC_WAITED | JANET_PROC_WAITING)) {
|
||||
janet_panicf("cannot wait twice on a process");
|
||||
}
|
||||
#ifdef JANET_EV
|
||||
/* Event loop implementation - threaded call */
|
||||
proc->flags |= JANET_PROC_WAITING;
|
||||
JanetEVGenericMessage targs;
|
||||
memset(&targs, 0, sizeof(targs));
|
||||
targs.argp = proc;
|
||||
targs.fiber = janet_root_fiber();
|
||||
janet_gcroot(janet_wrap_abstract(proc));
|
||||
janet_gcroot(janet_wrap_fiber(targs.fiber));
|
||||
janet_ev_threaded_call(janet_proc_wait_subr, targs, janet_proc_wait_cb);
|
||||
janet_await();
|
||||
#else
|
||||
/* Non evented implementation */
|
||||
proc->flags |= JANET_PROC_WAITED;
|
||||
int status = 0;
|
||||
#ifdef JANET_WINDOWS
|
||||
@ -386,6 +463,7 @@ static Janet os_proc_wait_impl(JanetProc *proc) {
|
||||
#endif
|
||||
proc->return_code = (int32_t) status;
|
||||
return janet_wrap_integer(proc->return_code);
|
||||
#endif
|
||||
}
|
||||
|
||||
static Janet os_proc_wait(int32_t argc, Janet *argv) {
|
||||
@ -481,7 +559,7 @@ static int janet_proc_get(void *p, Janet key, Janet *out) {
|
||||
return 1;
|
||||
}
|
||||
if (janet_keyeq(key, "err")) {
|
||||
*out = (NULL == proc->out) ? janet_wrap_nil() : janet_wrap_abstract(proc->err);
|
||||
*out = (NULL == proc->err) ? janet_wrap_nil() : janet_wrap_abstract(proc->err);
|
||||
return 1;
|
||||
}
|
||||
if ((-1 != proc->return_code) && janet_keyeq(key, "return-code")) {
|
||||
@ -575,7 +653,7 @@ static JanetFile *get_stdio_for_handle(JanetHandle handle, void *orig, int iswri
|
||||
}
|
||||
#endif
|
||||
|
||||
static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) {
|
||||
static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) {
|
||||
janet_arity(argc, 1, 3);
|
||||
|
||||
/* Get flags */
|
||||
@ -713,7 +791,7 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) {
|
||||
tHandle = processInfo.hThread;
|
||||
|
||||
/* Wait and cleanup immedaitely */
|
||||
if (!is_async) {
|
||||
if (!is_spawn) {
|
||||
DWORD code;
|
||||
WaitForSingleObject(pHandle, INFINITE);
|
||||
GetExitCodeProcess(pHandle, &code);
|
||||
@ -781,25 +859,15 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) {
|
||||
if (status) {
|
||||
os_execute_cleanup(envp, child_argv);
|
||||
janet_panicf("%p: %s", argv[0], strerror(errno));
|
||||
} else if (is_async) {
|
||||
} else if (is_spawn) {
|
||||
/* Get process handle */
|
||||
os_execute_cleanup(envp, child_argv);
|
||||
} else {
|
||||
/* Wait to complete */
|
||||
waitpid(pid, &status, 0);
|
||||
os_execute_cleanup(envp, child_argv);
|
||||
/* Use POSIX shell semantics for interpreting signals */
|
||||
if (WIFEXITED(status)) {
|
||||
status = WEXITSTATUS(status);
|
||||
} else if (WIFSTOPPED(status)) {
|
||||
status = WSTOPSIG(status) + 128;
|
||||
} else {
|
||||
status = WTERMSIG(status) + 128;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
if (is_async) {
|
||||
JanetProc *proc = janet_abstract(&ProcAT, sizeof(JanetProc));
|
||||
proc->return_code = -1;
|
||||
#ifdef JANET_WINDOWS
|
||||
@ -808,18 +876,34 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_async) {
|
||||
#else
|
||||
proc->pid = pid;
|
||||
#endif
|
||||
proc->in = get_stdio_for_handle(new_in, orig_in, 1);
|
||||
proc->out = get_stdio_for_handle(new_out, orig_out, 0);
|
||||
proc->err = get_stdio_for_handle(new_err, orig_err, 0);
|
||||
proc->flags = 0;
|
||||
if (proc->in == NULL || proc->out == NULL || proc->err == NULL) {
|
||||
janet_panic("failed to construct proc");
|
||||
proc->in = NULL;
|
||||
proc->out = NULL;
|
||||
proc->err = NULL;
|
||||
if (new_in != JANET_HANDLE_NONE) {
|
||||
proc->in = get_stdio_for_handle(new_in, orig_in, 0);
|
||||
if (NULL == proc->in) janet_panic("failed to construct proc");
|
||||
}
|
||||
if (new_out != JANET_HANDLE_NONE) {
|
||||
proc->out = get_stdio_for_handle(new_out, orig_out, 1);
|
||||
if (NULL == proc->out) janet_panic("failed to construct proc");
|
||||
}
|
||||
if (new_err != JANET_HANDLE_NONE) {
|
||||
proc->err = get_stdio_for_handle(new_err, orig_err, 1);
|
||||
if (NULL == proc->err) janet_panic("failed to construct proc");
|
||||
}
|
||||
proc->flags = 0;
|
||||
if (janet_flag_at(flags, 2)) {
|
||||
proc->flags |= JANET_PROC_ERROR_NONZERO;
|
||||
}
|
||||
|
||||
if (is_spawn) {
|
||||
return janet_wrap_abstract(proc);
|
||||
} else if (janet_flag_at(flags, 2) && status) {
|
||||
janet_panicf("command failed with non-zero exit code %d", status);
|
||||
} else {
|
||||
return janet_wrap_integer(status);
|
||||
#ifdef JANET_EV
|
||||
os_proc_wait_impl(proc);
|
||||
#else
|
||||
return os_proc_wait_impl(proc);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
@ -2078,6 +2162,8 @@ void janet_lib_os(JanetTable *env) {
|
||||
InitializeCriticalSection(&env_lock);
|
||||
env_lock_initialized = 1;
|
||||
}
|
||||
#endif
|
||||
#ifndef JANET_NO_PROCESSES
|
||||
#endif
|
||||
janet_core_cfuns(env, NULL, os_cfuns);
|
||||
}
|
||||
|
@ -1478,7 +1478,6 @@ int janet_init(void) {
|
||||
janet_vm_fiber = NULL;
|
||||
janet_vm_root_fiber = NULL;
|
||||
janet_vm_stackn = 0;
|
||||
/* Threads */
|
||||
#ifdef JANET_THREADS
|
||||
janet_threads_init();
|
||||
#endif
|
||||
|
@ -1279,14 +1279,59 @@ JANET_API JanetListenerState *janet_listen(JanetStream *stream, JanetListener be
|
||||
|
||||
/* Shorthand for yielding to event loop in C */
|
||||
JANET_NO_RETURN JANET_API void janet_await(void);
|
||||
JANET_NO_RETURN JANET_API void janet_sleep_await(double sec);
|
||||
|
||||
/* For use inside listeners - adds a timeout to the current fiber, such that
|
||||
* it will be resumed after sec seconds if no other event schedules the current fiber. */
|
||||
JANET_API void janet_addtimeout(double sec);
|
||||
JANET_API void janet_ev_inc_refcount(void);
|
||||
JANET_API void janet_ev_dec_refcount(void);
|
||||
|
||||
/* Get last error from a an IO operation */
|
||||
JANET_API Janet janet_ev_lasterr(void);
|
||||
|
||||
/* Async service for calling a function or syscall in a background thread. This is not
|
||||
* as efficient in the slightest as using Streams but can be used for arbitrary blocking
|
||||
* functions and syscalls. */
|
||||
|
||||
/* Used to pass data between the main thread and worker threads for simple tasks.
|
||||
* We could just use a pointer but this prevents malloc/free in the common case
|
||||
* of only a handful of arguments. */
|
||||
typedef struct {
|
||||
int tag;
|
||||
int argi;
|
||||
void *argp;
|
||||
JanetFiber *fiber;
|
||||
} JanetEVGenericMessage;
|
||||
|
||||
/* How to resume or cancel after a threaded call. Not exhaustive of the possible
|
||||
* ways one might want to resume after returning from a threaded call, but should
|
||||
* cover most of the common cases. For something more complicated, such as resuming
|
||||
* with an abstract type or a struct, one should use janet_ev_threaded_call instead
|
||||
* of janet_ev_threaded_await with a custom callback. */
|
||||
|
||||
#define JANET_EV_TCTAG_NIL 0 /* resume with nil */
|
||||
#define JANET_EV_TCTAG_INTEGER 1 /* resume with janet_wrap_integer(argi) */
|
||||
#define JANET_EV_TCTAG_STRING 2 /* resume with janet_cstringv((const char *) argp) */
|
||||
#define JANET_EV_TCTAG_STRINGF 3 /* resume with janet_cstringv((const char *) argp), then call free on argp. */
|
||||
#define JANET_EV_TCTAG_KEYWORD 4 /* resume with janet_ckeywordv((const char *) argp) */
|
||||
#define JANET_EV_TCTAG_ERR_STRING 5 /* cancel with janet_cstringv((const char *) argp) */
|
||||
#define JANET_EV_TCTAG_ERR_STRINGF 6 /* cancel with janet_cstringv((const char *) argp), then call free on argp. */
|
||||
#define JANET_EV_TCTAG_ERR_KEYWORD 7 /* cancel with janet_ckeywordv((const char *) argp) */
|
||||
|
||||
/* Function pointer that is run in the thread pool */
|
||||
typedef JanetEVGenericMessage(*JanetThreadedSubroutine)(JanetEVGenericMessage arguments);
|
||||
|
||||
/* Handler that is run in the main thread with the result of the JanetAsyncSubroutine */
|
||||
typedef void (*JanetThreadedCallback)(JanetEVGenericMessage return_value);
|
||||
|
||||
/* API calls for quickly offloading some work in C to a new thread or thread pool. */
|
||||
JANET_API void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb);
|
||||
JANET_API void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp);
|
||||
|
||||
/* Callback used by janet_ev_threaded_await */
|
||||
JANET_API void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value);
|
||||
|
||||
/* Read async from a stream */
|
||||
JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
|
||||
JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
|
||||
|
@ -40,7 +40,7 @@
|
||||
# or else the first read can fail. Might be a strange windows
|
||||
# "bug", but needs further investigating. Otherwise, `build_win test`
|
||||
# can sometimes fail on windows, leading to flaky testing.
|
||||
(ev/sleep 0.2)
|
||||
(ev/sleep 0.3)
|
||||
|
||||
(defn test-echo [msg]
|
||||
(with [conn (net/connect "127.0.0.1" "8000")]
|
||||
@ -59,6 +59,7 @@
|
||||
(var pipe-counter 0)
|
||||
(def chan (ev/chan 10))
|
||||
(let [[reader writer] (os/pipe)]
|
||||
(ev/sleep 0.3)
|
||||
(ev/spawn
|
||||
(while (ev/read reader 3)
|
||||
(++ pipe-counter))
|
||||
|
Loading…
Reference in New Issue
Block a user