1
0
mirror of https://github.com/janet-lang/janet synced 2024-12-24 23:40:27 +00:00

Merge branch 'net-reworkings'

This commit is contained in:
Calvin Rose 2023-09-27 19:06:14 -05:00
commit 964295b59d
13 changed files with 318 additions and 320 deletions

View File

@ -41,32 +41,32 @@ if not exist build\boot mkdir build\boot
@rem Build the bootstrap interpreter @rem Build the bootstrap interpreter
for %%f in (src\core\*.c) do ( for %%f in (src\core\*.c) do (
%JANET_COMPILE% /DJANET_BOOTSTRAP /Fobuild\boot\%%~nf.obj %%f %JANET_COMPILE% /DJANET_BOOTSTRAP /Fobuild\boot\%%~nf.obj %%f
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
) )
for %%f in (src\boot\*.c) do ( for %%f in (src\boot\*.c) do (
%JANET_COMPILE% /DJANET_BOOTSTRAP /Fobuild\boot\%%~nf.obj %%f %JANET_COMPILE% /DJANET_BOOTSTRAP /Fobuild\boot\%%~nf.obj %%f
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
) )
%JANET_LINK% /out:build\janet_boot.exe build\boot\*.obj %JANET_LINK% /out:build\janet_boot.exe build\boot\*.obj
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
build\janet_boot . > build\c\janet.c build\janet_boot . > build\c\janet.c
@rem Build the sources @rem Build the sources
%JANET_COMPILE% /Fobuild\janet.obj build\c\janet.c %JANET_COMPILE% /Fobuild\janet.obj build\c\janet.c
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
%JANET_COMPILE% /Fobuild\shell.obj src\mainclient\shell.c %JANET_COMPILE% /Fobuild\shell.obj src\mainclient\shell.c
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
@rem Build the resources @rem Build the resources
rc /nologo /fobuild\janet_win.res janet_win.rc rc /nologo /fobuild\janet_win.res janet_win.rc
@rem Link everything to main client @rem Link everything to main client
%JANET_LINK% /out:janet.exe build\janet.obj build\shell.obj build\janet_win.res %JANET_LINK% /out:janet.exe build\janet.obj build\shell.obj build\janet_win.res
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
@rem Build static library (libjanet.lib) @rem Build static library (libjanet.lib)
%JANET_LINK_STATIC% /out:build\libjanet.lib build\janet.obj %JANET_LINK_STATIC% /out:build\libjanet.lib build\janet.obj
@if errorlevel 1 goto :BUILDFAIL @if not errorlevel 0 goto :BUILDFAIL
echo === Successfully built janet.exe for Windows === echo === Successfully built janet.exe for Windows ===
echo === Run 'build_win test' to run tests. == echo === Run 'build_win test' to run tests. ==
@ -98,7 +98,7 @@ exit /b 0
:TEST :TEST
for %%f in (test/suite*.janet) do ( for %%f in (test/suite*.janet) do (
janet.exe test\%%f janet.exe test\%%f
@if errorlevel 1 goto TESTFAIL @if not errorlevel 0 goto TESTFAIL
) )
exit /b 0 exit /b 0

View File

@ -169,7 +169,7 @@ janet_boot = executable('janet-boot', core_src, boot_src,
# Build janet.c # Build janet.c
janetc = custom_target('janetc', janetc = custom_target('janetc',
input : [janet_boot], input : [janet_boot, 'src/boot/boot.janet'],
output : 'janet.c', output : 'janet.c',
capture : true, capture : true,
command : [ command : [
@ -288,12 +288,16 @@ patched_janet = custom_target('patched-janeth',
install : true, install : true,
install_dir : join_paths(get_option('includedir'), 'janet'), install_dir : join_paths(get_option('includedir'), 'janet'),
build_by_default : true, build_by_default : true,
<<<<<<< HEAD
output : ['_janet.h'], output : ['_janet.h'],
=======
output : ['janet_' + meson.project_version() + '.h'],
>>>>>>> @{-1}
command : [janet_nativeclient, '@INPUT@', '@OUTPUT@']) command : [janet_nativeclient, '@INPUT@', '@OUTPUT@'])
# Create a version of the janet.h header that matches what jpm often expects # Create a version of the janet.h header that matches what jpm often expects
if meson.version().version_compare('>=0.61') if meson.version().version_compare('>=0.61')
install_symlink('janet.h', pointing_to: 'janet/_janet.h', install_dir: get_option('includedir')) install_symlink('janet.h', pointing_to: 'janet/janet.h', install_dir: get_option('includedir'))
install_symlink('janet.h', pointing_to: '_janet.h', install_dir: join_paths(get_option('includedir'), 'janet')) install_symlink('janet.h', pointing_to: 'janet_' + meson.project_version() + '.h', install_dir: join_paths(get_option('includedir'), 'janet'))
endif endif

View File

@ -3766,7 +3766,7 @@
[host port &opt handler type] [host port &opt handler type]
(def s (net/listen host port type)) (def s (net/listen host port type))
(if handler (if handler
(ev/call (fn [] (net/accept-loop s handler)))) (ev/go (fn [] (net/accept-loop s handler))))
s)) s))
### ###

View File

