Threading improvements.

- Add thread/exit to kill the current thread.
- Add global lock aroung custom getline and add atexit handler
- to prevent any possible issues when exiting program.
- Allow sending stderr, stdout, and stdin over thread.
This commit is contained in:
Calvin Rose 2020-07-03 16:19:05 -05:00
parent dc259b9f8e
commit 617ec7f565
5 changed files with 119 additions and 34 deletions

View File

@ -27,12 +27,26 @@
#include "fiber.h"
#endif
#ifndef JANET_SINGLE_THREADED
#ifndef JANET_WINDOWS
#include <pthread.h>
#else
#include <windows.h>
#endif
#endif
JANET_NO_RETURN static void janet_top_level_signal(const char *msg) {
#ifdef JANET_TOP_LEVEL_SIGNAL
JANET_TOP_LEVEL_SIGNAL(msg);
#else
fputs(msg, stdout);
exit(1);
# ifdef JANET_SINGLE_THREADED
exit(-1);
# elif defined(JANET_WINDOWS)
ExitThread(-1);
# else
pthread_exit(NULL);
# endif
#endif
}

View File

@ -37,18 +37,23 @@
static int cfun_io_gc(void *p, size_t len);
static int io_file_get(void *p, Janet key, Janet *out);
static void io_file_marshal(void *p, JanetMarshalContext *ctx);
static void *io_file_unmarshal(JanetMarshalContext *ctx);
const JanetAbstractType janet_file_type = {
"core/file",
cfun_io_gc,
NULL,
io_file_get,
JANET_ATEND_GET
NULL,
io_file_marshal,
io_file_unmarshal,
JANET_ATEND_UNMARSHAL
};
/* Check arguments to fopen */
static int checkflags(const uint8_t *str) {
int flags = 0;
static int32_t checkflags(const uint8_t *str) {
int32_t flags = 0;
int32_t i;
int32_t len = janet_string_length(str);
if (!len || len > 3)
@ -85,7 +90,7 @@ static int checkflags(const uint8_t *str) {
return flags;
}
static Janet makef(FILE *f, int flags) {
static void *makef(FILE *f, int32_t flags) {
JanetFile *iof = (JanetFile *) janet_abstract(&janet_file_type, sizeof(JanetFile));
iof->file = f;
iof->flags = flags;
@ -95,7 +100,7 @@ static Janet makef(FILE *f, int flags) {
if (!(flags & JANET_FILE_NOT_CLOSEABLE))
fcntl(fileno(f), F_SETFD, FD_CLOEXEC);
#endif
return janet_wrap_abstract(iof);
return iof;
}
/* Open a process */
@ -104,7 +109,7 @@ static Janet cfun_io_popen(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 2);
const uint8_t *fname = janet_getstring(argv, 0);
const uint8_t *fmode = NULL;
int flags;
int32_t flags;
if (argc == 2) {
fmode = janet_getkeyword(argv, 1);
if (janet_string_length(fmode) != 1 ||
@ -123,7 +128,7 @@ static Janet cfun_io_popen(int32_t argc, Janet *argv) {
if (!f) {
return janet_wrap_nil();
}
return makef(f, flags);
return janet_makefile(f, flags);
}
#endif
@ -141,7 +146,7 @@ static Janet cfun_io_fopen(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 2);
const uint8_t *fname = janet_getstring(argv, 0);
const uint8_t *fmode;
int flags;
int32_t flags;
if (argc == 2) {
fmode = janet_getkeyword(argv, 1);
flags = checkflags(fmode);
@ -150,7 +155,7 @@ static Janet cfun_io_fopen(int32_t argc, Janet *argv) {
flags = JANET_FILE_READ;
}
FILE *f = fopen((const char *)fname, (const char *)fmode);
return f ? makef(f, flags) : janet_wrap_nil();
return f ? janet_makefile(f, flags) : janet_wrap_nil();
}
/* Read up to n bytes into buffer. */
@ -331,6 +336,41 @@ static int io_file_get(void *p, Janet key, Janet *out) {
return janet_getmethod(janet_unwrap_keyword(key), io_file_methods, out);
}
static void io_file_marshal(void *p, JanetMarshalContext *ctx) {
JanetFile *iof = (JanetFile *)p;
if (ctx->flags & JANET_MARSHAL_UNSAFE) {
janet_marshal_int(ctx, fileno(iof->file));
janet_marshal_int(ctx, iof->flags);
} else {
janet_panic("cannot marshal file in safe mode");
}
}
static void *io_file_unmarshal(JanetMarshalContext *ctx) {
if (ctx->flags & JANET_MARSHAL_UNSAFE) {
JanetFile *iof = janet_unmarshal_abstract(ctx, sizeof(JanetFile));
int32_t fd = janet_unmarshal_int(ctx);
int32_t flags = janet_unmarshal_int(ctx);
char fmt[4] = {0};
int index = 0;
if (flags & JANET_FILE_READ) fmt[index++] = 'r';
if (flags & JANET_FILE_APPEND) {
fmt[index++] = 'a';
} else if (flags & JANET_FILE_WRITE) {
fmt[index++] = 'w';
}
iof->file = fdopen(fd, fmt);
if (iof->file == NULL) {
iof->flags = JANET_FILE_CLOSED;
} else {
iof->flags = flags;
}
return iof;
} else {
janet_panic("cannot unmarshal file in safe mode");
}
}
FILE *janet_dynfile(const char *name, FILE *def) {
Janet x = janet_dyn(name);
if (!janet_checktype(x, JANET_ABSTRACT)) return def;
@ -677,7 +717,7 @@ FILE *janet_getfile(const Janet *argv, int32_t n, int *flags) {
}
Janet janet_makefile(FILE *f, int flags) {
return makef(f, flags);
return janet_wrap_abstract(makef(f, flags));
}
JanetAbstract janet_checkfile(Janet j) {
@ -693,18 +733,18 @@ FILE *janet_unwrapfile(Janet j, int *flags) {
/* Module entry point */
void janet_lib_io(JanetTable *env) {
janet_core_cfuns(env, NULL, io_cfuns);
janet_register_abstract_type(&janet_file_type);
/* stdout */
janet_core_def(env, "stdout",
makef(stdout, JANET_FILE_APPEND | JANET_FILE_NOT_CLOSEABLE | JANET_FILE_SERIALIZABLE),
janet_makefile(stdout, JANET_FILE_APPEND | JANET_FILE_NOT_CLOSEABLE | JANET_FILE_SERIALIZABLE),
JDOC("The standard output file."));
/* stderr */
janet_core_def(env, "stderr",
makef(stderr, JANET_FILE_APPEND | JANET_FILE_NOT_CLOSEABLE | JANET_FILE_SERIALIZABLE),
janet_makefile(stderr, JANET_FILE_APPEND | JANET_FILE_NOT_CLOSEABLE | JANET_FILE_SERIALIZABLE),
JDOC("The standard error file."));
/* stdin */
janet_core_def(env, "stdin",
makef(stdin, JANET_FILE_READ | JANET_FILE_NOT_CLOSEABLE | JANET_FILE_SERIALIZABLE),
janet_makefile(stdin, JANET_FILE_READ | JANET_FILE_NOT_CLOSEABLE | JANET_FILE_SERIALIZABLE),
JDOC("The standard input file."));
}

View File

@ -675,6 +675,18 @@ static Janet cfun_thread_close(int32_t argc, Janet *argv) {
return janet_wrap_nil();
}
static Janet cfun_thread_exit(int32_t argc, Janet *argv) {
(void) argv;
janet_arity(argc, 0, 1);
#if defined(JANET_WINDOWS)
int32_t flag = janet_optinteger(argv, argc, 0, 0);
ExitThread(flag);
#else
pthread_exit(NULL);
#endif
return janet_wrap_nil();
}
static const JanetMethod janet_thread_methods[] = {
{"send", cfun_thread_send},
{"close", cfun_thread_close},
@ -723,6 +735,12 @@ static const JanetReg threadlib_cfuns[] = {
"Close a thread, unblocking it and ending communication with it. Note that closing "
"a thread is idempotent and does not cancel the thread's operation. Returns nil.")
},
{
"thread/exit", cfun_thread_exit,
JDOC("(thread/exit &opt code)\n\n"
"Exit from the current thread. If no more threads are running, ends the process, but otherwise does "
"not end the current process.")
},
{NULL, NULL, NULL}
};

View File

@ -1002,7 +1002,7 @@ struct JanetRNG {
typedef struct JanetFile JanetFile;
struct JanetFile {
FILE *file;
int flags;
int32_t flags;
};
/* Thread types */
@ -1534,11 +1534,11 @@ extern JANET_API const JanetAbstractType janet_file_type;
#define JANET_FILE_SERIALIZABLE 128
#define JANET_FILE_PIPED 256
JANET_API Janet janet_makefile(FILE *f, int flags);
JANET_API FILE *janet_getfile(const Janet *argv, int32_t n, int *flags);
JANET_API Janet janet_makefile(FILE *f, int32_t flags);
JANET_API FILE *janet_getfile(const Janet *argv, int32_t n, int32_t *flags);
JANET_API FILE *janet_dynfile(const char *name, FILE *def);
JANET_API JanetAbstract janet_checkfile(Janet j);
JANET_API FILE *janet_unwrapfile(Janet j, int *flags);
JANET_API FILE *janet_unwrapfile(Janet j, int32_t *flags);
/* Marshal API */
JANET_API void janet_marshal_size(JanetMarshalContext *ctx, size_t value);

View File

@ -126,21 +126,28 @@ https://github.com/antirez/linenoise/blob/master/linenoise.c
#define JANET_LINE_MAX 1024
#define JANET_MATCH_MAX 256
#define JANET_HISTORY_MAX 100
static JANET_THREAD_LOCAL int gbl_israwmode = 0;
static JANET_THREAD_LOCAL const char *gbl_prompt = "> ";
static JANET_THREAD_LOCAL int gbl_plen = 2;
static JANET_THREAD_LOCAL char gbl_buf[JANET_LINE_MAX];
static JANET_THREAD_LOCAL int gbl_len = 0;
static JANET_THREAD_LOCAL int gbl_pos = 0;
static JANET_THREAD_LOCAL int gbl_cols = 80;
static JANET_THREAD_LOCAL char *gbl_history[JANET_HISTORY_MAX];
static JANET_THREAD_LOCAL int gbl_history_count = 0;
static JANET_THREAD_LOCAL int gbl_historyi = 0;
static JANET_THREAD_LOCAL int gbl_sigint_flag = 0;
static JANET_THREAD_LOCAL struct termios gbl_termios_start;
static JANET_THREAD_LOCAL JanetByteView gbl_matches[JANET_MATCH_MAX];
static JANET_THREAD_LOCAL int gbl_match_count = 0;
static JANET_THREAD_LOCAL int gbl_lines_below = 0;
static int gbl_israwmode = 0;
static const char *gbl_prompt = "> ";
static int gbl_plen = 2;
static char gbl_buf[JANET_LINE_MAX];
static int gbl_len = 0;
static int gbl_pos = 0;
static int gbl_cols = 80;
static char *gbl_history[JANET_HISTORY_MAX];
static int gbl_history_count = 0;
static int gbl_historyi = 0;
static int gbl_sigint_flag = 0;
static struct termios gbl_termios_start;
static JanetByteView gbl_matches[JANET_MATCH_MAX];
static int gbl_match_count = 0;
static int gbl_lines_below = 0;
/* Put a lock around this global state so we don't screw up
* the terminal in a multithreaded situation */
#ifndef JANET_SINGLE_THREADED
#include <pthread.h>
static pthread_mutex_t gbl_lock = PTHREAD_MUTEX_INITIALIZER;
#endif
/* Unsupported terminal list from linenoise */
static const char *badterms[] = {
@ -162,6 +169,7 @@ static char *sdup(const char *s) {
/* Ansi terminal raw mode */
static int rawmode(void) {
struct termios t;
pthread_mutex_lock(&gbl_lock);
if (!isatty(STDIN_FILENO)) goto fatal;
if (tcgetattr(STDIN_FILENO, &gbl_termios_start) == -1) goto fatal;
t = gbl_termios_start;
@ -175,6 +183,7 @@ static int rawmode(void) {
return 0;
fatal:
errno = ENOTTY;
pthread_mutex_unlock(&gbl_lock);
return -1;
}
@ -182,6 +191,7 @@ fatal:
static void norawmode(void) {
if (gbl_israwmode && tcsetattr(STDIN_FILENO, TCSAFLUSH, &gbl_termios_start) != -1)
gbl_israwmode = 0;
pthread_mutex_unlock(&gbl_lock);
}
static int curpos(void) {
@ -996,6 +1006,9 @@ int main(int argc, char **argv) {
SetConsoleOutputCP(65001);
#endif
/* Try and not leave the terminal in a bad state */
atexit(norawmode);
/* Set up VM */
janet_init();