mirror of
https://github.com/janet-lang/janet
synced 2025-01-10 15:40:30 +00:00
Remove unneeded book keeping for sub processes.
Since we are not using signals we no longer need some bookkeeping.
This commit is contained in:
parent
c831ecf5d2
commit
788f91a36f
22
examples/async-execute.janet
Normal file
22
examples/async-execute.janet
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
(defn dowork [name n]
|
||||||
|
(print name " starting work...")
|
||||||
|
(os/execute ["sleep" (string n)] :p)
|
||||||
|
(print name " finished work!"))
|
||||||
|
|
||||||
|
# Will be done in parallel
|
||||||
|
(print "starting group A")
|
||||||
|
(ev/call dowork "A 2" 2)
|
||||||
|
(ev/call dowork "A 1" 1)
|
||||||
|
(ev/call dowork "A 3" 3)
|
||||||
|
|
||||||
|
(ev/sleep 4)
|
||||||
|
|
||||||
|
# Will also be done in parallel
|
||||||
|
(print "starting group B")
|
||||||
|
(ev/call dowork "B 2" 2)
|
||||||
|
(ev/call dowork "B 1" 1)
|
||||||
|
(ev/call dowork "B 3" 3)
|
||||||
|
|
||||||
|
(ev/sleep 4)
|
||||||
|
|
||||||
|
(print "all work done")
|
@ -897,7 +897,7 @@ static void janet_ev_handle_selfpipe(void) {
|
|||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
case JANET_SELFPIPE_PROC:
|
case JANET_SELFPIPE_PROC:
|
||||||
janet_schedule_pid(ev.as.proc.pid, ev.as.proc.status);
|
janet_schedule_proc(ev.as.proc.proc, ev.as.proc.status);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -345,21 +345,18 @@ typedef struct {
|
|||||||
|
|
||||||
#ifdef JANET_EV
|
#ifdef JANET_EV
|
||||||
|
|
||||||
JANET_THREAD_LOCAL JanetProc **janet_vm_waiting_procs = NULL;
|
|
||||||
JANET_THREAD_LOCAL size_t janet_vm_proc_count = 0;
|
|
||||||
JANET_THREAD_LOCAL size_t janet_vm_proc_cap = 0;
|
|
||||||
|
|
||||||
/* Structure used to initialize the thread used to call wait
|
/* Structure used to initialize the thread used to call wait
|
||||||
* on child processes */
|
* on child processes */
|
||||||
typedef struct {
|
typedef struct {
|
||||||
pid_t pid;
|
|
||||||
int write_pipe;
|
int write_pipe;
|
||||||
|
JanetProc *proc;
|
||||||
} JanetReaperInit;
|
} JanetReaperInit;
|
||||||
|
|
||||||
static void *janet_thread_waiter(void *ptr) {
|
static void *janet_thread_waiter(void *ptr) {
|
||||||
JanetReaperInit *init = (JanetReaperInit *)ptr;
|
JanetReaperInit *init = (JanetReaperInit *)ptr;
|
||||||
pid_t pid = init->pid;
|
JanetProc *proc = (JanetProc *) init->proc;
|
||||||
int fd = init->write_pipe;
|
int fd = init->write_pipe;
|
||||||
|
pid_t pid = proc->pid;
|
||||||
free(init);
|
free(init);
|
||||||
for (;;) {
|
for (;;) {
|
||||||
int status = 0;
|
int status = 0;
|
||||||
@ -374,7 +371,7 @@ static void *janet_thread_waiter(void *ptr) {
|
|||||||
JanetSelfPipeEvent ev;
|
JanetSelfPipeEvent ev;
|
||||||
ev.tag = JANET_SELFPIPE_PROC;
|
ev.tag = JANET_SELFPIPE_PROC;
|
||||||
ev.as.proc.status = status;
|
ev.as.proc.status = status;
|
||||||
ev.as.proc.pid = which;
|
ev.as.proc.proc = proc;
|
||||||
if (write(fd, &ev, sizeof(ev)) < 0) {
|
if (write(fd, &ev, sizeof(ev)) < 0) {
|
||||||
/* TODO failed to handle signal. */
|
/* TODO failed to handle signal. */
|
||||||
fprintf(stderr, "failed to write event\n");
|
fprintf(stderr, "failed to write event\n");
|
||||||
@ -387,62 +384,30 @@ static void *janet_thread_waiter(void *ptr) {
|
|||||||
/* Map pids to JanetProc to allow for lookup after a call to
|
/* Map pids to JanetProc to allow for lookup after a call to
|
||||||
* waitpid. */
|
* waitpid. */
|
||||||
static void janet_add_waiting_proc(JanetProc *proc) {
|
static void janet_add_waiting_proc(JanetProc *proc) {
|
||||||
if (janet_vm_proc_count == janet_vm_proc_cap) {
|
|
||||||
size_t newcap = (janet_vm_proc_count + 1) * 2;
|
|
||||||
if (newcap < 16) newcap = 16;
|
|
||||||
JanetProc **newprocs = realloc(janet_vm_waiting_procs, sizeof(JanetProc *) * newcap);
|
|
||||||
if (NULL == newprocs) {
|
|
||||||
JANET_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
janet_vm_waiting_procs = newprocs;
|
|
||||||
janet_vm_proc_cap = newcap;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Set proccess group for tracking purposes */
|
|
||||||
pid_t pid = proc->pid;
|
|
||||||
JanetReaperInit *init = malloc(sizeof(JanetReaperInit));
|
JanetReaperInit *init = malloc(sizeof(JanetReaperInit));
|
||||||
if (NULL == init) {
|
if (NULL == init) {
|
||||||
JANET_OUT_OF_MEMORY;
|
JANET_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
init->pid = pid;
|
|
||||||
init->write_pipe = janet_vm_selfpipe[1];
|
init->write_pipe = janet_vm_selfpipe[1];
|
||||||
|
init->proc = proc;
|
||||||
pthread_attr_t attr;
|
pthread_attr_t attr;
|
||||||
pthread_t waiter_thread;
|
pthread_t waiter_thread;
|
||||||
pthread_attr_init(&attr);
|
pthread_attr_init(&attr);
|
||||||
pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
|
pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
|
||||||
int err = pthread_create(&waiter_thread, NULL, janet_thread_waiter, init);
|
int err = pthread_create(&waiter_thread, NULL, janet_thread_waiter, init);
|
||||||
pthread_attr_destroy(&attr);
|
pthread_attr_destroy(&attr);
|
||||||
if (err) {
|
if (err) janet_panicf("%s", strerror(err));
|
||||||
janet_panicf("%s", strerror(err));
|
|
||||||
}
|
|
||||||
pthread_detach(waiter_thread);
|
pthread_detach(waiter_thread);
|
||||||
|
|
||||||
janet_vm_waiting_procs[janet_vm_proc_count++] = proc;
|
|
||||||
janet_gcroot(janet_wrap_abstract(proc));
|
janet_gcroot(janet_wrap_abstract(proc));
|
||||||
janet_ev_inc_refcount();
|
janet_ev_inc_refcount();
|
||||||
}
|
}
|
||||||
|
|
||||||
static void janet_remove_waiting_proc(JanetProc *proc) {
|
static void janet_remove_waiting_proc(JanetProc *proc) {
|
||||||
for (size_t i = 0; i < janet_vm_proc_count; i++) {
|
janet_gcunroot(janet_wrap_abstract(proc));
|
||||||
if (janet_vm_waiting_procs[i] == proc) {
|
janet_ev_dec_refcount();
|
||||||
janet_vm_waiting_procs[i] = janet_vm_waiting_procs[--janet_vm_proc_count];
|
|
||||||
janet_gcunroot(janet_wrap_abstract(proc));
|
|
||||||
janet_ev_dec_refcount();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static JanetProc *janet_lookup_proc(pid_t pid) {
|
void janet_schedule_proc(void *ptr, int status) {
|
||||||
for (size_t i = 0; i < janet_vm_proc_count; i++) {
|
|
||||||
if (janet_vm_waiting_procs[i]->pid == pid) {
|
|
||||||
return janet_vm_waiting_procs[i];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
void janet_schedule_pid(pid_t pid, int status) {
|
|
||||||
/* Use POSIX shell semantics for interpreting signals */
|
/* Use POSIX shell semantics for interpreting signals */
|
||||||
if (WIFEXITED(status)) {
|
if (WIFEXITED(status)) {
|
||||||
status = WEXITSTATUS(status);
|
status = WEXITSTATUS(status);
|
||||||
@ -451,29 +416,24 @@ void janet_schedule_pid(pid_t pid, int status) {
|
|||||||
} else {
|
} else {
|
||||||
status = WTERMSIG(status) + 128;
|
status = WTERMSIG(status) + 128;
|
||||||
}
|
}
|
||||||
JanetProc *proc = janet_lookup_proc(pid);
|
JanetProc *proc = (JanetProc *)ptr;
|
||||||
if (NULL == proc) return;
|
if (NULL == proc) return;
|
||||||
proc->return_code = (int32_t) status;
|
proc->return_code = (int32_t) status;
|
||||||
proc->flags |= JANET_PROC_WAITED;
|
proc->flags |= JANET_PROC_WAITED;
|
||||||
proc->flags &= ~JANET_PROC_WAITING;
|
proc->flags &= ~JANET_PROC_WAITING;
|
||||||
|
janet_remove_waiting_proc(proc);
|
||||||
if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) {
|
if ((status != 0) && (proc->flags & JANET_PROC_ERROR_NONZERO)) {
|
||||||
JanetString s = janet_formatc("command failed with non-zero exit code %d", status);
|
JanetString s = janet_formatc("command failed with non-zero exit code %d", status);
|
||||||
janet_cancel(proc->fiber, janet_wrap_string(s));
|
janet_cancel(proc->fiber, janet_wrap_string(s));
|
||||||
} else {
|
} else {
|
||||||
janet_schedule(proc->fiber, janet_wrap_integer(status));
|
janet_schedule(proc->fiber, janet_wrap_integer(status));
|
||||||
}
|
}
|
||||||
janet_remove_waiting_proc(proc);
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static int janet_proc_gc(void *p, size_t s) {
|
static int janet_proc_gc(void *p, size_t s) {
|
||||||
(void) s;
|
(void) s;
|
||||||
JanetProc *proc = (JanetProc *) p;
|
JanetProc *proc = (JanetProc *) p;
|
||||||
#ifdef JANET_EV
|
|
||||||
if (proc->flags & JANET_PROC_WAITING) {
|
|
||||||
janet_remove_waiting_proc(proc);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
#ifdef JANET_WINDOWS
|
#ifdef JANET_WINDOWS
|
||||||
if (!(proc->flags & JANET_PROC_CLOSED)) {
|
if (!(proc->flags & JANET_PROC_CLOSED)) {
|
||||||
CloseHandle(proc->pHandle);
|
CloseHandle(proc->pHandle);
|
||||||
@ -2213,16 +2173,7 @@ static const JanetReg os_cfuns[] = {
|
|||||||
{NULL, NULL, NULL}
|
{NULL, NULL, NULL}
|
||||||
};
|
};
|
||||||
|
|
||||||
void janet_os_deinit(void) {
|
void janet_os_deinit(void) {}
|
||||||
#ifndef JANET_NO_PROCESSES
|
|
||||||
#ifdef JANET_EV
|
|
||||||
free(janet_vm_waiting_procs);
|
|
||||||
janet_vm_waiting_procs = NULL;
|
|
||||||
janet_vm_proc_count = 0;
|
|
||||||
janet_vm_proc_cap = 0;
|
|
||||||
#endif
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Module entry point */
|
/* Module entry point */
|
||||||
void janet_lib_os(JanetTable *env) {
|
void janet_lib_os(JanetTable *env) {
|
||||||
@ -2235,11 +2186,6 @@ void janet_lib_os(JanetTable *env) {
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
#ifndef JANET_NO_PROCESSES
|
#ifndef JANET_NO_PROCESSES
|
||||||
#ifdef JANET_EV
|
|
||||||
janet_vm_waiting_procs = NULL;
|
|
||||||
janet_vm_proc_count = 0;
|
|
||||||
janet_vm_proc_cap = 0;
|
|
||||||
#endif
|
|
||||||
#endif
|
#endif
|
||||||
janet_core_cfuns(env, NULL, os_cfuns);
|
janet_core_cfuns(env, NULL, os_cfuns);
|
||||||
}
|
}
|
||||||
|
@ -146,7 +146,7 @@ extern const JanetAbstractType janet_address_type;
|
|||||||
void janet_lib_ev(JanetTable *env);
|
void janet_lib_ev(JanetTable *env);
|
||||||
void janet_ev_mark(void);
|
void janet_ev_mark(void);
|
||||||
int janet_make_pipe(JanetHandle handles[2]);
|
int janet_make_pipe(JanetHandle handles[2]);
|
||||||
void janet_schedule_pid(pid_t pid, int status);
|
void janet_schedule_proc(void *proc, int status);
|
||||||
|
|
||||||
/* Single message that is written to self pipe. This is used
|
/* Single message that is written to self pipe. This is used
|
||||||
* to communicate messages inside Janet to work with the event loop.
|
* to communicate messages inside Janet to work with the event loop.
|
||||||
@ -158,7 +158,7 @@ typedef struct {
|
|||||||
union {
|
union {
|
||||||
struct {
|
struct {
|
||||||
int status;
|
int status;
|
||||||
pid_t pid;
|
void *proc;
|
||||||
} proc;
|
} proc;
|
||||||
} as;
|
} as;
|
||||||
} JanetSelfPipeEvent;
|
} JanetSelfPipeEvent;
|
||||||
|
Loading…
Reference in New Issue
Block a user