1
0
mirror of https://github.com/janet-lang/janet synced 2025-01-10 15:40:30 +00:00

Add ev api for making threaded calls.

Easy way to make arbitrary functions in C async.
This commit is contained in:
Calvin Rose 2020-12-31 16:12:42 -06:00
parent 788f91a36f
commit 0a1c93b869
7 changed files with 204 additions and 121 deletions

View File

@ -1,6 +1,6 @@
(defn dowork [name n]
(print name " starting work...")
(os/execute ["sleep" (string n)] :p)
(os/execute [(dyn :executable) "-e" (string "(os/sleep " n ")")])
(print name " finished work!"))
# Will be done in parallel

View File

@ -33,6 +33,7 @@
/* Includes */
#include <math.h>
#include <pthread.h>
#ifdef JANET_WINDOWS
#include <winsock2.h>
#include <windows.h>
@ -883,6 +884,21 @@ void janet_loop(void) {
* Self-pipe handling code.
*/
/* Structure used to initialize threads in the thread pool. */
typedef struct {
int write_pipe;
JanetEVGenericMessage msg;
JanetThreadedSubroutine subr;
JanetThreadedCallback cb;
} JanetEVThreadInit;
/* Wrap return value by pairing it with the callback used to handle it
* in the main thread */
typedef struct {
JanetEVGenericMessage msg;
JanetThreadedCallback cb;
} JanetSelfPipeEvent;
#ifdef JANET_WINDOWS
#else
@ -891,15 +907,10 @@ JANET_THREAD_LOCAL int janet_vm_selfpipe[2];
/* Handle events from the self pipe inside the event loop */
static void janet_ev_handle_selfpipe(void) {
JanetSelfPipeEvent ev;
while (read(janet_vm_selfpipe[0], &ev, sizeof(ev)) > 0) {
switch (ev.tag) {
default:
break;
case JANET_SELFPIPE_PROC:
janet_schedule_proc(ev.as.proc.proc, ev.as.proc.status);
break;
}
JanetSelfPipeEvent response;
while (read(janet_vm_selfpipe[0], &response, sizeof(response)) > 0) {
response.cb(response.msg);
janet_ev_dec_refcount();
}
}
@ -919,6 +930,97 @@ static void janet_ev_cleanup_selfpipe(void) {
#endif
static void *janet_thread_body(void *ptr) {
JanetEVThreadInit *init = (JanetEVThreadInit *)ptr;
int fd = init->write_pipe;
JanetEVGenericMessage msg = init->msg;
JanetThreadedSubroutine subr = init->subr;
JanetThreadedCallback cb = init->cb;
free(init);
JanetSelfPipeEvent response;
response.msg = subr(msg);
response.cb = cb;
/* TODO - implement for windows */
if (write(fd, &response, sizeof(response)) < 0) {
/* TODO failed to handle signal. */
fprintf(stderr, "failed to write response\n");
}
return NULL;
}
void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb) {
JanetEVThreadInit *init = malloc(sizeof(JanetEVThreadInit));
if (NULL == init) {
JANET_OUT_OF_MEMORY;
}
init->write_pipe = janet_vm_selfpipe[1];
init->msg = arguments;
init->subr = fp;
init->cb = cb;
/* Create thread - TODO thread pool? */
pthread_attr_t attr;
pthread_t waiter_thread;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
int err = pthread_create(&waiter_thread, NULL, janet_thread_body, init);
pthread_attr_destroy(&attr);
if (err) {
free(init);
janet_panicf("%s", strerror(err));
}
pthread_detach(waiter_thread);
/* Increment ev refcount so we don't quit while waiting for a subprocess */
janet_ev_inc_refcount();
}
/* Default callback for janet_ev_threaded_await. */
void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) {
switch (return_value.tag) {
default:
case JANET_EV_TCTAG_NIL:
janet_schedule(return_value.fiber, janet_wrap_nil());
break;
case JANET_EV_TCTAG_INTEGER:
janet_schedule(return_value.fiber, janet_wrap_integer(return_value.argi));
break;
case JANET_EV_TCTAG_STRING:
case JANET_EV_TCTAG_STRINGF:
janet_schedule(return_value.fiber, janet_cstringv((const char *) return_value.argp));
if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp);
break;
case JANET_EV_TCTAG_KEYWORD:
janet_schedule(return_value.fiber, janet_ckeywordv((const char *) return_value.argp));
break;
case JANET_EV_TCTAG_ERR_STRING:
case JANET_EV_TCTAG_ERR_STRINGF:
janet_cancel(return_value.fiber, janet_cstringv((const char *) return_value.argp));
if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp);
break;
case JANET_EV_TCTAG_ERR_KEYWORD:
janet_cancel(return_value.fiber, janet_ckeywordv((const char *) return_value.argp));
break;
}
janet_gcunroot(janet_wrap_fiber(return_value.fiber));
}
/* Convenience method for common case */
void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) {
JanetEVGenericMessage arguments;
arguments.tag = tag;
arguments.argi = argi;
arguments.argp = argp;
arguments.fiber = janet_root_fiber();
janet_gcroot(janet_wrap_fiber(arguments.fiber));
janet_ev_threaded_call(fp, arguments, janet_ev_default_threaded_callback);
janet_await();
}
#ifdef JANET_WINDOWS
JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL;

