mirror of
https://github.com/janet-lang/janet
synced 2025-01-14 17:35:41 +00:00
Use PostQueuedCompletionStatus for threaded calls on windows.
Ore efficient than using a self pipe.
This commit is contained in:
parent
61c65f3df1
commit
1c7ed8ca48
268
src/core/ev.c
268
src/core/ev.c
@ -31,13 +31,12 @@
|
|||||||
|
|
||||||
#ifdef JANET_EV
|
#ifdef JANET_EV
|
||||||
|
|
||||||
/* Includes */
|
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
#include <pthread.h>
|
|
||||||
#ifdef JANET_WINDOWS
|
#ifdef JANET_WINDOWS
|
||||||
#include <winsock2.h>
|
#include <winsock2.h>
|
||||||
#include <windows.h>
|
#include <windows.h>
|
||||||
#else
|
#else
|
||||||
|
#include <pthread.h>
|
||||||
#include <limits.h>
|
#include <limits.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
@ -885,14 +884,6 @@ void janet_loop(void) {
|
|||||||
* Self-pipe handling code.
|
* Self-pipe handling code.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/* Structure used to initialize threads in the thread pool. */
|
|
||||||
typedef struct {
|
|
||||||
int write_pipe;
|
|
||||||
JanetEVGenericMessage msg;
|
|
||||||
JanetThreadedSubroutine subr;
|
|
||||||
JanetThreadedCallback cb;
|
|
||||||
} JanetEVThreadInit;
|
|
||||||
|
|
||||||
/* Wrap return value by pairing it with the callback used to handle it
|
/* Wrap return value by pairing it with the callback used to handle it
|
||||||
* in the main thread */
|
* in the main thread */
|
||||||
typedef struct {
|
typedef struct {
|
||||||
@ -900,11 +891,29 @@ typedef struct {
|
|||||||
JanetThreadedCallback cb;
|
JanetThreadedCallback cb;
|
||||||
} JanetSelfPipeEvent;
|
} JanetSelfPipeEvent;
|
||||||
|
|
||||||
|
/* Structure used to initialize threads in the thread pool
|
||||||
|
* (same head structure as self pipe event)*/
|
||||||
|
typedef struct {
|
||||||
|
JanetEVGenericMessage msg;
|
||||||
|
JanetThreadedCallback cb;
|
||||||
|
JanetThreadedSubroutine subr;
|
||||||
|
JanetHandle write_pipe;
|
||||||
|
} JanetEVThreadInit;
|
||||||
|
|
||||||
#ifdef JANET_WINDOWS
|
#ifdef JANET_WINDOWS
|
||||||
|
|
||||||
|
/* On windows, use PostQueuedCompletionStatus instead for
|
||||||
|
* custom events */
|
||||||
|
|
||||||
#else
|
#else
|
||||||
|
|
||||||
JANET_THREAD_LOCAL int janet_vm_selfpipe[2];
|
static JANET_THREAD_LOCAL JanetHandle janet_vm_selfpipe[2];
|
||||||
|
|
||||||
|
static void janet_ev_setup_selfpipe(void) {
|
||||||
|
if (janet_make_pipe(janet_vm_selfpipe)) {
|
||||||
|
JANET_EXIT("failed to initialize self pipe in event loop");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Handle events from the self pipe inside the event loop */
|
/* Handle events from the self pipe inside the event loop */
|
||||||
static void janet_ev_handle_selfpipe(void) {
|
static void janet_ev_handle_selfpipe(void) {
|
||||||
@ -915,15 +924,6 @@ static void janet_ev_handle_selfpipe(void) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void janet_ev_setup_selfpipe(void) {
|
|
||||||
if (-1 == pipe(janet_vm_selfpipe)) goto error;
|
|
||||||
if (fcntl(janet_vm_selfpipe[0], F_SETFL, O_NONBLOCK)) goto error;
|
|
||||||
if (fcntl(janet_vm_selfpipe[1], F_SETFL, O_NONBLOCK)) goto error;
|
|
||||||
return;
|
|
||||||
error:
|
|
||||||
JANET_EXIT("failed to initialize self pipe in event loop");
|
|
||||||
}
|
|
||||||
|
|
||||||
static void janet_ev_cleanup_selfpipe(void) {
|
static void janet_ev_cleanup_selfpipe(void) {
|
||||||
close(janet_vm_selfpipe[0]);
|
close(janet_vm_selfpipe[0]);
|
||||||
close(janet_vm_selfpipe[1]);
|
close(janet_vm_selfpipe[1]);
|
||||||
@ -931,93 +931,6 @@ static void janet_ev_cleanup_selfpipe(void) {
|
|||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static void *janet_thread_body(void *ptr) {
|
|
||||||
JanetEVThreadInit *init = (JanetEVThreadInit *)ptr;
|
|
||||||
int fd = init->write_pipe;
|
|
||||||
JanetEVGenericMessage msg = init->msg;
|
|
||||||
JanetThreadedSubroutine subr = init->subr;
|
|
||||||
JanetThreadedCallback cb = init->cb;
|
|
||||||
free(init);
|
|
||||||
|
|
||||||
JanetSelfPipeEvent response;
|
|
||||||
response.msg = subr(msg);
|
|
||||||
response.cb = cb;
|
|
||||||
|
|
||||||
/* TODO - implement for windows */
|
|
||||||
if (write(fd, &response, sizeof(response)) < 0) {
|
|
||||||
/* TODO failed to handle signal. */
|
|
||||||
fprintf(stderr, "failed to write response\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb) {
|
|
||||||
JanetEVThreadInit *init = malloc(sizeof(JanetEVThreadInit));
|
|
||||||
if (NULL == init) {
|
|
||||||
JANET_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
init->write_pipe = janet_vm_selfpipe[1];
|
|
||||||
init->msg = arguments;
|
|
||||||
init->subr = fp;
|
|
||||||
init->cb = cb;
|
|
||||||
|
|
||||||
/* Create thread - TODO thread pool? */
|
|
||||||
pthread_t waiter_thread;
|
|
||||||
int err = pthread_create(&waiter_thread, NULL, janet_thread_body, init);
|
|
||||||
if (err) {
|
|
||||||
free(init);
|
|
||||||
janet_panicf("%s", strerror(err));
|
|
||||||
}
|
|
||||||
pthread_detach(waiter_thread);
|
|
||||||
|
|
||||||
/* Increment ev refcount so we don't quit while waiting for a subprocess */
|
|
||||||
janet_ev_inc_refcount();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Default callback for janet_ev_threaded_await. */
|
|
||||||
void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) {
|
|
||||||
switch (return_value.tag) {
|
|
||||||
default:
|
|
||||||
case JANET_EV_TCTAG_NIL:
|
|
||||||
janet_schedule(return_value.fiber, janet_wrap_nil());
|
|
||||||
break;
|
|
||||||
case JANET_EV_TCTAG_INTEGER:
|
|
||||||
janet_schedule(return_value.fiber, janet_wrap_integer(return_value.argi));
|
|
||||||
break;
|
|
||||||
case JANET_EV_TCTAG_STRING:
|
|
||||||
case JANET_EV_TCTAG_STRINGF:
|
|
||||||
janet_schedule(return_value.fiber, janet_cstringv((const char *) return_value.argp));
|
|
||||||
if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp);
|
|
||||||
break;
|
|
||||||
case JANET_EV_TCTAG_KEYWORD:
|
|
||||||
janet_schedule(return_value.fiber, janet_ckeywordv((const char *) return_value.argp));
|
|
||||||
break;
|
|
||||||
case JANET_EV_TCTAG_ERR_STRING:
|
|
||||||
case JANET_EV_TCTAG_ERR_STRINGF:
|
|
||||||
janet_cancel(return_value.fiber, janet_cstringv((const char *) return_value.argp));
|
|
||||||
if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp);
|
|
||||||
break;
|
|
||||||
case JANET_EV_TCTAG_ERR_KEYWORD:
|
|
||||||
janet_cancel(return_value.fiber, janet_ckeywordv((const char *) return_value.argp));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
janet_gcunroot(janet_wrap_fiber(return_value.fiber));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* Convenience method for common case */
|
|
||||||
void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) {
|
|
||||||
JanetEVGenericMessage arguments;
|
|
||||||
arguments.tag = tag;
|
|
||||||
arguments.argi = argi;
|
|
||||||
arguments.argp = argp;
|
|
||||||
arguments.fiber = janet_root_fiber();
|
|
||||||
janet_gcroot(janet_wrap_fiber(arguments.fiber));
|
|
||||||
janet_ev_threaded_call(fp, arguments, janet_ev_default_threaded_callback);
|
|
||||||
janet_await();
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef JANET_WINDOWS
|
#ifdef JANET_WINDOWS
|
||||||
|
|
||||||
JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL;
|
JANET_THREAD_LOCAL HANDLE janet_vm_iocp = NULL;
|
||||||
@ -1077,6 +990,12 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
|
|||||||
if (!has_timeout) {
|
if (!has_timeout) {
|
||||||
/* queue emptied */
|
/* queue emptied */
|
||||||
}
|
}
|
||||||
|
} else if (NULL == completionKey) {
|
||||||
|
/* Custom event */
|
||||||
|
JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped);
|
||||||
|
response->cb(response->msg);
|
||||||
|
free(response);
|
||||||
|
janet_ev_dec_refcount();
|
||||||
} else {
|
} else {
|
||||||
/* Normal event */
|
/* Normal event */
|
||||||
JanetStream *stream = (JanetStream *) completionKey;
|
JanetStream *stream = (JanetStream *) completionKey;
|
||||||
@ -1371,9 +1290,134 @@ void janet_ev_deinit(void) {
|
|||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* C API helpers for reading and writing from streams.
|
/*
|
||||||
|
* End poll implementation
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Threaded calls
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifdef JANET_WINDOWS
|
||||||
|
static DWORD WINAPI janet_thread_body(LPVOID ptr) {
|
||||||
|
JanetEVThreadInit *init = (JanetEVThreadInit *)ptr;
|
||||||
|
JanetEVGenericMessage msg = init->msg;
|
||||||
|
JanetThreadedSubroutine subr = init->subr;
|
||||||
|
JanetThreadedCallback cb = init->cb;
|
||||||
|
JanetHandle iocp = init->write_pipe;
|
||||||
|
/* Reuse memory from thread init for returning data */
|
||||||
|
init->msg = subr(msg);
|
||||||
|
init->cb = cb;
|
||||||
|
janet_assert(PostQueuedCompletionStatus(iocp, sizeof(JanetSelfPipeEvent), NULL, init),
|
||||||
|
"failed to post completion event");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
static void *janet_thread_body(void *ptr) {
|
||||||
|
JanetEVThreadInit *init = (JanetEVThreadInit *)ptr;
|
||||||
|
JanetEVGenericMessage msg = init->msg;
|
||||||
|
JanetThreadedSubroutine subr = init->subr;
|
||||||
|
JanetThreadedCallback cb = init->cb;
|
||||||
|
int fd = init->write_pipe;
|
||||||
|
free(init);
|
||||||
|
JanetSelfPipeEvent response;
|
||||||
|
response.msg = subr(msg);
|
||||||
|
response.cb = cb;
|
||||||
|
/* handle a bit of back pressure before giving up. */
|
||||||
|
int tries = 4;
|
||||||
|
while (tries > 0) {
|
||||||
|
int status;
|
||||||
|
do {
|
||||||
|
status = write(fd, &response, sizeof(response));
|
||||||
|
} while (status == -1 && errno == EINTR);
|
||||||
|
if (status > 0) break;
|
||||||
|
sleep(1);
|
||||||
|
tries--;
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
void janet_ev_threaded_call(JanetThreadedSubroutine fp, JanetEVGenericMessage arguments, JanetThreadedCallback cb) {
|
||||||
|
JanetEVThreadInit *init = malloc(sizeof(JanetEVThreadInit));
|
||||||
|
if (NULL == init) {
|
||||||
|
JANET_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
init->msg = arguments;
|
||||||
|
init->subr = fp;
|
||||||
|
init->cb = cb;
|
||||||
|
|
||||||
|
#ifdef JANET_WINDOWS
|
||||||
|
init->write_pipe = janet_vm_iocp;
|
||||||
|
HANDLE thread_handle = CreateThread(NULL, 0, janet_thread_body, init, 0, NULL);
|
||||||
|
if (NULL == thread_handle) {
|
||||||
|
free(init);
|
||||||
|
janet_panic("failed to create thread");
|
||||||
|
}
|
||||||
|
CloseHandle(thread_handle); /* detach from thread */
|
||||||
|
#else
|
||||||
|
init->write_pipe = janet_vm_selfpipe[1];
|
||||||
|
pthread_t waiter_thread;
|
||||||
|
int err = pthread_create(&waiter_thread, NULL, janet_thread_body, init);
|
||||||
|
if (err) {
|
||||||
|
free(init);
|
||||||
|
janet_panicf("%s", strerror(err));
|
||||||
|
}
|
||||||
|
pthread_detach(waiter_thread);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* Increment ev refcount so we don't quit while waiting for a subprocess */
|
||||||
|
janet_ev_inc_refcount();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Default callback for janet_ev_threaded_await. */
|
||||||
|
void janet_ev_default_threaded_callback(JanetEVGenericMessage return_value) {
|
||||||
|
switch (return_value.tag) {
|
||||||
|
default:
|
||||||
|
case JANET_EV_TCTAG_NIL:
|
||||||
|
janet_schedule(return_value.fiber, janet_wrap_nil());
|
||||||
|
break;
|
||||||
|
case JANET_EV_TCTAG_INTEGER:
|
||||||
|
janet_schedule(return_value.fiber, janet_wrap_integer(return_value.argi));
|
||||||
|
break;
|
||||||
|
case JANET_EV_TCTAG_STRING:
|
||||||
|
case JANET_EV_TCTAG_STRINGF:
|
||||||
|
janet_schedule(return_value.fiber, janet_cstringv((const char *) return_value.argp));
|
||||||
|
if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp);
|
||||||
|
break;
|
||||||
|
case JANET_EV_TCTAG_KEYWORD:
|
||||||
|
janet_schedule(return_value.fiber, janet_ckeywordv((const char *) return_value.argp));
|
||||||
|
break;
|
||||||
|
case JANET_EV_TCTAG_ERR_STRING:
|
||||||
|
case JANET_EV_TCTAG_ERR_STRINGF:
|
||||||
|
janet_cancel(return_value.fiber, janet_cstringv((const char *) return_value.argp));
|
||||||
|
if (return_value.tag == JANET_EV_TCTAG_STRINGF) free(return_value.argp);
|
||||||
|
break;
|
||||||
|
case JANET_EV_TCTAG_ERR_KEYWORD:
|
||||||
|
janet_cancel(return_value.fiber, janet_ckeywordv((const char *) return_value.argp));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
janet_gcunroot(janet_wrap_fiber(return_value.fiber));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Convenience method for common case */
|
||||||
|
void janet_ev_threaded_await(JanetThreadedSubroutine fp, int tag, int argi, void *argp) {
|
||||||
|
JanetEVGenericMessage arguments;
|
||||||
|
arguments.tag = tag;
|
||||||
|
arguments.argi = argi;
|
||||||
|
arguments.argp = argp;
|
||||||
|
arguments.fiber = janet_root_fiber();
|
||||||
|
janet_gcroot(janet_wrap_fiber(arguments.fiber));
|
||||||
|
janet_ev_threaded_call(fp, arguments, janet_ev_default_threaded_callback);
|
||||||
|
janet_await();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* C API helpers for reading and writing from streams.
|
||||||
* There is some networking code in here as well as generic
|
* There is some networking code in here as well as generic
|
||||||
* reading and writing primitives. */
|
* reading and writing primitives.
|
||||||
|
*/
|
||||||
|
|
||||||
void janet_stream_flags(JanetStream *stream, uint32_t flags) {
|
void janet_stream_flags(JanetStream *stream, uint32_t flags) {
|
||||||
if (stream->flags & JANET_STREAM_CLOSED) {
|
if (stream->flags & JANET_STREAM_CLOSED) {
|
||||||
@ -1907,7 +1951,13 @@ int janet_make_pipe(JanetHandle handles[2]) {
|
|||||||
return 0;
|
return 0;
|
||||||
#else
|
#else
|
||||||
if (pipe(handles)) return -1;
|
if (pipe(handles)) return -1;
|
||||||
|
if (fcntl(handles[0], F_SETFL, O_NONBLOCK)) goto error;
|
||||||
|
if (fcntl(handles[1], F_SETFL, O_NONBLOCK)) goto error;
|
||||||
return 0;
|
return 0;
|
||||||
|
error:
|
||||||
|
close(handles[0]);
|
||||||
|
close(handles[1]);
|
||||||
|
return -1;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
* Copyright (c) 2020 Calvin Rose
|
* Copyright (c) 2021 Calvin Rose
|
||||||
*
|
*
|
||||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
* of this software and associated documentation files (the "Software"), to
|
* of this software and associated documentation files (the "Software"), to
|
||||||
|
Loading…
Reference in New Issue
Block a user