1
0
mirror of https://github.com/janet-lang/janet synced 2025-11-22 10:14:49 +00:00

Compare commits

...

29 Commits

Author SHA1 Message Date
Calvin Rose
d19db30f3d Fix meson install test. 2023-09-27 00:19:35 -05:00
Calvin Rose
d12464fc0e Make poll work by going back to array of listeners for gc keeping. 2023-09-26 23:02:06 -05:00
Calvin Rose
a96971c8a7 More work on epoll implementation. 2023-09-26 12:05:06 -05:00
Calvin Rose
f6f769503a Fix up ev.c to pass tests. 2023-09-26 11:11:29 -05:00
Calvin Rose
82917ac6e3 Kqueue fix. 2023-09-25 19:17:51 -07:00
Calvin Rose
a6ffafb1a2 Patches to kqueue implementation. 2023-09-25 19:07:18 -07:00
Calvin Rose
fb8c529f2e Partial work updating epoll reimplentation. 2023-09-25 18:52:15 -07:00
Calvin Rose
1ee98e1e66 Get IOCP reworked event loop passing tests. 2023-09-25 15:19:39 -07:00
Calvin Rose
81f35f5dd1 Redo state management for Janet listeners.
Make more use of the built in GC code for abstracts to
be sure things are more correct. Issue before was streams could
be freed before IOCP events arrived.
2023-09-25 00:43:36 -07:00
Calvin Rose
1b402347cd Work on debugging issue with server spawning. 2023-09-24 18:15:58 -07:00
Calvin Rose
dccb60ba35 Ignore IOCP where the event failed to deque. 2023-09-24 12:53:06 -07:00
Calvin Rose
ae642ceca0 Don't hide windows segfaults in build_win.bat. 2023-09-24 12:36:15 -07:00
Calvin Rose
471b6f9966 Add TOCLOSE back. 2023-09-24 12:28:35 -07:00
Calvin Rose
5dd18bac2c More fixups to gc.c 2023-09-24 11:51:22 -07:00
Calvin Rose
018f4e0891 Remove some old code. 2023-09-24 10:30:58 -07:00
Calvin Rose
e85809a98a Remove remains of gc instrumentation code. 2023-09-24 10:11:24 -07:00
Calvin Rose
e6e9bd8147 Redo async connect code to be moved out of ev.c.
Async connect is different than write.
2023-09-24 10:08:40 -07:00
Calvin Rose
221645d2ce More refinement of meson build. 2023-09-23 14:16:13 -07:00
Calvin Rose
2f4a6214a2 Make meson build work on windows.
By default, use more traditional linking pattern with meson.
The janet.exe will now link to janet-x.x.dll on windows (and
similar for linux/posix) when built with meson. This is slightly
less efficient and means that janet.exe built this way is no longer
standalone (you would need to move the dll along with the exe), but
plays better with most build systems.
2023-09-23 08:53:37 -07:00
Calvin Rose
e00a461c26 Add optional buffer-size to file/open.
This calls setvbuf to change FILE buffering. A goal is
to be able to use the existing file/* functions for blocking
IO analogous to `read` and `write` system calls.
2023-09-23 09:40:17 -05:00
Calvin Rose
c31314be38 Merge pull request #1296 from Andriamanitra/doc-loop-unless
add :unless modifier to (doc loop)
2023-09-22 05:41:06 -07:00
Andriamanitra
ee142c4be0 truthy/falsey is more accurate than true/false 2023-09-22 03:04:41 +03:00
Andriamanitra
aeacc0b31b add :unless modifier to (doc loop) 2023-09-21 19:23:40 +03:00
Calvin Rose
7b4c3bdbcc Address issues from #1294 on non-nanboxed platforms.
Underlying bug was obscured by nanbox implementation.
2023-09-21 07:36:53 -07:00
Calvin Rose
910b9cf1fd Distinguish between JANET_API and JANET_EXPORT
One is a way to export symbols, the other a way to reference
API functions. Also include prebuilt dlljanet.dll and dlljanet.lib
for windows to save people the trouble of compiling janet.c themselves.
2023-09-20 20:07:03 -07:00
Calvin Rose
b10aaceab0 Work on dllimport option for janet. 2023-09-20 17:34:42 -07:00
Calvin Rose
169bd812c9 Update state.h to #include <windows.h>
Should fix usage with msvc in some pipelines.
2023-09-18 23:51:15 -05:00
Calvin Rose
34767f1e13 Merge pull request #1292 from sogaiu/tweak-janetconf-h 2023-09-18 18:50:46 -07:00
sogaiu
4f642c0843 Tweak janetconf.h 2023-09-19 10:34:50 +09:00
17 changed files with 366 additions and 333 deletions

View File

@@ -41,32 +41,32 @@ if not exist build\boot mkdir build\boot
@rem Build the bootstrap interpreter @rem Build the bootstrap interpreter
for %%f in (src\core\*.c) do ( for %%f in (src\core\*.c) do (
%JANET_COMPILE% /DJANET_BOOTSTRAP /Fobuild\boot\%%~nf.obj %%f %JANET_COMPILE% /DJANET_BOOTSTRAP /Fobuild\boot\%%~nf.obj %%f
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
) )
for %%f in (src\boot\*.c) do ( for %%f in (src\boot\*.c) do (
%JANET_COMPILE% /DJANET_BOOTSTRAP /Fobuild\boot\%%~nf.obj %%f %JANET_COMPILE% /DJANET_BOOTSTRAP /Fobuild\boot\%%~nf.obj %%f
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
) )
%JANET_LINK% /out:build\janet_boot.exe build\boot\*.obj %JANET_LINK% /out:build\janet_boot.exe build\boot\*.obj
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
build\janet_boot . > build\c\janet.c build\janet_boot . > build\c\janet.c
@rem Build the sources @rem Build the sources
%JANET_COMPILE% /Fobuild\janet.obj build\c\janet.c %JANET_COMPILE% /Fobuild\janet.obj build\c\janet.c
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
%JANET_COMPILE% /Fobuild\shell.obj src\mainclient\shell.c %JANET_COMPILE% /Fobuild\shell.obj src\mainclient\shell.c
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
@rem Build the resources @rem Build the resources
rc /nologo /fobuild\janet_win.res janet_win.rc rc /nologo /fobuild\janet_win.res janet_win.rc
@rem Link everything to main client @rem Link everything to main client
%JANET_LINK% /out:janet.exe build\janet.obj build\shell.obj build\janet_win.res %JANET_LINK% /out:janet.exe build\janet.obj build\shell.obj build\janet_win.res
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
@rem Build static library (libjanet.a) @rem Build static library (libjanet.lib)
%JANET_LINK_STATIC% /out:build\libjanet.lib build\janet.obj %JANET_LINK_STATIC% /out:build\libjanet.lib build\janet.obj
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
echo === Successfully built janet.exe for Windows === echo === Successfully built janet.exe for Windows ===
echo === Run 'build_win test' to run tests. == echo === Run 'build_win test' to run tests. ==
@@ -98,7 +98,7 @@ exit /b 0
:TEST :TEST
for %%f in (test/suite*.janet) do ( for %%f in (test/suite*.janet) do (
janet.exe test\%%f janet.exe test\%%f
@if errorlevel 1 goto TESTFAIL @if not errorlevel 0 goto TESTFAIL
) )
exit /b 0 exit /b 0
@@ -117,6 +117,7 @@ copy README.md dist\README.md
copy janet.lib dist\janet.lib copy janet.lib dist\janet.lib
copy janet.exp dist\janet.exp copy janet.exp dist\janet.exp
copy janet.def dist\janet.def
janet.exe tools\patch-header.janet src\include\janet.h src\conf\janetconf.h build\janet.h janet.exe tools\patch-header.janet src\include\janet.h src\conf\janetconf.h build\janet.h
copy build\janet.h dist\janet.h copy build\janet.h dist\janet.h

View File

@@ -169,7 +169,7 @@ janet_boot = executable('janet-boot', core_src, boot_src,
# Build janet.c # Build janet.c
janetc = custom_target('janetc', janetc = custom_target('janetc',
input : [janet_boot], input : [janet_boot, 'src/boot/boot.janet'],
output : 'janet.c', output : 'janet.c',
capture : true, capture : true,
command : [ command : [
@@ -182,23 +182,30 @@ if not get_option('single_threaded')
janet_dependencies += thread_dep janet_dependencies += thread_dep
endif endif
if cc.has_argument('-fvisibility=hidden')
lib_cflags = ['-fvisibility=hidden']
else
lib_cflags = []
endif
libjanet = library('janet', janetc, libjanet = library('janet', janetc,
include_directories : incdir, include_directories : incdir,
dependencies : janet_dependencies, dependencies : janet_dependencies,
version: meson.project_version(), version: meson.project_version(),
soversion: version_parts[0] + '.' + version_parts[1], soversion: version_parts[0] + '.' + version_parts[1],
c_args : lib_cflags,
install : true) install : true)
# Extra c flags - adding -fvisibility=hidden matches the Makefile and # Extra c flags - adding -fvisibility=hidden matches the Makefile and
# shaves off about 10k on linux x64, likely similar on other platforms. # shaves off about 10k on linux x64, likely similar on other platforms.
if cc.has_argument('-fvisibility=hidden') if cc.has_argument('-fvisibility=hidden')
extra_cflags = ['-fvisibility=hidden'] extra_cflags = ['-fvisibility=hidden', '-DJANET_DLL_IMPORT']
else else
extra_cflags = [] extra_cflags = ['-DJANET_DLL_IMPORT']
endif endif
janet_mainclient = executable('janet', janetc, mainclient_src, janet_mainclient = executable('janet', mainclient_src,
include_directories : incdir, include_directories : incdir,
dependencies : janet_dependencies, dependencies : janet_dependencies,
link_with: [libjanet],
c_args : extra_cflags, c_args : extra_cflags,
install : true) install : true)
@@ -281,11 +288,12 @@ patched_janet = custom_target('patched-janeth',
install : true, install : true,
install_dir : join_paths(get_option('includedir'), 'janet'), install_dir : join_paths(get_option('includedir'), 'janet'),
build_by_default : true, build_by_default : true,
output : ['janet.h'], output : ['janet_' + meson.project_version() + '.h'],
command : [janet_nativeclient, '@INPUT@', '@OUTPUT@']) command : [janet_nativeclient, '@INPUT@', '@OUTPUT@'])
# Create a version of the janet.h header that matches what jpm often expects # Create a version of the janet.h header that matches what jpm often expects
if meson.version().version_compare('>=0.61') if meson.version().version_compare('>=0.61')
install_symlink('janet.h', pointing_to: 'janet/janet.h', install_dir: get_option('includedir')) install_symlink('janet.h', pointing_to: 'janet/janet.h', install_dir: get_option('includedir'))
install_symlink('janet.h', pointing_to: 'janet_' + meson.project_version() + '.h', install_dir: join_paths(get_option('includedir'), 'janet'))
endif endif

View File

@@ -590,7 +590,10 @@
* `:repeat n` -- repeats the next inner loop `n` times. * `:repeat n` -- repeats the next inner loop `n` times.
* `:when condition` -- only evaluates the current loop body when `condition` * `:when condition` -- only evaluates the current loop body when `condition`
is true. is truthy.
* `:unless condition` -- only evaluates the current loop body when `condition`
is falsey.
The `loop` macro always evaluates to nil. The `loop` macro always evaluates to nil.
``` ```
@@ -3763,7 +3766,7 @@
[host port &opt handler type] [host port &opt handler type]
(def s (net/listen host port type)) (def s (net/listen host port type))
(if handler (if handler
(ev/call (fn [] (net/accept-loop s handler)))) (ev/go (fn [] (net/accept-loop s handler))))
s)) s))
### ###

View File

@@ -4,8 +4,8 @@
#define JANETCONF_H #define JANETCONF_H
#define JANET_VERSION_MAJOR 1 #define JANET_VERSION_MAJOR 1
#define JANET_VERSION_MINOR 30 #define JANET_VERSION_MINOR 31
#define JANET_VERSION_PATCH 1 #define JANET_VERSION_PATCH 0
#define JANET_VERSION_EXTRA "" #define JANET_VERSION_EXTRA ""
#define JANET_VERSION "1.31.0" #define JANET_VERSION "1.31.0"

View File

@@ -180,7 +180,7 @@ static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) {
} }
/* Forward declaration */ /* Forward declaration */
static void janet_unlisten(JanetListenerState *state, int is_gc); static void janet_unlisten(JanetListenerState *state);
/* Get current timestamp (millisecond precision) */ /* Get current timestamp (millisecond precision) */
static JanetTimestamp ts_now(void); static JanetTimestamp ts_now(void);
@@ -254,76 +254,100 @@ static void add_timeout(JanetTimeout to) {
} }
} }
static int janet_listener_gc(void *p, size_t s);
static int janet_listener_mark(void *p, size_t s);
static const JanetAbstractType janet_listener_AT = {
"core/ev-listener",
janet_listener_gc,
janet_listener_mark,
JANET_ATEND_GCMARK
};
/* Create a new event listener */ /* Create a new event listener */
static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
if (stream->flags & JANET_STREAM_CLOSED) { if (stream->flags & JANET_STREAM_CLOSED) {
janet_panic("cannot listen on closed stream"); janet_panic("cannot listen on closed stream");
} }
if (stream->_mask & mask) { if ((mask & JANET_ASYNC_LISTEN_READ) && stream->read_state) goto bad_listen_read;
janet_panic("cannot listen for duplicate event on stream"); if ((mask & JANET_ASYNC_LISTEN_WRITE) && stream->write_state) goto bad_listen_write;
} janet_assert(size >= sizeof(JanetListenerState), "bad size");
if (janet_vm.root_fiber->waiting != NULL) { JanetListenerState *state = janet_abstract(&janet_listener_AT, size);
janet_panic("current fiber is already waiting for event");
}
if (size < sizeof(JanetListenerState))
size = sizeof(JanetListenerState);
JanetListenerState *state = janet_malloc(size);
if (NULL == state) {
JANET_OUT_OF_MEMORY;
}
state->machine = behavior; state->machine = behavior;
state->fiber = janet_vm.root_fiber; state->fiber = janet_vm.root_fiber;
state->flags = 0;
janet_vm.root_fiber->waiting = state; janet_vm.root_fiber->waiting = state;
if (mask & JANET_ASYNC_LISTEN_READ) stream->read_state = state;
if (mask & JANET_ASYNC_LISTEN_WRITE) stream->write_state = state;
state->stream = stream; state->stream = stream;
state->_mask = mask;
stream->_mask |= mask;
state->_next = stream->state;
stream->state = state;
/* Keep track of a listener for GC purposes */
int resize = janet_vm.listener_cap == janet_vm.listener_count;
if (resize) {
size_t newcap = janet_vm.listener_count ? janet_vm.listener_cap * 2 : 16;
janet_vm.listeners = janet_realloc(janet_vm.listeners, newcap * sizeof(JanetListenerState *));
if (NULL == janet_vm.listeners) {
JANET_OUT_OF_MEMORY;
}
janet_vm.listener_cap = newcap;
}
size_t index = janet_vm.listener_count++;
janet_vm.listeners[index] = state;
state->_index = index;
/* Emit INIT event for convenience */
state->event = user; state->event = user;
state->machine(state, JANET_ASYNC_EVENT_INIT); state->machine(state, JANET_ASYNC_EVENT_INIT);
janet_ev_inc_refcount();
state->index = janet_vm.listeners->count;
janet_array_push(janet_vm.listeners, janet_wrap_abstract(state));
return state; return state;
bad_listen_write:
janet_panic("cannot listen for duplicate write event on stream");
bad_listen_read:
janet_panic("cannot listen for duplicate read event on stream");
} }
/* Indicate we are no longer listening for an event. This void janet_fiber_did_resume(JanetFiber *fiber) {
* frees the memory of the state machine as well. */ if (fiber->waiting) {
static void janet_unlisten_impl(JanetListenerState *state, int is_gc) { janet_unlisten(fiber->waiting);
state->machine(state, JANET_ASYNC_EVENT_DEINIT); fiber->waiting = NULL;
/* Remove state machine from poll list */ }
JanetListenerState **iter = &(state->stream->state); }
while (*iter && *iter != state)
iter = &((*iter)->_next); static void janet_unlisten_impl(JanetListenerState *state) {
janet_assert(*iter, "failed to remove listener"); /* Move last listener to position of this listener - O(1) removal and keep things densely packed. */
*iter = state->_next; if (state->stream) {
/* Remove mask */ Janet popped = janet_array_pop(janet_vm.listeners);
state->stream->_mask &= ~(state->_mask); janet_assert(janet_checktype(popped, JANET_ABSTRACT), "pop check");
/* Ensure fiber does not reference this state */ JanetListenerState *popped_state = (JanetListenerState *) janet_unwrap_abstract(popped);
if (!is_gc) { janet_vm.listeners->data[state->index] = popped;
JanetFiber *fiber = state->fiber; popped_state->index = state->index;
if (NULL != fiber && fiber->waiting == state) { state->index = UINT32_MAX; /* just in case */
fiber->waiting = NULL; janet_ev_dec_refcount();
} if (state->stream->read_state == state) {
state->stream->read_state = NULL;
}
if (state->stream->write_state == state) {
state->stream->write_state = NULL;
}
state->stream = NULL;
}
}
static int janet_listener_gc(void *p, size_t size) {
(void) size;
JanetListenerState *state = (JanetListenerState *)p;
if (state->stream) {
janet_ev_dec_refcount();
}
if (state->machine) {
state->machine(state, JANET_ASYNC_EVENT_DEINIT);
}
return 0;
}
static int janet_listener_mark(void *p, size_t size) {
(void) size;
JanetListenerState *state = (JanetListenerState *)p;
if (state->stream) {
janet_mark(janet_wrap_abstract(state->stream));
}
if (state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
state->machine(state, JANET_ASYNC_EVENT_MARK);
return 0;
}
static void janet_stream_checktoclose(JanetStream *stream) {
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->read_state && !stream->write_state) {
janet_stream_close(stream);
} }
/* Untrack a listener for gc purposes */
size_t index = state->_index;
janet_vm.listeners[index] = janet_vm.listeners[--janet_vm.listener_count];
janet_vm.listeners[index]->_index = index;
janet_free(state);
} }
static const JanetMethod ev_default_stream_methods[] = { static const JanetMethod ev_default_stream_methods[] = {
@@ -339,52 +363,52 @@ JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod
JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream)); JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream));
stream->handle = handle; stream->handle = handle;
stream->flags = flags; stream->flags = flags;
stream->state = NULL; stream->read_state = NULL;
stream->_mask = 0; stream->write_state = NULL;
if (methods == NULL) methods = ev_default_stream_methods; if (methods == NULL) methods = ev_default_stream_methods;
stream->methods = methods; stream->methods = methods;
return stream; return stream;
} }
/* Close a stream */ static void janet_stream_close_impl(JanetStream *stream) {
static void janet_stream_close_impl(JanetStream *stream, int is_gc) {
if (stream->flags & JANET_STREAM_CLOSED) return;
JanetListenerState *state = stream->state;
while (NULL != state) {
if (!is_gc) {
state->machine(state, JANET_ASYNC_EVENT_CLOSE);
}
JanetListenerState *next_state = state->_next;
janet_unlisten(state, is_gc);
state = next_state;
}
stream->state = NULL;
stream->flags |= JANET_STREAM_CLOSED; stream->flags |= JANET_STREAM_CLOSED;
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
if (stream->handle != INVALID_HANDLE_VALUE) {
#ifdef JANET_NET #ifdef JANET_NET
if (stream->flags & JANET_STREAM_SOCKET) { if (stream->flags & JANET_STREAM_SOCKET) {
closesocket((SOCKET) stream->handle); closesocket((SOCKET) stream->handle);
} else } else
#endif #endif
{ {
CloseHandle(stream->handle); CloseHandle(stream->handle);
}
stream->handle = INVALID_HANDLE_VALUE;
} }
stream->handle = INVALID_HANDLE_VALUE;
#else #else
close(stream->handle); if (stream->handle != -1) {
stream->handle = -1; close(stream->handle);
stream->handle = -1;
}
#endif #endif
} }
void janet_stream_close(JanetStream *stream) { void janet_stream_close(JanetStream *stream) {
janet_stream_close_impl(stream, 0); if (stream->read_state) {
stream->read_state->machine(stream->read_state, JANET_ASYNC_EVENT_CLOSE);
janet_unlisten(stream->read_state);
}
if (stream->write_state) {
stream->write_state->machine(stream->write_state, JANET_ASYNC_EVENT_CLOSE);
janet_unlisten(stream->write_state);
}
janet_stream_close_impl(stream);
} }
/* Called to clean up a stream */ /* Called to clean up a stream */
static int janet_stream_gc(void *p, size_t s) { static int janet_stream_gc(void *p, size_t s) {
(void) s; (void) s;
JanetStream *stream = (JanetStream *)p; JanetStream *stream = (JanetStream *)p;
janet_stream_close_impl(stream, 1); janet_stream_close_impl(stream);
return 0; return 0;
} }
@@ -392,13 +416,11 @@ static int janet_stream_gc(void *p, size_t s) {
static int janet_stream_mark(void *p, size_t s) { static int janet_stream_mark(void *p, size_t s) {
(void) s; (void) s;
JanetStream *stream = (JanetStream *) p; JanetStream *stream = (JanetStream *) p;
JanetListenerState *state = stream->state; if (NULL != stream->read_state) {
while (NULL != state) { janet_mark(janet_wrap_abstract(stream->read_state));
if (NULL != state->fiber) { }
janet_mark(janet_wrap_fiber(state->fiber)); if (NULL != stream->write_state) {
} janet_mark(janet_wrap_abstract(stream->write_state));
(state->machine)(state, JANET_ASYNC_EVENT_MARK);
state = state->_next;
} }
return 0; return 0;
} }
@@ -451,8 +473,8 @@ static void *janet_stream_unmarshal(JanetMarshalContext *ctx) {
} }
JanetStream *p = janet_unmarshal_abstract(ctx, sizeof(JanetStream)); JanetStream *p = janet_unmarshal_abstract(ctx, sizeof(JanetStream));
/* Can't share listening state and such across threads */ /* Can't share listening state and such across threads */
p->_mask = 0; p->read_state = NULL;
p->state = NULL; p->write_state = NULL;
p->flags = (uint32_t) janet_unmarshal_int(ctx); p->flags = (uint32_t) janet_unmarshal_int(ctx);
p->methods = janet_unmarshal_ptr(ctx); p->methods = janet_unmarshal_ptr(ctx);
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
@@ -516,14 +538,6 @@ void janet_schedule(JanetFiber *fiber, Janet value) {
janet_schedule_signal(fiber, value, JANET_SIGNAL_OK); janet_schedule_signal(fiber, value, JANET_SIGNAL_OK);
} }
void janet_fiber_did_resume(JanetFiber *fiber) {
/* Cancel any pending fibers */
if (fiber->waiting) {
fiber->waiting->machine(fiber->waiting, JANET_ASYNC_EVENT_CANCEL);
janet_unlisten(fiber->waiting, 0);
}
}
/* Mark all pending tasks */ /* Mark all pending tasks */
void janet_ev_mark(void) { void janet_ev_mark(void) {
@@ -552,16 +566,6 @@ void janet_ev_mark(void) {
janet_mark(janet_wrap_fiber(janet_vm.tq[i].curr_fiber)); janet_mark(janet_wrap_fiber(janet_vm.tq[i].curr_fiber));
} }
} }
/* Pending listeners */
for (size_t i = 0; i < janet_vm.listener_count; i++) {
JanetListenerState *state = janet_vm.listeners[i];
if (NULL != state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
janet_stream_mark(state->stream, sizeof(JanetStream));
(state->machine)(state, JANET_ASYNC_EVENT_MARK);
}
} }
static int janet_channel_push(JanetChannel *channel, Janet x, int mode); static int janet_channel_push(JanetChannel *channel, Janet x, int mode);
@@ -582,9 +586,6 @@ static Janet make_supervisor_event(const char *name, JanetFiber *fiber, int thre
/* Common init code */ /* Common init code */
void janet_ev_init_common(void) { void janet_ev_init_common(void) {
janet_q_init(&janet_vm.spawn); janet_q_init(&janet_vm.spawn);
janet_vm.listener_count = 0;
janet_vm.listener_cap = 0;
janet_vm.listeners = NULL;
janet_vm.tq = NULL; janet_vm.tq = NULL;
janet_vm.tq_count = 0; janet_vm.tq_count = 0;
janet_vm.tq_capacity = 0; janet_vm.tq_capacity = 0;
@@ -592,6 +593,8 @@ void janet_ev_init_common(void) {
janet_table_init_raw(&janet_vm.active_tasks, 0); janet_table_init_raw(&janet_vm.active_tasks, 0);
janet_table_init_raw(&janet_vm.signal_handlers, 0); janet_table_init_raw(&janet_vm.signal_handlers, 0);
janet_rng_seed(&janet_vm.ev_rng, 0); janet_rng_seed(&janet_vm.ev_rng, 0);
janet_vm.listeners = janet_array(0);
janet_gcroot(janet_wrap_array(janet_vm.listeners));
#ifndef JANET_WINDOWS #ifndef JANET_WINDOWS
pthread_attr_init(&janet_vm.new_thread_attr); pthread_attr_init(&janet_vm.new_thread_attr);
pthread_attr_setdetachstate(&janet_vm.new_thread_attr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&janet_vm.new_thread_attr, PTHREAD_CREATE_DETACHED);
@@ -602,11 +605,10 @@ void janet_ev_init_common(void) {
void janet_ev_deinit_common(void) { void janet_ev_deinit_common(void) {
janet_q_deinit(&janet_vm.spawn); janet_q_deinit(&janet_vm.spawn);
janet_free(janet_vm.tq); janet_free(janet_vm.tq);
janet_free(janet_vm.listeners);
janet_vm.listeners = NULL;
janet_table_deinit(&janet_vm.threaded_abstracts); janet_table_deinit(&janet_vm.threaded_abstracts);
janet_table_deinit(&janet_vm.active_tasks); janet_table_deinit(&janet_vm.active_tasks);
janet_table_deinit(&janet_vm.signal_handlers); janet_table_deinit(&janet_vm.signal_handlers);
janet_vm.listeners = NULL;
#ifndef JANET_WINDOWS #ifndef JANET_WINDOWS
pthread_attr_destroy(&janet_vm.new_thread_attr); pthread_attr_destroy(&janet_vm.new_thread_attr);
#endif #endif
@@ -633,9 +635,9 @@ void janet_addtimeout(double sec) {
void janet_ev_inc_refcount(void) { void janet_ev_inc_refcount(void) {
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
#ifdef JANET_64 #ifdef JANET_64
InterlockedIncrement64(&janet_vm.extra_listeners); InterlockedIncrement64((int64_t volatile *) &janet_vm.extra_listeners);
#else #else
InterlockedIncrement(&janet_vm.extra_listeners); InterlockedIncrement((int32_t volatile *) &janet_vm.extra_listeners);
#endif #endif
#else #else
__atomic_add_fetch(&janet_vm.extra_listeners, 1, __ATOMIC_RELAXED); __atomic_add_fetch(&janet_vm.extra_listeners, 1, __ATOMIC_RELAXED);
@@ -645,9 +647,9 @@ void janet_ev_inc_refcount(void) {
void janet_ev_dec_refcount(void) { void janet_ev_dec_refcount(void) {
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
#ifdef JANET_64 #ifdef JANET_64
InterlockedDecrement64(&janet_vm.extra_listeners); InterlockedDecrement64((int64_t volatile *) &janet_vm.extra_listeners);
#else #else
InterlockedDecrement(&janet_vm.extra_listeners); InterlockedDecrement((int32_t volatile *) &janet_vm.extra_listeners);
#endif #endif
#else #else
__atomic_add_fetch(&janet_vm.extra_listeners, -1, __ATOMIC_RELAXED); __atomic_add_fetch(&janet_vm.extra_listeners, -1, __ATOMIC_RELAXED);
@@ -1330,8 +1332,7 @@ const JanetAbstractType janet_channel_type = {
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout); void janet_loop1_impl(int has_timeout, JanetTimestamp timeout);
int janet_loop_done(void) { int janet_loop_done(void) {
return !(janet_vm.listener_count || return !((janet_vm.spawn.head != janet_vm.spawn.tail) ||
(janet_vm.spawn.head != janet_vm.spawn.tail) ||
janet_vm.tq_count || janet_vm.tq_count ||
janet_vm.extra_listeners); janet_vm.extra_listeners);
} }
@@ -1395,7 +1396,7 @@ JanetFiber *janet_loop1(void) {
} }
/* Poll for events */ /* Poll for events */
if (janet_vm.listener_count || janet_vm.tq_count || janet_vm.extra_listeners) { if (janet_vm.tq_count || janet_vm.extra_listeners) {
JanetTimeout to; JanetTimeout to;
memset(&to, 0, sizeof(to)); memset(&to, 0, sizeof(to));
int has_timeout; int has_timeout;
@@ -1414,7 +1415,7 @@ JanetFiber *janet_loop1(void) {
break; break;
} }
/* Run polling implementation only if pending timeouts or pending events */ /* Run polling implementation only if pending timeouts or pending events */
if (janet_vm.tq_count || janet_vm.listener_count || janet_vm.extra_listeners) { if (janet_vm.tq_count || janet_vm.extra_listeners) {
janet_loop1_impl(has_timeout, to.when); janet_loop1_impl(has_timeout, to.when);
} }
} }
@@ -1506,8 +1507,8 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
return state; return state;
} }
static void janet_unlisten(JanetListenerState *state, int is_gc) { static void janet_unlisten(JanetListenerState *state) {
janet_unlisten_impl(state, is_gc); janet_unlisten_impl(state);
} }
void janet_loop1_impl(int has_timeout, JanetTimestamp to) { void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
@@ -1529,7 +1530,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
} }
BOOL result = GetQueuedCompletionStatus(janet_vm.iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime); BOOL result = GetQueuedCompletionStatus(janet_vm.iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime);
if (result || overlapped) { if (!result) {
return;
}
if (overlapped) {
if (0 == completionKey) { if (0 == completionKey) {
/* Custom event */ /* Custom event */
JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped); JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped);
@@ -1540,24 +1545,22 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
} else { } else {
/* Normal event */ /* Normal event */
JanetStream *stream = (JanetStream *) completionKey; JanetStream *stream = (JanetStream *) completionKey;
JanetListenerState *state = stream->state; janet_assert(stream->handle != INVALID_HANDLE_VALUE, "got closed stream event");
while (state != NULL) { JanetListenerState *state = NULL;
if (state->tag == overlapped) { if (stream->read_state && stream->read_state->tag == overlapped) {
state->event = overlapped; state = stream->read_state;
state->bytes = num_bytes_transfered; } else if (stream->write_state && stream->write_state->tag == overlapped) {
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE); state = stream->write_state;
if (status == JANET_ASYNC_STATUS_DONE) { }
janet_unlisten(state, 0); if (state != NULL) {
} state->event = overlapped;
break; state->bytes = num_bytes_transfered;
} else { JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
state = state->_next; if (status == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
} }
} }
/* Close the stream if requested and no more listeners are left */ janet_stream_checktoclose(stream);
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
}
} }
} }
} }
@@ -1572,26 +1575,17 @@ static JanetTimestamp ts_now(void) {
return res; return res;
} }
static int make_epoll_events(int mask) {
int events = 0;
if (mask & JANET_ASYNC_LISTEN_READ)
events |= EPOLLIN;
if (mask & JANET_ASYNC_LISTEN_WRITE)
events |= EPOLLOUT;
return events;
}
static void janet_epoll_sync_callback(JanetEVGenericMessage msg) { static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
JanetListenerState *state = msg.argp; JanetListenerState *state = msg.argp;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE) if (state == state->stream->read_state)
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); status1 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (state->stream->_mask & JANET_ASYNC_LISTEN_READ) if (state == state->stream->write_state)
status2 = state->machine(state, JANET_ASYNC_EVENT_READ); status2 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (status1 == JANET_ASYNC_STATUS_DONE || if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE) { status2 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0); janet_unlisten(state);
} else { } else {
/* Repost event */ /* Repost event */
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg); janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
@@ -1600,11 +1594,13 @@ static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
/* Wait for the next event */ /* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
int is_first = !(stream->state); int is_first = !stream->read_state && !stream->write_state;
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
struct epoll_event ev; struct epoll_event ev;
ev.events = make_epoll_events(state->stream->_mask); ev.events = 0;
if (stream->read_state) ev.events |= EPOLLIN;
if (stream->write_state) ev.events |= EPOLLOUT;
ev.data.ptr = stream; ev.data.ptr = stream;
int status; int status;
do { do {
@@ -1618,13 +1614,13 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
* event to a file. So we just post a custom event to do the read/write * event to a file. So we just post a custom event to do the read/write
* asap. */ * asap. */
/* Use flag to indicate state is not registered in epoll */ /* Use flag to indicate state is not registered in epoll */
state->_mask |= (1 << JANET_ASYNC_EVENT_COMPLETE); state->flags = 1;
JanetEVGenericMessage msg = {0}; JanetEVGenericMessage msg = {0};
msg.argp = state; msg.argp = state;
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg); janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
} else { } else {
/* Unexpected error */ /* Unexpected error */
janet_unlisten_impl(state, 0); janet_unlisten_impl(state);
janet_panicv(janet_ev_lasterr()); janet_panicv(janet_ev_lasterr());
} }
} }
@@ -1632,15 +1628,19 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
} }
/* Tell system we are done listening for a certain event */ /* Tell system we are done listening for a certain event */
static void janet_unlisten(JanetListenerState *state, int is_gc) { static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream; JanetStream *stream = state->stream;
if (!(stream->flags & JANET_STREAM_CLOSED)) { if (stream && (stream->handle != -1)) {
/* Use flag to indicate state is not registered in epoll */ /* Use flag to indicate state is not registered in epoll */
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) { if (!state->flags) {
int is_last = (state->_next == NULL && stream->state == state); int is_read = (stream->read_state != state) && stream->read_state;
int is_write = (stream->write_state != state) && stream->write_state;
int is_last = !is_read && !is_write;
int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
struct epoll_event ev; struct epoll_event ev;
ev.events = make_epoll_events(stream->_mask & ~state->_mask); ev.events = 0;
if (is_read) ev.events |= EPOLLIN;
if (is_write) ev.events |= EPOLLOUT;
ev.data.ptr = stream; ev.data.ptr = stream;
int status; int status;
do { do {
@@ -1652,7 +1652,7 @@ static void janet_unlisten(JanetListenerState *state, int is_gc) {
} }
} }
/* Destroy state machine and free memory */ /* Destroy state machine and free memory */
janet_unlisten_impl(state, is_gc); janet_unlisten_impl(state);
} }
#define JANET_EPOLL_MAX_EVENTS 64 #define JANET_EPOLL_MAX_EVENTS 64
@@ -1689,10 +1689,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
} else { } else {
JanetStream *stream = p; JanetStream *stream = p;
int mask = events[i].events; int mask = events[i].events;
JanetListenerState *state = stream->state; JanetListenerState *states[2] = {stream->read_state, stream->write_state};
while (NULL != state) { for (int j = 0; j < 2; j++) {
JanetListenerState *state = states[j];
if (!state) continue;
state->event = events + i; state->event = events + i;
JanetListenerState *next_state = state->_next;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
@@ -1708,14 +1709,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (status1 == JANET_ASYNC_STATUS_DONE || if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE || status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE) status4 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0); janet_unlisten(state);
state = next_state; }
}
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
} }
janet_stream_checktoclose(stream);
} }
} }
} }
@@ -1809,46 +1807,44 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
struct kevent kev[2]; struct kevent kev[2];
int length = 0; int length = 0;
if (state->stream->_mask & JANET_ASYNC_LISTEN_READ) { if (mask & JANET_ASYNC_LISTEN_READ) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, stream); EV_SETx(&kev[length], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, stream);
length++; length++;
} }
if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE) { if (mask & JANET_ASYNC_LISTEN_WRITE) {
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, stream); EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, stream);
length++; length++;
} }
if (length > 0) { janet_assert(length, "expected to add kqueue events");
add_kqueue_events(kev, length); add_kqueue_events(kev, length);
}
return state; return state;
} }
static void janet_unlisten(JanetListenerState *state, int is_gc) { static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream; JanetStream *stream = state->stream;
if (!(stream->flags & JANET_STREAM_CLOSED)) { if (stream && (stream->handle != -1)) {
/* Use flag to indicate state is not registered in kqueue */ int is_read = (stream->read_state != state) && stream->read_state;
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) { int is_write = (stream->write_state != state) && stream->write_state;
int is_last = (state->_next == NULL && stream->state == state); int is_last = !is_read && !is_write;
int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD; int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD;
struct kevent kev[2]; struct kevent kev[2];
EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream); EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
int length = 0; int length = 0;
if (stream->_mask & JANET_ASYNC_EVENT_WRITE) { if (stream->read_state == state) {
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream); EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
length++; length++;
}
if (stream->_mask & JANET_ASYNC_EVENT_READ) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream);
length++;
}
add_kqueue_events(kev, length);
} }
if (stream->write_state == state) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream);
length++;
}
add_kqueue_events(kev, length);
} }
janet_unlisten_impl(state, is_gc); janet_unlisten_impl(state);
} }
#define JANET_KQUEUE_MAX_EVENTS 64 #define JANET_KQUEUE_MAX_EVENTS 64
@@ -1888,14 +1884,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
janet_ev_handle_selfpipe(); janet_ev_handle_selfpipe();
} else { } else {
JanetStream *stream = p; JanetStream *stream = p;
JanetListenerState *state = stream->state; JanetListenerState *states[2] = {stream->read_state, stream->write_state};
while (NULL != state) { for (int j = 0; j < 2; j++) {
JanetListenerState *next_state = state->_next; JanetListenerState *state = states[j];
if (!state) continue;
state->event = events + i; state->event = events + i;
JanetAsyncStatus statuses[4]; JanetAsyncStatus statuses[4];
for (int i = 0; i < 4; i++) for (int i = 0; i < 4; i++)
statuses[i] = JANET_ASYNC_STATUS_NOT_DONE; statuses[i] = JANET_ASYNC_STATUS_NOT_DONE;
if (!(events[i].flags & EV_ERROR)) { if (!(events[i].flags & EV_ERROR)) {
if (events[i].filter == EVFILT_WRITE) if (events[i].filter == EVFILT_WRITE)
statuses[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE); statuses[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE);
@@ -1909,15 +1905,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (statuses[0] == JANET_ASYNC_STATUS_DONE || if (statuses[0] == JANET_ASYNC_STATUS_DONE ||
statuses[1] == JANET_ASYNC_STATUS_DONE || statuses[1] == JANET_ASYNC_STATUS_DONE ||
statuses[2] == JANET_ASYNC_STATUS_DONE || statuses[2] == JANET_ASYNC_STATUS_DONE ||
statuses[3] == JANET_ASYNC_STATUS_DONE) statuses[3] == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0); janet_unlisten(state);
}
state = next_state;
}
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
} }
janet_stream_checktoclose(stream);
} }
} }
} }
@@ -1955,20 +1947,11 @@ static JanetTimestamp ts_now(void) {
return res; return res;
} }
static int make_poll_events(int mask) {
int events = 0;
if (mask & JANET_ASYNC_LISTEN_READ)
events |= POLLIN;
if (mask & JANET_ASYNC_LISTEN_WRITE)
events |= POLLOUT;
return events;
}
/* Wait for the next event */ /* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
size_t oldsize = janet_vm.listener_cap; size_t oldsize = janet_vm.listeners->capacity;
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
size_t newsize = janet_vm.listener_cap; size_t newsize = janet_vm.listeners->capacity;
if (newsize > oldsize) { if (newsize > oldsize) {
janet_vm.fds = janet_realloc(janet_vm.fds, (newsize + 1) * sizeof(struct pollfd)); janet_vm.fds = janet_realloc(janet_vm.fds, (newsize + 1) * sizeof(struct pollfd));
if (NULL == janet_vm.fds) { if (NULL == janet_vm.fds) {
@@ -1977,15 +1960,19 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
} }
struct pollfd ev; struct pollfd ev;
ev.fd = stream->handle; ev.fd = stream->handle;
ev.events = make_poll_events(state->stream->_mask); ev.events = 0;
if (stream->read_state) ev.events |= POLLIN;
if (stream->write_state) ev.events |= POLLOUT;
ev.revents = 0; ev.revents = 0;
janet_vm.fds[state->_index + 1] = ev; janet_vm.fds[state->index + 1] = ev;
return state; return state;
} }
static void janet_unlisten(JanetListenerState *state, int is_gc) { static void janet_unlisten(JanetListenerState *state) {
janet_vm.fds[state->_index + 1] = janet_vm.fds[janet_vm.listener_count]; if (state->stream) {
janet_unlisten_impl(state, is_gc); janet_vm.fds[state->index + 1] = janet_vm.fds[janet_vm.listeners->count];
}
janet_unlisten_impl(state);
} }
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
@@ -1997,7 +1984,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
JanetTimestamp now = ts_now(); JanetTimestamp now = ts_now();
to = now > timeout ? 0 : (int)(timeout - now); to = now > timeout ? 0 : (int)(timeout - now);
} }
ready = poll(janet_vm.fds, janet_vm.listener_count + 1, to); ready = poll(janet_vm.fds, janet_vm.listeners->count + 1, to);
} while (ready == -1 && errno == EINTR); } while (ready == -1 && errno == EINTR);
if (ready == -1) { if (ready == -1) {
JANET_EXIT("failed to poll events"); JANET_EXIT("failed to poll events");
@@ -2010,10 +1997,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
} }
/* Step state machines */ /* Step state machines */
for (size_t i = 0; i < janet_vm.listener_count; i++) { for (int32_t i = 0; i < janet_vm.listeners->count; i++) {
struct pollfd *pfd = janet_vm.fds + i + 1; struct pollfd *pfd = janet_vm.fds + i + 1;
/* Skip fds where nothing interesting happened */ /* Skip fds where nothing interesting happened */
JanetListenerState *state = janet_vm.listeners[i]; JanetListenerState *state = (JanetListenerState *) janet_unwrap_abstract(janet_vm.listeners->data[i]);
/* Normal event */ /* Normal event */
int mask = pfd->revents; int mask = pfd->revents;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
@@ -2033,12 +2020,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (status1 == JANET_ASYNC_STATUS_DONE || if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE || status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE) status4 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0); janet_unlisten(state);
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
} }
janet_stream_checktoclose(stream);
} }
} }
@@ -2525,8 +2510,7 @@ void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, in
typedef enum { typedef enum {
JANET_ASYNC_WRITEMODE_WRITE, JANET_ASYNC_WRITEMODE_WRITE,
JANET_ASYNC_WRITEMODE_SEND, JANET_ASYNC_WRITEMODE_SEND,
JANET_ASYNC_WRITEMODE_SENDTO, JANET_ASYNC_WRITEMODE_SENDTO
JANET_ASYNC_WRITEMODE_CONNECT
} JanetWriteMode; } JanetWriteMode;
typedef struct { typedef struct {
@@ -2550,41 +2534,15 @@ typedef struct {
#endif #endif
} StateWrite; } StateWrite;
static JanetAsyncStatus handle_connect(JanetListenerState *s) {
#ifdef JANET_WINDOWS
int res = 0;
int size = sizeof(res);
int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
#else
int res = 0;
socklen_t size = sizeof res;
int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
#endif
if (r == 0) {
if (res == 0) {
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
} else {
s->stream->flags |= JANET_STREAM_TOCLOSE;
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
}
} else {
s->stream->flags |= JANET_STREAM_TOCLOSE;
janet_cancel(s->fiber, janet_ev_lasterr());
}
return JANET_ASYNC_STATUS_DONE;
}
JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) { JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
StateWrite *state = (StateWrite *) s; StateWrite *state = (StateWrite *) s;
switch (event) { switch (event) {
default: default:
break; break;
case JANET_ASYNC_EVENT_MARK: { case JANET_ASYNC_EVENT_MARK: {
if (state->mode != JANET_ASYNC_WRITEMODE_CONNECT) { janet_mark(state->is_buffer
janet_mark(state->is_buffer ? janet_wrap_buffer(state->src.buf)
? janet_wrap_buffer(state->src.buf) : janet_wrap_string(state->src.str));
: janet_wrap_string(state->src.str));
}
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
janet_mark(janet_wrap_abstract(state->dest_abst)); janet_mark(janet_wrap_abstract(state->dest_abst));
} }
@@ -2606,11 +2564,6 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
} }
break; break;
case JANET_ASYNC_EVENT_USER: { case JANET_ASYNC_EVENT_USER: {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
return handle_connect(s);
}
#endif
/* Begin write */ /* Begin write */
int32_t len; int32_t len;
const uint8_t *bytes; const uint8_t *bytes;
@@ -2674,11 +2627,6 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
janet_cancel(s->fiber, janet_cstringv("stream hup")); janet_cancel(s->fiber, janet_cstringv("stream hup"));
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_WRITE: { case JANET_ASYNC_EVENT_WRITE: {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
return handle_connect(s);
}
#endif
int32_t start, len; int32_t start, len;
const uint8_t *bytes; const uint8_t *bytes;
start = state->start; start = state->start;
@@ -2780,10 +2728,6 @@ void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, i
void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) { void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags); janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
} }
void janet_ev_connect(JanetStream *stream, int flags) {
janet_ev_write_generic(stream, NULL, NULL, JANET_ASYNC_WRITEMODE_CONNECT, 0, flags);
}
#endif #endif
/* For a pipe ID */ /* For a pipe ID */

