1
0
mirror of https://github.com/janet-lang/janet synced 2025-10-29 06:37:41 +00:00

Compare commits

..

29 Commits

Author SHA1 Message Date
Calvin Rose
d19db30f3d Fix meson install test. 2023-09-27 00:19:35 -05:00
Calvin Rose
d12464fc0e Make poll work by going back to array of listeners for gc keeping. 2023-09-26 23:02:06 -05:00
Calvin Rose
a96971c8a7 More work on epoll implementation. 2023-09-26 12:05:06 -05:00
Calvin Rose
f6f769503a Fix up ev.c to pass tests. 2023-09-26 11:11:29 -05:00
Calvin Rose
82917ac6e3 Kqueue fix. 2023-09-25 19:17:51 -07:00
Calvin Rose
a6ffafb1a2 Patches to kqueue implementation. 2023-09-25 19:07:18 -07:00
Calvin Rose
fb8c529f2e Partial work updating epoll reimplentation. 2023-09-25 18:52:15 -07:00
Calvin Rose
1ee98e1e66 Get IOCP reworked event loop passing tests. 2023-09-25 15:19:39 -07:00
Calvin Rose
81f35f5dd1 Redo state management for Janet listeners.
Make more use of the built in GC code for abstracts to
be sure things are more correct. Issue before was streams could
be freed before IOCP events arrived.
2023-09-25 00:43:36 -07:00
Calvin Rose
1b402347cd Work on debugging issue with server spawning. 2023-09-24 18:15:58 -07:00
Calvin Rose
dccb60ba35 Ignore IOCP where the event failed to deque. 2023-09-24 12:53:06 -07:00
Calvin Rose
ae642ceca0 Don't hide windows segfaults in build_win.bat. 2023-09-24 12:36:15 -07:00
Calvin Rose
471b6f9966 Add TOCLOSE back. 2023-09-24 12:28:35 -07:00
Calvin Rose
5dd18bac2c More fixups to gc.c 2023-09-24 11:51:22 -07:00
Calvin Rose
018f4e0891 Remove some old code. 2023-09-24 10:30:58 -07:00
Calvin Rose
e85809a98a Remove remains of gc instrumentation code. 2023-09-24 10:11:24 -07:00
Calvin Rose
e6e9bd8147 Redo async connect code to be moved out of ev.c.
Async connect is different than write.
2023-09-24 10:08:40 -07:00
Calvin Rose
221645d2ce More refinement of meson build. 2023-09-23 14:16:13 -07:00
Calvin Rose
2f4a6214a2 Make meson build work on windows.
By default, use more traditional linking pattern with meson.
The janet.exe will now link to janet-x.x.dll on windows (and
similar for linux/posix) when built with meson. This is slightly
less efficient and means that janet.exe built this way is no longer
standalone (you would need to move the dll along with the exe), but
plays better with most build systems.
2023-09-23 08:53:37 -07:00
Calvin Rose
e00a461c26 Add optional buffer-size to file/open.
This calls setvbuf to change FILE buffering. A goal is
to be able to use the existing file/* functions for blocking
IO analogous to `read` and `write` system calls.
2023-09-23 09:40:17 -05:00
Calvin Rose
c31314be38 Merge pull request #1296 from Andriamanitra/doc-loop-unless
add :unless modifier to (doc loop)
2023-09-22 05:41:06 -07:00
Andriamanitra
ee142c4be0 truthy/falsey is more accurate than true/false 2023-09-22 03:04:41 +03:00
Andriamanitra
aeacc0b31b add :unless modifier to (doc loop) 2023-09-21 19:23:40 +03:00
Calvin Rose
7b4c3bdbcc Address issues from #1294 on non-nanboxed platforms.
Underlying bug was obscured by nanbox implementation.
2023-09-21 07:36:53 -07:00
Calvin Rose
910b9cf1fd Distinguish between JANET_API and JANET_EXPORT
One is a way to export symbols, the other a way to reference
API functions. Also include prebuilt dlljanet.dll and dlljanet.lib
for windows to save people the trouble of compiling janet.c themselves.
2023-09-20 20:07:03 -07:00
Calvin Rose
b10aaceab0 Work on dllimport option for janet. 2023-09-20 17:34:42 -07:00
Calvin Rose
169bd812c9 Update state.h to #include <windows.h>
Should fix usage with msvc in some pipelines.
2023-09-18 23:51:15 -05:00
Calvin Rose
34767f1e13 Merge pull request #1292 from sogaiu/tweak-janetconf-h 2023-09-18 18:50:46 -07:00
sogaiu
4f642c0843 Tweak janetconf.h 2023-09-19 10:34:50 +09:00
17 changed files with 366 additions and 333 deletions

View File

@@ -41,32 +41,32 @@ if not exist build\boot mkdir build\boot
@rem Build the bootstrap interpreter
for %%f in (src\core\*.c) do (
%JANET_COMPILE% /DJANET_BOOTSTRAP /Fobuild\boot\%%~nf.obj %%f
@if errorlevel 1 goto :BUILDFAIL
@if not errorlevel 0 goto :BUILDFAIL
)
for %%f in (src\boot\*.c) do (
%JANET_COMPILE% /DJANET_BOOTSTRAP /Fobuild\boot\%%~nf.obj %%f
@if errorlevel 1 goto :BUILDFAIL
@if not errorlevel 0 goto :BUILDFAIL
)
%JANET_LINK% /out:build\janet_boot.exe build\boot\*.obj
@if errorlevel 1 goto :BUILDFAIL
@if not errorlevel 0 goto :BUILDFAIL
build\janet_boot . > build\c\janet.c
@rem Build the sources
%JANET_COMPILE% /Fobuild\janet.obj build\c\janet.c
@if errorlevel 1 goto :BUILDFAIL
@if not errorlevel 0 goto :BUILDFAIL
%JANET_COMPILE% /Fobuild\shell.obj src\mainclient\shell.c
@if errorlevel 1 goto :BUILDFAIL
@if not errorlevel 0 goto :BUILDFAIL
@rem Build the resources
rc /nologo /fobuild\janet_win.res janet_win.rc
@rem Link everything to main client
%JANET_LINK% /out:janet.exe build\janet.obj build\shell.obj build\janet_win.res
@if errorlevel 1 goto :BUILDFAIL
@if not errorlevel 0 goto :BUILDFAIL
@rem Build static library (libjanet.a)
@rem Build static library (libjanet.lib)
%JANET_LINK_STATIC% /out:build\libjanet.lib build\janet.obj
@if errorlevel 1 goto :BUILDFAIL
@if not errorlevel 0 goto :BUILDFAIL
echo === Successfully built janet.exe for Windows ===
echo === Run 'build_win test' to run tests. ==
@@ -98,7 +98,7 @@ exit /b 0
:TEST
for %%f in (test/suite*.janet) do (
janet.exe test\%%f
@if errorlevel 1 goto TESTFAIL
@if not errorlevel 0 goto TESTFAIL
)
exit /b 0
@@ -117,6 +117,7 @@ copy README.md dist\README.md
copy janet.lib dist\janet.lib
copy janet.exp dist\janet.exp
copy janet.def dist\janet.def
janet.exe tools\patch-header.janet src\include\janet.h src\conf\janetconf.h build\janet.h
copy build\janet.h dist\janet.h

View File

@@ -169,7 +169,7 @@ janet_boot = executable('janet-boot', core_src, boot_src,
# Build janet.c
janetc = custom_target('janetc',
input : [janet_boot],
input : [janet_boot, 'src/boot/boot.janet'],
output : 'janet.c',
capture : true,
command : [
@@ -182,23 +182,30 @@ if not get_option('single_threaded')
janet_dependencies += thread_dep
endif
if cc.has_argument('-fvisibility=hidden')
lib_cflags = ['-fvisibility=hidden']
else
lib_cflags = []
endif
libjanet = library('janet', janetc,
include_directories : incdir,
dependencies : janet_dependencies,
version: meson.project_version(),
soversion: version_parts[0] + '.' + version_parts[1],
c_args : lib_cflags,
install : true)
# Extra c flags - adding -fvisibility=hidden matches the Makefile and
# shaves off about 10k on linux x64, likely similar on other platforms.
if cc.has_argument('-fvisibility=hidden')
extra_cflags = ['-fvisibility=hidden']
extra_cflags = ['-fvisibility=hidden', '-DJANET_DLL_IMPORT']
else
extra_cflags = []
extra_cflags = ['-DJANET_DLL_IMPORT']
endif
janet_mainclient = executable('janet', janetc, mainclient_src,
janet_mainclient = executable('janet', mainclient_src,
include_directories : incdir,
dependencies : janet_dependencies,
link_with: [libjanet],
c_args : extra_cflags,
install : true)
@@ -281,11 +288,12 @@ patched_janet = custom_target('patched-janeth',
install : true,
install_dir : join_paths(get_option('includedir'), 'janet'),
build_by_default : true,
output : ['janet.h'],
output : ['janet_' + meson.project_version() + '.h'],
command : [janet_nativeclient, '@INPUT@', '@OUTPUT@'])
# Create a version of the janet.h header that matches what jpm often expects
if meson.version().version_compare('>=0.61')
install_symlink('janet.h', pointing_to: 'janet/janet.h', install_dir: get_option('includedir'))
install_symlink('janet.h', pointing_to: 'janet_' + meson.project_version() + '.h', install_dir: join_paths(get_option('includedir'), 'janet'))
endif

View File

@@ -590,7 +590,10 @@
* `:repeat n` -- repeats the next inner loop `n` times.
* `:when condition` -- only evaluates the current loop body when `condition`
is true.
is truthy.
* `:unless condition` -- only evaluates the current loop body when `condition`
is falsey.
The `loop` macro always evaluates to nil.
```
@@ -3763,7 +3766,7 @@
[host port &opt handler type]
(def s (net/listen host port type))
(if handler
(ev/call (fn [] (net/accept-loop s handler))))
(ev/go (fn [] (net/accept-loop s handler))))
s))
###

View File

@@ -4,8 +4,8 @@
#define JANETCONF_H
#define JANET_VERSION_MAJOR 1
#define JANET_VERSION_MINOR 30
#define JANET_VERSION_PATCH 1
#define JANET_VERSION_MINOR 31
#define JANET_VERSION_PATCH 0
#define JANET_VERSION_EXTRA ""
#define JANET_VERSION "1.31.0"

View File

@@ -180,7 +180,7 @@ static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) {
}
/* Forward declaration */
static void janet_unlisten(JanetListenerState *state, int is_gc);
static void janet_unlisten(JanetListenerState *state);
/* Get current timestamp (millisecond precision) */
static JanetTimestamp ts_now(void);
@@ -254,76 +254,100 @@ static void add_timeout(JanetTimeout to) {
}
}
static int janet_listener_gc(void *p, size_t s);
static int janet_listener_mark(void *p, size_t s);
static const JanetAbstractType janet_listener_AT = {
"core/ev-listener",
janet_listener_gc,
janet_listener_mark,
JANET_ATEND_GCMARK
};
/* Create a new event listener */
static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
if (stream->flags & JANET_STREAM_CLOSED) {
janet_panic("cannot listen on closed stream");
}
if (stream->_mask & mask) {
janet_panic("cannot listen for duplicate event on stream");
}
if (janet_vm.root_fiber->waiting != NULL) {
janet_panic("current fiber is already waiting for event");
}
if (size < sizeof(JanetListenerState))
size = sizeof(JanetListenerState);
JanetListenerState *state = janet_malloc(size);
if (NULL == state) {
JANET_OUT_OF_MEMORY;
}
if ((mask & JANET_ASYNC_LISTEN_READ) && stream->read_state) goto bad_listen_read;
if ((mask & JANET_ASYNC_LISTEN_WRITE) && stream->write_state) goto bad_listen_write;
janet_assert(size >= sizeof(JanetListenerState), "bad size");
JanetListenerState *state = janet_abstract(&janet_listener_AT, size);
state->machine = behavior;
state->fiber = janet_vm.root_fiber;
state->flags = 0;
janet_vm.root_fiber->waiting = state;
if (mask & JANET_ASYNC_LISTEN_READ) stream->read_state = state;
if (mask & JANET_ASYNC_LISTEN_WRITE) stream->write_state = state;
state->stream = stream;
state->_mask = mask;
stream->_mask |= mask;
state->_next = stream->state;
stream->state = state;
/* Keep track of a listener for GC purposes */
int resize = janet_vm.listener_cap == janet_vm.listener_count;
if (resize) {
size_t newcap = janet_vm.listener_count ? janet_vm.listener_cap * 2 : 16;
janet_vm.listeners = janet_realloc(janet_vm.listeners, newcap * sizeof(JanetListenerState *));
if (NULL == janet_vm.listeners) {
JANET_OUT_OF_MEMORY;
}
janet_vm.listener_cap = newcap;
}
size_t index = janet_vm.listener_count++;
janet_vm.listeners[index] = state;
state->_index = index;
/* Emit INIT event for convenience */
state->event = user;
state->machine(state, JANET_ASYNC_EVENT_INIT);
janet_ev_inc_refcount();
state->index = janet_vm.listeners->count;
janet_array_push(janet_vm.listeners, janet_wrap_abstract(state));
return state;
bad_listen_write:
janet_panic("cannot listen for duplicate write event on stream");
bad_listen_read:
janet_panic("cannot listen for duplicate read event on stream");
}
/* Indicate we are no longer listening for an event. This
* frees the memory of the state machine as well. */
static void janet_unlisten_impl(JanetListenerState *state, int is_gc) {
state->machine(state, JANET_ASYNC_EVENT_DEINIT);
/* Remove state machine from poll list */
JanetListenerState **iter = &(state->stream->state);
while (*iter && *iter != state)
iter = &((*iter)->_next);
janet_assert(*iter, "failed to remove listener");
*iter = state->_next;
/* Remove mask */
state->stream->_mask &= ~(state->_mask);
/* Ensure fiber does not reference this state */
if (!is_gc) {
JanetFiber *fiber = state->fiber;
if (NULL != fiber && fiber->waiting == state) {
fiber->waiting = NULL;
}
void janet_fiber_did_resume(JanetFiber *fiber) {
if (fiber->waiting) {
janet_unlisten(fiber->waiting);
fiber->waiting = NULL;
}
}
static void janet_unlisten_impl(JanetListenerState *state) {
/* Move last listener to position of this listener - O(1) removal and keep things densely packed. */
if (state->stream) {
Janet popped = janet_array_pop(janet_vm.listeners);
janet_assert(janet_checktype(popped, JANET_ABSTRACT), "pop check");
JanetListenerState *popped_state = (JanetListenerState *) janet_unwrap_abstract(popped);
janet_vm.listeners->data[state->index] = popped;
popped_state->index = state->index;
state->index = UINT32_MAX; /* just in case */
janet_ev_dec_refcount();
if (state->stream->read_state == state) {
state->stream->read_state = NULL;
}
if (state->stream->write_state == state) {
state->stream->write_state = NULL;
}
state->stream = NULL;
}
}
static int janet_listener_gc(void *p, size_t size) {
(void) size;
JanetListenerState *state = (JanetListenerState *)p;
if (state->stream) {
janet_ev_dec_refcount();
}
if (state->machine) {
state->machine(state, JANET_ASYNC_EVENT_DEINIT);
}
return 0;
}
static int janet_listener_mark(void *p, size_t size) {
(void) size;
JanetListenerState *state = (JanetListenerState *)p;
if (state->stream) {
janet_mark(janet_wrap_abstract(state->stream));
}
if (state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
state->machine(state, JANET_ASYNC_EVENT_MARK);
return 0;
}
static void janet_stream_checktoclose(JanetStream *stream) {
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->read_state && !stream->write_state) {
janet_stream_close(stream);
}
/* Untrack a listener for gc purposes */
size_t index = state->_index;
janet_vm.listeners[index] = janet_vm.listeners[--janet_vm.listener_count];
janet_vm.listeners[index]->_index = index;
janet_free(state);
}
static const JanetMethod ev_default_stream_methods[] = {
@@ -339,52 +363,52 @@ JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod
JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream));
stream->handle = handle;
stream->flags = flags;
stream->state = NULL;
stream->_mask = 0;
stream->read_state = NULL;
stream->write_state = NULL;
if (methods == NULL) methods = ev_default_stream_methods;
stream->methods = methods;
return stream;
}
/* Close a stream */
static void janet_stream_close_impl(JanetStream *stream, int is_gc) {
if (stream->flags & JANET_STREAM_CLOSED) return;
JanetListenerState *state = stream->state;
while (NULL != state) {
if (!is_gc) {
state->machine(state, JANET_ASYNC_EVENT_CLOSE);
}
JanetListenerState *next_state = state->_next;
janet_unlisten(state, is_gc);
state = next_state;
}
stream->state = NULL;
static void janet_stream_close_impl(JanetStream *stream) {
stream->flags |= JANET_STREAM_CLOSED;
#ifdef JANET_WINDOWS
if (stream->handle != INVALID_HANDLE_VALUE) {
#ifdef JANET_NET
if (stream->flags & JANET_STREAM_SOCKET) {
closesocket((SOCKET) stream->handle);
} else
if (stream->flags & JANET_STREAM_SOCKET) {
closesocket((SOCKET) stream->handle);
} else
#endif
{
CloseHandle(stream->handle);
{
CloseHandle(stream->handle);
}
stream->handle = INVALID_HANDLE_VALUE;
}
stream->handle = INVALID_HANDLE_VALUE;
#else
close(stream->handle);
stream->handle = -1;
if (stream->handle != -1) {
close(stream->handle);
stream->handle = -1;
}
#endif
}
void janet_stream_close(JanetStream *stream) {
janet_stream_close_impl(stream, 0);
if (stream->read_state) {
stream->read_state->machine(stream->read_state, JANET_ASYNC_EVENT_CLOSE);
janet_unlisten(stream->read_state);
}
if (stream->write_state) {
stream->write_state->machine(stream->write_state, JANET_ASYNC_EVENT_CLOSE);
janet_unlisten(stream->write_state);
}
janet_stream_close_impl(stream);
}
/* Called to clean up a stream */
static int janet_stream_gc(void *p, size_t s) {
(void) s;
JanetStream *stream = (JanetStream *)p;
janet_stream_close_impl(stream, 1);
janet_stream_close_impl(stream);
return 0;
}
@@ -392,13 +416,11 @@ static int janet_stream_gc(void *p, size_t s) {
static int janet_stream_mark(void *p, size_t s) {
(void) s;
JanetStream *stream = (JanetStream *) p;
JanetListenerState *state = stream->state;
while (NULL != state) {
if (NULL != state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
(state->machine)(state, JANET_ASYNC_EVENT_MARK);
state = state->_next;
if (NULL != stream->read_state) {
janet_mark(janet_wrap_abstract(stream->read_state));
}
if (NULL != stream->write_state) {
janet_mark(janet_wrap_abstract(stream->write_state));
}
return 0;
}
@@ -451,8 +473,8 @@ static void *janet_stream_unmarshal(JanetMarshalContext *ctx) {
}
JanetStream *p = janet_unmarshal_abstract(ctx, sizeof(JanetStream));
/* Can't share listening state and such across threads */
p->_mask = 0;
p->state = NULL;
p->read_state = NULL;
p->write_state = NULL;
p->flags = (uint32_t) janet_unmarshal_int(ctx);
p->methods = janet_unmarshal_ptr(ctx);
#ifdef JANET_WINDOWS
@@ -516,14 +538,6 @@ void janet_schedule(JanetFiber *fiber, Janet value) {
janet_schedule_signal(fiber, value, JANET_SIGNAL_OK);
}
void janet_fiber_did_resume(JanetFiber *fiber) {
/* Cancel any pending fibers */
if (fiber->waiting) {
fiber->waiting->machine(fiber->waiting, JANET_ASYNC_EVENT_CANCEL);
janet_unlisten(fiber->waiting, 0);
}
}
/* Mark all pending tasks */
void janet_ev_mark(void) {
@@ -552,16 +566,6 @@ void janet_ev_mark(void) {
janet_mark(janet_wrap_fiber(janet_vm.tq[i].curr_fiber));
}
}
/* Pending listeners */
for (size_t i = 0; i < janet_vm.listener_count; i++) {
JanetListenerState *state = janet_vm.listeners[i];
if (NULL != state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
janet_stream_mark(state->stream, sizeof(JanetStream));
(state->machine)(state, JANET_ASYNC_EVENT_MARK);
}
}
static int janet_channel_push(JanetChannel *channel, Janet x, int mode);
@@ -582,9 +586,6 @@ static Janet make_supervisor_event(const char *name, JanetFiber *fiber, int thre
/* Common init code */
void janet_ev_init_common(void) {
janet_q_init(&janet_vm.spawn);
janet_vm.listener_count = 0;
janet_vm.listener_cap = 0;
janet_vm.listeners = NULL;
janet_vm.tq = NULL;
janet_vm.tq_count = 0;
janet_vm.tq_capacity = 0;
@@ -592,6 +593,8 @@ void janet_ev_init_common(void) {
janet_table_init_raw(&janet_vm.active_tasks, 0);
janet_table_init_raw(&janet_vm.signal_handlers, 0);
janet_rng_seed(&janet_vm.ev_rng, 0);
janet_vm.listeners = janet_array(0);
janet_gcroot(janet_wrap_array(janet_vm.listeners));
#ifndef JANET_WINDOWS
pthread_attr_init(&janet_vm.new_thread_attr);
pthread_attr_setdetachstate(&janet_vm.new_thread_attr, PTHREAD_CREATE_DETACHED);
@@ -602,11 +605,10 @@ void janet_ev_init_common(void) {
void janet_ev_deinit_common(void) {
janet_q_deinit(&janet_vm.spawn);
janet_free(janet_vm.tq);
janet_free(janet_vm.listeners);
janet_vm.listeners = NULL;
janet_table_deinit(&janet_vm.threaded_abstracts);
janet_table_deinit(&janet_vm.active_tasks);
janet_table_deinit(&janet_vm.signal_handlers);
janet_vm.listeners = NULL;
#ifndef JANET_WINDOWS
pthread_attr_destroy(&janet_vm.new_thread_attr);
#endif
@@ -633,9 +635,9 @@ void janet_addtimeout(double sec) {
void janet_ev_inc_refcount(void) {
#ifdef JANET_WINDOWS
#ifdef JANET_64
InterlockedIncrement64(&janet_vm.extra_listeners);
InterlockedIncrement64((int64_t volatile *) &janet_vm.extra_listeners);
#else
InterlockedIncrement(&janet_vm.extra_listeners);
InterlockedIncrement((int32_t volatile *) &janet_vm.extra_listeners);
#endif
#else
__atomic_add_fetch(&janet_vm.extra_listeners, 1, __ATOMIC_RELAXED);
@@ -645,9 +647,9 @@ void janet_ev_inc_refcount(void) {
void janet_ev_dec_refcount(void) {
#ifdef JANET_WINDOWS
#ifdef JANET_64
InterlockedDecrement64(&janet_vm.extra_listeners);
InterlockedDecrement64((int64_t volatile *) &janet_vm.extra_listeners);
#else
InterlockedDecrement(&janet_vm.extra_listeners);
InterlockedDecrement((int32_t volatile *) &janet_vm.extra_listeners);
#endif
#else
__atomic_add_fetch(&janet_vm.extra_listeners, -1, __ATOMIC_RELAXED);
@@ -1330,8 +1332,7 @@ const JanetAbstractType janet_channel_type = {
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout);
int janet_loop_done(void) {
return !(janet_vm.listener_count ||
(janet_vm.spawn.head != janet_vm.spawn.tail) ||
return !((janet_vm.spawn.head != janet_vm.spawn.tail) ||
janet_vm.tq_count ||
janet_vm.extra_listeners);
}
@@ -1395,7 +1396,7 @@ JanetFiber *janet_loop1(void) {
}
/* Poll for events */
if (janet_vm.listener_count || janet_vm.tq_count || janet_vm.extra_listeners) {
if (janet_vm.tq_count || janet_vm.extra_listeners) {
JanetTimeout to;
memset(&to, 0, sizeof(to));
int has_timeout;
@@ -1414,7 +1415,7 @@ JanetFiber *janet_loop1(void) {
break;
}
/* Run polling implementation only if pending timeouts or pending events */
if (janet_vm.tq_count || janet_vm.listener_count || janet_vm.extra_listeners) {
if (janet_vm.tq_count || janet_vm.extra_listeners) {
janet_loop1_impl(has_timeout, to.when);
}
}
@@ -1506,8 +1507,8 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
return state;
}
static void janet_unlisten(JanetListenerState *state, int is_gc) {
janet_unlisten_impl(state, is_gc);
static void janet_unlisten(JanetListenerState *state) {
janet_unlisten_impl(state);
}
void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
@@ -1529,7 +1530,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
}
BOOL result = GetQueuedCompletionStatus(janet_vm.iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime);
if (result || overlapped) {
if (!result) {
return;
}
if (overlapped) {
if (0 == completionKey) {
/* Custom event */
JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped);
@@ -1540,24 +1545,22 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
} else {
/* Normal event */
JanetStream *stream = (JanetStream *) completionKey;
JanetListenerState *state = stream->state;
while (state != NULL) {
if (state->tag == overlapped) {
state->event = overlapped;
state->bytes = num_bytes_transfered;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0);
}
break;
} else {
state = state->_next;
janet_assert(stream->handle != INVALID_HANDLE_VALUE, "got closed stream event");
JanetListenerState *state = NULL;
if (stream->read_state && stream->read_state->tag == overlapped) {
state = stream->read_state;
} else if (stream->write_state && stream->write_state->tag == overlapped) {
state = stream->write_state;
}
if (state != NULL) {
state->event = overlapped;
state->bytes = num_bytes_transfered;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
}
}
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
}
janet_stream_checktoclose(stream);
}
}
}
@@ -1572,26 +1575,17 @@ static JanetTimestamp ts_now(void) {
return res;
}
static int make_epoll_events(int mask) {
int events = 0;
if (mask & JANET_ASYNC_LISTEN_READ)
events |= EPOLLIN;
if (mask & JANET_ASYNC_LISTEN_WRITE)
events |= EPOLLOUT;
return events;
}
static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
JanetListenerState *state = msg.argp;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE)
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (state->stream->_mask & JANET_ASYNC_LISTEN_READ)
status2 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (state == state->stream->read_state)
status1 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (state == state->stream->write_state)
status2 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0);
janet_unlisten(state);
} else {
/* Repost event */
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
@@ -1600,11 +1594,13 @@ static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
/* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
int is_first = !(stream->state);
int is_first = !stream->read_state && !stream->write_state;
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
struct epoll_event ev;
ev.events = make_epoll_events(state->stream->_mask);
ev.events = 0;
if (stream->read_state) ev.events |= EPOLLIN;
if (stream->write_state) ev.events |= EPOLLOUT;
ev.data.ptr = stream;
int status;
do {
@@ -1618,13 +1614,13 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
* event to a file. So we just post a custom event to do the read/write
* asap. */
/* Use flag to indicate state is not registered in epoll */
state->_mask |= (1 << JANET_ASYNC_EVENT_COMPLETE);
state->flags = 1;
JanetEVGenericMessage msg = {0};
msg.argp = state;
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
} else {
/* Unexpected error */
janet_unlisten_impl(state, 0);
janet_unlisten_impl(state);
janet_panicv(janet_ev_lasterr());
}
}
@@ -1632,15 +1628,19 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
}
/* Tell system we are done listening for a certain event */
static void janet_unlisten(JanetListenerState *state, int is_gc) {
static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream;
if (!(stream->flags & JANET_STREAM_CLOSED)) {
if (stream && (stream->handle != -1)) {
/* Use flag to indicate state is not registered in epoll */
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) {
int is_last = (state->_next == NULL && stream->state == state);
if (!state->flags) {
int is_read = (stream->read_state != state) && stream->read_state;
int is_write = (stream->write_state != state) && stream->write_state;
int is_last = !is_read && !is_write;
int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
struct epoll_event ev;
ev.events = make_epoll_events(stream->_mask & ~state->_mask);
ev.events = 0;
if (is_read) ev.events |= EPOLLIN;
if (is_write) ev.events |= EPOLLOUT;
ev.data.ptr = stream;
int status;
do {
@@ -1652,7 +1652,7 @@ static void janet_unlisten(JanetListenerState *state, int is_gc) {
}
}
/* Destroy state machine and free memory */
janet_unlisten_impl(state, is_gc);
janet_unlisten_impl(state);
}
#define JANET_EPOLL_MAX_EVENTS 64
@@ -1689,10 +1689,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
} else {
JanetStream *stream = p;
int mask = events[i].events;
JanetListenerState *state = stream->state;
while (NULL != state) {
JanetListenerState *states[2] = {stream->read_state, stream->write_state};
for (int j = 0; j < 2; j++) {
JanetListenerState *state = states[j];
if (!state) continue;
state->event = events + i;
JanetListenerState *next_state = state->_next;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
@@ -1708,14 +1709,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state, 0);
state = next_state;
}
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
status4 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
}
}
janet_stream_checktoclose(stream);
}
}
}
@@ -1809,46 +1807,44 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
struct kevent kev[2];
int length = 0;
if (state->stream->_mask & JANET_ASYNC_LISTEN_READ) {
if (mask & JANET_ASYNC_LISTEN_READ) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, stream);
length++;
}
if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE) {
if (mask & JANET_ASYNC_LISTEN_WRITE) {
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, stream);
length++;
}
if (length > 0) {
add_kqueue_events(kev, length);
}
janet_assert(length, "expected to add kqueue events");
add_kqueue_events(kev, length);
return state;
}
static void janet_unlisten(JanetListenerState *state, int is_gc) {
static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream;
if (!(stream->flags & JANET_STREAM_CLOSED)) {
/* Use flag to indicate state is not registered in kqueue */
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) {
int is_last = (state->_next == NULL && stream->state == state);
int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD;
struct kevent kev[2];
EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
if (stream && (stream->handle != -1)) {
int is_read = (stream->read_state != state) && stream->read_state;
int is_write = (stream->write_state != state) && stream->write_state;
int is_last = !is_read && !is_write;
int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD;
struct kevent kev[2];
EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
int length = 0;
if (stream->_mask & JANET_ASYNC_EVENT_WRITE) {
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
length++;
}
if (stream->_mask & JANET_ASYNC_EVENT_READ) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream);
length++;
}
add_kqueue_events(kev, length);
int length = 0;
if (stream->read_state == state) {
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
length++;
}
if (stream->write_state == state) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream);
length++;
}
add_kqueue_events(kev, length);
}
janet_unlisten_impl(state, is_gc);
janet_unlisten_impl(state);
}
#define JANET_KQUEUE_MAX_EVENTS 64
@@ -1888,14 +1884,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
janet_ev_handle_selfpipe();
} else {
JanetStream *stream = p;
JanetListenerState *state = stream->state;
while (NULL != state) {
JanetListenerState *next_state = state->_next;
JanetListenerState *states[2] = {stream->read_state, stream->write_state};
for (int j = 0; j < 2; j++) {
JanetListenerState *state = states[j];
if (!state) continue;
state->event = events + i;
JanetAsyncStatus statuses[4];
for (int i = 0; i < 4; i++)
statuses[i] = JANET_ASYNC_STATUS_NOT_DONE;
if (!(events[i].flags & EV_ERROR)) {
if (events[i].filter == EVFILT_WRITE)
statuses[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE);
@@ -1909,15 +1905,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (statuses[0] == JANET_ASYNC_STATUS_DONE ||
statuses[1] == JANET_ASYNC_STATUS_DONE ||
statuses[2] == JANET_ASYNC_STATUS_DONE ||
statuses[3] == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state, 0);
state = next_state;
}
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
statuses[3] == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
}
}
janet_stream_checktoclose(stream);
}
}
}
@@ -1955,20 +1947,11 @@ static JanetTimestamp ts_now(void) {
return res;
}
static int make_poll_events(int mask) {
int events = 0;
if (mask & JANET_ASYNC_LISTEN_READ)
events |= POLLIN;
if (mask & JANET_ASYNC_LISTEN_WRITE)
events |= POLLOUT;
return events;
}
/* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
size_t oldsize = janet_vm.listener_cap;
size_t oldsize = janet_vm.listeners->capacity;
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
size_t newsize = janet_vm.listener_cap;
size_t newsize = janet_vm.listeners->capacity;
if (newsize > oldsize) {
janet_vm.fds = janet_realloc(janet_vm.fds, (newsize + 1) * sizeof(struct pollfd));
if (NULL == janet_vm.fds) {
@@ -1977,15 +1960,19 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
}
struct pollfd ev;
ev.fd = stream->handle;
ev.events = make_poll_events(state->stream->_mask);
ev.events = 0;
if (stream->read_state) ev.events |= POLLIN;
if (stream->write_state) ev.events |= POLLOUT;
ev.revents = 0;
janet_vm.fds[state->_index + 1] = ev;
janet_vm.fds[state->index + 1] = ev;
return state;
}
static void janet_unlisten(JanetListenerState *state, int is_gc) {
janet_vm.fds[state->_index + 1] = janet_vm.fds[janet_vm.listener_count];
janet_unlisten_impl(state, is_gc);
static void janet_unlisten(JanetListenerState *state) {
if (state->stream) {
janet_vm.fds[state->index + 1] = janet_vm.fds[janet_vm.listeners->count];
}
janet_unlisten_impl(state);
}
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
@@ -1997,7 +1984,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
JanetTimestamp now = ts_now();
to = now > timeout ? 0 : (int)(timeout - now);
}
ready = poll(janet_vm.fds, janet_vm.listener_count + 1, to);
ready = poll(janet_vm.fds, janet_vm.listeners->count + 1, to);
} while (ready == -1 && errno == EINTR);
if (ready == -1) {
JANET_EXIT("failed to poll events");
@@ -2010,10 +1997,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
}
/* Step state machines */
for (size_t i = 0; i < janet_vm.listener_count; i++) {
for (int32_t i = 0; i < janet_vm.listeners->count; i++) {
struct pollfd *pfd = janet_vm.fds + i + 1;
/* Skip fds where nothing interesting happened */
JanetListenerState *state = janet_vm.listeners[i];
JanetListenerState *state = (JanetListenerState *) janet_unwrap_abstract(janet_vm.listeners->data[i]);
/* Normal event */
int mask = pfd->revents;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
@@ -2033,12 +2020,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state, 0);
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
status4 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
}
janet_stream_checktoclose(stream);
}
}
@@ -2525,8 +2510,7 @@ void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, in
typedef enum {
JANET_ASYNC_WRITEMODE_WRITE,
JANET_ASYNC_WRITEMODE_SEND,
JANET_ASYNC_WRITEMODE_SENDTO,
JANET_ASYNC_WRITEMODE_CONNECT
JANET_ASYNC_WRITEMODE_SENDTO
} JanetWriteMode;
typedef struct {
@@ -2550,41 +2534,15 @@ typedef struct {
#endif
} StateWrite;
static JanetAsyncStatus handle_connect(JanetListenerState *s) {
#ifdef JANET_WINDOWS
int res = 0;
int size = sizeof(res);
int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
#else
int res = 0;
socklen_t size = sizeof res;
int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
#endif
if (r == 0) {
if (res == 0) {
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
} else {
s->stream->flags |= JANET_STREAM_TOCLOSE;
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
}
} else {
s->stream->flags |= JANET_STREAM_TOCLOSE;
janet_cancel(s->fiber, janet_ev_lasterr());
}
return JANET_ASYNC_STATUS_DONE;
}
JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
StateWrite *state = (StateWrite *) s;
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_MARK: {
if (state->mode != JANET_ASYNC_WRITEMODE_CONNECT) {
janet_mark(state->is_buffer
? janet_wrap_buffer(state->src.buf)
: janet_wrap_string(state->src.str));
}
janet_mark(state->is_buffer
? janet_wrap_buffer(state->src.buf)
: janet_wrap_string(state->src.str));
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
janet_mark(janet_wrap_abstract(state->dest_abst));
}
@@ -2606,11 +2564,6 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
}
break;
case JANET_ASYNC_EVENT_USER: {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
return handle_connect(s);
}
#endif
/* Begin write */
int32_t len;
const uint8_t *bytes;
@@ -2674,11 +2627,6 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
janet_cancel(s->fiber, janet_cstringv("stream hup"));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_WRITE: {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
return handle_connect(s);
}
#endif
int32_t start, len;
const uint8_t *bytes;
start = state->start;
@@ -2780,10 +2728,6 @@ void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, i
void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
}
void janet_ev_connect(JanetStream *stream, int flags) {
janet_ev_write_generic(stream, NULL, NULL, JANET_ASYNC_WRITEMODE_CONNECT, 0, flags);
}
#endif
/* For a pipe ID */

