mirror of
https://github.com/janet-lang/janet
synced 2024-11-24 09:17:17 +00:00
Start with ev module.
This commit is contained in:
parent
c19bbfce78
commit
b68b0a256e
1
Makefile
1
Makefile
@ -90,6 +90,7 @@ JANET_CORE_SOURCES=src/core/abstract.c \
|
||||
src/core/corelib.c \
|
||||
src/core/debug.c \
|
||||
src/core/emit.c \
|
||||
src/core/ev.c \
|
||||
src/core/fiber.c \
|
||||
src/core/gc.c \
|
||||
src/core/inttypes.c \
|
||||
|
@ -60,6 +60,7 @@ conf.set('JANET_NO_SOURCEMAPS', not get_option('sourcemaps'))
|
||||
conf.set('JANET_NO_ASSEMBLER', not get_option('assembler'))
|
||||
conf.set('JANET_NO_PEG', not get_option('peg'))
|
||||
conf.set('JANET_NO_NET', not get_option('net'))
|
||||
conf.set('JANET_NO_EV', not get_option('ev'))
|
||||
conf.set('JANET_REDUCED_OS', get_option('reduced_os'))
|
||||
conf.set('JANET_NO_TYPED_ARRAY', not get_option('typed_array'))
|
||||
conf.set('JANET_NO_INT_TYPES', not get_option('int_types'))
|
||||
@ -110,6 +111,7 @@ core_src = [
|
||||
'src/core/corelib.c',
|
||||
'src/core/debug.c',
|
||||
'src/core/emit.c',
|
||||
'src/core/ev.c',
|
||||
'src/core/fiber.c',
|
||||
'src/core/gc.c',
|
||||
'src/core/inttypes.c',
|
||||
|
@ -12,6 +12,7 @@ option('typed_array', type : 'boolean', value : true)
|
||||
option('int_types', type : 'boolean', value : true)
|
||||
option('prf', type : 'boolean', value : true)
|
||||
option('net', type : 'boolean', value : true)
|
||||
option('ev', type : 'boolean', value : true)
|
||||
option('processes', type : 'boolean', value : true)
|
||||
option('umask', type : 'boolean', value : true)
|
||||
option('realpath', type : 'boolean', value : true)
|
||||
|
@ -2832,6 +2832,7 @@
|
||||
"src/core/corelib.c"
|
||||
"src/core/debug.c"
|
||||
"src/core/emit.c"
|
||||
"src/core/ev.c"
|
||||
"src/core/fiber.c"
|
||||
"src/core/gc.c"
|
||||
"src/core/inttypes.c"
|
||||
|
@ -51,6 +51,7 @@
|
||||
/* #define JANET_NO_NET */
|
||||
/* #define JANET_NO_TYPED_ARRAY */
|
||||
/* #define JANET_NO_INT_TYPES */
|
||||
/* #define JANET_NO_EV */
|
||||
|
||||
/* Other settings */
|
||||
/* #define JANET_NO_PRF */
|
||||
|
@ -1004,6 +1004,9 @@ static void janet_load_libs(JanetTable *env) {
|
||||
#ifdef JANET_THREADS
|
||||
janet_lib_thread(env);
|
||||
#endif
|
||||
#ifdef JANET_EV
|
||||
janet_lib_ev(env);
|
||||
#endif
|
||||
#ifdef JANET_NET
|
||||
janet_lib_net(env);
|
||||
#endif
|
||||
|
378
src/core/ev.c
Normal file
378
src/core/ev.c
Normal file
@ -0,0 +1,378 @@
|
||||
/*
|
||||
* Copyright (c) 2020 Calvin Rose
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to
|
||||
* deal in the Software without restriction, including without limitation the
|
||||
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
* sell copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
* IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef JANET_AMALG
|
||||
#include "features.h"
|
||||
#include <janet.h>
|
||||
#include "util.h"
|
||||
#include "gc.h"
|
||||
#include "state.h"
|
||||
#endif
|
||||
|
||||
#ifdef JANET_EV
|
||||
|
||||
/* Includes */
|
||||
|
||||
#include <errno.h>
|
||||
#include <unistd.h>
|
||||
#include <signal.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <fcntl.h>
|
||||
#include <sys/timerfd.h>
|
||||
|
||||
/* New fibers to spawn or resume */
|
||||
typedef struct JanetTask JanetTask;
|
||||
struct JanetTask {
|
||||
JanetFiber *fiber;
|
||||
Janet value;
|
||||
};
|
||||
|
||||
/* Min priority queue of timestamps for timeouts. */
|
||||
typedef struct JanetListenerTimeout JanetListenerTimeout;
|
||||
struct JanetListenerTimeout JanetListenerTimeout {
|
||||
JanetListenerState *state;
|
||||
struct timespec when;
|
||||
};
|
||||
|
||||
/* Global data */
|
||||
JANET_THREAD_LOCAL size_t janet_vm_active_listeners = 0;
|
||||
JANET_THREAD_LOCAL size_t janet_vm_spawn_capacity = 0;
|
||||
JANET_THREAD_LOCAL size_t janet_vm_spawn_count = 0;
|
||||
JANET_THREAD_LOCAL size_t janet_vm_tq_count = 0;
|
||||
JANET_THREAD_LOCAL size_t janet_vm_tq_capacity = 0;
|
||||
JANET_THREAD_LOCAL JanetTask *janet_vm_spawn = NULL;
|
||||
JANET_THREAD_LOCAL JanetListenerTimeout *janet_vm_tq = NULL;
|
||||
|
||||
/* Compare two timespecs - 1 if t1 > t2 */
|
||||
static int timespec_cmp(struct timespec t1, struct timespec t2) {
|
||||
if (t1.tv_sec < t2.tv_sec) return -1;
|
||||
if (t1.tv_sec > t2.tv_sec) return 1;
|
||||
if (t1.tv_nsec < t2.tv_nsec) return -1;
|
||||
if (t1.tv_nsec > t2.tv_nsec) return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Add a timeout to the timeout min heap */
|
||||
static void add_timeout(JanetListenerState *state, struct timespec when) {
|
||||
size_t oldcount = janet_vm_tq_count;
|
||||
size_t newcount = oldcount + 1;
|
||||
if (oldcount == janet_vm_tq_capacity) {
|
||||
size_t newcap = 2 * newcount;
|
||||
JanetListenerTimeout *tq = realloc(janet_vm_tq, newcap * sizeof(JanetListenerTimeout));
|
||||
if (NULL == tq) {
|
||||
JANET_OUT_OF_MEMORY;
|
||||
}
|
||||
janet_vm_tq_capacity = newcap;
|
||||
}
|
||||
/* Append */
|
||||
janet_vm_tq_count = newcount;
|
||||
janet_vm_tq[oldcount] = { state, when };
|
||||
/* Heapify */
|
||||
size_t index = oldcount;
|
||||
while (index > 0) {
|
||||
size_t parent = (index - 1) >> 1;
|
||||
int cmp = timespec_cmp(janet_vm_tq[parent].when, when);
|
||||
if (cmp <= 0) break;
|
||||
/* Swap */
|
||||
JanetListenerState tmp = janet_vm_tq[index];
|
||||
janet_vm_tq[index] = janet_vm_tq[parent];
|
||||
janet_vm_tq[parent] = tmp;
|
||||
/* Next */
|
||||
index = parent;
|
||||
}
|
||||
}
|
||||
|
||||
/* Extract the next timeout from the priority queue */
|
||||
static JanetListenerTimeout next_timeout(void) {
|
||||
|
||||
}
|
||||
|
||||
/* Create a new event listener */
|
||||
static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) {
|
||||
if (size < sizeof(JanetListenerState))
|
||||
size = sizeof(JanetListenerState);
|
||||
JanetListenerState *state = malloc(size);
|
||||
if (NULL == state) {
|
||||
JANET_OUT_OF_MEMORY;
|
||||
}
|
||||
state->machine = behavior;
|
||||
state->fiber = janet_vm_root_fiber;
|
||||
state->pollable = pollable;
|
||||
state->_mask = mask;
|
||||
pollable->_mask |= mask;
|
||||
janet_vm_active_listeners++;
|
||||
/* Prepend to linked list */
|
||||
state->_next = pollable->state;
|
||||
pollable->state = state;
|
||||
/* Emit INIT event for convenience */
|
||||
state->machine(state, JANET_ASYNC_EVENT_INIT);
|
||||
return state;
|
||||
}
|
||||
|
||||
/* Indicate we are no longer listening for an event. This
|
||||
* frees the memory of the state machine as well. */
|
||||
static void janet_unlisten_impl(JanetListenerState *state) {
|
||||
state->machine(state, JANET_ASYNC_EVENT_DEINIT);
|
||||
/* Remove state machine from poll list */
|
||||
JanetListenerState **iter = &(state->pollable->state);
|
||||
while (*iter && *iter != state)
|
||||
iter = &((*iter)->_next);
|
||||
janet_assert(*iter, "failed to remove listener");
|
||||
*iter = state->_next;
|
||||
janet_vm_active_listeners--;
|
||||
/* Remove mask */
|
||||
state->pollable->_mask &= ~(state->_mask);
|
||||
free(state);
|
||||
}
|
||||
|
||||
/* Call after creating a pollable */
|
||||
void janet_pollable_init(JanetPollable *pollable, JanetPollType handle) {
|
||||
pollable->handle = handle;
|
||||
pollable->flags = 0;
|
||||
pollable->state = NULL;
|
||||
pollable->_mask = 0;
|
||||
}
|
||||
|
||||
/* Mark a pollable for GC */
|
||||
void janet_pollable_mark(JanetPollable *pollable) {
|
||||
JanetListenerState *state = pollable->state;
|
||||
while (NULL != state) {
|
||||
if (NULL != state->fiber) {
|
||||
janet_mark(janet_wrap_fiber(state->fiber));
|
||||
}
|
||||
(state->machine)(state, JANET_ASYNC_EVENT_MARK);
|
||||
state = state->_next;
|
||||
}
|
||||
}
|
||||
|
||||
/* Must be called to close all pollables - does NOT call `close` for you.
|
||||
* Also does not free memory of the pollable, so can be used on close. */
|
||||
void janet_pollable_deinit(JanetPollable *pollable) {
|
||||
pollable->flags |= JANET_POLL_FLAG_CLOSED;
|
||||
JanetListenerState *state = pollable->state;
|
||||
while (NULL != state) {
|
||||
state->machine(state, JANET_ASYNC_EVENT_CLOSE);
|
||||
JanetListenerState *next_state = state->_next;
|
||||
janet_unlisten_impl(state);
|
||||
state = next_state;
|
||||
}
|
||||
pollable->state = NULL;
|
||||
}
|
||||
|
||||
/* Register a fiber to resume with value */
|
||||
void janet_schedule(JanetFiber *fiber, Janet value) {
|
||||
size_t oldcount = janet_vm_spawn_count;
|
||||
size_t newcount = oldcount + 1;
|
||||
if (newcount > janet_vm_spawn_capacity) {
|
||||
size_t newcap = 2 * newcount;
|
||||
JanetTask *tasks = realloc(janet_vm_spawn, newcap * sizeof(JanetTask));
|
||||
if (NULL == tasks) {
|
||||
JANET_OUT_OF_MEMORY;
|
||||
}
|
||||
janet_vm_spawn = tasks;
|
||||
janet_vm_spawn_capacity = newcap;
|
||||
}
|
||||
janet_vm_spawn_count = newcount;
|
||||
janet_vm_spawn[oldcount].fiber = fiber;
|
||||
janet_vm_spawn[oldcount].value = value;
|
||||
}
|
||||
|
||||
/* Mark all pending tasks */
|
||||
void janet_ev_mark(void) {
|
||||
for (size_t i = 0; i < janet_vm_spawn_count; i++) {
|
||||
janet_mark(janet_wrap_fiber(janet_vm_spawn[i].fiber));
|
||||
janet_mark(janet_vm_spawn[i].value);
|
||||
}
|
||||
}
|
||||
|
||||
/* Run scheduled tasks */
|
||||
static void run_scheduled(void) {
|
||||
size_t index = 0;
|
||||
while (index < janet_vm_spawn_count) {
|
||||
JanetTask task = janet_vm_spawn[index];
|
||||
Janet res;
|
||||
JanetSignal sig = janet_continue(task.fiber, task.value, &res);
|
||||
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) {
|
||||
janet_stacktrace(task.fiber, res);
|
||||
}
|
||||
index++;
|
||||
}
|
||||
janet_vm_spawn_count = 0;
|
||||
}
|
||||
|
||||
/* Main event loop */
|
||||
|
||||
void janet_loop1_impl(void);
|
||||
|
||||
void janet_loop1(void) {
|
||||
if (janet_vm_active_listeners) {
|
||||
janet_loop1_impl();
|
||||
}
|
||||
/* Run scheduled fibers */
|
||||
run_scheduled();
|
||||
}
|
||||
|
||||
void janet_loop(void) {
|
||||
while (janet_vm_active_listeners || janet_vm_spawn_count) janet_loop1();
|
||||
}
|
||||
|
||||
/* Common init code */
|
||||
void janet_ev_init_common(void) {
|
||||
janet_vm_spawn_capacity = 0;
|
||||
janet_vm_spawn_count = 0;
|
||||
janet_vm_spawn = NULL;
|
||||
janet_vm_active_listeners = 0;
|
||||
}
|
||||
|
||||
/* Common deinit code */
|
||||
void janet_ev_deinit_common(void) {
|
||||
free(janet_vm_spawn);
|
||||
}
|
||||
|
||||
/* Short hand to yield to event loop */
|
||||
void janet_await(void) {
|
||||
janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil());
|
||||
}
|
||||
|
||||
/*
|
||||
* Start epoll implementation
|
||||
*/
|
||||
|
||||
/* Epoll global data */
|
||||
JANET_THREAD_LOCAL int janet_vm_epoll = 0;
|
||||
|
||||
static int make_epoll_events(int mask) {
|
||||
int events = 0;
|
||||
if (mask & JANET_ASYNC_EVENT_READ)
|
||||
events |= EPOLLIN;
|
||||
if (mask & JANET_ASYNC_EVENT_WRITE)
|
||||
events |= EPOLLOUT;
|
||||
return events;
|
||||
}
|
||||
|
||||
/* Wait for the next event */
|
||||
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) {
|
||||
int is_first = !(pollable->state);
|
||||
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
|
||||
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size);
|
||||
struct epoll_event ev;
|
||||
ev.events = make_epoll_events(state->pollable->_mask);
|
||||
ev.data.ptr = pollable;
|
||||
int status;
|
||||
do {
|
||||
status = epoll_ctl(janet_vm_epoll, op, pollable->handle, &ev);
|
||||
} while (status == -1 && errno == EINTR);
|
||||
if (status == -1) {
|
||||
janet_unlisten_impl(state);
|
||||
janet_panicf("failed to schedule event: %s", strerror(errno));
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
/* Tell system we are done listening for a certain event */
|
||||
void janet_unlisten(JanetListenerState *state) {
|
||||
JanetPollable *pollable = state->pollable;
|
||||
int is_last = (state->_next == NULL && pollable->state == state);
|
||||
int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
|
||||
struct epoll_event ev;
|
||||
ev.events = make_epoll_events(pollable->_mask);
|
||||
ev.data.ptr = pollable;
|
||||
int status;
|
||||
do {
|
||||
status = epoll_ctl(janet_vm_epoll, op, pollable->handle, &ev);
|
||||
} while (status == -1 && errno == EINTR);
|
||||
if (status == -1) {
|
||||
janet_panicf("failed to unschedule event: %s", strerror(errno));
|
||||
}
|
||||
/* Destroy state machine and free memory */
|
||||
janet_unlisten_impl(state);
|
||||
}
|
||||
|
||||
/* Replace janet_loop with this */
|
||||
#define JANET_EPOLL_MAX_EVENTS 64
|
||||
void janet_loop1_impl(void) {
|
||||
/* Poll for events */
|
||||
struct epoll_event events[JANET_EPOLL_MAX_EVENTS];
|
||||
int ready;
|
||||
do {
|
||||
ready = epoll_wait(janet_vm_epoll, events, JANET_EPOLL_MAX_EVENTS, -1);
|
||||
} while (ready == -1 && errno == EINTR);
|
||||
if (ready == -1) {
|
||||
JANET_EXIT("failed to poll events");
|
||||
}
|
||||
/* Step state machines */
|
||||
for (int i = 0; i < ready; i++) {
|
||||
JanetPollable *pollable = events[i].data.ptr;
|
||||
int mask = events[i].events;
|
||||
JanetListenerState *state = pollable->state;
|
||||
while (NULL != state) {
|
||||
if (mask & EPOLLOUT)
|
||||
state->machine(state, JANET_ASYNC_EVENT_WRITE);
|
||||
if (mask & EPOLLIN)
|
||||
state->machine(state, JANET_ASYNC_EVENT_READ);
|
||||
state = state->_next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void janet_ev_init(void) {
|
||||
janet_ev_init_common();
|
||||
janet_vm_epoll = epoll_create1(EPOLL_CLOEXEC);
|
||||
}
|
||||
|
||||
void janet_ev_deinit(void) {
|
||||
janet_ev_deinit_common();
|
||||
close(janet_vm_epoll);
|
||||
janet_vm_epoll = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* End epoll implementation
|
||||
*/
|
||||
|
||||
/* C functions */
|
||||
|
||||
static Janet cfun_ev_spawn(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 1, 2);
|
||||
JanetFiber *fiber = janet_getfiber(argv, 0);
|
||||
Janet value = argc == 2 ? argv[1] : janet_wrap_nil();
|
||||
janet_schedule(fiber, value);
|
||||
return argv[0];
|
||||
}
|
||||
|
||||
static const JanetReg ev_cfuns[] = {
|
||||
{
|
||||
"ev/go", cfun_ev_spawn,
|
||||
JDOC("(ev/go fiber &opt value)\n\n"
|
||||
"Put a fiber on the event loop to be resumed later. Optionally pass "
|
||||
"a value to resume with, otherwise resumes with nil.")
|
||||
},
|
||||
{NULL, NULL, NULL}
|
||||
};
|
||||
|
||||
void janet_lib_ev(JanetTable *env) {
|
||||
janet_core_cfuns(env, NULL, ev_cfuns);
|
||||
}
|
||||
|
||||
#endif
|
@ -389,8 +389,8 @@ void janet_collect(void) {
|
||||
if (janet_vm_gc_suspend) return;
|
||||
depth = JANET_RECURSION_GUARD;
|
||||
orig_rootcount = janet_vm_root_count;
|
||||
#ifdef JANET_NET
|
||||
janet_net_markloop();
|
||||
#ifdef JANET_EV
|
||||
janet_ev_mark();
|
||||
#endif
|
||||
for (i = 0; i < orig_rootcount; i++)
|
||||
janet_mark(janet_vm_roots[i]);
|
||||
|
491
src/core/net.c
491
src/core/net.c
@ -26,6 +26,8 @@
|
||||
#include "util.h"
|
||||
#endif
|
||||
|
||||
#ifdef JANET_NET
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
#include <winsock2.h>
|
||||
#include <windows.h>
|
||||
@ -45,28 +47,26 @@
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Streams
|
||||
* Streams - simple abstract type that wraps a pollable + extra flags
|
||||
*/
|
||||
|
||||
#define JANET_STREAM_CLOSED 1
|
||||
#define JANET_STREAM_READABLE 2
|
||||
#define JANET_STREAM_WRITABLE 4
|
||||
#define JANET_STREAM_READABLE 0x200
|
||||
#define JANET_STREAM_WRITABLE 0x400
|
||||
|
||||
static int janet_stream_close(void *p, size_t s);
|
||||
static int janet_stream_mark(void *p, size_t s);
|
||||
static int janet_stream_getter(void *p, Janet key, Janet *out);
|
||||
static const JanetAbstractType StreamAT = {
|
||||
"core/stream",
|
||||
janet_stream_close,
|
||||
NULL,
|
||||
janet_stream_mark,
|
||||
janet_stream_getter,
|
||||
JANET_ATEND_GET
|
||||
};
|
||||
|
||||
typedef JanetPollable JanetStream;
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
typedef struct {
|
||||
SOCKET fd;
|
||||
int flags;
|
||||
} JanetStream;
|
||||
#define JSOCKCLOSE(x) closesocket(x)
|
||||
#define JSOCKDEFAULT INVALID_SOCKET
|
||||
#define JLASTERR WSAGetLastError()
|
||||
@ -79,19 +79,15 @@ typedef struct {
|
||||
#define JSock SOCKET
|
||||
#define JReadInt long
|
||||
#define JSOCKFLAGS 0
|
||||
static JanetStream *make_stream(SOCKET fd, int flags) {
|
||||
static JanetStream *make_stream(SOCKET fd, uint32_t flags) {
|
||||
u_long iMode = 0;
|
||||
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
|
||||
janet_pollable_init(stream, fd);
|
||||
ioctlsocket(fd, FIONBIO, &iMode);
|
||||
stream->fd = fd;
|
||||
stream->flags = flags;
|
||||
return stream;
|
||||
}
|
||||
#else
|
||||
typedef struct {
|
||||
int fd;
|
||||
int flags;
|
||||
} JanetStream;
|
||||
#define JSOCKCLOSE(x) close(x)
|
||||
#define JSOCKDEFAULT 0
|
||||
#define JLASTERR errno
|
||||
@ -108,15 +104,15 @@ typedef struct {
|
||||
#else
|
||||
#define JSOCKFLAGS 0
|
||||
#endif
|
||||
static JanetStream *make_stream(int fd, int flags) {
|
||||
static JanetStream *make_stream(int fd, uint32_t flags) {
|
||||
JanetStream *stream = janet_abstract(&StreamAT, sizeof(JanetStream));
|
||||
janet_pollable_init(stream, fd);
|
||||
#ifndef SOCK_CLOEXEC
|
||||
int extra = O_CLOEXEC;
|
||||
#else
|
||||
int extra = 0;
|
||||
#endif
|
||||
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK | extra);
|
||||
stream->fd = fd;
|
||||
stream->flags = flags;
|
||||
return stream;
|
||||
}
|
||||
@ -130,220 +126,138 @@ static JanetStream *make_stream(int fd, int flags) {
|
||||
static int janet_stream_close(void *p, size_t s) {
|
||||
(void) s;
|
||||
JanetStream *stream = p;
|
||||
if (!(stream->flags & JANET_STREAM_CLOSED)) {
|
||||
stream->flags |= JANET_STREAM_CLOSED;
|
||||
JSOCKCLOSE(stream->fd);
|
||||
if (!(stream->flags & JANET_POLL_FLAG_CLOSED)) {
|
||||
JSOCKCLOSE(stream->handle);
|
||||
janet_pollable_deinit(stream);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int janet_stream_mark(void *p, size_t s) {
|
||||
(void) s;
|
||||
janet_pollable_mark((JanetPollable *) p);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Event loop
|
||||
* State machine for read
|
||||
*/
|
||||
|
||||
/* This large struct describes a waiting file descriptor, as well
|
||||
* as what to do when we get an event for it. It is a variant type, where
|
||||
* each variant implements a simple state machine. */
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
int32_t bytes_left;
|
||||
JanetBuffer *buf;
|
||||
int is_chunk;
|
||||
} NetStateRead;
|
||||
|
||||
/* File descriptor to listen for events on. */
|
||||
JanetStream *stream;
|
||||
|
||||
/* Fiber to resume when event finishes. Can be NULL, in which case,
|
||||
* no fiber is resumed when event completes. */
|
||||
JanetFiber *fiber;
|
||||
|
||||
/* What kind of event we are listening for.
|
||||
* As more IO functionality get's added, we can
|
||||
* expand this. */
|
||||
enum {
|
||||
JLE_READ_CHUNK,
|
||||
JLE_READ_SOME,
|
||||
JLE_READ_ACCEPT,
|
||||
JLE_CONNECT,
|
||||
JLE_WRITE_FROM_BUFFER,
|
||||
JLE_WRITE_FROM_STRINGLIKE
|
||||
} event_type;
|
||||
|
||||
/* Each variant can have a different payload. */
|
||||
union {
|
||||
|
||||
/* JLE_READ_CHUNK/JLE_READ_SOME */
|
||||
struct {
|
||||
int32_t bytes_left;
|
||||
JanetBuffer *buf;
|
||||
} read_chunk;
|
||||
|
||||
/* JLE_READ_ACCEPT */
|
||||
struct {
|
||||
JanetFunction *handler;
|
||||
} read_accept;
|
||||
|
||||
/* JLE_WRITE_FROM_BUFFER */
|
||||
struct {
|
||||
JanetBuffer *buf;
|
||||
int32_t start;
|
||||
} write_from_buffer;
|
||||
|
||||
/* JLE_WRITE_FROM_STRINGLIKE */
|
||||
struct {
|
||||
const uint8_t *str;
|
||||
int32_t start;
|
||||
} write_from_stringlike;
|
||||
|
||||
} data;
|
||||
|
||||
} JanetLoopFD;
|
||||
|
||||
#define JANET_LOOPFD_MAX 1024
|
||||
|
||||
/* Global loop data */
|
||||
JANET_THREAD_LOCAL JPollStruct janet_vm_pollfds[JANET_LOOPFD_MAX];
|
||||
JANET_THREAD_LOCAL JanetLoopFD janet_vm_loopfds[JANET_LOOPFD_MAX];
|
||||
JANET_THREAD_LOCAL int janet_vm_loop_count;
|
||||
|
||||
/* We could also add/remove gc roots. This is easier for now. */
|
||||
void janet_net_markloop(void) {
|
||||
for (int i = 0; i < janet_vm_loop_count; i++) {
|
||||
JanetLoopFD lfd = janet_vm_loopfds[i];
|
||||
if (lfd.fiber != NULL) {
|
||||
janet_mark(janet_wrap_fiber(lfd.fiber));
|
||||
}
|
||||
janet_mark(janet_wrap_abstract(lfd.stream));
|
||||
switch (lfd.event_type) {
|
||||
default:
|
||||
break;
|
||||
case JLE_READ_CHUNK:
|
||||
case JLE_READ_SOME:
|
||||
janet_mark(janet_wrap_buffer(lfd.data.read_chunk.buf));
|
||||
break;
|
||||
case JLE_READ_ACCEPT:
|
||||
janet_mark(janet_wrap_function(lfd.data.read_accept.handler));
|
||||
break;
|
||||
case JLE_CONNECT:
|
||||
break;
|
||||
case JLE_WRITE_FROM_BUFFER:
|
||||
janet_mark(janet_wrap_buffer(lfd.data.write_from_buffer.buf));
|
||||
break;
|
||||
case JLE_WRITE_FROM_STRINGLIKE:
|
||||
janet_mark(janet_wrap_string(lfd.data.write_from_stringlike.str));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Add a loop fd to the global event loop */
|
||||
static int janet_loop_schedule(JanetLoopFD lfd, short events) {
|
||||
if (janet_vm_loop_count == JANET_LOOPFD_MAX) {
|
||||
return -1;
|
||||
}
|
||||
int index = janet_vm_loop_count++;
|
||||
janet_vm_loopfds[index] = lfd;
|
||||
janet_vm_pollfds[index].fd = lfd.stream->fd;
|
||||
janet_vm_pollfds[index].events = events;
|
||||
janet_vm_pollfds[index].revents = 0;
|
||||
return index;
|
||||
}
|
||||
|
||||
/* Remove event from list */
|
||||
static void janet_loop_rmindex(int index) {
|
||||
janet_vm_loopfds[index] = janet_vm_loopfds[--janet_vm_loop_count];
|
||||
janet_vm_pollfds[index] = janet_vm_pollfds[janet_vm_loop_count];
|
||||
}
|
||||
|
||||
|
||||
/* Return delta in number of loop fds. Abstracted out so
|
||||
* we can separate out the polling logic */
|
||||
static size_t janet_loop_event(size_t index) {
|
||||
JanetLoopFD *jlfd = janet_vm_loopfds + index;
|
||||
JanetStream *stream = jlfd->stream;
|
||||
JSock fd = stream->fd;
|
||||
int ret = 1;
|
||||
int should_resume = 0;
|
||||
Janet resumeval = janet_wrap_nil();
|
||||
if (stream->flags & JANET_STREAM_CLOSED) {
|
||||
should_resume = 1;
|
||||
ret = 0;
|
||||
} else {
|
||||
switch (jlfd->event_type) {
|
||||
case JLE_READ_CHUNK:
|
||||
case JLE_READ_SOME: {
|
||||
JanetBuffer *buffer = jlfd->data.read_chunk.buf;
|
||||
int32_t bytes_left = jlfd->data.read_chunk.bytes_left;
|
||||
void net_machine_read(JanetListenerState *s, int event) {
|
||||
NetStateRead *state = (NetStateRead *) s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK:
|
||||
janet_mark(janet_wrap_buffer(state->buf));
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
/* Read is finished, even if chunk is incomplete */
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_READ:
|
||||
/* Read in bytes */
|
||||
{
|
||||
JanetBuffer *buffer = state->buf;
|
||||
int32_t bytes_left = state->bytes_left;
|
||||
janet_buffer_extra(buffer, bytes_left);
|
||||
if (!(stream->flags & JANET_STREAM_READABLE)) {
|
||||
should_resume = 1;
|
||||
ret = 0;
|
||||
break;
|
||||
}
|
||||
JReadInt nread;
|
||||
do {
|
||||
nread = recv(fd, buffer->data + buffer->count, bytes_left, 0);
|
||||
nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0);
|
||||
} while (nread == -1 && JLASTERR == JEINTR);
|
||||
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) {
|
||||
ret = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
/* Increment buffer counts */
|
||||
if (nread > 0) {
|
||||
buffer->count += nread;
|
||||
bytes_left -= nread;
|
||||
} else {
|
||||
bytes_left = 0;
|
||||
}
|
||||
if (jlfd->event_type == JLE_READ_SOME || bytes_left == 0) {
|
||||
should_resume = 1;
|
||||
if (nread > 0) {
|
||||
resumeval = janet_wrap_buffer(buffer);
|
||||
}
|
||||
ret = 0;
|
||||
} else {
|
||||
jlfd->data.read_chunk.bytes_left = bytes_left;
|
||||
ret = 1;
|
||||
state->bytes_left = bytes_left;
|
||||
|
||||
/* Resume if done */
|
||||
if (!state->is_chunk || bytes_left == 0) {
|
||||
Janet resume_val = nread > 0 ? janet_wrap_buffer(buffer) : janet_wrap_nil();
|
||||
janet_schedule(s->fiber, resume_val);
|
||||
janet_unlisten(s);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case JLE_READ_ACCEPT: {
|
||||
JSock connfd = accept(fd, NULL, NULL);
|
||||
if (JSOCKVALID(connfd)) {
|
||||
/* Made a new connection socket */
|
||||
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
|
||||
Janet streamv = janet_wrap_abstract(stream);
|
||||
JanetFunction *handler = jlfd->data.read_accept.handler;
|
||||
Janet out;
|
||||
JanetFiber *fiberp = NULL;
|
||||
/* Launch connection fiber */
|
||||
JanetSignal sig = janet_pcall(handler, 1, &streamv, &out, &fiberp);
|
||||
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) {
|
||||
janet_stacktrace(fiberp, out);
|
||||
}
|
||||
}
|
||||
ret = JANET_LOOPFD_MAX;
|
||||
break;
|
||||
}
|
||||
case JLE_WRITE_FROM_BUFFER:
|
||||
case JLE_WRITE_FROM_STRINGLIKE: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
|
||||
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
|
||||
JANET_ASYNC_EVENT_READ, sizeof(NetStateRead));
|
||||
state->is_chunk = 0;
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
janet_await();
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) {
|
||||
NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read,
|
||||
JANET_ASYNC_EVENT_READ, sizeof(NetStateRead));
|
||||
state->is_chunk = 1;
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
janet_await();
|
||||
}
|
||||
|
||||
/*
|
||||
* State machine for write
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
union {
|
||||
JanetBuffer *buf;
|
||||
const uint8_t *str;
|
||||
} src;
|
||||
int32_t start;
|
||||
int is_buffer;
|
||||
} NetStateWrite;
|
||||
|
||||
void net_machine_write(JanetListenerState *s, int event) {
|
||||
NetStateWrite *state = (NetStateWrite *) s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK:
|
||||
janet_mark(state->is_buffer
|
||||
? janet_wrap_buffer(state->src.buf)
|
||||
: janet_wrap_string(state->src.str));
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_WRITE: {
|
||||
int32_t start, len;
|
||||
const uint8_t *bytes;
|
||||
if (!(stream->flags & JANET_STREAM_WRITABLE)) {
|
||||
should_resume = 1;
|
||||
ret = 0;
|
||||
break;
|
||||
}
|
||||
if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) {
|
||||
JanetBuffer *buffer = jlfd->data.write_from_buffer.buf;
|
||||
start = state->start;
|
||||
if (state->is_buffer) {
|
||||
JanetBuffer *buffer = state->src.buf;
|
||||
bytes = buffer->data;
|
||||
len = buffer->count;
|
||||
start = jlfd->data.write_from_buffer.start;
|
||||
} else {
|
||||
bytes = jlfd->data.write_from_stringlike.str;
|
||||
bytes = state->src.str;
|
||||
len = janet_string_length(bytes);
|
||||
start = jlfd->data.write_from_stringlike.start;
|
||||
}
|
||||
if (start < len) {
|
||||
int32_t nbytes = len - start;
|
||||
JReadInt nwrote;
|
||||
do {
|
||||
nwrote = send(fd, bytes + start, nbytes, MSG_NOSIGNAL);
|
||||
nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL);
|
||||
} while (nwrote == -1 && JLASTERR == JEINTR);
|
||||
if (nwrote > 0) {
|
||||
start += nwrote;
|
||||
@ -351,115 +265,77 @@ static size_t janet_loop_event(size_t index) {
|
||||
start = len;
|
||||
}
|
||||
}
|
||||
state->start = start;
|
||||
if (start >= len) {
|
||||
should_resume = 1;
|
||||
ret = 0;
|
||||
} else {
|
||||
if (jlfd->event_type == JLE_WRITE_FROM_BUFFER) {
|
||||
jlfd->data.write_from_buffer.start = start;
|
||||
} else {
|
||||
jlfd->data.write_from_stringlike.start = start;
|
||||
}
|
||||
ret = 1;
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
janet_unlisten(s);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case JLE_CONNECT: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
/* Resume a fiber for some events */
|
||||
if (NULL != jlfd->fiber && should_resume) {
|
||||
/* Resume the fiber */
|
||||
Janet out;
|
||||
JanetSignal sig = janet_continue(jlfd->fiber, resumeval, &out);
|
||||
if (sig != JANET_SIGNAL_OK && sig != JANET_SIGNAL_EVENT) {
|
||||
janet_stacktrace(jlfd->fiber, out);
|
||||
}
|
||||
}
|
||||
|
||||
/* Remove this handler from the handler pool. */
|
||||
if (should_resume) janet_loop_rmindex((int) index);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void janet_loop1(void) {
|
||||
/* Remove closed file descriptors */
|
||||
for (int i = 0; i < janet_vm_loop_count;) {
|
||||
if (janet_vm_loopfds[i].stream->flags & JANET_STREAM_CLOSED) {
|
||||
janet_loop_rmindex(i);
|
||||
} else {
|
||||
i++;
|
||||
}
|
||||
}
|
||||
/* Poll */
|
||||
if (janet_vm_loop_count == 0) return;
|
||||
int ready;
|
||||
do {
|
||||
ready = JPOLL(janet_vm_pollfds, janet_vm_loop_count, -1);
|
||||
} while (ready == -1 && JLASTERR == JEINTR);
|
||||
if (ready == -1) return;
|
||||
/* Handle events */
|
||||
for (int i = 0; i < janet_vm_loop_count;) {
|
||||
int revents = janet_vm_pollfds[i].revents;
|
||||
janet_vm_pollfds[i].revents = 0;
|
||||
if ((janet_vm_pollfds[i].events | POLLHUP | POLLERR) & revents) {
|
||||
size_t delta = janet_loop_event(i);
|
||||
i += (int) delta;
|
||||
} else {
|
||||
i++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void janet_loop(void) {
|
||||
while (janet_vm_loop_count) {
|
||||
janet_loop1();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Scheduling Helpers
|
||||
*/
|
||||
|
||||
#define JANET_SCHED_FSOME 1
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags) {
|
||||
JanetLoopFD lfd;
|
||||
lfd.stream = stream;
|
||||
lfd.fiber = janet_root_fiber();
|
||||
lfd.event_type = (flags & JANET_SCHED_FSOME) ? JLE_READ_SOME : JLE_READ_CHUNK;
|
||||
lfd.data.read_chunk.buf = buf;
|
||||
lfd.data.read_chunk.bytes_left = nbytes;
|
||||
janet_loop_schedule(lfd, POLLIN);
|
||||
janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil());
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf) {
|
||||
JanetLoopFD lfd;
|
||||
lfd.stream = stream;
|
||||
lfd.fiber = janet_root_fiber();
|
||||
lfd.event_type = JLE_WRITE_FROM_BUFFER;
|
||||
lfd.data.write_from_buffer.buf = buf;
|
||||
lfd.data.write_from_buffer.start = 0;
|
||||
janet_loop_schedule(lfd, POLLOUT);
|
||||
janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil());
|
||||
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
|
||||
JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite));
|
||||
state->is_buffer = 1;
|
||||
state->start = 0;
|
||||
state->src.buf = buf;
|
||||
janet_await();
|
||||
}
|
||||
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str) {
|
||||
JanetLoopFD lfd;
|
||||
lfd.stream = stream;
|
||||
lfd.fiber = janet_root_fiber();
|
||||
lfd.event_type = JLE_WRITE_FROM_STRINGLIKE;
|
||||
lfd.data.write_from_stringlike.str = str;
|
||||
lfd.data.write_from_stringlike.start = 0;
|
||||
janet_loop_schedule(lfd, POLLOUT);
|
||||
janet_signalv(JANET_SIGNAL_EVENT, janet_wrap_nil());
|
||||
NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write,
|
||||
JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite));
|
||||
state->is_buffer = 0;
|
||||
state->start = 0;
|
||||
state->src.str = str;
|
||||
janet_await();
|
||||
}
|
||||
|
||||
/*
|
||||
* State machine for simple server
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
JanetFunction *function;
|
||||
} NetStateSimpleServer;
|
||||
|
||||
void net_machine_simple_server(JanetListenerState *s, int event) {
|
||||
NetStateSimpleServer *state = (NetStateSimpleServer *) s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_INIT:
|
||||
/* We know the pollable will be a stream */
|
||||
janet_gcroot(janet_wrap_abstract(s->pollable));
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK:
|
||||
janet_mark(janet_wrap_function(state->function));
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
janet_gcunroot(janet_wrap_abstract(s->pollable));
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_READ: {
|
||||
JSock connfd = accept(s->pollable->handle, NULL, NULL);
|
||||
if (JSOCKVALID(connfd)) {
|
||||
/* Made a new connection socket */
|
||||
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
|
||||
Janet streamv = janet_wrap_abstract(stream);
|
||||
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
|
||||
janet_schedule(fiber, janet_wrap_nil());
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Adress info */
|
||||
|
||||
/* Needs argc >= offset + 2 */
|
||||
static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset) {
|
||||
/* Get host and port */
|
||||
@ -564,29 +440,33 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
|
||||
}
|
||||
|
||||
/* Put sfd on our loop */
|
||||
JanetLoopFD lfd = {0};
|
||||
lfd.stream = make_stream(sfd, 0);
|
||||
lfd.event_type = JLE_READ_ACCEPT;
|
||||
lfd.data.read_accept.handler = fun;
|
||||
janet_loop_schedule(lfd, POLLIN);
|
||||
|
||||
return janet_wrap_abstract(lfd.stream);
|
||||
JanetStream *stream = make_stream(sfd, 0);
|
||||
NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server,
|
||||
JANET_ASYNC_EVENT_READ, sizeof(NetStateSimpleServer));
|
||||
ss->function = fun;
|
||||
return janet_wrap_abstract(stream);
|
||||
}
|
||||
|
||||
static Janet cfun_stream_read(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 2, 3);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
if (!(stream->flags & JANET_STREAM_READABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) {
|
||||
janet_panic("got non readable stream");
|
||||
}
|
||||
int32_t n = janet_getnat(argv, 1);
|
||||
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
|
||||
janet_sched_read(stream, buffer, n, JANET_SCHED_FSOME);
|
||||
janet_sched_read(stream, buffer, n);
|
||||
}
|
||||
|
||||
static Janet cfun_stream_chunk(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 2, 3);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
if (!(stream->flags & JANET_STREAM_READABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) {
|
||||
janet_panic("got non readable stream");
|
||||
}
|
||||
int32_t n = janet_getnat(argv, 1);
|
||||
JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10);
|
||||
janet_sched_read(stream, buffer, n, 0);
|
||||
janet_sched_chunk(stream, buffer, n);
|
||||
}
|
||||
|
||||
static Janet cfun_stream_close(int32_t argc, Janet *argv) {
|
||||
@ -599,6 +479,9 @@ static Janet cfun_stream_close(int32_t argc, Janet *argv) {
|
||||
static Janet cfun_stream_write(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 2);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
if (!(stream->flags & JANET_STREAM_WRITABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) {
|
||||
janet_panic("got non writeable stream");
|
||||
}
|
||||
if (janet_checktype(argv[1], JANET_BUFFER)) {
|
||||
janet_sched_write_buffer(stream, janet_getbuffer(argv, 1));
|
||||
} else {
|
||||
@ -662,12 +545,14 @@ static const JanetReg net_cfuns[] = {
|
||||
};
|
||||
|
||||
void janet_lib_net(JanetTable *env) {
|
||||
janet_vm_loop_count = 0;
|
||||
janet_core_cfuns(env, NULL, net_cfuns);
|
||||
}
|
||||
|
||||
void janet_net_init(void) {
|
||||
#ifdef JANET_WINDOWS
|
||||
WSADATA wsaData;
|
||||
janet_assert(!WSAStartup(MAKEWORD(2, 2), &wsaData), "could not start winsock");
|
||||
#endif
|
||||
janet_core_cfuns(env, NULL, net_cfuns);
|
||||
}
|
||||
|
||||
void janet_net_deinit(void) {
|
||||
@ -675,3 +560,5 @@ void janet_net_deinit(void) {
|
||||
WSACleanup();
|
||||
#endif
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -97,4 +97,14 @@ void janet_threads_init(void);
|
||||
void janet_threads_deinit(void);
|
||||
#endif
|
||||
|
||||
#ifdef JANET_NET
|
||||
void janet_net_init(void);
|
||||
void janet_net_deinit(void);
|
||||
#endif
|
||||
|
||||
#ifdef JANET_EV
|
||||
void janet_ev_init(void);
|
||||
void janet_ev_deinit(void);
|
||||
#endif
|
||||
|
||||
#endif /* JANET_STATE_H_defined */
|
||||
|
@ -128,8 +128,10 @@ void janet_lib_thread(JanetTable *env);
|
||||
#endif
|
||||
#ifdef JANET_NET
|
||||
void janet_lib_net(JanetTable *env);
|
||||
void janet_net_deinit(void);
|
||||
void janet_net_markloop(void);
|
||||
#endif
|
||||
#ifdef JANET_EV
|
||||
void janet_lib_ev(JanetTable *env);
|
||||
void janet_ev_mark(void);
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
@ -1428,6 +1428,12 @@ int janet_init(void) {
|
||||
/* Threads */
|
||||
#ifdef JANET_THREADS
|
||||
janet_threads_init();
|
||||
#endif
|
||||
#ifdef JANET_EV
|
||||
janet_ev_init();
|
||||
#endif
|
||||
#ifdef JANET_NET
|
||||
janet_net_init();
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
@ -1449,6 +1455,9 @@ void janet_deinit(void) {
|
||||
#ifdef JANET_THREADS
|
||||
janet_threads_deinit();
|
||||
#endif
|
||||
#ifdef JANET_EV
|
||||
janet_ev_deinit();
|
||||
#endif
|
||||
#ifdef JANET_NET
|
||||
janet_net_deinit();
|
||||
#endif
|
||||
|
@ -176,8 +176,13 @@ extern "C" {
|
||||
#define JANET_TYPED_ARRAY
|
||||
#endif
|
||||
|
||||
/* Enable or disable event loop */
|
||||
#if !defined(JANET_NO_EV) && !defined(__EMSCRIPTEN__)
|
||||
#define JANET_EV
|
||||
#endif
|
||||
|
||||
/* Enable or disable networking */
|
||||
#if !defined(JANET_NO_NET) && !defined(__EMSCRIPTEN__)
|
||||
#if defined(JANET_EV) && !defined(JANET_NO_NET) && !defined(__EMSCRIPTEN__)
|
||||
#define JANET_NET
|
||||
#endif
|
||||
|
||||
@ -1134,8 +1139,65 @@ extern enum JanetInstructionType janet_instructions[JOP_INSTRUCTION_COUNT];
|
||||
/***** START SECTION MAIN *****/
|
||||
|
||||
/* Event Loop */
|
||||
#ifdef JANET_NET
|
||||
#ifdef JANET_EV
|
||||
#define JANET_POLL_FLAG_CLOSED 0x1
|
||||
#define JANET_POLL_FLAG_SOCKET 0x2
|
||||
#define JANET_ASYNC_EVENT_INIT 0
|
||||
#define JANET_ASYNC_EVENT_MARK 1
|
||||
#define JANET_ASYNC_EVENT_DEINIT 2
|
||||
#define JANET_ASYNC_EVENT_CLOSE 3
|
||||
#define JANET_ASYNC_EVENT_READ 4
|
||||
#define JANET_ASYNC_EVENT_WRITE 5
|
||||
#define JANET_ASYNC_EVENT_TIMEOUT 6
|
||||
|
||||
/* Typedefs */
|
||||
#ifdef JANET_WINDOWS
|
||||
typedef HANDLE JanetPollType;
|
||||
#else
|
||||
typedef int JanetPollType;
|
||||
#endif
|
||||
typedef struct JanetListenerState JanetListenerState;
|
||||
typedef struct JanetPollable JanetPollable;
|
||||
typedef void (*JanetListener)(JanetListenerState *state, int event);
|
||||
|
||||
/* Wrapper around file descriptors and HANDLEs that can be polled. */
|
||||
struct JanetPollable {
|
||||
JanetPollType handle;
|
||||
uint32_t flags;
|
||||
JanetListenerState *state;
|
||||
/* internal */
|
||||
int _mask;
|
||||
};
|
||||
|
||||
/* Interface for state machine based event loop */
|
||||
struct JanetListenerState {
|
||||
JanetListener machine;
|
||||
JanetFiber *fiber;
|
||||
JanetPollable *pollable;
|
||||
/* internal */
|
||||
int _mask;
|
||||
JanetListenerState *_next;
|
||||
};
|
||||
|
||||
/* Run the event loop */
|
||||
JANET_API void janet_loop1(void);
|
||||
JANET_API void janet_loop(void);
|
||||
|
||||
/* Wrapper around pollables */
|
||||
JANET_API void janet_pollable_init(JanetPollable *pollable, JanetPollType handle);
|
||||
JANET_API void janet_pollable_mark(JanetPollable *pollable);
|
||||
JANET_API void janet_pollable_deinit(JanetPollable *pollable);
|
||||
|
||||
/* Queue a fiber to run on the event loop */
|
||||
JANET_API void janet_schedule(JanetFiber *fiber, Janet value);
|
||||
|
||||
/* Start a state machine listening for events from a pollable */
|
||||
JANET_API JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size);
|
||||
JANET_API void janet_unlisten(JanetListenerState *state);
|
||||
|
||||
/* Shorthand for yielding to event loop in C */
|
||||
JANET_NO_RETURN JANET_API void janet_await(void);
|
||||
|
||||
#endif
|
||||
|
||||
/* Parsing */
|
||||
|
@ -1026,7 +1026,7 @@ int main(int argc, char **argv) {
|
||||
janet_stacktrace(fiber, out);
|
||||
}
|
||||
|
||||
#ifdef JANET_NET
|
||||
#ifdef JANET_EV
|
||||
status = JANET_SIGNAL_OK;
|
||||
janet_loop();
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user