View File

@ -335,7 +335,6 @@ typedef struct {
JanetStream *in;
JanetStream *out;
JanetStream *err;
JanetFiber *fiber;
#else
JanetFile *in;
JanetFile *out;
@ -345,69 +344,22 @@ typedef struct {
#ifdef JANET_EV
/* Structure used to initialize the thread used to call wait
* on child processes */
typedef struct {
int write_pipe;
JanetProc *proc;
} JanetReaperInit;
static void *janet_thread_waiter(void *ptr) {
JanetReaperInit *init = (JanetReaperInit *)ptr;
JanetProc *proc = (JanetProc *) init->proc;
int fd = init->write_pipe;
pid_t pid = proc->pid;
free(init);
for (;;) {
int status = 0;
pid_t which = 0;
do {
which = waitpid(pid, &status, 0);
} while (which == -1 && errno == EINTR);
if (which < 0) {
/* Error, could not wait or no children to wait on. */
break;
} else {
JanetSelfPipeEvent ev;
ev.tag = JANET_SELFPIPE_PROC;
ev.as.proc.status = status;
ev.as.proc.proc = proc;
if (write(fd, &ev, sizeof(ev)) < 0) {
/* TODO failed to handle signal. */
fprintf(stderr, "failed to write event\n");
}
}
}
return NULL;
/* Function that is called in separate thread to wait on a pid */
static JanetEVGenericMessage janet_proc_wait_subr(JanetEVGenericMessage args) {
JanetProc *proc = (JanetProc *) args.argp;
pid_t result;
int status = 0;
do {
result = waitpid(proc->pid, &status, 0);
} while (result == -1 && errno == EINTR);
args.argi = status;
return args;
}
/* Map pids to JanetProc to allow for lookup after a call to
* waitpid. */
static void janet_add_waiting_proc(JanetProc *proc) {
JanetReaperInit *init = malloc(sizeof(JanetReaperInit));
if (NULL == init) {
JANET_OUT_OF_MEMORY;
}
init->write_pipe = janet_vm_selfpipe[1];
init->proc = proc;
pthread_attr_t attr;
pthread_t waiter_thread;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
int err = pthread_create(&waiter_thread, NULL, janet_thread_waiter, init);
pthread_attr_destroy(&attr);
if (err) janet_panicf("%s", strerror(err));
pthread_detach(waiter_thread);
janet_gcroot(janet_wrap_abstract(proc));
janet_ev_inc_refcount();
}
static void janet_remove_waiting_proc(JanetProc *proc) {
janet_gcunroot(janet_wrap_abstract(proc));
janet_ev_dec_refcount();
}
void janet_schedule_proc(void *ptr, int status) {
/* Callback that is called in main thread when subroutine completes. */
static void janet_proc_wait_cb(JanetEVGenericMessage args) {
int status = args.argi;
JanetProc *proc = (JanetProc *) args.argp;
/* Use POSIX shell semantics for interpreting signals */
if (WIFEXITED(status)) {
status = WEXITSTATUS(status);
@ -416,19 +368,21 @@ void janet_schedule_proc(void *ptr, int status) {
} else {
status = WTERMSIG(status) + 128;
}
JanetProc *proc = (JanetProc *)ptr;
if (NULL == proc) return;
proc->return_code = (int32_t) status;
proc->flags |= JANET_PROC_WAITED;
proc->flags &= ~JANET_PROC_WAITING;
janet_remove_waiting_proc(proc);
if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) {
JanetString s = janet_formatc("command failed with non-zero exit code %d", status);
janet_cancel(proc->fiber, janet_wrap_string(s));
} else {
janet_schedule(proc->fiber, janet_wrap_integer(status));
if (NULL != proc) {
proc->return_code = (int32_t) status;
proc->flags |= JANET_PROC_WAITED;
proc->flags &= ~JANET_PROC_WAITING;
janet_gcunroot(janet_wrap_abstract(proc));
janet_gcunroot(janet_wrap_fiber(args.fiber));
if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) {
JanetString s = janet_formatc("command failed with non-zero exit code %d", status);
janet_cancel(args.fiber, janet_wrap_string(s));
} else {
janet_schedule(args.fiber, janet_wrap_integer(status));
}
}
}
#endif
static int janet_proc_gc(void *p, size_t s) {
@ -456,9 +410,6 @@ static int janet_proc_mark(void *p, size_t s) {
if (NULL != proc->in) janet_mark(janet_wrap_abstract(proc->in));
if (NULL != proc->out) janet_mark(janet_wrap_abstract(proc->out));
if (NULL != proc->err) janet_mark(janet_wrap_abstract(proc->err));
#ifdef JANET_EV
if (NULL != proc->fiber) janet_mark(janet_wrap_fiber(proc->fiber));
#endif
return 0;
}
@ -470,10 +421,15 @@ static Janet os_proc_wait_impl(JanetProc *proc) {
janet_panicf("cannot wait twice on a process");
}
#ifdef JANET_EV
/* Event loop implementation */
proc->fiber = janet_root_fiber();
/* Event loop implementation - threaded call */
proc->flags |= JANET_PROC_WAITING;
janet_add_waiting_proc(proc);
JanetEVGenericMessage targs;
memset(&targs, 0, sizeof(targs));
targs.argp = proc;
targs.fiber = janet_root_fiber();
janet_gcroot(janet_wrap_abstract(proc));
janet_gcroot(janet_wrap_fiber(targs.fiber));
janet_ev_threaded_call(janet_proc_wait_subr, targs, janet_proc_wait_cb);
janet_await();
#else
/* Non evented implementation */
@ -905,13 +861,22 @@ static Janet os_execute_impl(int32_t argc, Janet *argv, int is_spawn) {
#else
proc->pid = pid;
#endif
proc->in = get_stdio_for_handle(new_in, orig_in, 0);
proc->out = get_stdio_for_handle(new_out, orig_out, 1);
proc->err = get_stdio_for_handle(new_err, orig_err, 1);
proc->flags = 0;
if (proc->in == NULL || proc->out == NULL || proc->err == NULL) {
janet_panic("failed to construct proc");
proc->in = NULL;
proc->out = NULL;
proc->err = NULL;
if (new_in != JANET_HANDLE_NONE) {
proc->in = get_stdio_for_handle(new_in, orig_in, 0);
if (NULL == proc->in) janet_panic("failed to construct proc");
}
if (new_out != JANET_HANDLE_NONE) {
proc->out = get_stdio_for_handle(new_out, orig_out, 1);
if (NULL == proc->out) janet_panic("failed to construct proc");
}
if (new_err != JANET_HANDLE_NONE) {
proc->err = get_stdio_for_handle(new_err, orig_err, 1);
if (NULL == proc->err) janet_panic("failed to construct proc");
}
proc->flags = 0;
if (janet_flag_at(flags, 2)) {
proc->flags |= JANET_PROC_ERROR_NONZERO;
}
@ -2173,8 +2138,6 @@ static const JanetReg os_cfuns[] = {
{NULL, NULL, NULL}
};
void janet_os_deinit(void) {}
/* Module entry point */
void janet_lib_os(JanetTable *env) {
#if !defined(JANET_REDUCED_OS) && defined(JANET_WINDOWS) && defined(JANET_THREADS)

View File

@ -111,6 +111,4 @@ void janet_ev_init(void);
void janet_ev_deinit(void);
#endif
void janet_os_deinit(void);
#endif /* JANET_STATE_H_defined */

View File

@ -146,27 +146,6 @@ extern const JanetAbstractType janet_address_type;
void janet_lib_ev(JanetTable *env);
void janet_ev_mark(void);
int janet_make_pipe(JanetHandle handles[2]);
void janet_schedule_proc(void *proc, int status);
/* Single message that is written to self pipe. This is used
* to communicate messages inside Janet to work with the event loop.
* Signals and threads will be the main users of this. */
typedef struct {
enum {
JANET_SELFPIPE_PROC
} tag;
union {
struct {
int status;
void *proc;
} proc;
} as;
} JanetSelfPipeEvent;
#ifndef JANET_WINDOWS
extern JANET_THREAD_LOCAL int janet_vm_selfpipe[2];
#endif
#endif
#endif

View File

@ -1505,7 +1505,6 @@ void janet_deinit(void) {
free(janet_vm_traversal_base);
janet_vm_fiber = NULL;
janet_vm_root_fiber = NULL;
janet_os_deinit();
#ifdef JANET_THREADS
janet_threads_deinit();
#endif

View File

@ -1290,6 +1290,48 @@ JANET_API void janet_ev_dec_refcount(void);
/* Get last error from a an IO operation */
JANET_API Janet janet_ev_lasterr(void);
/* Async service for calling a function or syscall in a background thread. This is not
* as efficient in the slightest as using Streams but can be used for arbitrary blocking
* functions and syscalls. */
/* Used to pass data between the main thread and worker threads for simple tasks.
* We could just use a pointer but this prevents malloc/free in the common case
* of only a handful of arguments. */
typedef struct {
int tag;
int argi;
void *argp;
JanetFiber *fiber;
} JanetEVGenericMessage;
/* How to resume or cancel after a threaded call. Not exhaustive of the possible
* ways one might want to resume after returning from a threaded call, but should
* cover most of the common cases. For something more complicated, such as resuming
* with an abstract type or a struct, one should use janet_ev_threaded_call instead
* of janet_ev_threaded_await with a custom callback. */
#define JANET_EV_TCTAG_NIL 0 /* resume with nil */
#define JANET_EV_TCTAG_INTEGER 1 /* resume with janet_wrap_integer(argi) */
#define JANET_EV_TCTAG_STRING 2 /* resume with janet_cstringv((const char *) argp) */
#define JANET_EV_TCTAG_STRINGF 3 /* resume with janet_cstringv((const char *) argp), then call free on argp. */
#define JANET_EV_TCTAG_KEYWORD 4 /* resume with janet_ckeywordv((const char *) argp) */
#define JANET_EV_TCTAG_ERR_STRING 5 /* cancel with janet_cstringv((const char *) argp) */
#define JANET_EV_TCTAG_ERR_STRINGF 6 /* cancel with janet_cstringv((const char *) argp), then call free on argp. */
#define JANET_EV_TCTAG_ERR_KEYWORD 7 /* cancel with janet_ckeywordv((const char *) argp) */
/* Function pointer that is run in the thread pool */
typedef JanetEVGenericMessage(*JanetThreadedSubroutine)(JanetEVGenericMessage arguments);
/* Handler that is run in the main thread with the result of the JanetAsyncSubroutine */
typedef void (*JanetThreadedCallback)(JanetEVGenericMessage return_value);
/* API calls for quickly offloading some work in C to a new thread or thread pool. */
JANET_API void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb);
JANET_API void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp);
/* Callback used by janet_ev_threaded_await */
JANET_API void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value);
/* Read async from a stream */
JANET_API void janet_ev_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);
JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes);