2021-07-31 00:26:42 +00:00
/* The above copyright notice and this permission notice shall be included in
2020-05-28 15:39:40 +00:00
* all copies or substantial portions of the Software .
*
* THE SOFTWARE IS PROVIDED " AS IS " , WITHOUT WARRANTY OF ANY KIND , EXPRESS OR
* IMPLIED , INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY ,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT . IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM , DAMAGES OR OTHER
* LIABILITY , WHETHER IN AN ACTION OF CONTRACT , TORT OR OTHERWISE , ARISING
* FROM , OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE .
*/
# ifndef JANET_AMALG
# include "features.h"
# include <janet.h>
# include "util.h"
# include "gc.h"
# include "state.h"
2020-07-20 00:41:12 +00:00
# include "fiber.h"
2020-05-28 15:39:40 +00:00
# endif
# ifdef JANET_EV
2020-11-15 16:21:37 +00:00
# include <math.h>
2020-10-04 17:46:15 +00:00
# ifdef JANET_WINDOWS
2020-11-14 17:48:23 +00:00
# include <winsock2.h>
2020-10-04 17:46:15 +00:00
# include <windows.h>
# else
2021-01-03 17:04:21 +00:00
# include <pthread.h>
2020-05-28 21:51:11 +00:00
# include <limits.h>
2020-05-28 15:39:40 +00:00
# include <errno.h>
# include <unistd.h>
# include <signal.h>
# include <sys/ioctl.h>
# include <sys/types.h>
# include <fcntl.h>
2020-09-07 20:22:18 +00:00
# include <netinet/in.h>
2020-11-14 17:48:23 +00:00
# include <netinet/tcp.h>
# include <netdb.h>
# include <sys/socket.h>
2020-11-29 21:36:21 +00:00
# include <sys/wait.h>
2020-09-07 18:13:28 +00:00
# ifdef JANET_EV_EPOLL
# include <sys/epoll.h>
2020-05-28 15:39:40 +00:00
# include <sys/timerfd.h>
2020-09-07 18:13:28 +00:00
# endif
2020-10-04 17:46:15 +00:00
# endif
2021-01-07 05:19:22 +00:00
typedef struct {
JanetFiber * fiber ;
uint32_t sched_id ;
enum {
JANET_CP_MODE_ITEM ,
JANET_CP_MODE_CHOICE_READ ,
JANET_CP_MODE_CHOICE_WRITE
} mode ;
} JanetChannelPending ;
typedef struct {
JanetQueue items ;
JanetQueue read_pending ;
JanetQueue write_pending ;
int32_t limit ;
2021-07-31 00:26:42 +00:00
int closed ;
2021-01-07 05:19:22 +00:00
} JanetChannel ;
2021-07-20 23:24:56 +00:00
typedef struct {
JanetFiber * fiber ;
Janet value ;
JanetSignal sig ;
} JanetTask ;
/* Wrap return value by pairing it with the callback used to handle it
* in the main thread */
typedef struct {
JanetEVGenericMessage msg ;
JanetThreadedCallback cb ;
} JanetSelfPipeEvent ;
/* Structure used to initialize threads in the thread pool
* ( same head structure as self pipe event ) */
typedef struct {
JanetEVGenericMessage msg ;
JanetThreadedCallback cb ;
JanetThreadedSubroutine subr ;
JanetHandle write_pipe ;
} JanetEVThreadInit ;
2020-10-04 20:18:31 +00:00
# define JANET_MAX_Q_CAPACITY 0x7FFFFFF
2020-08-01 19:20:58 +00:00
static void janet_q_init ( JanetQueue * q ) {
q - > data = NULL ;
q - > head = 0 ;
q - > tail = 0 ;
q - > capacity = 0 ;
}
static void janet_q_deinit ( JanetQueue * q ) {
2021-03-23 10:00:48 +00:00
janet_free ( q - > data ) ;
2020-08-01 19:20:58 +00:00
}
static int32_t janet_q_count ( JanetQueue * q ) {
return ( q - > head > q - > tail )
? ( q - > tail + q - > capacity - q - > head )
: ( q - > tail - q - > head ) ;
}
static int janet_q_push ( JanetQueue * q , void * item , size_t itemsize ) {
int32_t count = janet_q_count ( q ) ;
/* Resize if needed */
if ( count + 1 > = q - > capacity ) {
if ( count + 1 > = JANET_MAX_Q_CAPACITY ) return 1 ;
int32_t newcap = ( count + 2 ) * 2 ;
if ( newcap > JANET_MAX_Q_CAPACITY ) newcap = JANET_MAX_Q_CAPACITY ;
2021-03-23 10:00:48 +00:00
q - > data = janet_realloc ( q - > data , itemsize * newcap ) ;
2020-08-01 19:20:58 +00:00
if ( NULL = = q - > data ) {
JANET_OUT_OF_MEMORY ;
}
if ( q - > head > q - > tail ) {
/* Two segments, fix 2nd seg. */
int32_t newhead = q - > head + ( newcap - q - > capacity ) ;
2020-08-09 05:20:27 +00:00
size_t seg1 = ( size_t ) ( q - > capacity - q - > head ) ;
if ( seg1 > 0 ) {
2020-10-04 17:46:15 +00:00
memmove ( ( char * ) q - > data + ( newhead * itemsize ) ,
( char * ) q - > data + ( q - > head * itemsize ) ,
2020-08-09 05:20:27 +00:00
seg1 * itemsize ) ;
}
2020-08-01 19:20:58 +00:00
q - > head = newhead ;
}
q - > capacity = newcap ;
}
2020-10-04 17:46:15 +00:00
memcpy ( ( char * ) q - > data + itemsize * q - > tail , item , itemsize ) ;
2020-08-09 05:20:27 +00:00
q - > tail = q - > tail + 1 < q - > capacity ? q - > tail + 1 : 0 ;
2020-08-01 19:20:58 +00:00
return 0 ;
}
static int janet_q_pop ( JanetQueue * q , void * out , size_t itemsize ) {
if ( q - > head = = q - > tail ) return 1 ;
2020-10-04 17:46:15 +00:00
memcpy ( out , ( char * ) q - > data + itemsize * q - > head , itemsize ) ;
2020-08-09 05:20:27 +00:00
q - > head = q - > head + 1 < q - > capacity ? q - > head + 1 : 0 ;
2020-08-01 19:20:58 +00:00
return 0 ;
}
2020-07-05 22:26:17 +00:00
/* Forward declaration */
2021-07-21 00:42:35 +00:00
static void janet_unlisten ( JanetListenerState * state , int is_gc ) ;
2020-07-05 22:26:17 +00:00
2020-05-28 21:51:11 +00:00
/* Get current timestamp (millisecond precision) */
static JanetTimestamp ts_now ( void ) ;
/* Get current timestamp + an interval (millisecond precision) */
static JanetTimestamp ts_delta ( JanetTimestamp ts , double delta ) {
ts + = ( int64_t ) round ( delta * 1000 ) ;
return ts ;
2020-05-28 15:39:40 +00:00
}
2020-07-05 22:26:17 +00:00
/* Look at the next timeout value without
* removing it . */
static int peek_timeout ( JanetTimeout * out ) {
2021-07-17 01:59:03 +00:00
if ( janet_vm . tq_count = = 0 ) return 0 ;
* out = janet_vm . tq [ 0 ] ;
2020-07-05 22:26:17 +00:00
return 1 ;
}
/* Remove the next timeout from the priority queue */
static void pop_timeout ( size_t index ) {
2021-07-17 01:59:03 +00:00
if ( janet_vm . tq_count < = index ) return ;
janet_vm . tq [ index ] = janet_vm . tq [ - - janet_vm . tq_count ] ;
2020-07-05 22:26:17 +00:00
for ( ; ; ) {
size_t left = ( index < < 1 ) + 1 ;
size_t right = left + 1 ;
size_t smallest = index ;
2021-07-17 01:59:03 +00:00
if ( left < janet_vm . tq_count & &
( janet_vm . tq [ left ] . when < janet_vm . tq [ smallest ] . when ) )
2020-07-05 22:26:17 +00:00
smallest = left ;
2021-07-17 01:59:03 +00:00
if ( right < janet_vm . tq_count & &
( janet_vm . tq [ right ] . when < janet_vm . tq [ smallest ] . when ) )
2020-07-05 22:26:17 +00:00
smallest = right ;
if ( smallest = = index ) return ;
2021-07-17 01:59:03 +00:00
JanetTimeout temp = janet_vm . tq [ index ] ;
janet_vm . tq [ index ] = janet_vm . tq [ smallest ] ;
janet_vm . tq [ smallest ] = temp ;
2020-07-05 22:26:17 +00:00
index = smallest ;
}
}
2020-05-28 15:39:40 +00:00
/* Add a timeout to the timeout min heap */
2020-05-28 21:51:11 +00:00
static void add_timeout ( JanetTimeout to ) {
2021-07-17 01:59:03 +00:00
size_t oldcount = janet_vm . tq_count ;
2020-05-28 15:39:40 +00:00
size_t newcount = oldcount + 1 ;
2021-07-17 01:59:03 +00:00
if ( newcount > janet_vm . tq_capacity ) {
2020-05-28 15:39:40 +00:00
size_t newcap = 2 * newcount ;
2021-07-17 01:59:03 +00:00
JanetTimeout * tq = janet_realloc ( janet_vm . tq , newcap * sizeof ( JanetTimeout ) ) ;
2020-05-28 15:39:40 +00:00
if ( NULL = = tq ) {
JANET_OUT_OF_MEMORY ;
}
2021-07-17 01:59:03 +00:00
janet_vm . tq = tq ;
janet_vm . tq_capacity = newcap ;
2020-05-28 15:39:40 +00:00
}
/* Append */
2021-07-17 01:59:03 +00:00
janet_vm . tq_count = ( int32_t ) newcount ;
janet_vm . tq [ oldcount ] = to ;
2020-05-28 15:39:40 +00:00
/* Heapify */
size_t index = oldcount ;
while ( index > 0 ) {
size_t parent = ( index - 1 ) > > 1 ;
2021-07-17 01:59:03 +00:00
if ( janet_vm . tq [ parent ] . when < = janet_vm . tq [ index ] . when ) break ;
2020-05-28 15:39:40 +00:00
/* Swap */
2021-07-17 01:59:03 +00:00
JanetTimeout tmp = janet_vm . tq [ index ] ;
janet_vm . tq [ index ] = janet_vm . tq [ parent ] ;
janet_vm . tq [ parent ] = tmp ;
2020-05-28 15:39:40 +00:00
/* Next */
index = parent ;
}
}
/* Create a new event listener */
2020-11-14 20:28:10 +00:00
static JanetListenerState * janet_listen_impl ( JanetStream * stream , JanetListener behavior , int mask , size_t size , void * user ) {
if ( stream - > _mask & mask ) {
janet_panic ( " cannot listen for duplicate event on stream " ) ;
2020-06-02 03:06:27 +00:00
}
2021-07-17 01:59:03 +00:00
if ( janet_vm . root_fiber - > waiting ! = NULL ) {
2020-08-08 12:51:46 +00:00
janet_panic ( " current fiber is already waiting for event " ) ;
}
2020-05-28 15:39:40 +00:00
if ( size < sizeof ( JanetListenerState ) )
size = sizeof ( JanetListenerState ) ;
2021-03-23 10:00:48 +00:00
JanetListenerState * state = janet_malloc ( size ) ;
2020-05-28 15:39:40 +00:00
if ( NULL = = state ) {
JANET_OUT_OF_MEMORY ;
}
state - > machine = behavior ;
2021-07-17 01:59:03 +00:00
state - > fiber = janet_vm . root_fiber ;
janet_vm . root_fiber - > waiting = state ;
2020-11-14 20:28:10 +00:00
state - > stream = stream ;
2020-05-28 15:39:40 +00:00
state - > _mask = mask ;
2020-11-14 20:28:10 +00:00
stream - > _mask | = mask ;
state - > _next = stream - > state ;
stream - > state = state ;
2020-11-16 15:30:04 +00:00
/* Keep track of a listener for GC purposes */
2021-07-17 01:59:03 +00:00
int resize = janet_vm . listener_cap = = janet_vm . listener_count ;
2020-11-16 15:30:04 +00:00
if ( resize ) {
2021-07-17 01:59:03 +00:00
size_t newcap = janet_vm . listener_count ? janet_vm . listener_cap * 2 : 16 ;
janet_vm . listeners = janet_realloc ( janet_vm . listeners , newcap * sizeof ( JanetListenerState * ) ) ;
if ( NULL = = janet_vm . listeners ) {
2020-11-16 15:30:04 +00:00
JANET_OUT_OF_MEMORY ;
}
2021-07-17 01:59:03 +00:00
janet_vm . listener_cap = newcap ;
2020-11-16 15:30:04 +00:00
}
2021-07-17 01:59:03 +00:00
size_t index = janet_vm . listener_count + + ;
janet_vm . listeners [ index ] = state ;
2020-11-16 15:30:04 +00:00
state - > _index = index ;
2020-05-28 15:39:40 +00:00
/* Emit INIT event for convenience */
2020-10-07 00:07:29 +00:00
state - > event = user ;
2020-05-28 15:39:40 +00:00
state - > machine ( state , JANET_ASYNC_EVENT_INIT ) ;
return state ;
}
/* Indicate we are no longer listening for an event. This
* frees the memory of the state machine as well . */
2021-07-21 00:42:35 +00:00
static void janet_unlisten_impl ( JanetListenerState * state , int is_gc ) {
2020-05-28 15:39:40 +00:00
state - > machine ( state , JANET_ASYNC_EVENT_DEINIT ) ;
/* Remove state machine from poll list */
2020-11-14 20:28:10 +00:00
JanetListenerState * * iter = & ( state - > stream - > state ) ;
2020-05-28 15:39:40 +00:00
while ( * iter & & * iter ! = state )
iter = & ( ( * iter ) - > _next ) ;
janet_assert ( * iter , " failed to remove listener " ) ;
* iter = state - > _next ;
/* Remove mask */
2020-11-14 20:28:10 +00:00
state - > stream - > _mask & = ~ ( state - > _mask ) ;
2020-07-05 22:26:17 +00:00
/* Ensure fiber does not reference this state */
2021-07-21 00:42:35 +00:00
if ( ! is_gc ) {
JanetFiber * fiber = state - > fiber ;
if ( NULL ! = fiber & & fiber - > waiting = = state ) {
fiber - > waiting = NULL ;
}
2020-07-05 22:26:17 +00:00
}
2020-11-16 15:30:04 +00:00
/* Untrack a listener for gc purposes */
size_t index = state - > _index ;
2021-07-17 01:59:03 +00:00
janet_vm . listeners [ index ] = janet_vm . listeners [ - - janet_vm . listener_count ] ;
janet_vm . listeners [ index ] - > _index = index ;
2021-03-23 10:00:48 +00:00
janet_free ( state ) ;
2020-05-28 15:39:40 +00:00
}
2020-11-14 20:28:10 +00:00
static const JanetMethod ev_default_stream_methods [ ] = {
{ " close " , janet_cfun_stream_close } ,
{ " read " , janet_cfun_stream_read } ,
{ " chunk " , janet_cfun_stream_chunk } ,
{ " write " , janet_cfun_stream_write } ,
{ NULL , NULL }
} ;
/* Create a stream*/
JanetStream * janet_stream ( JanetHandle handle , uint32_t flags , const JanetMethod * methods ) {
JanetStream * stream = janet_abstract ( & janet_stream_type , sizeof ( JanetStream ) ) ;
stream - > handle = handle ;
stream - > flags = flags ;
stream - > state = NULL ;
stream - > _mask = 0 ;
if ( methods = = NULL ) methods = ev_default_stream_methods ;
stream - > methods = methods ;
return stream ;
}
/* Close a stream */
2021-07-21 00:42:35 +00:00
static void janet_stream_close_impl ( JanetStream * stream , int is_gc ) {
2020-11-14 20:28:10 +00:00
if ( stream - > flags & JANET_STREAM_CLOSED ) return ;
JanetListenerState * state = stream - > state ;
while ( NULL ! = state ) {
2021-07-21 00:42:35 +00:00
if ( ! is_gc ) {
state - > machine ( state , JANET_ASYNC_EVENT_CLOSE ) ;
}
2020-11-14 20:28:10 +00:00
JanetListenerState * next_state = state - > _next ;
2021-07-21 00:42:35 +00:00
janet_unlisten ( state , is_gc ) ;
2020-11-14 20:28:10 +00:00
state = next_state ;
}
stream - > state = NULL ;
stream - > flags | = JANET_STREAM_CLOSED ;
# ifdef JANET_WINDOWS
# ifdef JANET_NET
if ( stream - > flags & JANET_STREAM_SOCKET ) {
closesocket ( ( SOCKET ) stream - > handle ) ;
} else
# endif
{
CloseHandle ( stream - > handle ) ;
}
# else
close ( stream - > handle ) ;
# endif
}
2021-07-21 00:42:35 +00:00
void janet_stream_close ( JanetStream * stream ) {
janet_stream_close_impl ( stream , 0 ) ;
}
/* Called to clean up a stream */
static int janet_stream_gc ( void * p , size_t s ) {
( void ) s ;
JanetStream * stream = ( JanetStream * ) p ;
janet_stream_close_impl ( stream , 1 ) ;
return 0 ;
}
2020-11-14 20:28:10 +00:00
/* Mark a stream for GC */
static int janet_stream_mark ( void * p , size_t s ) {
( void ) s ;
JanetStream * stream = ( JanetStream * ) p ;
JanetListenerState * state = stream - > state ;
2020-05-28 15:39:40 +00:00
while ( NULL ! = state ) {
if ( NULL ! = state - > fiber ) {
janet_mark ( janet_wrap_fiber ( state - > fiber ) ) ;
}
( state - > machine ) ( state , JANET_ASYNC_EVENT_MARK ) ;
state = state - > _next ;
}
2020-11-14 20:28:10 +00:00
return 0 ;
2020-05-28 15:39:40 +00:00
}
2020-11-14 20:28:10 +00:00
static int janet_stream_getter ( void * p , Janet key , Janet * out ) {
JanetStream * stream = ( JanetStream * ) p ;
if ( ! janet_checktype ( key , JANET_KEYWORD ) ) return 0 ;
const JanetMethod * stream_methods = stream - > methods ;
return janet_getmethod ( janet_unwrap_keyword ( key ) , stream_methods , out ) ;
2020-05-28 15:39:40 +00:00
}
2020-12-13 15:33:05 +00:00
static void janet_stream_marshal ( void * p , JanetMarshalContext * ctx ) {
JanetStream * s = p ;
if ( ! ( ctx - > flags & JANET_MARSHAL_UNSAFE ) ) {
janet_panic ( " can only marshal stream with unsafe flag " ) ;
}
janet_marshal_abstract ( ctx , p ) ;
janet_marshal_int ( ctx , ( int32_t ) s - > flags ) ;
janet_marshal_int64 ( ctx , ( intptr_t ) s - > methods ) ;
# ifdef JANET_WINDOWS
/* TODO - ref counting to avoid situation where a handle is closed or GCed
* while in transit , and it ' s value gets reused . DuplicateHandle does not work
* for network sockets , and in general for winsock it is better to nipt duplicate
* unless there is a need to . */
2021-01-24 22:48:46 +00:00
HANDLE duph = INVALID_HANDLE_VALUE ;
if ( s - > flags & JANET_STREAM_SOCKET ) {
duph = s - > handle ;
} else {
DuplicateHandle (
2021-01-30 02:11:38 +00:00
GetCurrentProcess ( ) ,
s - > handle ,
GetCurrentProcess ( ) ,
& duph ,
0 ,
FALSE ,
DUPLICATE_SAME_ACCESS ) ;
2021-01-24 22:48:46 +00:00
}
janet_marshal_int64 ( ctx , ( int64_t ) ( duph ) ) ;
2020-12-13 15:33:05 +00:00
# else
/* Marshal after dup becuse it is easier than maintaining our own ref counting. */
int duph = dup ( s - > handle ) ;
if ( duph < 0 ) janet_panicf ( " failed to duplicate stream handle: %V " , janet_ev_lasterr ( ) ) ;
janet_marshal_int ( ctx , ( int32_t ) ( duph ) ) ;
# endif
}
static void * janet_stream_unmarshal ( JanetMarshalContext * ctx ) {
if ( ! ( ctx - > flags & JANET_MARSHAL_UNSAFE ) ) {
janet_panic ( " can only unmarshal stream with unsafe flag " ) ;
}
JanetStream * p = janet_unmarshal_abstract ( ctx , sizeof ( JanetStream ) ) ;
/* Can't share listening state and such across threads */
p - > _mask = 0 ;
p - > state = NULL ;
p - > flags = ( uint32_t ) janet_unmarshal_int ( ctx ) ;
p - > methods = ( void * ) janet_unmarshal_int64 ( ctx ) ;
# ifdef JANET_WINDOWS
p - > handle = ( JanetHandle ) janet_unmarshal_int64 ( ctx ) ;
# else
p - > handle = ( JanetHandle ) janet_unmarshal_int ( ctx ) ;
# endif
return p ;
}
2021-01-12 05:14:07 +00:00
static Janet janet_stream_next ( void * p , Janet key ) {
JanetStream * stream = ( JanetStream * ) p ;
return janet_nextmethod ( stream - > methods , key ) ;
}
2020-12-13 15:33:05 +00:00
2020-11-14 20:28:10 +00:00
const JanetAbstractType janet_stream_type = {
" core/stream " ,
janet_stream_gc ,
janet_stream_mark ,
janet_stream_getter ,
2020-12-13 15:33:05 +00:00
NULL ,
janet_stream_marshal ,
janet_stream_unmarshal ,
2021-01-12 05:14:07 +00:00
NULL ,
NULL ,
NULL ,
janet_stream_next ,
JANET_ATEND_NEXT
2020-11-14 20:28:10 +00:00
} ;
2020-05-28 15:39:40 +00:00
/* Register a fiber to resume with value */
2020-08-16 22:29:57 +00:00
void janet_schedule_signal ( JanetFiber * fiber , Janet value , JanetSignal sig ) {
2020-07-20 00:41:12 +00:00
if ( fiber - > flags & JANET_FIBER_FLAG_SCHEDULED ) return ;
fiber - > flags | = JANET_FIBER_FLAG_SCHEDULED ;
2020-08-09 05:20:27 +00:00
fiber - > sched_id + + ;
2020-08-16 22:29:57 +00:00
JanetTask t = { fiber , value , sig } ;
2021-07-17 01:59:03 +00:00
janet_q_push ( & janet_vm . spawn , & t , sizeof ( t ) ) ;
2020-05-28 15:39:40 +00:00
}
2020-08-16 22:29:57 +00:00
void janet_cancel ( JanetFiber * fiber , Janet value ) {
janet_schedule_signal ( fiber , value , JANET_SIGNAL_ERROR ) ;
}
void janet_schedule ( JanetFiber * fiber , Janet value ) {
janet_schedule_signal ( fiber , value , JANET_SIGNAL_OK ) ;
}
void janet_fiber_did_resume ( JanetFiber * fiber ) {
/* Cancel any pending fibers */
2020-11-17 00:46:59 +00:00
if ( fiber - > waiting ) {
fiber - > waiting - > machine ( fiber - > waiting , JANET_ASYNC_EVENT_CANCEL ) ;
2021-07-21 00:42:35 +00:00
janet_unlisten ( fiber - > waiting , 0 ) ;
2020-11-17 00:46:59 +00:00
}
2020-08-16 22:29:57 +00:00
}
2020-05-28 15:39:40 +00:00
/* Mark all pending tasks */
void janet_ev_mark ( void ) {
2020-11-16 15:30:04 +00:00
/* Pending tasks */
2021-07-17 01:59:03 +00:00
JanetTask * tasks = janet_vm . spawn . data ;
if ( janet_vm . spawn . head < = janet_vm . spawn . tail ) {
for ( int32_t i = janet_vm . spawn . head ; i < janet_vm . spawn . tail ; i + + ) {
2020-08-01 19:20:58 +00:00
janet_mark ( janet_wrap_fiber ( tasks [ i ] . fiber ) ) ;
janet_mark ( tasks [ i ] . value ) ;
}
} else {
2021-07-17 01:59:03 +00:00
for ( int32_t i = janet_vm . spawn . head ; i < janet_vm . spawn . capacity ; i + + ) {
2020-08-01 19:20:58 +00:00
janet_mark ( janet_wrap_fiber ( tasks [ i ] . fiber ) ) ;
janet_mark ( tasks [ i ] . value ) ;
}
2021-07-17 01:59:03 +00:00
for ( int32_t i = 0 ; i < janet_vm . spawn . tail ; i + + ) {
2020-08-01 19:20:58 +00:00
janet_mark ( janet_wrap_fiber ( tasks [ i ] . fiber ) ) ;
janet_mark ( tasks [ i ] . value ) ;
}
2020-05-28 15:39:40 +00:00
}
2020-11-16 15:30:04 +00:00
/* Pending timeouts */
2021-07-17 01:59:03 +00:00
for ( size_t i = 0 ; i < janet_vm . tq_count ; i + + ) {
janet_mark ( janet_wrap_fiber ( janet_vm . tq [ i ] . fiber ) ) ;
if ( janet_vm . tq [ i ] . curr_fiber ! = NULL ) {
janet_mark ( janet_wrap_fiber ( janet_vm . tq [ i ] . curr_fiber ) ) ;
2020-12-05 16:32:34 +00:00
}
2020-05-28 15:39:40 +00:00
}
2020-11-16 15:30:04 +00:00
/* Pending listeners */
2021-07-17 01:59:03 +00:00
for ( size_t i = 0 ; i < janet_vm . listener_count ; i + + ) {
JanetListenerState * state = janet_vm . listeners [ i ] ;
2020-11-16 15:30:04 +00:00
if ( NULL ! = state - > fiber ) {
janet_mark ( janet_wrap_fiber ( state - > fiber ) ) ;
}
janet_stream_mark ( state - > stream , sizeof ( JanetStream ) ) ;
( state - > machine ) ( state , JANET_ASYNC_EVENT_MARK ) ;
}
2020-05-28 15:39:40 +00:00
}
2021-01-10 00:33:40 +00:00
static int janet_channel_push ( JanetChannel * channel , Janet x , int mode ) ;
2021-01-07 05:19:22 +00:00
2021-01-13 03:35:28 +00:00
static Janet make_supervisor_event ( const char * name , JanetFiber * fiber ) {
Janet tup [ 2 ] ;
tup [ 0 ] = janet_ckeywordv ( name ) ;
tup [ 1 ] = janet_wrap_fiber ( fiber ) ;
return janet_wrap_tuple ( janet_tuple_n ( tup , 2 ) ) ;
}
2020-05-28 15:39:40 +00:00
/* Common init code */
void janet_ev_init_common ( void ) {
2021-07-17 01:59:03 +00:00
janet_q_init ( & janet_vm . spawn ) ;
janet_vm . listener_count = 0 ;
janet_vm . listener_cap = 0 ;
janet_vm . listeners = NULL ;
janet_vm . tq = NULL ;
janet_vm . tq_count = 0 ;
janet_vm . tq_capacity = 0 ;
janet_rng_seed ( & janet_vm . ev_rng , 0 ) ;
2020-05-28 15:39:40 +00:00
}
/* Common deinit code */
void janet_ev_deinit_common ( void ) {
2021-07-17 01:59:03 +00:00
janet_q_deinit ( & janet_vm . spawn ) ;
janet_free ( janet_vm . tq ) ;
janet_free ( janet_vm . listeners ) ;
janet_vm . listeners = NULL ;
2020-05-28 15:39:40 +00:00
}
/* Short hand to yield to event loop */
void janet_await ( void ) {
janet_signalv ( JANET_SIGNAL_EVENT , janet_wrap_nil ( ) ) ;
}
2020-07-05 22:26:17 +00:00
/* Set timeout for the current root fiber */
void janet_addtimeout ( double sec ) {
2021-07-17 01:59:03 +00:00
JanetFiber * fiber = janet_vm . root_fiber ;
2020-07-05 22:26:17 +00:00
JanetTimeout to ;
to . when = ts_delta ( ts_now ( ) , sec ) ;
to . fiber = fiber ;
2020-12-05 16:32:34 +00:00
to . curr_fiber = NULL ;
2020-10-11 14:07:11 +00:00
to . sched_id = fiber - > sched_id ;
2020-08-16 22:29:57 +00:00
to . is_error = 1 ;
2020-07-05 22:26:17 +00:00
add_timeout ( to ) ;
}
2020-11-29 21:36:21 +00:00
void janet_ev_inc_refcount ( void ) {
2021-07-17 01:59:03 +00:00
janet_vm . extra_listeners + + ;
2020-11-29 21:36:21 +00:00
}
void janet_ev_dec_refcount ( void ) {
2021-07-17 01:59:03 +00:00
janet_vm . extra_listeners - - ;
2020-11-29 21:36:21 +00:00
}
2020-07-27 04:45:25 +00:00
/* Channels */
2020-08-01 18:13:58 +00:00
# define JANET_MAX_CHANNEL_CAPACITY 0xFFFFFF
static void janet_chan_init ( JanetChannel * chan , int32_t limit ) {
chan - > limit = limit ;
2021-07-31 00:26:42 +00:00
chan - > closed = 0 ;
2020-08-01 19:20:58 +00:00
janet_q_init ( & chan - > items ) ;
janet_q_init ( & chan - > read_pending ) ;
janet_q_init ( & chan - > write_pending ) ;
2020-08-01 18:13:58 +00:00
}
static void janet_chan_deinit ( JanetChannel * chan ) {
2020-08-01 19:20:58 +00:00
janet_q_deinit ( & chan - > read_pending ) ;
janet_q_deinit ( & chan - > write_pending ) ;
janet_q_deinit ( & chan - > items ) ;
2020-08-01 18:13:58 +00:00
}
/*
* Janet Channel abstract type
*/
static int janet_chanat_mark ( void * p , size_t s ) ;
static int janet_chanat_gc ( void * p , size_t s ) ;
2021-01-12 05:14:07 +00:00
static Janet janet_chanat_next ( void * p , Janet key ) ;
static int janet_chanat_get ( void * p , Janet key , Janet * out ) ;
2020-08-01 18:13:58 +00:00
static const JanetAbstractType ChannelAT = {
" core/channel " ,
janet_chanat_gc ,
janet_chanat_mark ,
2021-01-12 05:14:07 +00:00
janet_chanat_get ,
NULL , /* put */
NULL , /* marshal */
NULL , /* unmarshal */
NULL , /* tostring */
NULL , /* compare */
NULL , /* hash */
janet_chanat_next ,
JANET_ATEND_NEXT
2020-08-01 18:13:58 +00:00
} ;
static int janet_chanat_gc ( void * p , size_t s ) {
( void ) s ;
JanetChannel * channel = p ;
janet_chan_deinit ( channel ) ;
return 0 ;
}
2020-08-01 19:20:58 +00:00
static void janet_chanat_mark_fq ( JanetQueue * fq ) {
2020-08-09 05:20:27 +00:00
JanetChannelPending * pending = fq - > data ;
2020-08-01 18:13:58 +00:00
if ( fq - > head < = fq - > tail ) {
for ( int32_t i = fq - > head ; i < fq - > tail ; i + + )
2020-08-09 05:20:27 +00:00
janet_mark ( janet_wrap_fiber ( pending [ i ] . fiber ) ) ;
2020-08-01 18:13:58 +00:00
} else {
for ( int32_t i = fq - > head ; i < fq - > capacity ; i + + )
2020-08-09 05:20:27 +00:00
janet_mark ( janet_wrap_fiber ( pending [ i ] . fiber ) ) ;
2020-08-01 18:13:58 +00:00
for ( int32_t i = 0 ; i < fq - > tail ; i + + )
2020-08-09 05:20:27 +00:00
janet_mark ( janet_wrap_fiber ( pending [ i ] . fiber ) ) ;
2020-08-01 18:13:58 +00:00
}
}
static int janet_chanat_mark ( void * p , size_t s ) {
( void ) s ;
JanetChannel * chan = p ;
janet_chanat_mark_fq ( & chan - > read_pending ) ;
janet_chanat_mark_fq ( & chan - > write_pending ) ;
2020-08-01 19:20:58 +00:00
JanetQueue * items = & chan - > items ;
Janet * data = chan - > items . data ;
if ( items - > head < = items - > tail ) {
for ( int32_t i = items - > head ; i < items - > tail ; i + + )
janet_mark ( data [ i ] ) ;
2020-08-01 18:13:58 +00:00
} else {
2020-08-01 19:20:58 +00:00
for ( int32_t i = items - > head ; i < items - > capacity ; i + + )
janet_mark ( data [ i ] ) ;
for ( int32_t i = 0 ; i < items - > tail ; i + + )
janet_mark ( data [ i ] ) ;
2020-08-01 18:13:58 +00:00
}
return 0 ;
}
2020-10-11 14:07:11 +00:00
static Janet make_write_result ( JanetChannel * channel ) {
Janet * tup = janet_tuple_begin ( 2 ) ;
tup [ 0 ] = janet_ckeywordv ( " give " ) ;
tup [ 1 ] = janet_wrap_abstract ( channel ) ;
return janet_wrap_tuple ( janet_tuple_end ( tup ) ) ;
}
2020-08-01 18:13:58 +00:00
2020-10-11 14:07:11 +00:00
static Janet make_read_result ( JanetChannel * channel , Janet x ) {
Janet * tup = janet_tuple_begin ( 3 ) ;
tup [ 0 ] = janet_ckeywordv ( " take " ) ;
tup [ 1 ] = janet_wrap_abstract ( channel ) ;
tup [ 2 ] = x ;
return janet_wrap_tuple ( janet_tuple_end ( tup ) ) ;
}
2021-07-31 00:26:42 +00:00
static Janet make_close_result ( JanetChannel * channel ) {
Janet * tup = janet_tuple_begin ( 2 ) ;
tup [ 0 ] = janet_ckeywordv ( " close " ) ;
tup [ 1 ] = janet_wrap_abstract ( channel ) ;
return janet_wrap_tuple ( janet_tuple_end ( tup ) ) ;
}
2020-10-11 14:07:11 +00:00
/* Push a value to a channel, and return 1 if channel should block, zero otherwise.
* If the push would block , will add to the write_pending queue in the channel . */
2021-01-10 00:33:40 +00:00
static int janet_channel_push ( JanetChannel * channel , Janet x , int mode ) {
2020-08-09 05:20:27 +00:00
JanetChannelPending reader ;
int is_empty ;
do {
is_empty = janet_q_pop ( & channel - > read_pending , & reader , sizeof ( reader ) ) ;
} while ( ! is_empty & & ( reader . sched_id ! = reader . fiber - > sched_id ) ) ;
if ( is_empty ) {
2020-08-01 18:13:58 +00:00
/* No pending reader */
2020-10-11 14:07:11 +00:00
if ( janet_q_push ( & channel - > items , & x , sizeof ( Janet ) ) ) {
janet_panicf ( " channel overflow: %v " , x ) ;
2020-08-01 19:20:58 +00:00
} else if ( janet_q_count ( & channel - > items ) > channel - > limit ) {
2021-01-07 05:19:22 +00:00
/* No root fiber, we are in completion on a root fiber. Don't block. */
2021-01-10 00:33:40 +00:00
if ( mode = = 2 ) return 0 ;
2020-08-01 18:13:58 +00:00
/* Pushed successfully, but should block. */
2020-08-09 05:20:27 +00:00
JanetChannelPending pending ;
2021-07-17 01:59:03 +00:00
pending . fiber = janet_vm . root_fiber ,
pending . sched_id = janet_vm . root_fiber - > sched_id ,
2021-01-10 00:33:40 +00:00
pending . mode = mode ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_ITEM ;
2020-08-09 05:20:27 +00:00
janet_q_push ( & channel - > write_pending , & pending , sizeof ( pending ) ) ;
2020-10-11 14:07:11 +00:00
return 1 ;
2020-08-09 05:20:27 +00:00
}
} else {
/* Pending reader */
2020-10-11 14:07:11 +00:00
if ( reader . mode = = JANET_CP_MODE_CHOICE_READ ) {
janet_schedule ( reader . fiber , make_read_result ( channel , x ) ) ;
2020-08-09 05:20:27 +00:00
} else {
2020-10-11 14:07:11 +00:00
janet_schedule ( reader . fiber , x ) ;
2020-08-01 18:13:58 +00:00
}
}
2020-10-11 14:07:11 +00:00
return 0 ;
2020-08-01 18:13:58 +00:00
}
2021-07-17 20:13:28 +00:00
/* Pop from a channel - returns 1 if item was obtained, 0 otherwise. The item
2020-10-11 14:07:11 +00:00
* is returned by reference . If the pop would block , will add to the read_pending
* queue in the channel . */
static int janet_channel_pop ( JanetChannel * channel , Janet * item , int is_choice ) {
2020-08-09 05:20:27 +00:00
JanetChannelPending writer ;
2020-10-11 14:07:11 +00:00
if ( janet_q_pop ( & channel - > items , item , sizeof ( Janet ) ) ) {
2020-08-01 18:13:58 +00:00
/* Queue empty */
2020-08-09 05:20:27 +00:00
JanetChannelPending pending ;
2021-07-17 01:59:03 +00:00
pending . fiber = janet_vm . root_fiber ,
pending . sched_id = janet_vm . root_fiber - > sched_id ;
2020-10-11 14:07:11 +00:00
pending . mode = is_choice ? JANET_CP_MODE_CHOICE_READ : JANET_CP_MODE_ITEM ;
2020-08-09 05:20:27 +00:00
janet_q_push ( & channel - > read_pending , & pending , sizeof ( pending ) ) ;
2020-10-11 14:07:11 +00:00
return 0 ;
2020-08-09 05:20:27 +00:00
}
if ( ! janet_q_pop ( & channel - > write_pending , & writer , sizeof ( writer ) ) ) {
2020-10-11 14:07:11 +00:00
/* pending writer */
if ( writer . mode = = JANET_CP_MODE_CHOICE_WRITE ) {
janet_schedule ( writer . fiber , make_write_result ( channel ) ) ;
} else {
janet_schedule ( writer . fiber , janet_wrap_abstract ( channel ) ) ;
}
}
return 1 ;
}
/* Channel Methods */
2021-07-26 05:25:38 +00:00
JANET_CORE_FN ( cfun_channel_push ,
2021-07-30 02:29:08 +00:00
" (ev/give channel value) " ,
2021-07-31 00:26:42 +00:00
" Write a value to a channel, suspending the current fiber if the channel is full. "
" Returns the channel if the write succeeded, nil otherwise. " ) {
2020-10-11 14:07:11 +00:00
janet_fixarity ( argc , 2 ) ;
JanetChannel * channel = janet_getabstract ( argv , 0 , & ChannelAT ) ;
2021-07-31 00:26:42 +00:00
if ( channel - > closed ) {
janet_panic ( " cannot write to closed channel " ) ;
}
2020-10-11 14:07:11 +00:00
if ( janet_channel_push ( channel , argv [ 1 ] , 0 ) ) {
janet_await ( ) ;
}
return argv [ 0 ] ;
}
2021-07-30 02:29:08 +00:00
JANET_CORE_FN ( cfun_channel_pop ,
" (ev/take channel) " ,
" Read from a channel, suspending the current fiber if no value is available. " ) {
2020-10-11 14:07:11 +00:00
janet_fixarity ( argc , 1 ) ;
JanetChannel * channel = janet_getabstract ( argv , 0 , & ChannelAT ) ;
Janet item ;
2021-07-31 00:26:42 +00:00
if ( channel - > closed ) return janet_wrap_nil ( ) ;
2020-10-11 14:07:11 +00:00
if ( janet_channel_pop ( channel , & item , 0 ) ) {
2021-07-17 01:59:03 +00:00
janet_schedule ( janet_vm . root_fiber , item ) ;
2020-08-09 05:20:27 +00:00
}
janet_await ( ) ;
}
2021-07-26 05:25:38 +00:00
JANET_CORE_FN ( cfun_channel_choice ,
2021-07-30 02:29:08 +00:00
" (ev/select & clauses) " ,
2021-07-31 00:26:42 +00:00
" Block until the first of several channel operations occur. Returns a tuple of the form [:give chan], [:take chan x], or [:close chan], where "
2021-07-30 02:29:08 +00:00
" a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for "
" a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first "
2021-07-31 00:26:42 +00:00
" clauses will take precedence over later clauses. Both and give and take operation can return a [:close chan] tuple, which indicates that the specified channel was closed. " ) {
2020-08-09 05:20:27 +00:00
janet_arity ( argc , 1 , - 1 ) ;
2020-10-11 14:07:11 +00:00
int32_t len ;
const Janet * data ;
/* Check channels for immediate reads and writes */
2020-08-09 05:20:27 +00:00
for ( int32_t i = 0 ; i < argc ; i + + ) {
2020-10-11 14:07:11 +00:00
if ( janet_indexed_view ( argv [ i ] , & data , & len ) & & len = = 2 ) {
/* Write */
JanetChannel * chan = janet_getabstract ( data , 0 , & ChannelAT ) ;
2021-07-31 00:26:42 +00:00
if ( chan - > closed ) continue ;
2020-10-11 14:07:11 +00:00
if ( janet_q_count ( & chan - > items ) < chan - > limit ) {
janet_channel_push ( chan , data [ 1 ] , 1 ) ;
return make_write_result ( chan ) ;
}
} else {
/* Read */
JanetChannel * chan = janet_getabstract ( argv , i , & ChannelAT ) ;
2021-07-31 00:26:42 +00:00
if ( chan - > closed ) continue ;
2020-10-11 14:07:11 +00:00
if ( chan - > items . head ! = chan - > items . tail ) {
Janet item ;
janet_channel_pop ( chan , & item , 1 ) ;
return make_read_result ( chan , item ) ;
}
}
2020-08-01 18:13:58 +00:00
}
2020-10-11 14:07:11 +00:00
/* Wait for all readers or writers */
2020-08-09 05:20:27 +00:00
for ( int32_t i = 0 ; i < argc ; i + + ) {
2020-10-11 14:07:11 +00:00
if ( janet_indexed_view ( argv [ i ] , & data , & len ) & & len = = 2 ) {
/* Write */
JanetChannel * chan = janet_getabstract ( data , 0 , & ChannelAT ) ;
2021-07-31 00:26:42 +00:00
if ( chan - > closed ) continue ;
2020-10-11 14:07:11 +00:00
janet_channel_push ( chan , data [ 1 ] , 1 ) ;
} else {
/* Read */
Janet item ;
JanetChannel * chan = janet_getabstract ( argv , i , & ChannelAT ) ;
2021-07-31 00:26:42 +00:00
if ( chan - > closed ) continue ;
2020-10-11 14:07:11 +00:00
janet_channel_pop ( chan , & item , 1 ) ;
}
2020-08-09 05:20:27 +00:00
}
2020-10-11 14:07:11 +00:00
2020-08-09 05:20:27 +00:00
janet_await ( ) ;
2020-08-01 18:13:58 +00:00
}
2021-07-26 05:25:38 +00:00
JANET_CORE_FN ( cfun_channel_full ,
2021-07-30 02:29:08 +00:00
" (ev/full channel) " ,
" Check if a channel is full or not. " ) {
2020-08-01 18:13:58 +00:00
janet_fixarity ( argc , 1 ) ;
JanetChannel * channel = janet_getabstract ( argv , 0 , & ChannelAT ) ;
2020-08-01 19:20:58 +00:00
return janet_wrap_boolean ( janet_q_count ( & channel - > items ) > = channel - > limit ) ;
2020-08-01 18:13:58 +00:00
}
2021-07-26 05:25:38 +00:00
JANET_CORE_FN ( cfun_channel_capacity ,
2021-07-30 02:29:08 +00:00
" (ev/capacity channel) " ,
" Get the number of items a channel will store before blocking writers. " ) {
2020-08-01 18:13:58 +00:00
janet_fixarity ( argc , 1 ) ;
JanetChannel * channel = janet_getabstract ( argv , 0 , & ChannelAT ) ;
return janet_wrap_integer ( channel - > limit ) ;
}
2021-07-26 05:25:38 +00:00
JANET_CORE_FN ( cfun_channel_count ,
2021-07-30 02:29:08 +00:00
" (ev/count channel) " ,
" Get the number of items currently waiting in a channel. " ) {
2020-08-01 18:13:58 +00:00
janet_fixarity ( argc , 1 ) ;
JanetChannel * channel = janet_getabstract ( argv , 0 , & ChannelAT ) ;
2020-08-01 19:20:58 +00:00
return janet_wrap_integer ( janet_q_count ( & channel - > items ) ) ;
2020-08-01 18:13:58 +00:00
}
2020-10-11 14:07:11 +00:00
/* Fisher yates shuffle of arguments to get fairness */
static void fisher_yates_args ( int32_t argc , Janet * argv ) {
2020-08-09 05:20:27 +00:00
for ( int32_t i = argc ; i > 1 ; i - - ) {
2021-07-17 01:59:03 +00:00
int32_t swap_index = janet_rng_u32 ( & janet_vm . ev_rng ) % i ;
2020-08-09 05:20:27 +00:00
Janet temp = argv [ swap_index ] ;
argv [ swap_index ] = argv [ i - 1 ] ;
argv [ i - 1 ] = temp ;
}
2020-10-11 14:07:11 +00:00
}
2021-07-26 05:25:38 +00:00
JANET_CORE_FN ( cfun_channel_rchoice ,
2021-07-30 02:29:08 +00:00
" (ev/rselect & clauses) " ,
" Similar to ev/select, but will try clauses in a random order for fairness. " ) {
2020-10-11 14:07:11 +00:00
fisher_yates_args ( argc , argv ) ;
return cfun_channel_choice ( argc , argv ) ;
2020-08-09 05:20:27 +00:00
}
2021-07-30 02:29:08 +00:00
JANET_CORE_FN ( cfun_channel_new ,
" (ev/chan &opt capacity) " ,
" Create a new channel. capacity is the number of values to queue before "
" blocking writers, defaults to 0 if not provided. Returns a new channel. " ) {
2020-08-01 18:13:58 +00:00
janet_arity ( argc , 0 , 1 ) ;
2020-08-03 12:57:02 +00:00
int32_t limit = janet_optnat ( argv , argc , 0 , 0 ) ;
2020-08-01 18:13:58 +00:00
JanetChannel * channel = janet_abstract ( & ChannelAT , sizeof ( JanetChannel ) ) ;
janet_chan_init ( channel , limit ) ;
return janet_wrap_abstract ( channel ) ;
}
2021-07-31 00:26:42 +00:00
JANET_CORE_FN ( cfun_channel_close ,
" (ev/chan-close chan) " ,
" Close a channel. A closed channel will cause all pending reads and writes to return nil. "
" Returns the channel. " ) {
janet_fixarity ( argc , 1 ) ;
JanetChannel * channel = janet_getabstract ( argv , 0 , & ChannelAT ) ;
if ( ! channel - > closed ) {
channel - > closed = 1 ;
JanetChannelPending writer ;
while ( ! janet_q_pop ( & channel - > write_pending , & writer , sizeof ( writer ) ) ) {
if ( writer . mode = = JANET_CP_MODE_CHOICE_WRITE ) {
janet_schedule ( writer . fiber , janet_wrap_nil ( ) ) ;
} else {
janet_schedule ( writer . fiber , make_close_result ( channel ) ) ;
}
}
JanetChannelPending reader ;
while ( ! janet_q_pop ( & channel - > read_pending , & reader , sizeof ( reader ) ) ) {
if ( reader . mode = = JANET_CP_MODE_CHOICE_READ ) {
janet_schedule ( reader . fiber , janet_wrap_nil ( ) ) ;
} else {
janet_schedule ( reader . fiber , make_close_result ( channel ) ) ;
}
}
}
return janet_wrap_abstract ( channel ) ;
}
2021-01-12 05:14:07 +00:00
static const JanetMethod ev_chanat_methods [ ] = {
{ " select " , cfun_channel_choice } ,
{ " rselect " , cfun_channel_rchoice } ,
{ " count " , cfun_channel_count } ,
{ " take " , cfun_channel_pop } ,
{ " give " , cfun_channel_push } ,
{ " capacity " , cfun_channel_capacity } ,
{ " full " , cfun_channel_full } ,
2021-07-31 00:26:42 +00:00
{ " close " , cfun_channel_close } ,
2021-01-12 05:14:07 +00:00
{ NULL , NULL }
} ;
static int janet_chanat_get ( void * p , Janet key , Janet * out ) {
( void ) p ;
if ( ! janet_checktype ( key , JANET_KEYWORD ) ) return 0 ;
return janet_getmethod ( janet_unwrap_keyword ( key ) , ev_chanat_methods , out ) ;
}
static Janet janet_chanat_next ( void * p , Janet key ) {
( void ) p ;
return janet_nextmethod ( ev_chanat_methods , key ) ;
}
2020-05-28 21:51:11 +00:00
/* Main event loop */
2020-10-11 14:07:11 +00:00
void janet_loop1_impl ( int has_timeout , JanetTimestamp timeout ) ;
2020-05-28 21:51:11 +00:00
2021-07-25 01:30:36 +00:00
int janet_loop_done ( void ) {
return ! ( janet_vm . listener_count | |
2021-07-26 02:47:52 +00:00
( janet_vm . spawn . head ! = janet_vm . spawn . tail ) | |
janet_vm . tq_count | |
janet_vm . extra_listeners ) ;
2021-07-25 01:30:36 +00:00
}
JanetFiber * janet_loop1 ( void ) {
2020-10-11 14:07:11 +00:00
/* Schedule expired timers */
JanetTimeout to ;
JanetTimestamp now = ts_now ( ) ;
while ( peek_timeout ( & to ) & & to . when < = now ) {
pop_timeout ( 0 ) ;
2020-12-05 16:32:34 +00:00
if ( to . curr_fiber ! = NULL ) {
/* This is a deadline (for a fiber, not a function call) */
JanetFiberStatus s = janet_fiber_status ( to . curr_fiber ) ;
int isFinished = s = = ( JANET_STATUS_DEAD | |
2020-12-06 20:33:08 +00:00
s = = JANET_STATUS_ERROR | |
s = = JANET_STATUS_USER0 | |
s = = JANET_STATUS_USER1 | |
s = = JANET_STATUS_USER2 | |
s = = JANET_STATUS_USER3 | |
s = = JANET_STATUS_USER4 ) ;
if ( ! isFinished ) {
janet_cancel ( to . fiber , janet_cstringv ( " deadline expired " ) ) ;
}
2020-12-05 16:32:34 +00:00
} else {
/* This is a timeout (for a function call, not a whole fiber) */
if ( to . fiber - > sched_id = = to . sched_id ) {
if ( to . is_error ) {
janet_cancel ( to . fiber , janet_cstringv ( " timeout " ) ) ;
} else {
janet_schedule ( to . fiber , janet_wrap_nil ( ) ) ;
}
2020-10-11 14:07:11 +00:00
}
}
}
2020-11-29 21:36:21 +00:00
2020-10-11 14:07:11 +00:00
/* Run scheduled fibers */
2021-07-17 01:59:03 +00:00
while ( janet_vm . spawn . head ! = janet_vm . spawn . tail ) {
2020-10-11 14:07:11 +00:00
JanetTask task = { NULL , janet_wrap_nil ( ) , JANET_SIGNAL_OK } ;
2021-07-17 01:59:03 +00:00
janet_q_pop ( & janet_vm . spawn , & task , sizeof ( task ) ) ;
2021-07-25 01:30:36 +00:00
task . fiber - > flags & = ~ JANET_FIBER_FLAG_SCHEDULED ;
Janet res ;
JanetSignal sig = janet_continue_signal ( task . fiber , task . value , & res , task . sig ) ;
JanetChannel * chan = ( JanetChannel * ) ( task . fiber - > supervisor_channel ) ;
if ( NULL = = chan ) {
if ( sig ! = JANET_SIGNAL_EVENT & & sig ! = JANET_SIGNAL_YIELD & & sig ! = JANET_SIGNAL_INTERRUPT ) {
janet_stacktrace ( task . fiber , res ) ;
}
} else if ( sig = = JANET_SIGNAL_OK | | ( task . fiber - > flags & ( 1 < < sig ) ) ) {
janet_channel_push ( chan , make_supervisor_event ( janet_signal_names [ sig ] , task . fiber ) , 2 ) ;
}
if ( sig = = JANET_SIGNAL_INTERRUPT ) {
/* On interrupts, return the interrupted fiber immediately */
return task . fiber ;
}
2020-10-11 14:07:11 +00:00
}
2020-11-29 21:36:21 +00:00
2020-10-11 14:07:11 +00:00
/* Poll for events */
2021-07-17 01:59:03 +00:00
if ( janet_vm . listener_count | | janet_vm . tq_count | | janet_vm . extra_listeners ) {
2020-05-28 21:51:11 +00:00
JanetTimeout to ;
2020-10-11 14:07:11 +00:00
memset ( & to , 0 , sizeof ( to ) ) ;
int has_timeout ;
/* Drop timeouts that are no longer needed */
2020-12-05 16:32:34 +00:00
while ( ( has_timeout = peek_timeout ( & to ) ) & & ( to . curr_fiber = = NULL ) & & to . fiber - > sched_id ! = to . sched_id ) {
2020-07-05 22:26:17 +00:00
pop_timeout ( 0 ) ;
2020-05-28 21:51:11 +00:00
}
2020-11-17 00:46:59 +00:00
/* Run polling implementation only if pending timeouts or pending events */
2021-07-17 01:59:03 +00:00
if ( janet_vm . tq_count | | janet_vm . listener_count | | janet_vm . extra_listeners ) {
2020-11-17 00:46:59 +00:00
janet_loop1_impl ( has_timeout , to . when ) ;
}
2020-05-28 21:51:11 +00:00
}
2021-07-25 01:30:36 +00:00
/* No fiber was interrupted */
return NULL ;
}
/* Same as janet_interpreter_interrupt, but will also
* break out of the event loop if waiting for an event
* ( say , waiting for ev / sleep to finish ) . Does this by pushing
* an empty event to the event loop . */
void janet_loop1_interrupt ( JanetVM * vm ) {
janet_interpreter_interrupt ( vm ) ;
JanetEVGenericMessage msg = { 0 } ;
JanetCallback cb = NULL ;
janet_ev_post_event ( vm , cb , msg ) ;
2020-05-28 21:51:11 +00:00
}
2020-10-11 14:07:11 +00:00
void janet_loop ( void ) {
2021-07-25 01:30:36 +00:00
while ( ! janet_loop_done ( ) ) {
JanetFiber * interrupted_fiber = janet_loop1 ( ) ;
if ( NULL ! = interrupted_fiber ) {
janet_schedule ( interrupted_fiber , janet_wrap_nil ( ) ) ;
}
2020-10-11 14:07:11 +00:00
}
}
2020-05-28 21:51:11 +00:00
2020-11-29 21:36:21 +00:00
/*
2020-12-31 17:22:18 +00:00
* Self - pipe handling code .
2020-11-29 21:36:21 +00:00
*/
# ifdef JANET_WINDOWS
2021-01-03 17:04:21 +00:00
/* On windows, use PostQueuedCompletionStatus instead for
* custom events */
2020-11-29 21:36:21 +00:00
# else
2021-01-03 17:04:21 +00:00
static void janet_ev_setup_selfpipe ( void ) {
2021-07-17 01:59:03 +00:00
if ( janet_make_pipe ( janet_vm . selfpipe , 0 ) ) {
2021-01-03 17:04:21 +00:00
JANET_EXIT ( " failed to initialize self pipe in event loop " ) ;
}
}
2020-11-29 21:36:21 +00:00
2020-12-31 17:22:18 +00:00
/* Handle events from the self pipe inside the event loop */
static void janet_ev_handle_selfpipe ( void ) {
2020-12-31 22:12:42 +00:00
JanetSelfPipeEvent response ;
2021-07-17 01:59:03 +00:00
while ( read ( janet_vm . selfpipe [ 0 ] , & response , sizeof ( response ) ) > 0 ) {
2021-07-25 01:30:36 +00:00
if ( NULL ! = response . cb ) {
response . cb ( response . msg ) ;
}
2020-11-29 21:36:21 +00:00
}
}
2020-12-31 17:22:18 +00:00
static void janet_ev_cleanup_selfpipe ( void ) {
2021-07-17 01:59:03 +00:00
close ( janet_vm . selfpipe [ 0 ] ) ;
close ( janet_vm . selfpipe [ 1 ] ) ;
2020-12-31 17:22:18 +00:00
}
2020-11-29 21:36:21 +00:00
# endif
2020-10-04 17:46:15 +00:00
# ifdef JANET_WINDOWS
static JanetTimestamp ts_now ( void ) {
2020-10-04 20:18:31 +00:00
return ( JanetTimestamp ) GetTickCount64 ( ) ;
2020-10-04 17:46:15 +00:00
}
void janet_ev_init ( void ) {
janet_ev_init_common ( ) ;
2021-07-17 01:59:03 +00:00
janet_vm . iocp = CreateIoCompletionPort ( INVALID_HANDLE_VALUE , NULL , 0 , 0 ) ;
if ( NULL = = janet_vm . iocp ) janet_panic ( " could not create io completion port " ) ;
2020-10-04 17:46:15 +00:00
}
void janet_ev_deinit ( void ) {
janet_ev_deinit_common ( ) ;
2021-07-17 01:59:03 +00:00
CloseHandle ( janet_vm . iocp ) ;
2020-10-04 17:46:15 +00:00
}
2020-11-14 20:28:10 +00:00
JanetListenerState * janet_listen ( JanetStream * stream , JanetListener behavior , int mask , size_t size , void * user ) {
2020-11-08 16:38:28 +00:00
/* Add the handle to the io completion port if not already added */
2020-11-14 20:28:10 +00:00
JanetListenerState * state = janet_listen_impl ( stream , behavior , mask , size , user ) ;
2020-11-14 20:40:39 +00:00
if ( ! ( stream - > flags & JANET_STREAM_IOCP ) ) {
2021-07-17 01:59:03 +00:00
if ( NULL = = CreateIoCompletionPort ( stream - > handle , janet_vm . iocp , ( ULONG_PTR ) stream , 0 ) ) {
2021-01-11 17:10:23 +00:00
janet_panicf ( " failed to listen for events: %V " , janet_ev_lasterr ( ) ) ;
2020-11-08 16:38:28 +00:00
}
2020-11-14 20:55:26 +00:00
stream - > flags | = JANET_STREAM_IOCP ;
2020-11-08 16:38:28 +00:00
}
2020-10-04 17:46:15 +00:00
return state ;
}
2021-07-21 00:42:35 +00:00
static void janet_unlisten ( JanetListenerState * state , int is_gc ) {
janet_unlisten_impl ( state , is_gc ) ;
2020-10-04 17:46:15 +00:00
}
2020-11-02 14:19:53 +00:00
void janet_loop1_impl ( int has_timeout , JanetTimestamp to ) {
2020-10-04 17:46:15 +00:00
ULONG_PTR completionKey = 0 ;
DWORD num_bytes_transfered = 0 ;
2021-01-18 17:43:53 +00:00
LPOVERLAPPED overlapped = NULL ;
2020-10-04 17:46:15 +00:00
/* Calculate how long to wait before timeout */
uint64_t waittime ;
if ( has_timeout ) {
JanetTimestamp now = ts_now ( ) ;
2020-11-02 14:27:45 +00:00
if ( now > to ) {
2020-10-04 17:46:15 +00:00
waittime = 0 ;
} else {
2020-11-02 14:27:45 +00:00
waittime = ( uint64_t ) ( to - now ) ;
2020-10-04 17:46:15 +00:00
}
} else {
waittime = INFINITE ;
}
2021-07-17 01:59:03 +00:00
BOOL result = GetQueuedCompletionStatus ( janet_vm . iocp , & num_bytes_transfered , & completionKey , & overlapped , ( DWORD ) waittime ) ;
2020-10-04 17:46:15 +00:00
2021-01-18 02:41:59 +00:00
if ( result | | overlapped ) {
if ( 0 = = completionKey ) {
/* Custom event */
JanetSelfPipeEvent * response = ( JanetSelfPipeEvent * ) ( overlapped ) ;
2021-07-25 01:30:36 +00:00
if ( NULL ! = response - > cb ) {
response - > cb ( response - > msg ) ;
}
2021-03-23 10:00:48 +00:00
janet_free ( response ) ;
2021-01-18 02:41:59 +00:00
} else {
/* Normal event */
JanetStream * stream = ( JanetStream * ) completionKey ;
JanetListenerState * state = stream - > state ;
while ( state ! = NULL ) {
if ( state - > tag = = overlapped ) {
state - > event = overlapped ;
state - > bytes = num_bytes_transfered ;
JanetAsyncStatus status = state - > machine ( state , JANET_ASYNC_EVENT_COMPLETE ) ;
if ( status = = JANET_ASYNC_STATUS_DONE ) {
2021-07-21 00:42:35 +00:00
janet_unlisten ( state , 0 ) ;
2021-01-18 02:41:59 +00:00
}
break ;
} else {
state = state - > _next ;
2020-11-11 21:35:44 +00:00
}
2020-11-08 16:38:28 +00:00
}
}
2020-10-04 17:46:15 +00:00
}
}
2020-11-16 01:40:47 +00:00
# elif defined(JANET_EV_EPOLL)
2020-05-28 21:51:11 +00:00
static JanetTimestamp ts_now ( void ) {
struct timespec now ;
janet_assert ( - 1 ! = clock_gettime ( CLOCK_MONOTONIC , & now ) , " failed to get time " ) ;
uint64_t res = 1000 * now . tv_sec ;
res + = now . tv_nsec / 1000000 ;
return res ;
}
2020-05-28 15:39:40 +00:00
static int make_epoll_events ( int mask ) {
2021-03-26 20:36:50 +00:00
int events = 0 ;
2020-06-02 03:06:27 +00:00
if ( mask & JANET_ASYNC_LISTEN_READ )
2020-05-28 15:39:40 +00:00
events | = EPOLLIN ;
2020-06-02 03:06:27 +00:00
if ( mask & JANET_ASYNC_LISTEN_WRITE )
2020-05-28 15:39:40 +00:00
events | = EPOLLOUT ;
return events ;
}
2021-07-26 02:47:52 +00:00
static void janet_epoll_sync_callback ( JanetEVGenericMessage msg ) {
JanetListenerState * state = msg . argp ;
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE ;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE ;
if ( state - > stream - > _mask & JANET_ASYNC_LISTEN_WRITE )
status1 = state - > machine ( state , JANET_ASYNC_EVENT_WRITE ) ;
if ( state - > stream - > _mask & JANET_ASYNC_LISTEN_WRITE )
status2 = state - > machine ( state , JANET_ASYNC_EVENT_READ ) ;
if ( status1 = = JANET_ASYNC_STATUS_DONE | |
status2 = = JANET_ASYNC_STATUS_DONE ) {
janet_unlisten ( state , 0 ) ;
} else {
/* Repost event */
janet_ev_post_event ( NULL , janet_epoll_sync_callback , msg ) ;
}
}
2020-05-28 15:39:40 +00:00
/* Wait for the next event */
2020-11-14 20:28:10 +00:00
JanetListenerState * janet_listen ( JanetStream * stream , JanetListener behavior , int mask , size_t size , void * user ) {
int is_first = ! ( stream - > state ) ;
2020-05-28 15:39:40 +00:00
int op = is_first ? EPOLL_CTL_ADD : EPOLL_CTL_MOD ;
2020-11-14 20:28:10 +00:00
JanetListenerState * state = janet_listen_impl ( stream , behavior , mask , size , user ) ;
2020-05-28 15:39:40 +00:00
struct epoll_event ev ;
2020-11-14 20:28:10 +00:00
ev . events = make_epoll_events ( state - > stream - > _mask ) ;
ev . data . ptr = stream ;
2020-05-28 15:39:40 +00:00
int status ;
do {
2021-07-17 01:59:03 +00:00
status = epoll_ctl ( janet_vm . epoll , op , stream - > handle , & ev ) ;
2020-05-28 15:39:40 +00:00
} while ( status = = - 1 & & errno = = EINTR ) ;
if ( status = = - 1 ) {
2021-07-26 02:47:52 +00:00
if ( errno = = EPERM ) {
/* Couldn't add to event loop, so assume that it completes
* synchronously . In that case , fire the completion
* event manually , since this should be a read or write
* event to a file . So we just post a custom event to do the read / write
* asap . */
/* Use flag to indicate state is not registered in epoll */
state - > _mask | = ( 1 < < JANET_ASYNC_EVENT_COMPLETE ) ;
JanetEVGenericMessage msg = { 0 } ;
msg . argp = state ;
janet_ev_post_event ( NULL , janet_epoll_sync_callback , msg ) ;
} else {
/* Unexpected error */
janet_unlisten_impl ( state , 0 ) ;
janet_panicv ( janet_ev_lasterr ( ) ) ;
}
2020-05-28 15:39:40 +00:00
}
return state ;
}
/* Tell system we are done listening for a certain event */
2021-07-21 00:42:35 +00:00
static void janet_unlisten ( JanetListenerState * state , int is_gc ) {
2020-11-14 20:28:10 +00:00
JanetStream * stream = state - > stream ;
if ( ! ( stream - > flags & JANET_STREAM_CLOSED ) ) {
2021-07-26 02:47:52 +00:00
/* Use flag to indicate state is not registered in epoll */
if ( ! ( state - > _mask & ( 1 < < JANET_ASYNC_EVENT_COMPLETE ) ) ) {
int is_last = ( state - > _next = = NULL & & stream - > state = = state ) ;
int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD ;
struct epoll_event ev ;
ev . events = make_epoll_events ( stream - > _mask & ~ state - > _mask ) ;
ev . data . ptr = stream ;
int status ;
do {
status = epoll_ctl ( janet_vm . epoll , op , stream - > handle , & ev ) ;
} while ( status = = - 1 & & errno = = EINTR ) ;
if ( status = = - 1 ) {
janet_panicv ( janet_ev_lasterr ( ) ) ;
}
2020-11-14 20:28:10 +00:00
}
2020-05-28 15:39:40 +00:00
}
/* Destroy state machine and free memory */
2021-07-21 00:42:35 +00:00
janet_unlisten_impl ( state , is_gc ) ;
2020-05-28 15:39:40 +00:00
}
# define JANET_EPOLL_MAX_EVENTS 64
2020-10-11 14:07:11 +00:00
void janet_loop1_impl ( int has_timeout , JanetTimestamp timeout ) {
2020-11-16 01:40:47 +00:00
struct itimerspec its ;
2021-07-17 01:59:03 +00:00
if ( janet_vm . timer_enabled | | has_timeout ) {
2020-05-28 21:51:11 +00:00
memset ( & its , 0 , sizeof ( its ) ) ;
if ( has_timeout ) {
2020-10-11 14:07:11 +00:00
its . it_value . tv_sec = timeout / 1000 ;
its . it_value . tv_nsec = ( timeout % 1000 ) * 1000000 ;
2020-05-28 21:51:11 +00:00
}
2021-07-17 01:59:03 +00:00
timerfd_settime ( janet_vm . timerfd , TFD_TIMER_ABSTIME , & its , NULL ) ;
2020-05-28 21:51:11 +00:00
}
2021-07-17 01:59:03 +00:00
janet_vm . timer_enabled = has_timeout ;
2020-05-28 21:51:11 +00:00
2020-05-28 15:39:40 +00:00
/* Poll for events */
struct epoll_event events [ JANET_EPOLL_MAX_EVENTS ] ;
int ready ;
do {
2021-07-17 01:59:03 +00:00
ready = epoll_wait ( janet_vm . epoll , events , JANET_EPOLL_MAX_EVENTS , - 1 ) ;
2020-05-28 15:39:40 +00:00
} while ( ready = = - 1 & & errno = = EINTR ) ;
if ( ready = = - 1 ) {
JANET_EXIT ( " failed to poll events " ) ;
}
2020-05-28 21:51:11 +00:00
2020-05-28 15:39:40 +00:00
/* Step state machines */
for ( int i = 0 ; i < ready ; i + + ) {
2020-11-29 21:36:21 +00:00
void * p = events [ i ] . data . ptr ;
2021-07-17 01:59:03 +00:00
if ( & janet_vm . timerfd = = p ) {
2020-11-29 21:36:21 +00:00
/* Timer expired, ignore */ ;
2021-07-17 01:59:03 +00:00
} else if ( janet_vm . selfpipe = = p ) {
2020-12-31 17:22:18 +00:00
/* Self-pipe handling */
janet_ev_handle_selfpipe ( ) ;
2020-11-29 21:36:21 +00:00
} else {
JanetStream * stream = p ;
2020-05-28 21:51:11 +00:00
int mask = events [ i ] . events ;
2020-11-14 20:28:10 +00:00
JanetListenerState * state = stream - > state ;
2020-10-04 17:46:15 +00:00
state - > event = events + i ;
2020-05-28 21:51:11 +00:00
while ( NULL ! = state ) {
2020-05-30 16:29:58 +00:00
JanetListenerState * next_state = state - > _next ;
2020-06-02 03:06:27 +00:00
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE ;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE ;
2020-11-16 01:40:47 +00:00
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE ;
JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE ;
2020-05-28 21:51:11 +00:00
if ( mask & EPOLLOUT )
2020-06-02 03:06:27 +00:00
status1 = state - > machine ( state , JANET_ASYNC_EVENT_WRITE ) ;
if ( mask & EPOLLIN )
status2 = state - > machine ( state , JANET_ASYNC_EVENT_READ ) ;
2020-11-14 20:28:10 +00:00
if ( mask & EPOLLERR )
status3 = state - > machine ( state , JANET_ASYNC_EVENT_ERR ) ;
2021-03-26 20:36:25 +00:00
if ( ( mask & EPOLLHUP ) & & ! ( mask & ( EPOLLOUT | EPOLLIN ) ) )
2020-11-14 20:28:10 +00:00
status4 = state - > machine ( state , JANET_ASYNC_EVENT_HUP ) ;
if ( status1 = = JANET_ASYNC_STATUS_DONE | |
status2 = = JANET_ASYNC_STATUS_DONE | |
status3 = = JANET_ASYNC_STATUS_DONE | |
status4 = = JANET_ASYNC_STATUS_DONE )
2021-07-21 00:42:35 +00:00
janet_unlisten ( state , 0 ) ;
2020-05-30 16:29:58 +00:00
state = next_state ;
2020-05-28 21:51:11 +00:00
}
2020-05-28 15:39:40 +00:00
}
}
}
void janet_ev_init ( void ) {
janet_ev_init_common ( ) ;
2020-12-31 17:22:18 +00:00
janet_ev_setup_selfpipe ( ) ;
2021-07-17 01:59:03 +00:00
janet_vm . epoll = epoll_create1 ( EPOLL_CLOEXEC ) ;
janet_vm . timerfd = timerfd_create ( CLOCK_MONOTONIC , TFD_CLOEXEC | TFD_NONBLOCK ) ;
janet_vm . timer_enabled = 0 ;
if ( janet_vm . epoll = = - 1 | | janet_vm . timerfd = = - 1 ) goto error ;
2020-05-28 21:51:11 +00:00
struct epoll_event ev ;
ev . events = EPOLLIN | EPOLLET ;
2021-07-17 01:59:03 +00:00
ev . data . ptr = & janet_vm . timerfd ;
if ( - 1 = = epoll_ctl ( janet_vm . epoll , EPOLL_CTL_ADD , janet_vm . timerfd , & ev ) ) goto error ;
2020-11-29 21:36:21 +00:00
ev . events = EPOLLIN | EPOLLET ;
2021-07-17 01:59:03 +00:00
ev . data . ptr = janet_vm . selfpipe ;
if ( - 1 = = epoll_ctl ( janet_vm . epoll , EPOLL_CTL_ADD , janet_vm . selfpipe [ 0 ] , & ev ) ) goto error ;
2020-05-28 21:51:11 +00:00
return ;
error :
JANET_EXIT ( " failed to initialize event loop " ) ;
2020-05-28 15:39:40 +00:00
}
void janet_ev_deinit ( void ) {
janet_ev_deinit_common ( ) ;
2021-07-17 01:59:03 +00:00
close ( janet_vm . epoll ) ;
close ( janet_vm . timerfd ) ;
2020-12-31 17:22:18 +00:00
janet_ev_cleanup_selfpipe ( ) ;
2021-07-17 01:59:03 +00:00
janet_vm . epoll = 0 ;
2020-05-28 15:39:40 +00:00
}
/*
* End epoll implementation
*/
2020-05-29 00:14:35 +00:00
# else
2020-09-07 17:52:50 +00:00
# include <poll.h>
2020-05-29 00:14:35 +00:00
2020-09-07 17:52:50 +00:00
static JanetTimestamp ts_now ( void ) {
struct timespec now ;
janet_assert ( - 1 ! = clock_gettime ( CLOCK_REALTIME , & now ) , " failed to get time " ) ;
uint64_t res = 1000 * now . tv_sec ;
res + = now . tv_nsec / 1000000 ;
return res ;
}
static int make_poll_events ( int mask ) {
int events = 0 ;
if ( mask & JANET_ASYNC_LISTEN_READ )
events | = POLLIN ;
if ( mask & JANET_ASYNC_LISTEN_WRITE )
events | = POLLOUT ;
return events ;
}
2020-11-16 15:30:04 +00:00
/* Wait for the next event */
JanetListenerState * janet_listen ( JanetStream * stream , JanetListener behavior , int mask , size_t size , void * user ) {
2021-07-17 01:59:03 +00:00
size_t oldsize = janet_vm . listener_cap ;
2020-11-16 15:30:04 +00:00
JanetListenerState * state = janet_listen_impl ( stream , behavior , mask , size , user ) ;
2021-07-17 01:59:03 +00:00
size_t newsize = janet_vm . listener_cap ;
2020-11-16 15:30:04 +00:00
if ( newsize > oldsize ) {
2021-07-17 01:59:03 +00:00
janet_vm . fds = janet_realloc ( janet_vm . fds , ( newsize + 1 ) * sizeof ( struct pollfd ) ) ;
if ( NULL = = janet_vm . fds ) {
2020-09-07 17:52:50 +00:00
JANET_OUT_OF_MEMORY ;
}
}
struct pollfd ev ;
2020-11-14 20:28:10 +00:00
ev . fd = stream - > handle ;
ev . events = make_poll_events ( state - > stream - > _mask ) ;
2020-09-07 17:52:50 +00:00
ev . revents = 0 ;
2021-07-17 01:59:03 +00:00
janet_vm . fds [ state - > _index + 1 ] = ev ;
2020-09-07 17:52:50 +00:00
return state ;
}
2021-07-21 00:42:35 +00:00
static void janet_unlisten ( JanetListenerState * state , int is_gc ) {
2021-07-17 01:59:03 +00:00
janet_vm . fds [ state - > _index + 1 ] = janet_vm . fds [ janet_vm . listener_count ] ;
2021-07-21 00:42:35 +00:00
janet_unlisten_impl ( state , is_gc ) ;
2020-09-07 17:52:50 +00:00
}
2020-10-11 14:07:11 +00:00
void janet_loop1_impl ( int has_timeout , JanetTimestamp timeout ) {
2020-09-07 17:52:50 +00:00
/* Poll for events */
int ready ;
do {
2020-11-16 15:30:04 +00:00
int to = - 1 ;
2020-09-07 17:52:50 +00:00
if ( has_timeout ) {
2020-10-11 14:07:11 +00:00
JanetTimestamp now = ts_now ( ) ;
2020-11-16 15:30:04 +00:00
to = now > timeout ? 0 : ( int ) ( timeout - now ) ;
2020-09-07 17:52:50 +00:00
}
2021-07-17 01:59:03 +00:00
ready = poll ( janet_vm . fds , janet_vm . listener_count + 1 , to ) ;
2020-09-07 17:52:50 +00:00
} while ( ready = = - 1 & & errno = = EINTR ) ;
if ( ready = = - 1 ) {
JANET_EXIT ( " failed to poll events " ) ;
}
2020-05-29 00:14:35 +00:00
2020-11-29 21:36:21 +00:00
/* Check selfpipe */
2021-07-17 01:59:03 +00:00
if ( janet_vm . fds [ 0 ] . revents & POLLIN ) {
janet_vm . fds [ 0 ] . revents = 0 ;
2020-12-31 17:22:18 +00:00
janet_ev_handle_selfpipe ( ) ;
2020-11-29 21:36:21 +00:00
}
2020-09-07 17:52:50 +00:00
/* Step state machines */
2021-07-17 01:59:03 +00:00
for ( size_t i = 0 ; i < janet_vm . listener_count ; i + + ) {
struct pollfd * pfd = janet_vm . fds + i + 1 ;
2020-09-07 17:52:50 +00:00
/* Skip fds where nothing interesting happened */
2021-07-17 01:59:03 +00:00
JanetListenerState * state = janet_vm . listeners [ i ] ;
2020-09-07 17:52:50 +00:00
/* Normal event */
2020-11-29 21:36:21 +00:00
int mask = pfd - > revents ;
2020-09-07 17:52:50 +00:00
JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE ;
JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE ;
2020-11-14 20:28:10 +00:00
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE ;
JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE ;
2020-10-04 17:46:15 +00:00
state - > event = pfd ;
2020-09-07 17:52:50 +00:00
if ( mask & POLLOUT )
status1 = state - > machine ( state , JANET_ASYNC_EVENT_WRITE ) ;
if ( mask & POLLIN )
status2 = state - > machine ( state , JANET_ASYNC_EVENT_READ ) ;
2020-11-14 20:28:10 +00:00
if ( mask & POLLERR )
2021-01-12 05:14:07 +00:00
status3 = state - > machine ( state , JANET_ASYNC_EVENT_ERR ) ;
2021-03-26 20:36:25 +00:00
if ( ( mask & POLLHUP ) & & ! ( mask & ( POLLIN | POLLOUT ) ) )
2021-01-12 05:14:07 +00:00
status4 = state - > machine ( state , JANET_ASYNC_EVENT_HUP ) ;
2020-11-14 20:28:10 +00:00
if ( status1 = = JANET_ASYNC_STATUS_DONE | |
status2 = = JANET_ASYNC_STATUS_DONE | |
status3 = = JANET_ASYNC_STATUS_DONE | |
status4 = = JANET_ASYNC_STATUS_DONE )
2021-07-21 00:42:35 +00:00
janet_unlisten ( state , 0 ) ;
2020-09-07 17:52:50 +00:00
}
}
void janet_ev_init ( void ) {
janet_ev_init_common ( ) ;
2021-07-17 01:59:03 +00:00
janet_vm . fds = NULL ;
2020-12-31 17:22:18 +00:00
janet_ev_setup_selfpipe ( ) ;
2021-07-17 01:59:03 +00:00
janet_vm . fds = janet_malloc ( sizeof ( struct pollfd ) ) ;
if ( NULL = = janet_vm . fds ) {
2020-11-29 21:36:21 +00:00
JANET_OUT_OF_MEMORY ;
}
2021-07-17 01:59:03 +00:00
janet_vm . fds [ 0 ] . fd = janet_vm . selfpipe [ 0 ] ;
janet_vm . fds [ 0 ] . events = POLLIN ;
janet_vm . fds [ 0 ] . revents = 0 ;
2020-09-07 17:52:50 +00:00
return ;
}
void janet_ev_deinit ( void ) {
janet_ev_deinit_common ( ) ;
2020-12-31 17:22:18 +00:00
janet_ev_cleanup_selfpipe ( ) ;
2021-07-17 01:59:03 +00:00
janet_free ( janet_vm . fds ) ;
janet_vm . fds = NULL ;
2020-09-07 17:52:50 +00:00
}
# endif
2020-05-29 00:14:35 +00:00
2021-01-03 17:04:21 +00:00
/*
* End poll implementation
*/
2021-07-25 01:30:36 +00:00
/*
* Generic Callback system . Post a function pointer + data to the event loop ( from another
* thread or even a signal handler ) . Allows posting events from another thread or signal handler .
*/
void janet_ev_post_event ( JanetVM * vm , JanetCallback cb , JanetEVGenericMessage msg ) {
vm = vm ? vm : & janet_vm ;
# ifdef JANET_WINDOWS
JanetHandle iocp = vm - > iocp ;
JanetSelfPipeEvent * event = janet_malloc ( sizeof ( JanetSelfPipeEvent ) ) ;
if ( NULL = = event ) {
JANET_OUT_OF_MEMORY ;
}
event - > msg = msg ;
event - > cb = cb ;
janet_assert ( PostQueuedCompletionStatus ( iocp ,
2021-07-26 02:47:52 +00:00
sizeof ( JanetSelfPipeEvent ) ,
0 ,
( LPOVERLAPPED ) event ) ,
" failed to post completion event " ) ;
2021-07-25 01:30:36 +00:00
# else
JanetSelfPipeEvent event ;
event . msg = msg ;
event . cb = cb ;
2021-07-26 02:47:52 +00:00
int fd = vm - > selfpipe [ 1 ] ;
2021-07-25 01:30:36 +00:00
/* handle a bit of back pressure before giving up. */
int tries = 4 ;
while ( tries > 0 ) {
int status ;
do {
status = write ( fd , & event , sizeof ( event ) ) ;
} while ( status = = - 1 & & errno = = EINTR ) ;
if ( status > 0 ) break ;
sleep ( 1 ) ;
tries - - ;
}
janet_assert ( tries > 0 , " failed to write event to self-pipe " ) ;
# endif
}
2021-01-03 17:04:21 +00:00
/*
* Threaded calls
*/
# ifdef JANET_WINDOWS
static DWORD WINAPI janet_thread_body ( LPVOID ptr ) {
JanetEVThreadInit * init = ( JanetEVThreadInit * ) ptr ;
JanetEVGenericMessage msg = init - > msg ;
JanetThreadedSubroutine subr = init - > subr ;
JanetThreadedCallback cb = init - > cb ;
JanetHandle iocp = init - > write_pipe ;
/* Reuse memory from thread init for returning data */
init - > msg = subr ( msg ) ;
init - > cb = cb ;
2021-01-03 17:21:44 +00:00
janet_assert ( PostQueuedCompletionStatus ( iocp ,
sizeof ( JanetSelfPipeEvent ) ,
0 ,
( LPOVERLAPPED ) init ) ,
2021-01-03 17:04:21 +00:00
" failed to post completion event " ) ;
return 0 ;
}
# else
static void * janet_thread_body ( void * ptr ) {
JanetEVThreadInit * init = ( JanetEVThreadInit * ) ptr ;
JanetEVGenericMessage msg = init - > msg ;
JanetThreadedSubroutine subr = init - > subr ;
JanetThreadedCallback cb = init - > cb ;
int fd = init - > write_pipe ;
2021-03-23 10:00:48 +00:00
janet_free ( init ) ;
2021-01-03 17:04:21 +00:00
JanetSelfPipeEvent response ;
response . msg = subr ( msg ) ;
response . cb = cb ;
/* handle a bit of back pressure before giving up. */
int tries = 4 ;
while ( tries > 0 ) {
int status ;
do {
status = write ( fd , & response , sizeof ( response ) ) ;
} while ( status = = - 1 & & errno = = EINTR ) ;
if ( status > 0 ) break ;
sleep ( 1 ) ;
tries - - ;
}
return NULL ;
}
# endif
void janet_ev_threaded_call ( JanetThreadedSubroutine fp , JanetEVGenericMessage arguments , JanetThreadedCallback cb ) {
2021-03-23 10:00:48 +00:00
JanetEVThreadInit * init = janet_malloc ( sizeof ( JanetEVThreadInit ) ) ;
2021-01-03 17:04:21 +00:00
if ( NULL = = init ) {
JANET_OUT_OF_MEMORY ;
}
init - > msg = arguments ;
init - > subr = fp ;
init - > cb = cb ;
# ifdef JANET_WINDOWS
2021-07-17 01:59:03 +00:00
init - > write_pipe = janet_vm . iocp ;
2021-01-03 17:04:21 +00:00
HANDLE thread_handle = CreateThread ( NULL , 0 , janet_thread_body , init , 0 , NULL ) ;
if ( NULL = = thread_handle ) {
2021-03-23 10:00:48 +00:00
janet_free ( init ) ;
2021-01-03 17:04:21 +00:00
janet_panic ( " failed to create thread " ) ;
}
CloseHandle ( thread_handle ) ; /* detach from thread */
# else
2021-07-17 01:59:03 +00:00
init - > write_pipe = janet_vm . selfpipe [ 1 ] ;
2021-01-03 17:04:21 +00:00
pthread_t waiter_thread ;
int err = pthread_create ( & waiter_thread , NULL , janet_thread_body , init ) ;
if ( err ) {
2021-03-23 10:00:48 +00:00
janet_free ( init ) ;
2021-01-03 17:04:21 +00:00
janet_panicf ( " %s " , strerror ( err ) ) ;
}
pthread_detach ( waiter_thread ) ;
# endif
/* Increment ev refcount so we don't quit while waiting for a subprocess */
janet_ev_inc_refcount ( ) ;
}
/* Default callback for janet_ev_threaded_await. */
void janet_ev_default_threaded_callback ( JanetEVGenericMessage return_value ) {
2021-07-25 01:30:36 +00:00
janet_ev_dec_refcount ( ) ;
2021-07-17 20:13:28 +00:00
if ( return_value . fiber = = NULL ) {
return ;
}
2021-01-03 17:04:21 +00:00
switch ( return_value . tag ) {
default :
case JANET_EV_TCTAG_NIL :
janet_schedule ( return_value . fiber , janet_wrap_nil ( ) ) ;
break ;
case JANET_EV_TCTAG_INTEGER :
janet_schedule ( return_value . fiber , janet_wrap_integer ( return_value . argi ) ) ;
break ;
case JANET_EV_TCTAG_STRING :
case JANET_EV_TCTAG_STRINGF :
janet_schedule ( return_value . fiber , janet_cstringv ( ( const char * ) return_value . argp ) ) ;
2021-03-23 10:00:48 +00:00
if ( return_value . tag = = JANET_EV_TCTAG_STRINGF ) janet_free ( return_value . argp ) ;
2021-01-03 17:04:21 +00:00
break ;
case JANET_EV_TCTAG_KEYWORD :
janet_schedule ( return_value . fiber , janet_ckeywordv ( ( const char * ) return_value . argp ) ) ;
break ;
case JANET_EV_TCTAG_ERR_STRING :
case JANET_EV_TCTAG_ERR_STRINGF :
janet_cancel ( return_value . fiber , janet_cstringv ( ( const char * ) return_value . argp ) ) ;
2021-03-23 10:00:48 +00:00
if ( return_value . tag = = JANET_EV_TCTAG_STRINGF ) janet_free ( return_value . argp ) ;
2021-01-03 17:04:21 +00:00
break ;
case JANET_EV_TCTAG_ERR_KEYWORD :
janet_cancel ( return_value . fiber , janet_ckeywordv ( ( const char * ) return_value . argp ) ) ;
break ;
2021-01-18 22:44:22 +00:00
case JANET_EV_TCTAG_BOOLEAN :
janet_schedule ( return_value . fiber , janet_wrap_boolean ( return_value . argi ) ) ;
break ;
2021-01-03 17:04:21 +00:00
}
janet_gcunroot ( janet_wrap_fiber ( return_value . fiber ) ) ;
}
/* Convenience method for common case */
2021-01-18 22:44:22 +00:00
JANET_NO_RETURN
2021-01-03 17:04:21 +00:00
void janet_ev_threaded_await ( JanetThreadedSubroutine fp , int tag , int argi , void * argp ) {
JanetEVGenericMessage arguments ;
arguments . tag = tag ;
arguments . argi = argi ;
arguments . argp = argp ;
arguments . fiber = janet_root_fiber ( ) ;
janet_gcroot ( janet_wrap_fiber ( arguments . fiber ) ) ;
janet_ev_threaded_call ( fp , arguments , janet_ev_default_threaded_callback ) ;
janet_await ( ) ;
}
/*
* C API helpers for reading and writing from streams .
2020-11-14 17:48:23 +00:00
* There is some networking code in here as well as generic
2021-01-03 17:04:21 +00:00
* reading and writing primitives .
*/
2020-11-14 17:48:23 +00:00
2020-11-15 16:06:20 +00:00
void janet_stream_flags ( JanetStream * stream , uint32_t flags ) {
if ( stream - > flags & JANET_STREAM_CLOSED ) {
janet_panic ( " stream is closed " ) ;
}
if ( ( stream - > flags & flags ) ! = flags ) {
const char * rmsg = " " , * wmsg = " " , * amsg = " " , * dmsg = " " , * smsg = " stream " ;
if ( flags & JANET_STREAM_READABLE ) rmsg = " readable " ;
if ( flags & JANET_STREAM_WRITABLE ) wmsg = " writable " ;
if ( flags & JANET_STREAM_ACCEPTABLE ) amsg = " server " ;
if ( flags & JANET_STREAM_UDPSERVER ) dmsg = " datagram " ;
if ( flags & JANET_STREAM_SOCKET ) smsg = " socket " ;
janet_panicf ( " bad stream, expected %s%s%s%s%s " , rmsg , wmsg , amsg , dmsg , smsg ) ;
}
}
2020-11-14 17:48:23 +00:00
/* When there is an IO error, we need to be able to convert it to a Janet
* string to raise a Janet error . */
# ifdef JANET_WINDOWS
# define JANET_EV_CHUNKSIZE 4096
Janet janet_ev_lasterr ( void ) {
int code = GetLastError ( ) ;
char msgbuf [ 256 ] ;
msgbuf [ 0 ] = ' \0 ' ;
FormatMessage ( FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS ,
NULL ,
code ,
MAKELANGID ( LANG_NEUTRAL , SUBLANG_DEFAULT ) ,
msgbuf ,
sizeof ( msgbuf ) ,
NULL ) ;
if ( ! * msgbuf ) sprintf ( msgbuf , " %d " , code ) ;
char * c = msgbuf ;
while ( * c ) {
if ( * c = = ' \n ' | | * c = = ' \r ' ) {
* c = ' \0 ' ;
break ;
}
c + + ;
}
return janet_cstringv ( msgbuf ) ;
}
# else
Janet janet_ev_lasterr ( void ) {
return janet_cstringv ( strerror ( errno ) ) ;
}
# endif
/* State machine for read/recv/recvfrom */
typedef enum {
JANET_ASYNC_READMODE_READ ,
JANET_ASYNC_READMODE_RECV ,
JANET_ASYNC_READMODE_RECVFROM
} JanetReadMode ;
typedef struct {
JanetListenerState head ;
int32_t bytes_left ;
2020-11-14 20:28:10 +00:00
int32_t bytes_read ;
2020-11-14 17:48:23 +00:00
JanetBuffer * buf ;
int is_chunk ;
JanetReadMode mode ;
# ifdef JANET_WINDOWS
OVERLAPPED overlapped ;
# ifdef JANET_NET
WSABUF wbuf ;
DWORD flags ;
struct sockaddr from ;
int fromlen ;
# endif
uint8_t chunk_buf [ JANET_EV_CHUNKSIZE ] ;
# else
int flags ;
# endif
} StateRead ;
JanetAsyncStatus ev_machine_read ( JanetListenerState * s , JanetAsyncEvent event ) {
StateRead * state = ( StateRead * ) s ;
switch ( event ) {
default :
break ;
case JANET_ASYNC_EVENT_MARK :
janet_mark ( janet_wrap_buffer ( state - > buf ) ) ;
break ;
case JANET_ASYNC_EVENT_CLOSE :
2021-01-18 02:41:59 +00:00
janet_schedule ( s - > fiber , janet_wrap_nil ( ) ) ;
2020-11-14 17:48:23 +00:00
return JANET_ASYNC_STATUS_DONE ;
# ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE : {
/* Called when read finished */
2020-11-14 20:28:10 +00:00
state - > bytes_read + = s - > bytes ;
if ( state - > bytes_read = = 0 & & ( state - > mode ! = JANET_ASYNC_READMODE_RECVFROM ) ) {
2020-11-14 17:48:23 +00:00
janet_schedule ( s - > fiber , janet_wrap_nil ( ) ) ;
return JANET_ASYNC_STATUS_DONE ;
}
janet_buffer_push_bytes ( state - > buf , state - > chunk_buf , s - > bytes ) ;
state - > bytes_left - = s - > bytes ;
2020-12-30 02:37:59 +00:00
if ( state - > bytes_left = = 0 | | ! state - > is_chunk | | s - > bytes = = 0 ) {
2020-11-14 17:48:23 +00:00
Janet resume_val ;
# ifdef JANET_NET
if ( state - > mode = = JANET_ASYNC_READMODE_RECVFROM ) {
void * abst = janet_abstract ( & janet_address_type , state - > fromlen ) ;
memcpy ( abst , & state - > from , state - > fromlen ) ;
resume_val = janet_wrap_abstract ( abst ) ;
} else
# endif
{
resume_val = janet_wrap_buffer ( state - > buf ) ;
}
janet_schedule ( s - > fiber , resume_val ) ;
return JANET_ASYNC_STATUS_DONE ;
}
}
/* fallthrough */
case JANET_ASYNC_EVENT_USER : {
int32_t chunk_size = state - > bytes_left > JANET_EV_CHUNKSIZE ? JANET_EV_CHUNKSIZE : state - > bytes_left ;
s - > tag = & state - > overlapped ;
memset ( & ( state - > overlapped ) , 0 , sizeof ( OVERLAPPED ) ) ;
int status ;
# ifdef JANET_NET
2021-01-12 00:00:31 +00:00
if ( state - > mode = = JANET_ASYNC_READMODE_RECVFROM ) {
2020-11-14 17:48:23 +00:00
state - > wbuf . len = ( ULONG ) chunk_size ;
state - > wbuf . buf = state - > chunk_buf ;
2021-01-12 00:00:31 +00:00
status = WSARecvFrom ( ( SOCKET ) s - > stream - > handle , & state - > wbuf , 1 ,
NULL , & state - > flags , & state - > from , & state - > fromlen , & state - > overlapped , NULL ) ;
2020-11-14 20:40:39 +00:00
if ( status & & ( WSA_IO_PENDING ! = WSAGetLastError ( ) ) ) {
janet_cancel ( s - > fiber , janet_ev_lasterr ( ) ) ;
return JANET_ASYNC_STATUS_DONE ;
}
} else
2020-11-14 17:48:23 +00:00
# endif
2020-11-14 20:40:39 +00:00
{
status = ReadFile ( s - > stream - > handle , state - > chunk_buf , chunk_size , NULL , & state - > overlapped ) ;
2020-11-14 21:52:01 +00:00
if ( ! status & & ( ERROR_IO_PENDING ! = WSAGetLastError ( ) ) ) {
2020-11-14 22:03:51 +00:00
if ( WSAGetLastError ( ) = = ERROR_BROKEN_PIPE ) {
2021-01-12 00:32:56 +00:00
if ( state - > bytes_read ) {
janet_schedule ( s - > fiber , janet_wrap_buffer ( state - > buf ) ) ;
} else {
janet_schedule ( s - > fiber , janet_wrap_nil ( ) ) ;
}
2020-11-14 22:03:51 +00:00
} else {
janet_cancel ( s - > fiber , janet_ev_lasterr ( ) ) ;
}
2020-11-14 20:40:39 +00:00
return JANET_ASYNC_STATUS_DONE ;
}
2020-11-14 20:28:10 +00:00
}
2020-11-14 17:48:23 +00:00
}
2020-11-14 20:40:39 +00:00
break ;
2020-11-14 17:48:23 +00:00
# else
2021-01-23 20:26:24 +00:00
case JANET_ASYNC_EVENT_ERR : {
2020-11-14 21:01:52 +00:00
if ( state - > bytes_read ) {
janet_schedule ( s - > fiber , janet_wrap_buffer ( state - > buf ) ) ;
} else {
janet_schedule ( s - > fiber , janet_wrap_nil ( ) ) ;
2020-11-14 20:28:10 +00:00
}
2020-11-14 21:01:52 +00:00
return JANET_ASYNC_STATUS_DONE ;
}
2021-01-23 20:26:24 +00:00
case JANET_ASYNC_EVENT_HUP :
2020-12-30 02:37:59 +00:00
case JANET_ASYNC_EVENT_READ : {
2020-11-14 21:01:52 +00:00
JanetBuffer * buffer = state - > buf ;
int32_t bytes_left = state - > bytes_left ;
2021-01-11 17:10:23 +00:00
int32_t read_limit = bytes_left > 4096 ? 4096 : bytes_left ;
2020-12-30 02:37:59 +00:00
janet_buffer_extra ( buffer , read_limit ) ;
2020-11-14 21:01:52 +00:00
ssize_t nread ;
2020-11-14 17:48:23 +00:00
# ifdef JANET_NET
2020-11-14 21:01:52 +00:00
char saddr [ 256 ] ;
socklen_t socklen = sizeof ( saddr ) ;
2020-11-14 17:48:23 +00:00
# endif
2020-11-14 21:01:52 +00:00
do {
2020-11-14 17:48:23 +00:00
# ifdef JANET_NET
2020-11-14 21:01:52 +00:00
if ( state - > mode = = JANET_ASYNC_READMODE_RECVFROM ) {
2020-12-30 02:37:59 +00:00
nread = recvfrom ( s - > stream - > handle , buffer - > data + buffer - > count , read_limit , state - > flags ,
2020-11-14 21:01:52 +00:00
( struct sockaddr * ) & saddr , & socklen ) ;
} else if ( state - > mode = = JANET_ASYNC_READMODE_RECV ) {
2020-12-30 02:37:59 +00:00
nread = recv ( s - > stream - > handle , buffer - > data + buffer - > count , read_limit , state - > flags ) ;
2020-11-14 21:01:52 +00:00
} else
2020-11-14 17:48:23 +00:00
# endif
2020-11-14 21:01:52 +00:00
{
2020-12-30 02:37:59 +00:00
nread = read ( s - > stream - > handle , buffer - > data + buffer - > count , read_limit ) ;
2020-11-14 17:48:23 +00:00
}
2020-11-14 21:01:52 +00:00
} while ( nread = = - 1 & & errno = = EINTR ) ;
2020-11-14 17:48:23 +00:00
2020-11-14 21:01:52 +00:00
/* Check for errors - special case errors that can just be waited on to fix */
if ( nread = = - 1 ) {
2020-11-16 22:42:09 +00:00
if ( errno = = EAGAIN | | errno = = EWOULDBLOCK ) {
return JANET_ASYNC_STATUS_NOT_DONE ;
}
/* In stream protocols, a pipe error is end of stream */
if ( errno = = EPIPE & & ( state - > mode ! = JANET_ASYNC_READMODE_RECVFROM ) ) {
nread = 0 ;
} else {
janet_cancel ( s - > fiber , janet_ev_lasterr ( ) ) ;
return JANET_ASYNC_STATUS_DONE ;
}
2020-11-14 21:01:52 +00:00
}
2020-11-14 17:48:23 +00:00
2020-11-14 21:01:52 +00:00
/* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
state - > bytes_read + = nread ;
if ( state - > bytes_read = = 0 & & ( state - > mode ! = JANET_ASYNC_READMODE_RECVFROM ) ) {
janet_schedule ( s - > fiber , janet_wrap_nil ( ) ) ;
return JANET_ASYNC_STATUS_DONE ;
}
2020-11-14 17:48:23 +00:00
2020-11-14 21:01:52 +00:00
/* Increment buffer counts */
buffer - > count + = nread ;
bytes_left - = nread ;
state - > bytes_left = bytes_left ;
/* Resume if done */
if ( ! state - > is_chunk | | bytes_left = = 0 | | nread = = 0 ) {
Janet resume_val ;
2020-11-14 17:48:23 +00:00
# ifdef JANET_NET
2020-11-14 21:01:52 +00:00
if ( state - > mode = = JANET_ASYNC_READMODE_RECVFROM ) {
void * abst = janet_abstract ( & janet_address_type , socklen ) ;
memcpy ( abst , & saddr , socklen ) ;
resume_val = janet_wrap_abstract ( abst ) ;
} else
2020-11-14 17:48:23 +00:00
# endif
2020-11-14 21:01:52 +00:00
{
resume_val = janet_wrap_buffer ( buffer ) ;
2020-11-14 17:48:23 +00:00
}
2020-11-14 21:01:52 +00:00
janet_schedule ( s - > fiber , resume_val ) ;
return JANET_ASYNC_STATUS_DONE ;
2020-11-14 17:48:23 +00:00
}
2020-11-14 21:01:52 +00:00
}
break ;
2020-11-14 17:48:23 +00:00
# endif
2020-11-14 21:01:52 +00:00
}
return JANET_ASYNC_STATUS_NOT_DONE ;
2020-11-14 17:48:23 +00:00
}
2020-11-14 20:28:10 +00:00
static void janet_ev_read_generic ( JanetStream * stream , JanetBuffer * buf , int32_t nbytes , int is_chunked , JanetReadMode mode , int flags ) {
2020-11-14 17:48:23 +00:00
StateRead * state = ( StateRead * ) janet_listen ( stream , ev_machine_read ,
JANET_ASYNC_LISTEN_READ , sizeof ( StateRead ) , NULL ) ;
state - > is_chunk = is_chunked ;
state - > buf = buf ;
state - > bytes_left = nbytes ;
2020-11-14 20:28:10 +00:00
state - > bytes_read = 0 ;
2020-11-14 17:48:23 +00:00
state - > mode = mode ;
# ifdef JANET_WINDOWS
ev_machine_read ( ( JanetListenerState * ) state , JANET_ASYNC_EVENT_USER ) ;
state - > flags = ( DWORD ) flags ;
# else
state - > flags = flags ;
# endif
}
2020-11-14 20:28:10 +00:00
void janet_ev_read ( JanetStream * stream , JanetBuffer * buf , int32_t nbytes ) {
2020-11-14 17:48:23 +00:00
janet_ev_read_generic ( stream , buf , nbytes , 0 , JANET_ASYNC_READMODE_READ , 0 ) ;
}
2020-11-14 20:28:10 +00:00
void janet_ev_readchunk ( JanetStream * stream , JanetBuffer * buf , int32_t nbytes ) {
2020-11-14 17:48:23 +00:00
janet_ev_read_generic ( stream , buf , nbytes , 1 , JANET_ASYNC_READMODE_READ , 0 ) ;
}
# ifdef JANET_NET
2020-11-14 20:28:10 +00:00
void janet_ev_recv ( JanetStream * stream , JanetBuffer * buf , int32_t nbytes , int flags ) {
2020-11-14 17:48:23 +00:00
janet_ev_read_generic ( stream , buf , nbytes , 0 , JANET_ASYNC_READMODE_RECV , flags ) ;
}
2020-11-14 20:28:10 +00:00
void janet_ev_recvchunk ( JanetStream * stream , JanetBuffer * buf , int32_t nbytes , int flags ) {
2020-11-14 17:48:23 +00:00
janet_ev_read_generic ( stream , buf , nbytes , 1 , JANET_ASYNC_READMODE_RECV , flags ) ;
}
2020-11-14 20:28:10 +00:00
void janet_ev_recvfrom ( JanetStream * stream , JanetBuffer * buf , int32_t nbytes , int flags ) {
2020-11-14 17:48:23 +00:00
janet_ev_read_generic ( stream , buf , nbytes , 0 , JANET_ASYNC_READMODE_RECVFROM , flags ) ;
}
# endif
/*
* State machine for write / send / send - to
*/
typedef enum {
JANET_ASYNC_WRITEMODE_WRITE ,
JANET_ASYNC_WRITEMODE_SEND ,
JANET_ASYNC_WRITEMODE_SENDTO
} JanetWriteMode ;
typedef struct {
JanetListenerState head ;
union {
JanetBuffer * buf ;
const uint8_t * str ;
} src ;
int is_buffer ;
JanetWriteMode mode ;
void * dest_abst ;
# ifdef JANET_WINDOWS
OVERLAPPED overlapped ;
# ifdef JANET_NET
WSABUF wbuf ;
DWORD flags ;
# endif
# else
int flags ;
int32_t start ;
# endif
} StateWrite ;
JanetAsyncStatus ev_machine_write ( JanetListenerState * s , JanetAsyncEvent event ) {
StateWrite * state = ( StateWrite * ) s ;
switch ( event ) {
default :
break ;
case JANET_ASYNC_EVENT_MARK :
janet_mark ( state - > is_buffer
? janet_wrap_buffer ( state - > src . buf )
: janet_wrap_string ( state - > src . str ) ) ;
if ( state - > mode = = JANET_ASYNC_WRITEMODE_SENDTO ) {
janet_mark ( janet_wrap_abstract ( state - > dest_abst ) ) ;
}
break ;
case JANET_ASYNC_EVENT_CLOSE :
janet_cancel ( s - > fiber , janet_cstringv ( " stream closed " ) ) ;
return JANET_ASYNC_STATUS_DONE ;
# ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE : {
/* Called when write finished */
if ( s - > bytes = = 0 & & ( state - > mode ! = JANET_ASYNC_WRITEMODE_SENDTO ) ) {
janet_cancel ( s - > fiber , janet_cstringv ( " disconnect " ) ) ;
return JANET_ASYNC_STATUS_DONE ;
}
janet_schedule ( s - > fiber , janet_wrap_nil ( ) ) ;
return JANET_ASYNC_STATUS_DONE ;
}
break ;
case JANET_ASYNC_EVENT_USER : {
/* Begin write */
2020-11-14 21:01:52 +00:00
int32_t len ;
2020-11-14 17:48:23 +00:00
const uint8_t * bytes ;
if ( state - > is_buffer ) {
/* If buffer, convert to string. */
/* TODO - be more efficient about this */
JanetBuffer * buffer = state - > src . buf ;
JanetString str = janet_string ( buffer - > data , buffer - > count ) ;
bytes = str ;
len = buffer - > count ;
state - > is_buffer = 0 ;
state - > src . str = str ;
} else {
bytes = state - > src . str ;
len = janet_string_length ( bytes ) ;
}
s - > tag = & state - > overlapped ;
memset ( & ( state - > overlapped ) , 0 , sizeof ( WSAOVERLAPPED ) ) ;
int status ;
# ifdef JANET_NET
2021-01-12 00:00:31 +00:00
if ( state - > mode = = JANET_ASYNC_WRITEMODE_SENDTO ) {
2020-11-14 20:28:10 +00:00
SOCKET sock = ( SOCKET ) s - > stream - > handle ;
2020-11-14 17:48:23 +00:00
state - > wbuf . buf = ( char * ) bytes ;
state - > wbuf . len = len ;
2021-01-12 00:00:31 +00:00
const struct sockaddr * to = state - > dest_abst ;
int tolen = ( int ) janet_abstract_size ( ( void * ) to ) ;
status = WSASendTo ( sock , & state - > wbuf , 1 , NULL , state - > flags , to , tolen , & state - > overlapped , NULL ) ;
2020-11-14 17:48:23 +00:00
if ( status & & ( WSA_IO_PENDING ! = WSAGetLastError ( ) ) ) {
2020-11-14 21:48:21 +00:00
janet_cancel ( s - > fiber , janet_ev_lasterr ( ) ) ;
2020-11-14 17:48:23 +00:00
return JANET_ASYNC_STATUS_DONE ;
}
} else
# endif
{
2021-07-25 01:30:36 +00:00
/*
* File handles in IOCP need to specify this if they are writing to the
2021-07-24 16:55:04 +00:00
* ends of files , like how this is used here .
* If the underlying resource doesn ' t support seeking
* byte offsets , they will be ignored
* but this otherwise writes to the end of the file in question
* Right now , os / open streams aren ' t seekable , so this works .
* for more details see the lpOverlapped parameter in
* https : //docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefile
*/
2021-07-24 08:30:00 +00:00
state - > overlapped . Offset = ( DWORD ) 0xFFFFFFFF ;
state - > overlapped . OffsetHigh = ( DWORD ) 0xFFFFFFFF ;
2020-11-14 20:28:10 +00:00
status = WriteFile ( s - > stream - > handle , bytes , len , NULL , & state - > overlapped ) ;
2020-11-14 21:52:01 +00:00
if ( ! status & & ( ERROR_IO_PENDING ! = WSAGetLastError ( ) ) ) {
2020-11-14 21:48:21 +00:00
janet_cancel ( s - > fiber , janet_ev_lasterr ( ) ) ;
2020-11-14 17:48:23 +00:00
return JANET_ASYNC_STATUS_DONE ;
}
}
}
break ;
# else
2020-11-14 20:28:10 +00:00
case JANET_ASYNC_EVENT_ERR :
janet_cancel ( s - > fiber , janet_cstringv ( " stream err " ) ) ;
return JANET_ASYNC_STATUS_DONE ;
case JANET_ASYNC_EVENT_HUP :
janet_cancel ( s - > fiber , janet_cstringv ( " stream hup " ) ) ;
return JANET_ASYNC_STATUS_DONE ;
2020-11-14 17:48:23 +00:00
case JANET_ASYNC_EVENT_WRITE : {
int32_t start , len ;
const uint8_t * bytes ;
start = state - > start ;
if ( state - > is_buffer ) {
JanetBuffer * buffer = state - > src . buf ;
bytes = buffer - > data ;
len = buffer - > count ;
} else {
bytes = state - > src . str ;
len = janet_string_length ( bytes ) ;
}
ssize_t nwrote = 0 ;
if ( start < len ) {
int32_t nbytes = len - start ;
void * dest_abst = state - > dest_abst ;
do {
# ifdef JANET_NET
if ( state - > mode = = JANET_ASYNC_WRITEMODE_SENDTO ) {
2020-11-14 20:28:10 +00:00
nwrote = sendto ( s - > stream - > handle , bytes + start , nbytes , state - > flags ,
2020-11-14 17:48:23 +00:00
( struct sockaddr * ) dest_abst , janet_abstract_size ( dest_abst ) ) ;
} else if ( state - > mode = = JANET_ASYNC_WRITEMODE_SEND ) {
2020-11-14 20:28:10 +00:00
nwrote = send ( s - > stream - > handle , bytes + start , nbytes , state - > flags ) ;
2020-11-14 17:48:23 +00:00
} else
# endif
{
2020-11-14 20:28:10 +00:00
nwrote = write ( s - > stream - > handle , bytes + start , nbytes ) ;
2020-11-14 17:48:23 +00:00
}
} while ( nwrote = = - 1 & & errno = = EINTR ) ;
/* Handle write errors */
if ( nwrote = = - 1 ) {
if ( errno = = EAGAIN | | errno = = EWOULDBLOCK ) break ;
janet_cancel ( s - > fiber , janet_ev_lasterr ( ) ) ;
return JANET_ASYNC_STATUS_DONE ;
}
/* Unless using datagrams, empty message is a disconnect */
if ( nwrote = = 0 & & ! dest_abst ) {
janet_cancel ( s - > fiber , janet_cstringv ( " disconnect " ) ) ;
return JANET_ASYNC_STATUS_DONE ;
}
if ( nwrote > 0 ) {
start + = nwrote ;
} else {
start = len ;
}
}
state - > start = start ;
if ( start > = len ) {
janet_schedule ( s - > fiber , janet_wrap_nil ( ) ) ;
return JANET_ASYNC_STATUS_DONE ;
}
break ;
}
break ;
# endif
}
return JANET_ASYNC_STATUS_NOT_DONE ;
}
2020-11-14 20:28:10 +00:00
static void janet_ev_write_generic ( JanetStream * stream , void * buf , void * dest_abst , JanetWriteMode mode , int is_buffer , int flags ) {
2020-11-14 17:48:23 +00:00
StateWrite * state = ( StateWrite * ) janet_listen ( stream , ev_machine_write ,
JANET_ASYNC_LISTEN_WRITE , sizeof ( StateWrite ) , NULL ) ;
state - > is_buffer = is_buffer ;
state - > src . buf = buf ;
state - > dest_abst = dest_abst ;
state - > mode = mode ;
# ifdef JANET_WINDOWS
state - > flags = ( DWORD ) flags ;
2020-11-14 21:01:52 +00:00
ev_machine_write ( ( JanetListenerState * ) state , JANET_ASYNC_EVENT_USER ) ;
2020-11-14 17:48:23 +00:00
# else
state - > start = 0 ;
state - > flags = flags ;
# endif
}
2020-11-14 20:28:10 +00:00
void janet_ev_write_buffer ( JanetStream * stream , JanetBuffer * buf ) {
2020-11-14 17:48:23 +00:00
janet_ev_write_generic ( stream , buf , NULL , JANET_ASYNC_WRITEMODE_WRITE , 1 , 0 ) ;
}
2020-11-14 20:28:10 +00:00
void janet_ev_write_string ( JanetStream * stream , JanetString str ) {
2020-11-14 17:48:23 +00:00
janet_ev_write_generic ( stream , ( void * ) str , NULL , JANET_ASYNC_WRITEMODE_WRITE , 0 , 0 ) ;
}
# ifdef JANET_NET
2020-11-14 20:28:10 +00:00
void janet_ev_send_buffer ( JanetStream * stream , JanetBuffer * buf , int flags ) {
2020-11-14 17:48:23 +00:00
janet_ev_write_generic ( stream , buf , NULL , JANET_ASYNC_WRITEMODE_SEND , 1 , flags ) ;
}
2020-11-14 20:28:10 +00:00
void janet_ev_send_string ( JanetStream * stream , JanetString str , int flags ) {
2020-11-14 17:48:23 +00:00
janet_ev_write_generic ( stream , ( void * ) str , NULL , JANET_ASYNC_WRITEMODE_SEND , 0 , flags ) ;
}
2020-11-14 20:28:10 +00:00
void janet_ev_sendto_buffer ( JanetStream * stream , JanetBuffer * buf , void * dest , int flags ) {
2020-11-14 17:48:23 +00:00
janet_ev_write_generic ( stream , buf , dest , JANET_ASYNC_WRITEMODE_SENDTO , 1 , flags ) ;
}
2020-11-14 20:28:10 +00:00
void janet_ev_sendto_string ( JanetStream * stream , JanetString str , void * dest , int flags ) {
2020-11-14 17:48:23 +00:00
janet_ev_write_generic ( stream , ( void * ) str , dest , JANET_ASYNC_WRITEMODE_SENDTO , 0 , flags ) ;
}
# endif
2020-11-18 16:53:36 +00:00
/* For a pipe ID */
# ifdef JANET_WINDOWS
static volatile long PipeSerialNumber ;
# endif
2021-01-16 21:11:07 +00:00
int janet_make_pipe ( JanetHandle handles [ 2 ] , int mode ) {
2020-11-18 16:53:36 +00:00
# ifdef JANET_WINDOWS
/*
* On windows , the built in CreatePipe function doesn ' t support overlapped IO
* so we lift from the windows source code and modify for our own version .
2021-01-16 21:11:07 +00:00
*
* mode = 0 : both sides non - blocking .
* mode = 1 : only read side non - blocking : write side sent to subprocess
* mode = 2 : only write side non - blocking : read side sent to subprocess
2020-11-18 16:53:36 +00:00
*/
2021-01-16 21:11:07 +00:00
JanetHandle shandle , chandle ;
2020-11-18 16:53:36 +00:00
UCHAR PipeNameBuffer [ MAX_PATH ] ;
2021-01-11 17:10:23 +00:00
SECURITY_ATTRIBUTES saAttr ;
memset ( & saAttr , 0 , sizeof ( saAttr ) ) ;
saAttr . nLength = sizeof ( saAttr ) ;
saAttr . bInheritHandle = TRUE ;
2020-11-18 16:53:36 +00:00
sprintf ( PipeNameBuffer ,
" \\ \\ . \\ Pipe \\ JanetPipeFile.%08x.%08x " ,
GetCurrentProcessId ( ) ,
InterlockedIncrement ( & PipeSerialNumber ) ) ;
2021-01-16 21:11:07 +00:00
/* server handle goes to subprocess */
shandle = CreateNamedPipeA (
2020-11-18 16:53:36 +00:00
PipeNameBuffer ,
2021-01-16 21:11:07 +00:00
( mode = = 2 ? PIPE_ACCESS_INBOUND : PIPE_ACCESS_OUTBOUND ) | FILE_FLAG_OVERLAPPED ,
PIPE_TYPE_BYTE | PIPE_WAIT ,
255 , /* Max number of pipes for duplication. */
2020-11-18 16:53:36 +00:00
4096 , /* Out buffer size */
4096 , /* In buffer size */
120 * 1000 , /* Timeout in ms */
2021-01-11 17:10:23 +00:00
& saAttr ) ;
2021-01-16 21:11:07 +00:00
if ( shandle = = INVALID_HANDLE_VALUE ) {
return - 1 ;
}
/* we keep client handle */
chandle = CreateFileA (
2020-11-18 16:53:36 +00:00
PipeNameBuffer ,
2021-01-16 21:11:07 +00:00
( mode = = 2 ? GENERIC_WRITE : GENERIC_READ ) ,
2020-11-18 16:53:36 +00:00
0 ,
2021-01-11 17:10:23 +00:00
& saAttr ,
2020-11-18 16:53:36 +00:00
OPEN_EXISTING ,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED ,
NULL ) ;
2021-01-16 21:11:07 +00:00
if ( chandle = = INVALID_HANDLE_VALUE ) {
CloseHandle ( shandle ) ;
2020-11-18 16:53:36 +00:00
return - 1 ;
}
2021-01-16 21:11:07 +00:00
if ( mode = = 2 ) {
handles [ 0 ] = shandle ;
handles [ 1 ] = chandle ;
} else {
handles [ 0 ] = chandle ;
handles [ 1 ] = shandle ;
}
2020-11-18 16:53:36 +00:00
return 0 ;
# else
2021-01-16 21:11:07 +00:00
( void ) mode ;
2020-11-18 16:53:36 +00:00
if ( pipe ( handles ) ) return - 1 ;
2021-01-03 17:04:21 +00:00
if ( fcntl ( handles [ 0 ] , F_SETFL , O_NONBLOCK ) ) goto error ;
if ( fcntl ( handles [ 1 ] , F_SETFL , O_NONBLOCK ) ) goto error ;
2020-11-18 16:53:36 +00:00
return 0 ;
2021-01-03 17:04:21 +00:00
error :
close ( handles [ 0 ] ) ;
close ( handles [ 1 ] ) ;
return - 1 ;
2020-11-18 16:53:36 +00:00
# endif
}
2020-05-28 15:39:40 +00:00
/* C functions */
2021-07-26 05:25:38 +00:00
JANET_CORE_FN ( cfun_ev_go ,
2021-07-30 02:29:08 +00:00
" (ev/go fiber &opt value supervisor) " ,
" Put a fiber on the event loop to be resumed later. Optionally pass "
" a value to resume with, otherwise resumes with nil. Returns the fiber. "
" An optional `core/channel` can be provided as well as a supervisor. When various "
" events occur in the newly scheduled fiber, an event will be pushed to the supervisor. "
" If not provided, the new fiber will inherit the current supervisor. " ) {
2021-01-13 03:35:28 +00:00
janet_arity ( argc , 1 , 3 ) ;
2020-05-28 15:39:40 +00:00
JanetFiber * fiber = janet_getfiber ( argv , 0 ) ;
2021-06-25 23:58:19 +00:00
Janet value = argc > = 2 ? argv [ 1 ] : janet_wrap_nil ( ) ;
2021-01-13 03:35:28 +00:00
JanetChannel * supervisor_channel = janet_optabstract ( argv , argc , 2 , & ChannelAT ,
2021-07-17 01:59:03 +00:00
janet_vm . root_fiber - > supervisor_channel ) ;
2021-01-18 22:44:22 +00:00
fiber - > supervisor_channel = supervisor_channel ;
janet_schedule ( fiber , value ) ;
2020-05-28 15:39:40 +00:00
return argv [ 0 ] ;
}
2021-01-22 18:52:45 +00:00
/* For ev/thread - Run an interpreter in the new thread. */
static JanetEVGenericMessage janet_go_thread_subr ( JanetEVGenericMessage args ) {
JanetBuffer * buffer = ( JanetBuffer * ) args . argp ;
const uint8_t * nextbytes = buffer - > data ;
const uint8_t * endbytes = nextbytes + buffer - > count ;
2021-07-30 02:29:08 +00:00
uint32_t flags = args . tag ;
args . tag = 0 ;
2021-01-22 18:52:45 +00:00
janet_init ( ) ;
JanetTryState tstate ;
JanetSignal signal = janet_try ( & tstate ) ;
if ( ! signal ) {
2021-07-30 02:29:08 +00:00
/* Set abstract registry */
if ( flags & 0x2 ) {
Janet aregv = janet_unmarshal ( nextbytes , endbytes - nextbytes ,
JANET_MARSHAL_UNSAFE , NULL , & nextbytes ) ;
if ( ! janet_checktype ( aregv , JANET_TABLE ) ) janet_panic ( " expected table for abstract registry " ) ;
janet_vm . abstract_registry = janet_unwrap_table ( aregv ) ;
}
/* Set cfunction registry */
if ( flags & 0x4 ) {
uint32_t count1 ;
memcpy ( & count1 , nextbytes , sizeof ( count1 ) ) ;
size_t count = ( size_t ) count1 ;
if ( count > ( endbytes - nextbytes ) * sizeof ( JanetCFunRegistry ) ) {
janet_panic ( " thread message invalid " ) ;
}
janet_vm . registry_count = count ;
janet_vm . registry_cap = count ;
janet_vm . registry = janet_malloc ( count * sizeof ( JanetCFunRegistry ) ) ;
if ( janet_vm . registry = = NULL ) {
JANET_OUT_OF_MEMORY ;
}
janet_vm . registry_dirty = 1 ;
memcpy ( janet_vm . registry , nextbytes , count * sizeof ( JanetCFunRegistry ) ) ;
}
2021-01-22 18:52:45 +00:00
Janet fiberv = janet_unmarshal ( nextbytes , endbytes - nextbytes ,
JANET_MARSHAL_UNSAFE , NULL , & nextbytes ) ;
Janet value = janet_unmarshal ( nextbytes , endbytes - nextbytes ,
JANET_MARSHAL_UNSAFE , NULL , & nextbytes ) ;
if ( ! janet_checktype ( fiberv , JANET_FIBER ) ) janet_panic ( " expected fiber " ) ;
JanetFiber * fiber = janet_unwrap_fiber ( fiberv ) ;
janet_schedule ( fiber , value ) ;
janet_loop ( ) ;
args . tag = JANET_EV_TCTAG_NIL ;
} else {
if ( janet_checktype ( tstate . payload , JANET_STRING ) ) {
args . tag = JANET_EV_TCTAG_ERR_STRINGF ;
args . argp = strdup ( ( const char * ) janet_unwrap_string ( tstate . payload ) ) ;
} else {
args . tag = JANET_EV_TCTAG_ERR_STRING ;
args . argp = " failed to start thread " ;
}
}
janet_buffer_deinit ( buffer ) ;
janet_restore ( & tstate ) ;
janet_deinit ( ) ;
return args ;
}
2021-07-30 02:29:08 +00:00
JANET_CORE_FN ( cfun_ev_thread ,
" (ev/thread fiber &opt value flags) " ,
" Resume a (copy of a) `fiber` in a new operating system thread, optionally passing `value` "
" to resume with. "
" Unlike `ev/go`, this function will suspend the current fiber until the thread is complete. "
" If you want to run the thread without waiting for a result, pass the `:n` flag to return nil immediately. "
" Otherwise, returns (a copy of) the final result from the fiber on the new thread. Available flags: \n \n "
" * `:n` - return immediately \n "
" * `:a` - don't copy abstract registry to new thread (performance optimization) \n "
" * `:c` - don't copy cfunction registry to new thread (performance optimization) " ) {
2021-01-22 18:52:45 +00:00
janet_arity ( argc , 1 , 3 ) ;
janet_getfiber ( argv , 0 ) ;
2021-07-17 20:13:28 +00:00
Janet value = argc > = 2 ? argv [ 1 ] : janet_wrap_nil ( ) ;
uint64_t flags = 0 ;
if ( argc > = 3 ) {
2021-07-30 02:29:08 +00:00
flags = janet_getflags ( argv , 2 , " nac " ) ;
2021-07-17 20:13:28 +00:00
}
2021-01-22 18:52:45 +00:00
/* Marshal arguments for the new thread. */
2021-03-23 10:00:48 +00:00
JanetBuffer * buffer = janet_malloc ( sizeof ( JanetBuffer ) ) ;
2021-01-22 18:52:45 +00:00
if ( NULL = = buffer ) {
JANET_OUT_OF_MEMORY ;
}
janet_buffer_init ( buffer , 0 ) ;
2021-07-30 02:29:08 +00:00
if ( flags & 0x2 ) janet_marshal ( buffer , janet_wrap_table ( janet_vm . abstract_registry ) , NULL , JANET_MARSHAL_UNSAFE ) ;
if ( flags & 0x4 ) {
janet_assert ( janet_vm . registry_count < = UINT32_MAX , " assert failed size check " ) ;
uint32_t temp = ( uint32_t ) janet_vm . registry_count ;
janet_buffer_push_bytes ( buffer , ( uint8_t * ) & temp , sizeof ( temp ) ) ;
janet_buffer_push_bytes ( buffer , ( uint8_t * ) janet_vm . registry , janet_vm . registry_count * sizeof ( JanetCFunRegistry ) ) ;
}
2021-01-22 18:52:45 +00:00
janet_marshal ( buffer , argv [ 0 ] , NULL , JANET_MARSHAL_UNSAFE ) ;
janet_marshal ( buffer , value , NULL , JANET_MARSHAL_UNSAFE ) ;
2021-07-17 20:13:28 +00:00
if ( flags & 0x1 ) {
/* Return immediately */
JanetEVGenericMessage arguments ;
2021-07-30 02:29:08 +00:00
arguments . tag = ( uint32_t ) flags ; ;
2021-07-17 20:13:28 +00:00
arguments . argi = argc ;
arguments . argp = buffer ;
arguments . fiber = NULL ;
janet_ev_threaded_call ( janet_go_thread_subr , arguments , janet_ev_default_threaded_callback ) ;
return janet_wrap_nil ( ) ;
} else {
janet_ev_threaded_await ( janet_go_thread_subr , 0 , argc , buffer ) ;
}
2021-01-22 18:52:45 +00:00
}
2021-07-30 02:29:08 +00:00
JANET_CORE_FN ( cfun_ev_give_supervisor ,
" (ev/give-supervisor tag & payload) " ,
" Send a message to the current supervior channel if there is one. The message will be a "
" tuple of all of the arguments combined into a single message, where the first element is tag. "
" By convention, tag should be a keyword indicating the type of message. Returns nil. " ) {
2021-01-16 13:18:07 +00:00
janet_arity ( argc , 1 , - 1 ) ;
2021-07-17 01:59:03 +00:00
JanetChannel * chan = janet_vm . root_fiber - > supervisor_channel ;
2021-01-16 13:18:07 +00:00
if ( NULL ! = chan ) {
if ( janet_channel_push ( chan , janet_wrap_tuple ( janet_tuple_n ( argv , argc ) ) , 0 ) ) {
janet_await ( ) ;
}
}
return janet_wrap_nil ( ) ;
}
2020-11-29 21:36:21 +00:00
JANET_NO_RETURN void janet_sleep_await ( double sec ) {
2020-05-28 21:51:11 +00:00
JanetTimeout to ;
to . when = ts_delta ( ts_now ( ) , sec ) ;
2021-07-17 01:59:03 +00:00
to . fiber = janet_vm . root_fiber ;
2020-08-16 22:29:57 +00:00
to . is_error = 0 ;
2020-10-11 14:07:11 +00:00
to . sched_id = to . fiber - > sched_id ;
2020-12-05 16:32:34 +00:00
to . curr_fiber = NULL ;
2020-05-28 21:51:11 +00:00
add_timeout ( to ) ;
janet_await ( ) ;
}
2021-07-26 05:25:38 +00:00
JANET_CORE_FN ( cfun_ev_sleep ,
2021-07-30 02:29:08 +00:00
" (ev/sleep sec) " ,
" Suspend the current fiber for sec seconds without blocking the event loop. " ) {
2020-11-29 21:36:21 +00:00
janet_fixarity ( argc , 1 ) ;
double sec = janet_getnumber ( argv , 0 ) ;
janet_sleep_await ( sec ) ;
}
2021-07-30 02:29:08 +00:00
JANET_CORE_FN ( cfun_ev_deadline ,
" (ev/deadline sec &opt tocancel tocheck) " ,
" Set a deadline for a fiber `tocheck`. If `tocheck` is not finished after `sec` seconds, "
" `tocancel` will be canceled as with `ev/cancel`. "
" If `tocancel` and `tocheck` are not given, they default to `(fiber/root)` and "
" `(fiber/current)` respectively. Returns `tocancel`. " ) {
2020-12-05 16:32:34 +00:00
janet_arity ( argc , 1 , 3 ) ;
double sec = janet_getnumber ( argv , 0 ) ;
2021-07-17 01:59:03 +00:00
JanetFiber * tocancel = janet_optfiber ( argv , argc , 1 , janet_vm . root_fiber ) ;
JanetFiber * tocheck = janet_optfiber ( argv , argc , 2 , janet_vm . fiber ) ;
2020-12-05 16:32:34 +00:00
JanetTimeout to ;
to . when = ts_delta ( ts_now ( ) , sec ) ;
to . fiber = tocancel ;
to . curr_fiber = tocheck ;
to . is_error = 0 ;
to . sched_id = to . fiber - > sched_id ;
add_timeout ( to ) ;
return janet_wrap_fiber ( tocancel ) ;
}
2021-07-26 05:25:38 +00:00
JANET_CORE_FN ( cfun_ev_cancel ,
2021-07-30 02:29:08 +00:00
" (ev/cancel fiber err) " ,
" Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately " ) {
2020-08-23 16:25:04 +00:00
janet_fixarity ( argc , 2 ) ;
JanetFiber * fiber = janet_getfiber ( argv , 0 ) ;
Janet err = argv [ 1 ] ;
janet_cancel ( fiber , err ) ;
return argv [ 0 ] ;
}
2021-07-30 02:29:08 +00:00
JANET_CORE_FN ( janet_cfun_stream_close ,
" (ev/close stream) " ,
" Close a stream. This should be the same as calling (:close stream) for all streams. " ) {
2020-11-14 20:28:10 +00:00
janet_fixarity ( argc , 1 ) ;
JanetStream * stream = janet_getabstract ( argv , 0 , & janet_stream_type ) ;
janet_stream_close ( stream ) ;
return argv [ 0 ] ;
}
2021-07-26 05:25:38 +00:00
JANET_CORE_FN ( janet_cfun_stream_read ,
2021-07-30 02:29:08 +00:00
" (ev/read stream n &opt buffer timeout) " ,
" Read up to n bytes into a buffer asynchronously from a stream. `n` can also be the keyword "
" `:all` to read into the buffer until end of stream. "
" Optionally provide a buffer to write into "
" as well as a timeout in seconds after which to cancel the operation and raise an error. "
" Returns the buffer if the read was successful or nil if end-of-stream reached. Will raise an "
" error if there are problems with the IO operation. " ) {
2020-11-14 20:28:10 +00:00
janet_arity ( argc , 2 , 4 ) ;
JanetStream * stream = janet_getabstract ( argv , 0 , & janet_stream_type ) ;
janet_stream_flags ( stream , JANET_STREAM_READABLE ) ;
JanetBuffer * buffer = janet_optbuffer ( argv , argc , 2 , 10 ) ;
double to = janet_optnumber ( argv , argc , 3 , INFINITY ) ;
2020-12-30 02:37:59 +00:00
if ( janet_keyeq ( argv [ 1 ] , " all " ) ) {
if ( to ! = INFINITY ) janet_addtimeout ( to ) ;
2021-01-11 17:10:23 +00:00
janet_ev_readchunk ( stream , buffer , INT32_MAX ) ;
2020-12-30 02:37:59 +00:00
} else {
int32_t n = janet_getnat ( argv , 1 ) ;
if ( to ! = INFINITY ) janet_addtimeout ( to ) ;
janet_ev_read ( stream , buffer , n ) ;
}
2020-11-14 20:28:10 +00:00
janet_await ( ) ;
}
2021-07-30 02:29:08 +00:00
JANET_CORE_FN ( janet_cfun_stream_chunk ,
" (ev/chunk stream n &opt buffer timeout) " ,
" Same as ev/read, but will not return early if less than n bytes are available. If an end of "
" stream is reached, will also return early with the collected bytes. " ) {
2020-11-14 20:28:10 +00:00
janet_arity ( argc , 2 , 4 ) ;
JanetStream * stream = janet_getabstract ( argv , 0 , & janet_stream_type ) ;
janet_stream_flags ( stream , JANET_STREAM_READABLE ) ;
int32_t n = janet_getnat ( argv , 1 ) ;
JanetBuffer * buffer = janet_optbuffer ( argv , argc , 2 , 10 ) ;
double to = janet_optnumber ( argv , argc , 3 , INFINITY ) ;
if ( to ! = INFINITY ) janet_addtimeout ( to ) ;
janet_ev_readchunk ( stream , buffer , n ) ;
janet_await ( ) ;
}
2021-07-30 02:29:08 +00:00
JANET_CORE_FN ( janet_cfun_stream_write ,
" (ev/write stream data &opt timeout) " ,
" Write data to a stream, suspending the current fiber until the write "
" completes. Takes an optional timeout in seconds, after which will return nil. "
" Returns nil, or raises an error if the write failed. " ) {
2020-11-14 20:28:10 +00:00
janet_arity ( argc , 2 , 3 ) ;
JanetStream * stream = janet_getabstract ( argv , 0 , & janet_stream_type ) ;
janet_stream_flags ( stream , JANET_STREAM_WRITABLE ) ;
double to = janet_optnumber ( argv , argc , 2 , INFINITY ) ;
if ( janet_checktype ( argv [ 1 ] , JANET_BUFFER ) ) {
if ( to ! = INFINITY ) janet_addtimeout ( to ) ;
janet_ev_write_buffer ( stream , janet_getbuffer ( argv , 1 ) ) ;
} else {
JanetByteView bytes = janet_getbytes ( argv , 1 ) ;
if ( to ! = INFINITY ) janet_addtimeout ( to ) ;
janet_ev_write_string ( stream , bytes . bytes ) ;
}
janet_await ( ) ;
}
2020-05-28 15:39:40 +00:00
void janet_lib_ev ( JanetTable * env ) {
2021-07-26 05:25:38 +00:00
JanetRegExt ev_cfuns_ext [ ] = {
JANET_CORE_REG ( " ev/give " , cfun_channel_push ) ,
JANET_CORE_REG ( " ev/take " , cfun_channel_pop ) ,
JANET_CORE_REG ( " ev/full " , cfun_channel_full ) ,
JANET_CORE_REG ( " ev/capacity " , cfun_channel_capacity ) ,
JANET_CORE_REG ( " ev/count " , cfun_channel_count ) ,
JANET_CORE_REG ( " ev/select " , cfun_channel_choice ) ,
2021-07-27 06:48:28 +00:00
JANET_CORE_REG ( " ev/rselect " , cfun_channel_rchoice ) ,
2021-07-26 05:25:38 +00:00
JANET_CORE_REG ( " ev/chan " , cfun_channel_new ) ,
JANET_CORE_REG ( " ev/go " , cfun_ev_go ) ,
JANET_CORE_REG ( " ev/thread " , cfun_ev_thread ) ,
JANET_CORE_REG ( " ev/give-supervisor " , cfun_ev_give_supervisor ) ,
JANET_CORE_REG ( " ev/sleep " , cfun_ev_sleep ) ,
JANET_CORE_REG ( " ev/deadline " , cfun_ev_deadline ) ,
JANET_CORE_REG ( " ev/cancel " , cfun_ev_cancel ) ,
JANET_CORE_REG ( " ev/close " , janet_cfun_stream_close ) ,
JANET_CORE_REG ( " ev/read " , janet_cfun_stream_read ) ,
JANET_CORE_REG ( " ev/chunk " , janet_cfun_stream_chunk ) ,
JANET_CORE_REG ( " ev/write " , janet_cfun_stream_write ) ,
2021-07-31 00:26:42 +00:00
JANET_CORE_REG ( " ev/chan-close " , cfun_channel_close ) ,
2021-07-26 05:25:38 +00:00
JANET_REG_END
} ;
janet_core_cfuns_ext ( env , NULL , ev_cfuns_ext ) ;
2021-01-24 22:48:46 +00:00
janet_register_abstract_type ( & janet_stream_type ) ;
2020-05-28 15:39:40 +00:00
}
# endif