@ -180,7 +180,7 @@ static int janet_q_pop(JanetQueue *q, void *out, size_t itemsize) {
} }
/* Forward declaration */ /* Forward declaration */
static void janet_unlisten(JanetListenerState *state, int is_gc); static void janet_unlisten(JanetListenerState *state);
/* Get current timestamp (millisecond precision) */ /* Get current timestamp (millisecond precision) */
static JanetTimestamp ts_now(void); static JanetTimestamp ts_now(void);
@ -254,76 +254,100 @@ static void add_timeout(JanetTimeout to) {
} }
} }
static int janet_listener_gc(void *p, size_t s);
static int janet_listener_mark(void *p, size_t s);
static const JanetAbstractType janet_listener_AT = {
"core/ev-listener",
janet_listener_gc,
janet_listener_mark,
JANET_ATEND_GCMARK
};
/* Create a new event listener */ /* Create a new event listener */
static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { static JanetListenerState *janet_listen_impl(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
if (stream->flags & JANET_STREAM_CLOSED) { if (stream->flags & JANET_STREAM_CLOSED) {
janet_panic("cannot listen on closed stream"); janet_panic("cannot listen on closed stream");
} }
if (stream->_mask & mask) { if ((mask & JANET_ASYNC_LISTEN_READ) && stream->read_state) goto bad_listen_read;
janet_panic("cannot listen for duplicate event on stream"); if ((mask & JANET_ASYNC_LISTEN_WRITE) && stream->write_state) goto bad_listen_write;
} janet_assert(size >= sizeof(JanetListenerState), "bad size");
if (janet_vm.root_fiber->waiting != NULL) { JanetListenerState *state = janet_abstract(&janet_listener_AT, size);
janet_panic("current fiber is already waiting for event");
}
if (size < sizeof(JanetListenerState))
size = sizeof(JanetListenerState);
JanetListenerState *state = janet_malloc(size);
if (NULL == state) {
JANET_OUT_OF_MEMORY;
}
state->machine = behavior; state->machine = behavior;
state->fiber = janet_vm.root_fiber; state->fiber = janet_vm.root_fiber;
state->flags = 0;
janet_vm.root_fiber->waiting = state; janet_vm.root_fiber->waiting = state;
if (mask & JANET_ASYNC_LISTEN_READ) stream->read_state = state;
if (mask & JANET_ASYNC_LISTEN_WRITE) stream->write_state = state;
state->stream = stream; state->stream = stream;
state->_mask = mask;
stream->_mask |= mask;
state->_next = stream->state;
stream->state = state;
/* Keep track of a listener for GC purposes */
int resize = janet_vm.listener_cap == janet_vm.listener_count;
if (resize) {
size_t newcap = janet_vm.listener_count ? janet_vm.listener_cap * 2 : 16;
janet_vm.listeners = janet_realloc(janet_vm.listeners, newcap * sizeof(JanetListenerState *));
if (NULL == janet_vm.listeners) {
JANET_OUT_OF_MEMORY;
}
janet_vm.listener_cap = newcap;
}
size_t index = janet_vm.listener_count++;
janet_vm.listeners[index] = state;
state->_index = index;
/* Emit INIT event for convenience */
state->event = user; state->event = user;
state->machine(state, JANET_ASYNC_EVENT_INIT); state->machine(state, JANET_ASYNC_EVENT_INIT);
janet_ev_inc_refcount();
state->index = janet_vm.listeners->count;
janet_array_push(janet_vm.listeners, janet_wrap_abstract(state));
return state; return state;
bad_listen_write:
janet_panic("cannot listen for duplicate write event on stream");
bad_listen_read:
janet_panic("cannot listen for duplicate read event on stream");
} }
/* Indicate we are no longer listening for an event. This void janet_fiber_did_resume(JanetFiber *fiber) {
* frees the memory of the state machine as well. */ if (fiber->waiting) {
static void janet_unlisten_impl(JanetListenerState *state, int is_gc) { janet_unlisten(fiber->waiting);
state->machine(state, JANET_ASYNC_EVENT_DEINIT);
/* Remove state machine from poll list */
JanetListenerState **iter = &(state->stream->state);
while (*iter && *iter != state)
iter = &((*iter)->_next);
janet_assert(*iter, "failed to remove listener");
*iter = state->_next;
/* Remove mask */
state->stream->_mask &= ~(state->_mask);
/* Ensure fiber does not reference this state */
if (!is_gc) {
JanetFiber *fiber = state->fiber;
if (NULL != fiber && fiber->waiting == state) {
fiber->waiting = NULL; fiber->waiting = NULL;
} }
}
static void janet_unlisten_impl(JanetListenerState *state) {
/* Move last listener to position of this listener - O(1) removal and keep things densely packed. */
if (state->stream) {
Janet popped = janet_array_pop(janet_vm.listeners);
janet_assert(janet_checktype(popped, JANET_ABSTRACT), "pop check");
JanetListenerState *popped_state = (JanetListenerState *) janet_unwrap_abstract(popped);
janet_vm.listeners->data[state->index] = popped;
popped_state->index = state->index;
state->index = UINT32_MAX; /* just in case */
janet_ev_dec_refcount();
if (state->stream->read_state == state) {
state->stream->read_state = NULL;
}
if (state->stream->write_state == state) {
state->stream->write_state = NULL;
}
state->stream = NULL;
}
}
static int janet_listener_gc(void *p, size_t size) {
(void) size;
JanetListenerState *state = (JanetListenerState *)p;
if (state->stream) {
janet_ev_dec_refcount();
}
if (state->machine) {
state->machine(state, JANET_ASYNC_EVENT_DEINIT);
}
return 0;
}
static int janet_listener_mark(void *p, size_t size) {
(void) size;
JanetListenerState *state = (JanetListenerState *)p;
if (state->stream) {
janet_mark(janet_wrap_abstract(state->stream));
}
if (state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
state->machine(state, JANET_ASYNC_EVENT_MARK);
return 0;
}
static void janet_stream_checktoclose(JanetStream *stream) {
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->read_state && !stream->write_state) {
janet_stream_close(stream);
} }
/* Untrack a listener for gc purposes */
size_t index = state->_index;
janet_vm.listeners[index] = janet_vm.listeners[--janet_vm.listener_count];
janet_vm.listeners[index]->_index = index;
janet_free(state);
} }
static const JanetMethod ev_default_stream_methods[] = { static const JanetMethod ev_default_stream_methods[] = {
@ -339,28 +363,17 @@ JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod
JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream)); JanetStream *stream = janet_abstract(&janet_stream_type, sizeof(JanetStream));
stream->handle = handle; stream->handle = handle;
stream->flags = flags; stream->flags = flags;
stream->state = NULL; stream->read_state = NULL;
stream->_mask = 0; stream->write_state = NULL;
if (methods == NULL) methods = ev_default_stream_methods; if (methods == NULL) methods = ev_default_stream_methods;
stream->methods = methods; stream->methods = methods;
return stream; return stream;
} }
/* Close a stream */ static void janet_stream_close_impl(JanetStream *stream) {
static void janet_stream_close_impl(JanetStream *stream, int is_gc) {
if (stream->flags & JANET_STREAM_CLOSED) return;
JanetListenerState *state = stream->state;
while (NULL != state) {
if (!is_gc) {
state->machine(state, JANET_ASYNC_EVENT_CLOSE);
}
JanetListenerState *next_state = state->_next;
janet_unlisten(state, is_gc);
state = next_state;
}
stream->state = NULL;
stream->flags |= JANET_STREAM_CLOSED; stream->flags |= JANET_STREAM_CLOSED;
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
if (stream->handle != INVALID_HANDLE_VALUE) {
#ifdef JANET_NET #ifdef JANET_NET
if (stream->flags & JANET_STREAM_SOCKET) { if (stream->flags & JANET_STREAM_SOCKET) {
closesocket((SOCKET) stream->handle); closesocket((SOCKET) stream->handle);
@ -370,21 +383,32 @@ static void janet_stream_close_impl(JanetStream *stream, int is_gc) {
CloseHandle(stream->handle); CloseHandle(stream->handle);
} }
stream->handle = INVALID_HANDLE_VALUE; stream->handle = INVALID_HANDLE_VALUE;
}
#else #else
if (stream->handle != -1) {
close(stream->handle); close(stream->handle);
stream->handle = -1; stream->handle = -1;
}
#endif #endif
} }
void janet_stream_close(JanetStream *stream) { void janet_stream_close(JanetStream *stream) {
janet_stream_close_impl(stream, 0); if (stream->read_state) {
stream->read_state->machine(stream->read_state, JANET_ASYNC_EVENT_CLOSE);
janet_unlisten(stream->read_state);
}
if (stream->write_state) {
stream->write_state->machine(stream->write_state, JANET_ASYNC_EVENT_CLOSE);
janet_unlisten(stream->write_state);
}
janet_stream_close_impl(stream);
} }
/* Called to clean up a stream */ /* Called to clean up a stream */
static int janet_stream_gc(void *p, size_t s) { static int janet_stream_gc(void *p, size_t s) {
(void) s; (void) s;
JanetStream *stream = (JanetStream *)p; JanetStream *stream = (JanetStream *)p;
janet_stream_close_impl(stream, 1); janet_stream_close_impl(stream);
return 0; return 0;
} }
@ -392,13 +416,11 @@ static int janet_stream_gc(void *p, size_t s) {
static int janet_stream_mark(void *p, size_t s) { static int janet_stream_mark(void *p, size_t s) {
(void) s; (void) s;
JanetStream *stream = (JanetStream *) p; JanetStream *stream = (JanetStream *) p;
JanetListenerState *state = stream->state; if (NULL != stream->read_state) {
while (NULL != state) { janet_mark(janet_wrap_abstract(stream->read_state));
if (NULL != state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
} }
(state->machine)(state, JANET_ASYNC_EVENT_MARK); if (NULL != stream->write_state) {
state = state->_next; janet_mark(janet_wrap_abstract(stream->write_state));
} }
return 0; return 0;
} }
@ -451,8 +473,8 @@ static void *janet_stream_unmarshal(JanetMarshalContext *ctx) {
} }
JanetStream *p = janet_unmarshal_abstract(ctx, sizeof(JanetStream)); JanetStream *p = janet_unmarshal_abstract(ctx, sizeof(JanetStream));
/* Can't share listening state and such across threads */ /* Can't share listening state and such across threads */
p->_mask = 0; p->read_state = NULL;
p->state = NULL; p->write_state = NULL;
p->flags = (uint32_t) janet_unmarshal_int(ctx); p->flags = (uint32_t) janet_unmarshal_int(ctx);
p->methods = janet_unmarshal_ptr(ctx); p->methods = janet_unmarshal_ptr(ctx);
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
@ -516,14 +538,6 @@ void janet_schedule(JanetFiber *fiber, Janet value) {
janet_schedule_signal(fiber, value, JANET_SIGNAL_OK); janet_schedule_signal(fiber, value, JANET_SIGNAL_OK);
} }
void janet_fiber_did_resume(JanetFiber *fiber) {
/* Cancel any pending fibers */
if (fiber->waiting) {
fiber->waiting->machine(fiber->waiting, JANET_ASYNC_EVENT_CANCEL);
janet_unlisten(fiber->waiting, 0);
}
}
/* Mark all pending tasks */ /* Mark all pending tasks */
void janet_ev_mark(void) { void janet_ev_mark(void) {
@ -552,16 +566,6 @@ void janet_ev_mark(void) {
janet_mark(janet_wrap_fiber(janet_vm.tq[i].curr_fiber)); janet_mark(janet_wrap_fiber(janet_vm.tq[i].curr_fiber));
} }
} }
/* Pending listeners */
for (size_t i = 0; i < janet_vm.listener_count; i++) {
JanetListenerState *state = janet_vm.listeners[i];
if (NULL != state->fiber) {
janet_mark(janet_wrap_fiber(state->fiber));
}
janet_stream_mark(state->stream, sizeof(JanetStream));
(state->machine)(state, JANET_ASYNC_EVENT_MARK);
}
} }
static int janet_channel_push(JanetChannel *channel, Janet x, int mode); static int janet_channel_push(JanetChannel *channel, Janet x, int mode);
@ -582,9 +586,6 @@ static Janet make_supervisor_event(const char *name, JanetFiber *fiber, int thre
/* Common init code */ /* Common init code */
void janet_ev_init_common(void) { void janet_ev_init_common(void) {
janet_q_init(&janet_vm.spawn); janet_q_init(&janet_vm.spawn);
janet_vm.listener_count = 0;
janet_vm.listener_cap = 0;
janet_vm.listeners = NULL;
janet_vm.tq = NULL; janet_vm.tq = NULL;
janet_vm.tq_count = 0; janet_vm.tq_count = 0;
janet_vm.tq_capacity = 0; janet_vm.tq_capacity = 0;
@ -592,6 +593,8 @@ void janet_ev_init_common(void) {
janet_table_init_raw(&janet_vm.active_tasks, 0); janet_table_init_raw(&janet_vm.active_tasks, 0);
janet_table_init_raw(&janet_vm.signal_handlers, 0); janet_table_init_raw(&janet_vm.signal_handlers, 0);
janet_rng_seed(&janet_vm.ev_rng, 0); janet_rng_seed(&janet_vm.ev_rng, 0);
janet_vm.listeners = janet_array(0);
janet_gcroot(janet_wrap_array(janet_vm.listeners));
#ifndef JANET_WINDOWS #ifndef JANET_WINDOWS
pthread_attr_init(&janet_vm.new_thread_attr); pthread_attr_init(&janet_vm.new_thread_attr);
pthread_attr_setdetachstate(&janet_vm.new_thread_attr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&janet_vm.new_thread_attr, PTHREAD_CREATE_DETACHED);
@ -602,11 +605,10 @@ void janet_ev_init_common(void) {
void janet_ev_deinit_common(void) { void janet_ev_deinit_common(void) {
janet_q_deinit(&janet_vm.spawn); janet_q_deinit(&janet_vm.spawn);
janet_free(janet_vm.tq); janet_free(janet_vm.tq);
janet_free(janet_vm.listeners);
janet_vm.listeners = NULL;
janet_table_deinit(&janet_vm.threaded_abstracts); janet_table_deinit(&janet_vm.threaded_abstracts);
janet_table_deinit(&janet_vm.active_tasks); janet_table_deinit(&janet_vm.active_tasks);
janet_table_deinit(&janet_vm.signal_handlers); janet_table_deinit(&janet_vm.signal_handlers);
janet_vm.listeners = NULL;
#ifndef JANET_WINDOWS #ifndef JANET_WINDOWS
pthread_attr_destroy(&janet_vm.new_thread_attr); pthread_attr_destroy(&janet_vm.new_thread_attr);
#endif #endif
@ -633,9 +635,9 @@ void janet_addtimeout(double sec) {
void janet_ev_inc_refcount(void) { void janet_ev_inc_refcount(void) {
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
#ifdef JANET_64 #ifdef JANET_64
InterlockedIncrement64(&janet_vm.extra_listeners); InterlockedIncrement64((int64_t volatile *) &janet_vm.extra_listeners);
#else #else
InterlockedIncrement(&janet_vm.extra_listeners); InterlockedIncrement((int32_t volatile *) &janet_vm.extra_listeners);
#endif #endif
#else #else
__atomic_add_fetch(&janet_vm.extra_listeners, 1, __ATOMIC_RELAXED); __atomic_add_fetch(&janet_vm.extra_listeners, 1, __ATOMIC_RELAXED);
@ -645,9 +647,9 @@ void janet_ev_inc_refcount(void) {
void janet_ev_dec_refcount(void) { void janet_ev_dec_refcount(void) {
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
#ifdef JANET_64 #ifdef JANET_64
InterlockedDecrement64(&janet_vm.extra_listeners); InterlockedDecrement64((int64_t volatile *) &janet_vm.extra_listeners);
#else #else
InterlockedDecrement(&janet_vm.extra_listeners); InterlockedDecrement((int32_t volatile *) &janet_vm.extra_listeners);
#endif #endif
#else #else
__atomic_add_fetch(&janet_vm.extra_listeners, -1, __ATOMIC_RELAXED); __atomic_add_fetch(&janet_vm.extra_listeners, -1, __ATOMIC_RELAXED);
@ -1330,8 +1332,7 @@ const JanetAbstractType janet_channel_type = {
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout); void janet_loop1_impl(int has_timeout, JanetTimestamp timeout);
int janet_loop_done(void) { int janet_loop_done(void) {
return !(janet_vm.listener_count || return !((janet_vm.spawn.head != janet_vm.spawn.tail) ||
(janet_vm.spawn.head != janet_vm.spawn.tail) ||
janet_vm.tq_count || janet_vm.tq_count ||
janet_vm.extra_listeners); janet_vm.extra_listeners);
} }
@ -1395,7 +1396,7 @@ JanetFiber *janet_loop1(void) {
} }
/* Poll for events */ /* Poll for events */
if (janet_vm.listener_count || janet_vm.tq_count || janet_vm.extra_listeners) { if (janet_vm.tq_count || janet_vm.extra_listeners) {
JanetTimeout to; JanetTimeout to;
memset(&to, 0, sizeof(to)); memset(&to, 0, sizeof(to));
int has_timeout; int has_timeout;
@ -1414,7 +1415,7 @@ JanetFiber *janet_loop1(void) {
break; break;
} }
/* Run polling implementation only if pending timeouts or pending events */ /* Run polling implementation only if pending timeouts or pending events */
if (janet_vm.tq_count || janet_vm.listener_count || janet_vm.extra_listeners) { if (janet_vm.tq_count || janet_vm.extra_listeners) {
janet_loop1_impl(has_timeout, to.when); janet_loop1_impl(has_timeout, to.when);
} }
} }
@ -1506,8 +1507,8 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
return state; return state;
} }
static void janet_unlisten(JanetListenerState *state, int is_gc) { static void janet_unlisten(JanetListenerState *state) {
janet_unlisten_impl(state, is_gc); janet_unlisten_impl(state);
} }
void janet_loop1_impl(int has_timeout, JanetTimestamp to) { void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
@ -1529,7 +1530,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
} }
BOOL result = GetQueuedCompletionStatus(janet_vm.iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime); BOOL result = GetQueuedCompletionStatus(janet_vm.iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime);
if (result || overlapped) { if (!result) {
return;
}
if (overlapped) {
if (0 == completionKey) { if (0 == completionKey) {
/* Custom event */ /* Custom event */
JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped); JanetSelfPipeEvent *response = (JanetSelfPipeEvent *)(overlapped);
@ -1540,24 +1545,22 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
} else { } else {
/* Normal event */ /* Normal event */
JanetStream *stream = (JanetStream *) completionKey; JanetStream *stream = (JanetStream *) completionKey;
JanetListenerState *state = stream->state; janet_assert(stream->handle != INVALID_HANDLE_VALUE, "got closed stream event");
while (state != NULL) { JanetListenerState *state = NULL;
if (state->tag == overlapped) { if (stream->read_state && stream->read_state->tag == overlapped) {
state = stream->read_state;
} else if (stream->write_state && stream->write_state->tag == overlapped) {
state = stream->write_state;
}
if (state != NULL) {
state->event = overlapped; state->event = overlapped;
state->bytes = num_bytes_transfered; state->bytes = num_bytes_transfered;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE); JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE) { if (status == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0); janet_unlisten(state);
}
break;
} else {
state = state->_next;
} }
} }
/* Close the stream if requested and no more listeners are left */ janet_stream_checktoclose(stream);
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
}
} }
} }
} }
@ -1572,26 +1575,17 @@ static JanetTimestamp ts_now(void) {
return res; return res;
} }
static int make_epoll_events(int mask) {
int events = 0;
if (mask & JANET_ASYNC_LISTEN_READ)
events |= EPOLLIN;
if (mask & JANET_ASYNC_LISTEN_WRITE)
events |= EPOLLOUT;
return events;
}
static void janet_epoll_sync_callback(JanetEVGenericMessage msg) { static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
JanetListenerState *state = msg.argp; JanetListenerState *state = msg.argp;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE) if (state == state->stream->read_state)
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); status1 = state->machine(state, JANET_ASYNC_EVENT_READ);
if (state->stream->_mask & JANET_ASYNC_LISTEN_READ) if (state == state->stream->write_state)
status2 = state->machine(state, JANET_ASYNC_EVENT_READ); status2 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (status1 == JANET_ASYNC_STATUS_DONE || if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE) { status2 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0); janet_unlisten(state);
} else { } else {
/* Repost event */ /* Repost event */
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg); janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
@ -1600,11 +1594,13 @@ static void janet_epoll_sync_callback(JanetEVGenericMessage msg) {
/* Wait for the next event */ /* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
int is_first = !(stream->state); int is_first = !stream->read_state && !stream->write_state;
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
struct epoll_event ev; struct epoll_event ev;
ev.events = make_epoll_events(state->stream->_mask); ev.events = 0;
if (stream->read_state) ev.events |= EPOLLIN;
if (stream->write_state) ev.events |= EPOLLOUT;
ev.data.ptr = stream; ev.data.ptr = stream;
int status; int status;
do { do {
@ -1618,13 +1614,13 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
* event to a file. So we just post a custom event to do the read/write * event to a file. So we just post a custom event to do the read/write
* asap. */ * asap. */
/* Use flag to indicate state is not registered in epoll */ /* Use flag to indicate state is not registered in epoll */
state->_mask |= (1 << JANET_ASYNC_EVENT_COMPLETE); state->flags = 1;
JanetEVGenericMessage msg = {0}; JanetEVGenericMessage msg = {0};
msg.argp = state; msg.argp = state;
janet_ev_post_event(NULL, janet_epoll_sync_callback, msg); janet_ev_post_event(NULL, janet_epoll_sync_callback, msg);
} else { } else {
/* Unexpected error */ /* Unexpected error */
janet_unlisten_impl(state, 0); janet_unlisten_impl(state);
janet_panicv(janet_ev_lasterr()); janet_panicv(janet_ev_lasterr());
} }
} }
@ -1632,15 +1628,19 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
} }
/* Tell system we are done listening for a certain event */ /* Tell system we are done listening for a certain event */
static void janet_unlisten(JanetListenerState *state, int is_gc) { static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream; JanetStream *stream = state->stream;
if (!(stream->flags & JANET_STREAM_CLOSED)) { if (stream && (stream->handle != -1)) {
/* Use flag to indicate state is not registered in epoll */ /* Use flag to indicate state is not registered in epoll */
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) { if (!state->flags) {
int is_last = (state->_next == NULL && stream->state == state); int is_read = (stream->read_state != state) && stream->read_state;
int is_write = (stream->write_state != state) && stream->write_state;
int is_last = !is_read && !is_write;
int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
struct epoll_event ev; struct epoll_event ev;
ev.events = make_epoll_events(stream->_mask & ~state->_mask); ev.events = 0;
if (is_read) ev.events |= EPOLLIN;
if (is_write) ev.events |= EPOLLOUT;
ev.data.ptr = stream; ev.data.ptr = stream;
int status; int status;
do { do {
@ -1652,7 +1652,7 @@ static void janet_unlisten(JanetListenerState *state, int is_gc) {
} }
} }
/* Destroy state machine and free memory */ /* Destroy state machine and free memory */
janet_unlisten_impl(state, is_gc); janet_unlisten_impl(state);
} }
#define JANET_EPOLL_MAX_EVENTS 64 #define JANET_EPOLL_MAX_EVENTS 64
@ -1689,10 +1689,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
} else { } else {
JanetStream *stream = p; JanetStream *stream = p;
int mask = events[i].events; int mask = events[i].events;
JanetListenerState *state = stream->state; JanetListenerState *states[2] = {stream->read_state, stream->write_state};
while (NULL != state) { for (int j = 0; j < 2; j++) {
JanetListenerState *state = states[j];
if (!state) continue;
state->event = events + i; state->event = events + i;
JanetListenerState *next_state = state->_next;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
@ -1708,14 +1709,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (status1 == JANET_ASYNC_STATUS_DONE || if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE || status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE) status4 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0); janet_unlisten(state);
state = next_state;
} }
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
} }
janet_stream_checktoclose(stream);
} }
} }
} }
@ -1809,46 +1807,44 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
struct kevent kev[2]; struct kevent kev[2];
int length = 0; int length = 0;
if (state->stream->_mask & JANET_ASYNC_LISTEN_READ) { if (mask & JANET_ASYNC_LISTEN_READ) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, stream); EV_SETx(&kev[length], stream->handle, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, stream);
length++; length++;
} }
if (state->stream->_mask & JANET_ASYNC_LISTEN_WRITE) { if (mask & JANET_ASYNC_LISTEN_WRITE) {
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, stream); EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, stream);
length++; length++;
} }
if (length > 0) { janet_assert(length, "expected to add kqueue events");
add_kqueue_events(kev, length); add_kqueue_events(kev, length);
}
return state; return state;
} }
static void janet_unlisten(JanetListenerState *state, int is_gc) { static void janet_unlisten(JanetListenerState *state) {
JanetStream *stream = state->stream; JanetStream *stream = state->stream;
if (!(stream->flags & JANET_STREAM_CLOSED)) { if (stream && (stream->handle != -1)) {
/* Use flag to indicate state is not registered in kqueue */ int is_read = (stream->read_state != state) && stream->read_state;
if (!(state->_mask & (1 << JANET_ASYNC_EVENT_COMPLETE))) { int is_write = (stream->write_state != state) && stream->write_state;
int is_last = (state->_next == NULL && stream->state == state); int is_last = !is_read && !is_write;
int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD; int op = is_last ? EV_DELETE : EV_DISABLE | EV_ADD;
struct kevent kev[2]; struct kevent kev[2];
EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream); EV_SETx(&kev[1], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
int length = 0; int length = 0;
if (stream->_mask & JANET_ASYNC_EVENT_WRITE) { if (stream->read_state == state) {
EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream); EV_SETx(&kev[length], stream->handle, EVFILT_WRITE, op, 0, 0, stream);
length++; length++;
} }
if (stream->_mask & JANET_ASYNC_EVENT_READ) { if (stream->write_state == state) {
EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream); EV_SETx(&kev[length], stream->handle, EVFILT_READ, op, 0, 0, stream);
length++; length++;
} }
add_kqueue_events(kev, length); add_kqueue_events(kev, length);
} }
} janet_unlisten_impl(state);
janet_unlisten_impl(state, is_gc);
} }
#define JANET_KQUEUE_MAX_EVENTS 64 #define JANET_KQUEUE_MAX_EVENTS 64
@ -1888,14 +1884,14 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
janet_ev_handle_selfpipe(); janet_ev_handle_selfpipe();
} else { } else {
JanetStream *stream = p; JanetStream *stream = p;
JanetListenerState *state = stream->state; JanetListenerState *states[2] = {stream->read_state, stream->write_state};
while (NULL != state) { for (int j = 0; j < 2; j++) {
JanetListenerState *next_state = state->_next; JanetListenerState *state = states[j];
if (!state) continue;
state->event = events + i; state->event = events + i;
JanetAsyncStatus statuses[4]; JanetAsyncStatus statuses[4];
for (int i = 0; i < 4; i++) for (int i = 0; i < 4; i++)
statuses[i] = JANET_ASYNC_STATUS_NOT_DONE; statuses[i] = JANET_ASYNC_STATUS_NOT_DONE;
if (!(events[i].flags & EV_ERROR)) { if (!(events[i].flags & EV_ERROR)) {
if (events[i].filter == EVFILT_WRITE) if (events[i].filter == EVFILT_WRITE)
statuses[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE); statuses[0] = state->machine(state, JANET_ASYNC_EVENT_WRITE);
@ -1909,15 +1905,11 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (statuses[0] == JANET_ASYNC_STATUS_DONE || if (statuses[0] == JANET_ASYNC_STATUS_DONE ||
statuses[1] == JANET_ASYNC_STATUS_DONE || statuses[1] == JANET_ASYNC_STATUS_DONE ||
statuses[2] == JANET_ASYNC_STATUS_DONE || statuses[2] == JANET_ASYNC_STATUS_DONE ||
statuses[3] == JANET_ASYNC_STATUS_DONE) statuses[3] == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0); janet_unlisten(state);
state = next_state;
} }
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
} }
janet_stream_checktoclose(stream);
} }
} }
} }
@ -1955,20 +1947,11 @@ static JanetTimestamp ts_now(void) {
return res; return res;
} }
static int make_poll_events(int mask) {
int events = 0;
if (mask & JANET_ASYNC_LISTEN_READ)
events |= POLLIN;
if (mask & JANET_ASYNC_LISTEN_WRITE)
events |= POLLOUT;
return events;
}
/* Wait for the next event */ /* Wait for the next event */
JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) { JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, int mask, size_t size, void *user) {
size_t oldsize = janet_vm.listener_cap; size_t oldsize = janet_vm.listeners->capacity;
JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user); JanetListenerState *state = janet_listen_impl(stream, behavior, mask, size, user);
size_t newsize = janet_vm.listener_cap; size_t newsize = janet_vm.listeners->capacity;
if (newsize > oldsize) { if (newsize > oldsize) {
janet_vm.fds = janet_realloc(janet_vm.fds, (newsize + 1) * sizeof(struct pollfd)); janet_vm.fds = janet_realloc(janet_vm.fds, (newsize + 1) * sizeof(struct pollfd));
if (NULL == janet_vm.fds) { if (NULL == janet_vm.fds) {
@ -1977,15 +1960,19 @@ JanetListenerState *janet_listen(JanetStream *stream, JanetListener behavior, in
} }
struct pollfd ev; struct pollfd ev;
ev.fd = stream->handle; ev.fd = stream->handle;
ev.events = make_poll_events(state->stream->_mask); ev.events = 0;
if (stream->read_state) ev.events |= POLLIN;
if (stream->write_state) ev.events |= POLLOUT;
ev.revents = 0; ev.revents = 0;
janet_vm.fds[state->_index + 1] = ev; janet_vm.fds[state->index + 1] = ev;
return state; return state;
} }
static void janet_unlisten(JanetListenerState *state, int is_gc) { static void janet_unlisten(JanetListenerState *state) {
janet_vm.fds[state->_index + 1] = janet_vm.fds[janet_vm.listener_count]; if (state->stream) {
janet_unlisten_impl(state, is_gc); janet_vm.fds[state->index + 1] = janet_vm.fds[janet_vm.listeners->count];
}
janet_unlisten_impl(state);
} }
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
@ -1997,7 +1984,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
JanetTimestamp now = ts_now(); JanetTimestamp now = ts_now();
to = now > timeout ? 0 : (int)(timeout - now); to = now > timeout ? 0 : (int)(timeout - now);
} }
ready = poll(janet_vm.fds, janet_vm.listener_count + 1, to); ready = poll(janet_vm.fds, janet_vm.listeners->count + 1, to);
} while (ready == -1 && errno == EINTR); } while (ready == -1 && errno == EINTR);
if (ready == -1) { if (ready == -1) {
JANET_EXIT("failed to poll events"); JANET_EXIT("failed to poll events");
@ -2010,10 +1997,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
} }
/* Step state machines */ /* Step state machines */
for (size_t i = 0; i < janet_vm.listener_count; i++) { for (int32_t i = 0; i < janet_vm.listeners->count; i++) {
struct pollfd *pfd = janet_vm.fds + i + 1; struct pollfd *pfd = janet_vm.fds + i + 1;
/* Skip fds where nothing interesting happened */ /* Skip fds where nothing interesting happened */
JanetListenerState *state = janet_vm.listeners[i]; JanetListenerState *state = (JanetListenerState *) janet_unwrap_abstract(janet_vm.listeners->data[i]);
/* Normal event */ /* Normal event */
int mask = pfd->revents; int mask = pfd->revents;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE;
@ -2033,12 +2020,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (status1 == JANET_ASYNC_STATUS_DONE || if (status1 == JANET_ASYNC_STATUS_DONE ||
status2 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE ||
status3 == JANET_ASYNC_STATUS_DONE || status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE) status4 == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state, 0); janet_unlisten(state);
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
} }
janet_stream_checktoclose(stream);
} }
} }
@ -2525,8 +2510,7 @@ void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, in
typedef enum { typedef enum {
JANET_ASYNC_WRITEMODE_WRITE, JANET_ASYNC_WRITEMODE_WRITE,
JANET_ASYNC_WRITEMODE_SEND, JANET_ASYNC_WRITEMODE_SEND,
JANET_ASYNC_WRITEMODE_SENDTO, JANET_ASYNC_WRITEMODE_SENDTO
JANET_ASYNC_WRITEMODE_CONNECT
} JanetWriteMode; } JanetWriteMode;
typedef struct { typedef struct {
@ -2550,41 +2534,15 @@ typedef struct {
#endif #endif
} StateWrite; } StateWrite;
static JanetAsyncStatus handle_connect(JanetListenerState *s) {
#ifdef JANET_WINDOWS
int res = 0;
int size = sizeof(res);
int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
#else
int res = 0;
socklen_t size = sizeof res;
int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
#endif
if (r == 0) {
if (res == 0) {
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
} else {
s->stream->flags |= JANET_STREAM_TOCLOSE;
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
}
} else {
s->stream->flags |= JANET_STREAM_TOCLOSE;
janet_cancel(s->fiber, janet_ev_lasterr());
}
return JANET_ASYNC_STATUS_DONE;
}
JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) { JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
StateWrite *state = (StateWrite *) s; StateWrite *state = (StateWrite *) s;
switch (event) { switch (event) {
default: default:
break; break;
case JANET_ASYNC_EVENT_MARK: { case JANET_ASYNC_EVENT_MARK: {
if (state->mode != JANET_ASYNC_WRITEMODE_CONNECT) {
janet_mark(state->is_buffer janet_mark(state->is_buffer
? janet_wrap_buffer(state->src.buf) ? janet_wrap_buffer(state->src.buf)
: janet_wrap_string(state->src.str)); : janet_wrap_string(state->src.str));
}
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
janet_mark(janet_wrap_abstract(state->dest_abst)); janet_mark(janet_wrap_abstract(state->dest_abst));
} }
@ -2606,11 +2564,6 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
} }
break; break;
case JANET_ASYNC_EVENT_USER: { case JANET_ASYNC_EVENT_USER: {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
return handle_connect(s);
}
#endif
/* Begin write */ /* Begin write */
int32_t len; int32_t len;
const uint8_t *bytes; const uint8_t *bytes;
@ -2674,11 +2627,6 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
janet_cancel(s->fiber, janet_cstringv("stream hup")); janet_cancel(s->fiber, janet_cstringv("stream hup"));
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_WRITE: { case JANET_ASYNC_EVENT_WRITE: {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
return handle_connect(s);
}
#endif
int32_t start, len; int32_t start, len;
const uint8_t *bytes; const uint8_t *bytes;
start = state->start; start = state->start;
@ -2780,10 +2728,6 @@ void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, i
void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) { void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags); janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
} }
void janet_ev_connect(JanetStream *stream, int flags) {
janet_ev_write_generic(stream, NULL, NULL, JANET_ASYNC_WRITEMODE_CONNECT, 0, flags);
}
#endif #endif
/* For a pipe ID */ /* For a pipe ID */