View File

@@ -39,8 +39,8 @@ static void fiber_reset(JanetFiber *fiber) {
fiber->env = NULL;
fiber->last_value = janet_wrap_nil();
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->sched_id = 0;
fiber->waiting = NULL;
fiber->supervisor_channel = NULL;
#endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
@@ -85,7 +85,6 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t
if (janet_fiber_funcframe(fiber, callee)) return NULL;
janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE;
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->supervisor_channel = NULL;
#endif
return fiber;

View File

@@ -268,6 +268,9 @@ recur:
if (fiber->supervisor_channel) {
janet_mark_abstract(fiber->supervisor_channel);
}
if (fiber->waiting) {
janet_mark_abstract(fiber->waiting);
}
#endif
/* Explicit tail recursion */
@@ -438,6 +441,7 @@ void janet_collect(void) {
uint32_t i;
if (janet_vm.gc_suspend) return;
depth = JANET_RECURSION_GUARD;
janet_vm.gc_mark_phase = 1;
/* Try and prevent many major collections back to back.
* A full collection will take O(janet_vm.block_count) time.
* If we have a large heap, make sure our interval is not too
@@ -457,6 +461,7 @@ void janet_collect(void) {
Janet x = janet_vm.roots[--janet_vm.root_count];
janet_mark(x);
}
janet_vm.gc_mark_phase = 0;
janet_sweep();
janet_vm.next_collection = 0;
janet_free_all_scratch();
@@ -560,7 +565,9 @@ void janet_gcunlock(int handle) {
janet_vm.gc_suspend = handle;
}
/* Scratch memory API */
/* Scratch memory API
* Scratch memory allocations do not need to be free (but optionally can be), and will be automatically cleaned
* up in the next call to janet_collect. */
void *janet_smalloc(size_t size) {
JanetScratch *s = janet_malloc(sizeof(JanetScratch) + size);

View File

@@ -131,7 +131,7 @@ JANET_CORE_FN(cfun_io_temp,
}
JANET_CORE_FN(cfun_io_fopen,
"(file/open path &opt mode)",
"(file/open path &opt mode buffer-size)",
"Open a file. `path` is an absolute or relative path, and "
"`mode` is a set of flags indicating the mode to open the file in. "
"`mode` is a keyword where each character represents a flag. If the file "
@@ -145,7 +145,7 @@ JANET_CORE_FN(cfun_io_fopen,
"* + - append to the file instead of overwriting it\n\n"
"* n - error if the file cannot be opened instead of returning nil\n\n"
"See fopen (<stdio.h>, C99) for further details.") {
janet_arity(argc, 1, 2);
janet_arity(argc, 1, 3);
const uint8_t *fname = janet_getstring(argv, 0);
const uint8_t *fmode;
int32_t flags;
@@ -158,6 +158,15 @@ JANET_CORE_FN(cfun_io_fopen,
flags = JANET_FILE_READ;
}
FILE *f = fopen((const char *)fname, (const char *)fmode);
if (f != NULL) {
size_t bufsize = janet_optsize(argv, argc, 2, BUFSIZ);
if (bufsize != BUFSIZ) {
int result = setvbuf(f, NULL, bufsize ? _IOFBF : _IONBF, bufsize);
if (result) {
janet_panic("failed to set buffer size for file");
}
}
}
return f ? janet_makefile(f, flags)
: (flags & JANET_FILE_NONIL) ? (janet_panicf("failed to open file %s: %s", fname, strerror(errno)), janet_wrap_nil())
: janet_wrap_nil();

View File

@@ -1048,7 +1048,6 @@ static const uint8_t *unmarshal_one_fiber(
fiber->env = NULL;
fiber->last_value = janet_wrap_nil();
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->sched_id = 0;
fiber->supervisor_channel = NULL;
#endif

View File

@@ -111,6 +111,62 @@ static void janet_net_socknoblock(JSock s) {
#endif
}
/* State machine for async connect */
typedef struct {
JanetListenerState head;
int did_connect;
} NetStateConnect;
JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent event) {
NetStateConnect *state = (NetStateConnect *)s;
switch (event) {
default:
return JANET_ASYNC_STATUS_NOT_DONE;
case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_HUP:
case JANET_ASYNC_EVENT_ERR:
case JANET_ASYNC_EVENT_COMPLETE:
case JANET_ASYNC_EVENT_WRITE:
case JANET_ASYNC_EVENT_USER:
break;
}
JanetStream *stream = s->stream;
#ifdef JANET_WINDOWS
int res = 0;
int size = sizeof(res);
int r = getsockopt((SOCKET)stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
#else
int res = 0;
socklen_t size = sizeof res;
int r = getsockopt(stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
#endif
if (r == 0) {
if (res == 0) {
state->did_connect = 1;
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
} else {
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
stream->flags |= JANET_STREAM_TOCLOSE;
}
} else {
janet_cancel(s->fiber, janet_ev_lasterr());
stream->flags |= JANET_STREAM_TOCLOSE;
}
return JANET_ASYNC_STATUS_DONE;
}
static void net_sched_connect(JanetStream *stream) {
JanetListenerState *s = janet_listen(stream, net_machine_connect, JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateConnect), NULL);
NetStateConnect *state = (NetStateConnect *)s;
state->did_connect = 0;
#ifdef JANET_WINDOWS
net_machine_connect(s, JANET_ASYNC_EVENT_USER);
#endif
}
/* State machine for accepting connections. */
#ifdef JANET_WINDOWS
@@ -496,7 +552,7 @@ JANET_CORE_FN(cfun_net_connect,
}
#endif
if (status != 0) {
if (status) {
#ifdef JANET_WINDOWS
if (err != WSAEWOULDBLOCK) {
#else
@@ -508,9 +564,7 @@ JANET_CORE_FN(cfun_net_connect,
}
}
/* Handle the connect() result in the event loop*/
janet_ev_connect(stream, MSG_NOSIGNAL);
net_sched_connect(stream);
janet_await();
}

View File

@@ -1437,8 +1437,8 @@ JANET_CORE_FN(os_getenv,
janet_sandbox_assert(JANET_SANDBOX_ENV);
janet_arity(argc, 1, 2);
const char *cstr = janet_getcstring(argv, 0);
const char *res = getenv(cstr);
janet_lock_environ();
const char *res = getenv(cstr);
Janet ret = res
? janet_cstringv(res)
: argc == 2

View File

@@ -27,6 +27,10 @@
#include "util.h"
#endif
#ifdef JANET_WINDOWS
#include <windows.h>
#endif
JANET_THREAD_LOCAL JanetVM janet_vm;
JanetVM *janet_local_vm(void) {

View File

@@ -125,6 +125,7 @@ struct JanetVM {
size_t next_collection;
size_t block_count;
int gc_suspend;
int gc_mark_phase;
/* GC roots */
Janet *roots;
@@ -154,12 +155,10 @@ struct JanetVM {
JanetQueue spawn;
JanetTimeout *tq;
JanetRNG ev_rng;
JanetListenerState **listeners;
size_t listener_count;
size_t listener_cap;
volatile size_t extra_listeners; /* used in signal handler, must be volatile */
JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */
JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */
JanetArray *listeners; /* For GC */
JanetTable signal_handlers;
#ifdef JANET_WINDOWS
void **iocp;

View File

@@ -111,12 +111,11 @@ static void janet_table_rehash(JanetTable *t, int32_t size) {
JANET_OUT_OF_MEMORY;
}
}
int32_t i, oldcapacity;
oldcapacity = t->capacity;
int32_t oldcapacity = t->capacity;
t->data = newdata;
t->capacity = size;
t->deleted = 0;
for (i = 0; i < oldcapacity; i++) {
for (int32_t i = 0; i < oldcapacity; i++) {
JanetKV *kv = olddata + i;
if (!janet_checktype(kv->key, JANET_NIL)) {
JanetKV *newkv = janet_table_find(t, kv->key);

View File

@@ -49,7 +49,7 @@
#ifndef JANET_EXIT
#include <stdio.h>
#define JANET_EXIT(m) do { \
fprintf(stderr, "C runtime error at line %d in file %s: %s\n",\
fprintf(stderr, "janet interpreter runtime error at line %d in file %s: %s\n",\
__LINE__,\
__FILE__,\
(m));\

View File

@@ -861,7 +861,7 @@ static JanetSignal run_vm(JanetFiber *fiber, Janet in) {
vm_pcnext();
VM_OP(JOP_EQUALS_IMMEDIATE)
stack[A] = janet_wrap_boolean(janet_unwrap_number(stack[B]) == (double) CS);
stack[A] = janet_wrap_boolean(janet_checktype(stack[B], JANET_NUMBER) && (janet_unwrap_number(stack[B]) == (double) CS));
vm_pcnext();
VM_OP(JOP_NOT_EQUALS)
@@ -869,7 +869,7 @@ static JanetSignal run_vm(JanetFiber *fiber, Janet in) {
vm_pcnext();
VM_OP(JOP_NOT_EQUALS_IMMEDIATE)
stack[A] = janet_wrap_boolean(janet_unwrap_number(stack[B]) != (double) CS);
stack[A] = janet_wrap_boolean(!janet_checktype(stack[B], JANET_NUMBER) || (janet_unwrap_number(stack[B]) != (double) CS));
vm_pcnext();
VM_OP(JOP_COMPARE)
@@ -1588,6 +1588,7 @@ int janet_init(void) {
janet_vm.next_collection = 0;
janet_vm.gc_interval = 0x400000;
janet_vm.block_count = 0;
janet_vm.gc_mark_phase = 0;
janet_symcache_init();

View File

@@ -235,9 +235,22 @@ extern "C" {
#endif
/* How to export symbols */
#ifndef JANET_EXPORT
#ifdef JANET_WINDOWS
#define JANET_EXPORT __declspec(dllexport)
#else
#define JANET_EXPORT __attribute__((visibility ("default")))
#endif
#endif
/* How declare API functions */
#ifndef JANET_API
#ifdef JANET_WINDOWS
#ifdef JANET_DLL_IMPORT
#define JANET_API __declspec(dllimport)
#else
#define JANET_API __declspec(dllexport)
#endif
#else
#define JANET_API __attribute__((visibility ("default")))
#endif
@@ -578,7 +591,6 @@ typedef enum {
JANET_ASYNC_EVENT_HUP,
JANET_ASYNC_EVENT_READ,
JANET_ASYNC_EVENT_WRITE,
JANET_ASYNC_EVENT_CANCEL,
JANET_ASYNC_EVENT_COMPLETE, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_USER
} JanetAsyncEvent;
@@ -600,13 +612,9 @@ typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEv
struct JanetStream {
JanetHandle handle;
uint32_t flags;
/* Linked list of all in-flight IO routines for this stream */
JanetListenerState *state;
JanetListenerState *read_state;
JanetListenerState *write_state;
const void *methods; /* Methods for this stream */
/* internal - used to disallow multiple concurrent reads / writes on the same stream.
* this constraint may be lifted later but allowing such would require more internal book keeping
* for some implementations. You can read and write at the same time on the same stream, though. */
int _mask;
};
/* Interface for state machine based event loop */
@@ -616,14 +624,12 @@ struct JanetListenerState {
JanetStream *stream;
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
implementation of the event loop and the particular event. */
uint32_t index; /* Used for GC and poll implentation */
uint32_t flags;
#ifdef JANET_WINDOWS
void *tag; /* Used to associate listeners with an overlapped structure */
int bytes; /* Used to track how many bytes were transfered. */
#endif
/* internal */
size_t _index;
int _mask;
JanetListenerState *_next;
};
#endif
@@ -730,6 +736,7 @@ JANET_API Janet janet_wrap_integer(int32_t x);
? janet_nanbox_isnumber(x) \
: janet_nanbox_checkauxtype((x), (t)))
/* Use JANET_API so that modules will use a local version of these functions if possible */
JANET_API void *janet_nanbox_to_pointer(Janet x);
JANET_API Janet janet_nanbox_from_pointer(void *p, uint64_t tagmask);
JANET_API Janet janet_nanbox_from_cpointer(const void *p, uint64_t tagmask);
@@ -912,8 +919,8 @@ struct JanetFiber {
* that is, fibers that are scheduled on the event loop and behave much like threads
* in a multi-tasking system. It would be possible to move these fields to a new
* type, say "JanetTask", that as separate from fibers to save a bit of space. */
JanetListenerState *waiting;
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
JanetListenerState *waiting;
void *supervisor_channel; /* Channel to push self to when complete */
#endif
};
@@ -1485,7 +1492,6 @@ JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_connect(JanetStream *stream, int flags);
#endif
/* Write async to a stream */
@@ -1948,10 +1954,10 @@ JANET_API void janet_register(const char *name, JanetCFunction cfun);
#endif
#ifndef JANET_ENTRY_NAME
#define JANET_MODULE_ENTRY \
JANET_MODULE_PREFIX JANET_API JanetBuildConfig _janet_mod_config(void) { \
JANET_MODULE_PREFIX JANET_EXPORT JanetBuildConfig _janet_mod_config(void) { \
return janet_config_current(); \
} \
JANET_MODULE_PREFIX JANET_API void _janet_init
JANET_MODULE_PREFIX JANET_EXPORT void _janet_init
#else
#define JANET_MODULE_ENTRY JANET_MODULE_PREFIX JANET_API void JANET_ENTRY_NAME
#endif