View File

@@ -39,8 +39,8 @@ static void fiber_reset(JanetFiber *fiber) {
fiber->env = NULL; fiber->env = NULL;
fiber->last_value = janet_wrap_nil(); fiber->last_value = janet_wrap_nil();
#ifdef JANET_EV #ifdef JANET_EV
fiber->waiting = NULL;
fiber->sched_id = 0; fiber->sched_id = 0;
fiber->waiting = NULL;
fiber->supervisor_channel = NULL; fiber->supervisor_channel = NULL;
#endif #endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW); janet_fiber_set_status(fiber, JANET_STATUS_NEW);
@@ -85,7 +85,6 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t
if (janet_fiber_funcframe(fiber, callee)) return NULL; if (janet_fiber_funcframe(fiber, callee)) return NULL;
janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE; janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE;
#ifdef JANET_EV #ifdef JANET_EV
fiber->waiting = NULL;
fiber->supervisor_channel = NULL; fiber->supervisor_channel = NULL;
#endif #endif
return fiber; return fiber;

View File

@@ -268,6 +268,9 @@ recur:
if (fiber->supervisor_channel) { if (fiber->supervisor_channel) {
janet_mark_abstract(fiber->supervisor_channel); janet_mark_abstract(fiber->supervisor_channel);
} }
if (fiber->waiting) {
janet_mark_abstract(fiber->waiting);
}
#endif #endif
/* Explicit tail recursion */ /* Explicit tail recursion */
@@ -438,6 +441,7 @@ void janet_collect(void) {
uint32_t i; uint32_t i;
if (janet_vm.gc_suspend) return; if (janet_vm.gc_suspend) return;
depth = JANET_RECURSION_GUARD; depth = JANET_RECURSION_GUARD;
janet_vm.gc_mark_phase = 1;
/* Try and prevent many major collections back to back. /* Try and prevent many major collections back to back.
* A full collection will take O(janet_vm.block_count) time. * A full collection will take O(janet_vm.block_count) time.
* If we have a large heap, make sure our interval is not too * If we have a large heap, make sure our interval is not too
@@ -457,6 +461,7 @@ void janet_collect(void) {
Janet x = janet_vm.roots[--janet_vm.root_count]; Janet x = janet_vm.roots[--janet_vm.root_count];
janet_mark(x); janet_mark(x);
} }
janet_vm.gc_mark_phase = 0;
janet_sweep(); janet_sweep();
janet_vm.next_collection = 0; janet_vm.next_collection = 0;
janet_free_all_scratch(); janet_free_all_scratch();
@@ -560,7 +565,9 @@ void janet_gcunlock(int handle) {
janet_vm.gc_suspend = handle; janet_vm.gc_suspend = handle;
} }
/* Scratch memory API */ /* Scratch memory API
* Scratch memory allocations do not need to be free (but optionally can be), and will be automatically cleaned
* up in the next call to janet_collect. */
void *janet_smalloc(size_t size) { void *janet_smalloc(size_t size) {
JanetScratch *s = janet_malloc(sizeof(JanetScratch) + size); JanetScratch *s = janet_malloc(sizeof(JanetScratch) + size);

View File

@@ -131,7 +131,7 @@ JANET_CORE_FN(cfun_io_temp,
} }
JANET_CORE_FN(cfun_io_fopen, JANET_CORE_FN(cfun_io_fopen,
"(file/open path &opt mode)", "(file/open path &opt mode buffer-size)",
"Open a file. `path` is an absolute or relative path, and " "Open a file. `path` is an absolute or relative path, and "
"`mode` is a set of flags indicating the mode to open the file in. " "`mode` is a set of flags indicating the mode to open the file in. "
"`mode` is a keyword where each character represents a flag. If the file " "`mode` is a keyword where each character represents a flag. If the file "
@@ -145,7 +145,7 @@ JANET_CORE_FN(cfun_io_fopen,
"* + - append to the file instead of overwriting it\n\n" "* + - append to the file instead of overwriting it\n\n"
"* n - error if the file cannot be opened instead of returning nil\n\n" "* n - error if the file cannot be opened instead of returning nil\n\n"
"See fopen (<stdio.h>, C99) for further details.") { "See fopen (<stdio.h>, C99) for further details.") {
janet_arity(argc, 1, 2); janet_arity(argc, 1, 3);
const uint8_t *fname = janet_getstring(argv, 0); const uint8_t *fname = janet_getstring(argv, 0);
const uint8_t *fmode; const uint8_t *fmode;
int32_t flags; int32_t flags;
@@ -158,6 +158,15 @@ JANET_CORE_FN(cfun_io_fopen,
flags = JANET_FILE_READ; flags = JANET_FILE_READ;
} }
FILE *f = fopen((const char *)fname, (const char *)fmode); FILE *f = fopen((const char *)fname, (const char *)fmode);
if (f != NULL) {
size_t bufsize = janet_optsize(argv, argc, 2, BUFSIZ);
if (bufsize != BUFSIZ) {
int result = setvbuf(f, NULL, bufsize ? _IOFBF : _IONBF, bufsize);
if (result) {
janet_panic("failed to set buffer size for file");
}
}
}
return f ? janet_makefile(f, flags) return f ? janet_makefile(f, flags)
: (flags & JANET_FILE_NONIL) ? (janet_panicf("failed to open file %s: %s", fname, strerror(errno)), janet_wrap_nil()) : (flags & JANET_FILE_NONIL) ? (janet_panicf("failed to open file %s: %s", fname, strerror(errno)), janet_wrap_nil())
: janet_wrap_nil(); : janet_wrap_nil();

View File

@@ -1048,7 +1048,6 @@ static const uint8_t *unmarshal_one_fiber(
fiber->env = NULL; fiber->env = NULL;
fiber->last_value = janet_wrap_nil(); fiber->last_value = janet_wrap_nil();
#ifdef JANET_EV #ifdef JANET_EV
fiber->waiting = NULL;
fiber->sched_id = 0; fiber->sched_id = 0;
fiber->supervisor_channel = NULL; fiber->supervisor_channel = NULL;
#endif #endif

View File

@@ -111,6 +111,62 @@ static void janet_net_socknoblock(JSock s) {
#endif #endif
} }
/* State machine for async connect */
typedef struct {
JanetListenerState head;
int did_connect;
} NetStateConnect;
JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent event) {
NetStateConnect *state = (NetStateConnect *)s;
switch (event) {
default:
return JANET_ASYNC_STATUS_NOT_DONE;
case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_HUP:
case JANET_ASYNC_EVENT_ERR:
case JANET_ASYNC_EVENT_COMPLETE:
case JANET_ASYNC_EVENT_WRITE:
case JANET_ASYNC_EVENT_USER:
break;
}
JanetStream *stream = s->stream;
#ifdef JANET_WINDOWS
int res = 0;
int size = sizeof(res);
int r = getsockopt((SOCKET)stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
#else
int res = 0;
socklen_t size = sizeof res;
int r = getsockopt(stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
#endif
if (r == 0) {
if (res == 0) {
state->did_connect = 1;
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
} else {
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
stream->flags |= JANET_STREAM_TOCLOSE;
}
} else {
janet_cancel(s->fiber, janet_ev_lasterr());
stream->flags |= JANET_STREAM_TOCLOSE;
}
return JANET_ASYNC_STATUS_DONE;
}
static void net_sched_connect(JanetStream *stream) {
JanetListenerState *s = janet_listen(stream, net_machine_connect, JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateConnect), NULL);
NetStateConnect *state = (NetStateConnect *)s;
state->did_connect = 0;
#ifdef JANET_WINDOWS
net_machine_connect(s, JANET_ASYNC_EVENT_USER);
#endif
}
/* State machine for accepting connections. */ /* State machine for accepting connections. */
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
@@ -496,7 +552,7 @@ JANET_CORE_FN(cfun_net_connect,
} }
#endif #endif
if (status != 0) { if (status) {
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
if (err != WSAEWOULDBLOCK) { if (err != WSAEWOULDBLOCK) {
#else #else
@@ -508,9 +564,7 @@ JANET_CORE_FN(cfun_net_connect,
} }
} }
/* Handle the connect() result in the event loop*/ net_sched_connect(stream);
janet_ev_connect(stream, MSG_NOSIGNAL);
janet_await(); janet_await();
} }

View File

@@ -1437,8 +1437,8 @@ JANET_CORE_FN(os_getenv,
janet_sandbox_assert(JANET_SANDBOX_ENV); janet_sandbox_assert(JANET_SANDBOX_ENV);
janet_arity(argc, 1, 2); janet_arity(argc, 1, 2);
const char *cstr = janet_getcstring(argv, 0); const char *cstr = janet_getcstring(argv, 0);
const char *res = getenv(cstr);
janet_lock_environ(); janet_lock_environ();
const char *res = getenv(cstr);
Janet ret = res Janet ret = res
? janet_cstringv(res) ? janet_cstringv(res)
: argc == 2 : argc == 2

View File

@@ -27,6 +27,10 @@
#include "util.h" #include "util.h"
#endif #endif
#ifdef JANET_WINDOWS
#include <windows.h>
#endif
JANET_THREAD_LOCAL JanetVM janet_vm; JANET_THREAD_LOCAL JanetVM janet_vm;
JanetVM *janet_local_vm(void) { JanetVM *janet_local_vm(void) {

View File

@@ -125,6 +125,7 @@ struct JanetVM {
size_t next_collection; size_t next_collection;
size_t block_count; size_t block_count;
int gc_suspend; int gc_suspend;
int gc_mark_phase;
/* GC roots */ /* GC roots */
Janet *roots; Janet *roots;
@@ -154,12 +155,10 @@ struct JanetVM {
JanetQueue spawn; JanetQueue spawn;
JanetTimeout *tq; JanetTimeout *tq;
JanetRNG ev_rng; JanetRNG ev_rng;
JanetListenerState **listeners;
size_t listener_count;
size_t listener_cap;
volatile size_t extra_listeners; /* used in signal handler, must be volatile */ volatile size_t extra_listeners; /* used in signal handler, must be volatile */
JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */ JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */
JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */ JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */
JanetArray *listeners; /* For GC */
JanetTable signal_handlers; JanetTable signal_handlers;
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
void **iocp; void **iocp;

View File

@@ -111,12 +111,11 @@ static void janet_table_rehash(JanetTable *t, int32_t size) {
JANET_OUT_OF_MEMORY; JANET_OUT_OF_MEMORY;
} }
} }
int32_t i, oldcapacity; int32_t oldcapacity = t->capacity;
oldcapacity = t->capacity;
t->data = newdata; t->data = newdata;
t->capacity = size; t->capacity = size;
t->deleted = 0; t->deleted = 0;
for (i = 0; i < oldcapacity; i++) { for (int32_t i = 0; i < oldcapacity; i++) {
JanetKV *kv = olddata + i; JanetKV *kv = olddata + i;
if (!janet_checktype(kv->key, JANET_NIL)) { if (!janet_checktype(kv->key, JANET_NIL)) {
JanetKV *newkv = janet_table_find(t, kv->key); JanetKV *newkv = janet_table_find(t, kv->key);

View File

@@ -49,7 +49,7 @@
#ifndef JANET_EXIT #ifndef JANET_EXIT
#include <stdio.h> #include <stdio.h>
#define JANET_EXIT(m) do { \ #define JANET_EXIT(m) do { \
fprintf(stderr, "C runtime error at line %d in file %s: %s\n",\ fprintf(stderr, "janet interpreter runtime error at line %d in file %s: %s\n",\
__LINE__,\ __LINE__,\
__FILE__,\ __FILE__,\
(m));\ (m));\

View File

@@ -861,7 +861,7 @@ static JanetSignal run_vm(JanetFiber *fiber, Janet in) {
vm_pcnext(); vm_pcnext();
VM_OP(JOP_EQUALS_IMMEDIATE) VM_OP(JOP_EQUALS_IMMEDIATE)
stack[A] = janet_wrap_boolean(janet_unwrap_number(stack[B]) == (double) CS); stack[A] = janet_wrap_boolean(janet_checktype(stack[B], JANET_NUMBER) && (janet_unwrap_number(stack[B]) == (double) CS));
vm_pcnext(); vm_pcnext();
VM_OP(JOP_NOT_EQUALS) VM_OP(JOP_NOT_EQUALS)
@@ -869,7 +869,7 @@ static JanetSignal run_vm(JanetFiber *fiber, Janet in) {
vm_pcnext(); vm_pcnext();
VM_OP(JOP_NOT_EQUALS_IMMEDIATE) VM_OP(JOP_NOT_EQUALS_IMMEDIATE)
stack[A] = janet_wrap_boolean(janet_unwrap_number(stack[B]) != (double) CS); stack[A] = janet_wrap_boolean(!janet_checktype(stack[B], JANET_NUMBER) || (janet_unwrap_number(stack[B]) != (double) CS));
vm_pcnext(); vm_pcnext();
VM_OP(JOP_COMPARE) VM_OP(JOP_COMPARE)
@@ -1588,6 +1588,7 @@ int janet_init(void) {
janet_vm.next_collection = 0; janet_vm.next_collection = 0;
janet_vm.gc_interval = 0x400000; janet_vm.gc_interval = 0x400000;
janet_vm.block_count = 0; janet_vm.block_count = 0;
janet_vm.gc_mark_phase = 0;
janet_symcache_init(); janet_symcache_init();

View File

@@ -235,9 +235,22 @@ extern "C" {
#endif #endif
/* How to export symbols */ /* How to export symbols */
#ifndef JANET_EXPORT
#ifdef JANET_WINDOWS
#define JANET_EXPORT __declspec(dllexport)
#else
#define JANET_EXPORT __attribute__((visibility ("default")))
#endif
#endif
/* How declare API functions */
#ifndef JANET_API #ifndef JANET_API
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
#ifdef JANET_DLL_IMPORT
#define JANET_API __declspec(dllimport)
#else
#define JANET_API __declspec(dllexport) #define JANET_API __declspec(dllexport)
#endif
#else #else
#define JANET_API __attribute__((visibility ("default"))) #define JANET_API __attribute__((visibility ("default")))
#endif #endif
@@ -578,7 +591,6 @@ typedef enum {
JANET_ASYNC_EVENT_HUP, JANET_ASYNC_EVENT_HUP,
JANET_ASYNC_EVENT_READ, JANET_ASYNC_EVENT_READ,
JANET_ASYNC_EVENT_WRITE, JANET_ASYNC_EVENT_WRITE,
JANET_ASYNC_EVENT_CANCEL,
JANET_ASYNC_EVENT_COMPLETE, /* Used on windows for IOCP */ JANET_ASYNC_EVENT_COMPLETE, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_USER JANET_ASYNC_EVENT_USER
} JanetAsyncEvent; } JanetAsyncEvent;
@@ -600,13 +612,9 @@ typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEv
struct JanetStream { struct JanetStream {
JanetHandle handle; JanetHandle handle;
uint32_t flags; uint32_t flags;
/* Linked list of all in-flight IO routines for this stream */ JanetListenerState *read_state;
JanetListenerState *state; JanetListenerState *write_state;
const void *methods; /* Methods for this stream */ const void *methods; /* Methods for this stream */
/* internal - used to disallow multiple concurrent reads / writes on the same stream.
* this constraint may be lifted later but allowing such would require more internal book keeping
* for some implementations. You can read and write at the same time on the same stream, though. */
int _mask;
}; };
/* Interface for state machine based event loop */ /* Interface for state machine based event loop */
@@ -616,14 +624,12 @@ struct JanetListenerState {
JanetStream *stream; JanetStream *stream;
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
implementation of the event loop and the particular event. */ implementation of the event loop and the particular event. */
uint32_t index; /* Used for GC and poll implentation */
uint32_t flags;
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
void *tag; /* Used to associate listeners with an overlapped structure */ void *tag; /* Used to associate listeners with an overlapped structure */
int bytes; /* Used to track how many bytes were transfered. */ int bytes; /* Used to track how many bytes were transfered. */
#endif #endif
/* internal */
size_t _index;
int _mask;
JanetListenerState *_next;
}; };
#endif #endif
@@ -730,6 +736,7 @@ JANET_API Janet janet_wrap_integer(int32_t x);
? janet_nanbox_isnumber(x) \ ? janet_nanbox_isnumber(x) \
: janet_nanbox_checkauxtype((x), (t))) : janet_nanbox_checkauxtype((x), (t)))
/* Use JANET_API so that modules will use a local version of these functions if possible */
JANET_API void *janet_nanbox_to_pointer(Janet x); JANET_API void *janet_nanbox_to_pointer(Janet x);
JANET_API Janet janet_nanbox_from_pointer(void *p, uint64_t tagmask); JANET_API Janet janet_nanbox_from_pointer(void *p, uint64_t tagmask);
JANET_API Janet janet_nanbox_from_cpointer(const void *p, uint64_t tagmask); JANET_API Janet janet_nanbox_from_cpointer(const void *p, uint64_t tagmask);
@@ -912,8 +919,8 @@ struct JanetFiber {
* that is, fibers that are scheduled on the event loop and behave much like threads * that is, fibers that are scheduled on the event loop and behave much like threads
* in a multi-tasking system. It would be possible to move these fields to a new * in a multi-tasking system. It would be possible to move these fields to a new
* type, say "JanetTask", that as separate from fibers to save a bit of space. */ * type, say "JanetTask", that as separate from fibers to save a bit of space. */
JanetListenerState *waiting;
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */ uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
JanetListenerState *waiting;
void *supervisor_channel; /* Channel to push self to when complete */ void *supervisor_channel; /* Channel to push self to when complete */
#endif #endif
}; };
@@ -1485,7 +1492,6 @@ JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_connect(JanetStream *stream, int flags);
#endif #endif
/* Write async to a stream */ /* Write async to a stream */
@@ -1948,10 +1954,10 @@ JANET_API void janet_register(const char *name, JanetCFunction cfun);
#endif #endif
#ifndef JANET_ENTRY_NAME #ifndef JANET_ENTRY_NAME
#define JANET_MODULE_ENTRY \ #define JANET_MODULE_ENTRY \
JANET_MODULE_PREFIX JANET_API JanetBuildConfig _janet_mod_config(void) { \ JANET_MODULE_PREFIX JANET_EXPORT JanetBuildConfig _janet_mod_config(void) { \
return janet_config_current(); \ return janet_config_current(); \
} \ } \
JANET_MODULE_PREFIX JANET_API void _janet_init JANET_MODULE_PREFIX JANET_EXPORT void _janet_init
#else #else
#define JANET_MODULE_ENTRY JANET_MODULE_PREFIX JANET_API void JANET_ENTRY_NAME #define JANET_MODULE_ENTRY JANET_MODULE_PREFIX JANET_API void JANET_ENTRY_NAME
#endif #endif