View File

@ -39,8 +39,8 @@ static void fiber_reset(JanetFiber *fiber) {
fiber->env = NULL; fiber->env = NULL;
fiber->last_value = janet_wrap_nil(); fiber->last_value = janet_wrap_nil();
#ifdef JANET_EV #ifdef JANET_EV
fiber->waiting = NULL;
fiber->sched_id = 0; fiber->sched_id = 0;
fiber->waiting = NULL;
fiber->supervisor_channel = NULL; fiber->supervisor_channel = NULL;
#endif #endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW); janet_fiber_set_status(fiber, JANET_STATUS_NEW);
@ -85,7 +85,6 @@ JanetFiber *janet_fiber_reset(JanetFiber *fiber, JanetFunction *callee, int32_t
if (janet_fiber_funcframe(fiber, callee)) return NULL; if (janet_fiber_funcframe(fiber, callee)) return NULL;
janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE; janet_fiber_frame(fiber)->flags |= JANET_STACKFRAME_ENTRANCE;
#ifdef JANET_EV #ifdef JANET_EV
fiber->waiting = NULL;
fiber->supervisor_channel = NULL; fiber->supervisor_channel = NULL;
#endif #endif
return fiber; return fiber;

View File

@ -268,6 +268,9 @@ recur:
if (fiber->supervisor_channel) { if (fiber->supervisor_channel) {
janet_mark_abstract(fiber->supervisor_channel); janet_mark_abstract(fiber->supervisor_channel);
} }
if (fiber->waiting) {
janet_mark_abstract(fiber->waiting);
}
#endif #endif
/* Explicit tail recursion */ /* Explicit tail recursion */
@ -438,6 +441,7 @@ void janet_collect(void) {
uint32_t i; uint32_t i;
if (janet_vm.gc_suspend) return; if (janet_vm.gc_suspend) return;
depth = JANET_RECURSION_GUARD; depth = JANET_RECURSION_GUARD;
janet_vm.gc_mark_phase = 1;
/* Try and prevent many major collections back to back. /* Try and prevent many major collections back to back.
* A full collection will take O(janet_vm.block_count) time. * A full collection will take O(janet_vm.block_count) time.
* If we have a large heap, make sure our interval is not too * If we have a large heap, make sure our interval is not too
@ -457,6 +461,7 @@ void janet_collect(void) {
Janet x = janet_vm.roots[--janet_vm.root_count]; Janet x = janet_vm.roots[--janet_vm.root_count];
janet_mark(x); janet_mark(x);
} }
janet_vm.gc_mark_phase = 0;
janet_sweep(); janet_sweep();
janet_vm.next_collection = 0; janet_vm.next_collection = 0;
janet_free_all_scratch(); janet_free_all_scratch();
@ -560,7 +565,9 @@ void janet_gcunlock(int handle) {
janet_vm.gc_suspend = handle; janet_vm.gc_suspend = handle;
} }
/* Scratch memory API */ /* Scratch memory API
* Scratch memory allocations do not need to be free (but optionally can be), and will be automatically cleaned
* up in the next call to janet_collect. */
void *janet_smalloc(size_t size) { void *janet_smalloc(size_t size) {
JanetScratch *s = janet_malloc(sizeof(JanetScratch) + size); JanetScratch *s = janet_malloc(sizeof(JanetScratch) + size);

View File

@ -1048,7 +1048,6 @@ static const uint8_t *unmarshal_one_fiber(
fiber->env = NULL; fiber->env = NULL;
fiber->last_value = janet_wrap_nil(); fiber->last_value = janet_wrap_nil();
#ifdef JANET_EV #ifdef JANET_EV
fiber->waiting = NULL;
fiber->sched_id = 0; fiber->sched_id = 0;
fiber->supervisor_channel = NULL; fiber->supervisor_channel = NULL;
#endif #endif

View File

@ -111,6 +111,62 @@ static void janet_net_socknoblock(JSock s) {
#endif #endif
} }
/* State machine for async connect */
typedef struct {
JanetListenerState head;
int did_connect;
} NetStateConnect;
JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent event) {
NetStateConnect *state = (NetStateConnect *)s;
switch (event) {
default:
return JANET_ASYNC_STATUS_NOT_DONE;
case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_HUP:
case JANET_ASYNC_EVENT_ERR:
case JANET_ASYNC_EVENT_COMPLETE:
case JANET_ASYNC_EVENT_WRITE:
case JANET_ASYNC_EVENT_USER:
break;
}
JanetStream *stream = s->stream;
#ifdef JANET_WINDOWS
int res = 0;
int size = sizeof(res);
int r = getsockopt((SOCKET)stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
#else
int res = 0;
socklen_t size = sizeof res;
int r = getsockopt(stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
#endif
if (r == 0) {
if (res == 0) {
state->did_connect = 1;
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
} else {
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
stream->flags |= JANET_STREAM_TOCLOSE;
}
} else {
janet_cancel(s->fiber, janet_ev_lasterr());
stream->flags |= JANET_STREAM_TOCLOSE;
}
return JANET_ASYNC_STATUS_DONE;
}
static void net_sched_connect(JanetStream *stream) {
JanetListenerState *s = janet_listen(stream, net_machine_connect, JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateConnect), NULL);
NetStateConnect *state = (NetStateConnect *)s;
state->did_connect = 0;
#ifdef JANET_WINDOWS
net_machine_connect(s, JANET_ASYNC_EVENT_USER);
#endif
}
/* State machine for accepting connections. */ /* State machine for accepting connections. */
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
@ -496,7 +552,7 @@ JANET_CORE_FN(cfun_net_connect,
} }
#endif #endif
if (status != 0) { if (status) {
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
if (err != WSAEWOULDBLOCK) { if (err != WSAEWOULDBLOCK) {
#else #else
@ -508,9 +564,7 @@ JANET_CORE_FN(cfun_net_connect,
} }
} }
/* Handle the connect() result in the event loop*/ net_sched_connect(stream);
janet_ev_connect(stream, MSG_NOSIGNAL);
janet_await(); janet_await();
} }

View File

@ -125,6 +125,7 @@ struct JanetVM {
size_t next_collection; size_t next_collection;
size_t block_count; size_t block_count;
int gc_suspend; int gc_suspend;
int gc_mark_phase;
/* GC roots */ /* GC roots */
Janet *roots; Janet *roots;
@ -154,12 +155,10 @@ struct JanetVM {
JanetQueue spawn; JanetQueue spawn;
JanetTimeout *tq; JanetTimeout *tq;
JanetRNG ev_rng; JanetRNG ev_rng;
JanetListenerState **listeners;
size_t listener_count;
size_t listener_cap;
volatile size_t extra_listeners; /* used in signal handler, must be volatile */ volatile size_t extra_listeners; /* used in signal handler, must be volatile */
JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */ JanetTable threaded_abstracts; /* All abstract types that can be shared between threads (used in this thread) */
JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */ JanetTable active_tasks; /* All possibly live task fibers - used just for tracking */
JanetArray *listeners; /* For GC */
JanetTable signal_handlers; JanetTable signal_handlers;
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
void **iocp; void **iocp;

View File

@ -111,12 +111,11 @@ static void janet_table_rehash(JanetTable *t, int32_t size) {
JANET_OUT_OF_MEMORY; JANET_OUT_OF_MEMORY;
} }
} }
int32_t i, oldcapacity; int32_t oldcapacity = t->capacity;
oldcapacity = t->capacity;
t->data = newdata; t->data = newdata;
t->capacity = size; t->capacity = size;
t->deleted = 0; t->deleted = 0;
for (i = 0; i < oldcapacity; i++) { for (int32_t i = 0; i < oldcapacity; i++) {
JanetKV *kv = olddata + i; JanetKV *kv = olddata + i;
if (!janet_checktype(kv->key, JANET_NIL)) { if (!janet_checktype(kv->key, JANET_NIL)) {
JanetKV *newkv = janet_table_find(t, kv->key); JanetKV *newkv = janet_table_find(t, kv->key);

View File

@ -49,7 +49,7 @@
#ifndef JANET_EXIT #ifndef JANET_EXIT
#include <stdio.h> #include <stdio.h>
#define JANET_EXIT(m) do { \ #define JANET_EXIT(m) do { \
fprintf(stderr, "C runtime error at line %d in file %s: %s\n",\ fprintf(stderr, "janet interpreter runtime error at line %d in file %s: %s\n",\
__LINE__,\ __LINE__,\
__FILE__,\ __FILE__,\
(m));\ (m));\

View File

@ -1588,6 +1588,7 @@ int janet_init(void) {
janet_vm.next_collection = 0; janet_vm.next_collection = 0;
janet_vm.gc_interval = 0x400000; janet_vm.gc_interval = 0x400000;
janet_vm.block_count = 0; janet_vm.block_count = 0;
janet_vm.gc_mark_phase = 0;
janet_symcache_init(); janet_symcache_init();

View File

@ -591,7 +591,6 @@ typedef enum {
JANET_ASYNC_EVENT_HUP, JANET_ASYNC_EVENT_HUP,
JANET_ASYNC_EVENT_READ, JANET_ASYNC_EVENT_READ,
JANET_ASYNC_EVENT_WRITE, JANET_ASYNC_EVENT_WRITE,
JANET_ASYNC_EVENT_CANCEL,
JANET_ASYNC_EVENT_COMPLETE, /* Used on windows for IOCP */ JANET_ASYNC_EVENT_COMPLETE, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_USER JANET_ASYNC_EVENT_USER
} JanetAsyncEvent; } JanetAsyncEvent;
@ -613,13 +612,9 @@ typedef JanetAsyncStatus(*JanetListener)(JanetListenerState *state, JanetAsyncEv
struct JanetStream { struct JanetStream {
JanetHandle handle; JanetHandle handle;
uint32_t flags; uint32_t flags;
/* Linked list of all in-flight IO routines for this stream */ JanetListenerState *read_state;
JanetListenerState *state; JanetListenerState *write_state;
const void *methods; /* Methods for this stream */ const void *methods; /* Methods for this stream */
/* internal - used to disallow multiple concurrent reads / writes on the same stream.
* this constraint may be lifted later but allowing such would require more internal book keeping
* for some implementations. You can read and write at the same time on the same stream, though. */
int _mask;
}; };
/* Interface for state machine based event loop */ /* Interface for state machine based event loop */
@ -629,14 +624,12 @@ struct JanetListenerState {
JanetStream *stream; JanetStream *stream;
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
implementation of the event loop and the particular event. */ implementation of the event loop and the particular event. */
uint32_t index; /* Used for GC and poll implentation */
uint32_t flags;
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
void *tag; /* Used to associate listeners with an overlapped structure */ void *tag; /* Used to associate listeners with an overlapped structure */
int bytes; /* Used to track how many bytes were transfered. */ int bytes; /* Used to track how many bytes were transfered. */
#endif #endif
/* internal */
size_t _index;
int _mask;
JanetListenerState *_next;
}; };
#endif #endif
@ -926,8 +919,8 @@ struct JanetFiber {
* that is, fibers that are scheduled on the event loop and behave much like threads * that is, fibers that are scheduled on the event loop and behave much like threads
* in a multi-tasking system. It would be possible to move these fields to a new * in a multi-tasking system. It would be possible to move these fields to a new
* type, say "JanetTask", that as separate from fibers to save a bit of space. */ * type, say "JanetTask", that as separate from fibers to save a bit of space. */
JanetListenerState *waiting;
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */ uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
JanetListenerState *waiting;
void *supervisor_channel; /* Channel to push self to when complete */ void *supervisor_channel; /* Channel to push self to when complete */
#endif #endif
}; };
@ -1499,7 +1492,6 @@ JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_connect(JanetStream *stream, int flags);
#endif #endif
/* Write async to a stream */ /* Write async to a stream */