1
0
mirror of https://github.com/janet-lang/janet synced 2025-11-09 20:13:02 +00:00

Compare commits

..

41 Commits

Author SHA1 Message Date
Calvin Rose
4f10264f76 Fix typo. 2020-11-11 15:34:41 -06:00
Calvin Rose
c192caa349 Format source. 2020-11-11 09:40:59 -06:00
Calvin Rose
6f1d5d3b73 Add net/listen and net/accept-loop.
These are the elements that make up net/server, which has been moved
into pure Janet instead.
2020-11-09 11:18:09 -06:00
Calvin Rose
099a912992 Fix posix regression. 2020-11-08 19:48:44 -06:00
Calvin Rose
56b1ea3726 Include math.h to fix some bugs on 32 bit windows. 2020-11-08 19:06:35 -06:00
Calvin Rose
d6391f2d70 Get windows IOCP working for accept.
This also changes the api of servers slightly -
in light of having support for ev tasks, it is probably better
to remove the "simple" server code and replace it with some Janet
or remove it all together. While convenient, it has issues with error
handling and rigidity.
2020-11-08 18:56:13 -06:00
Calvin Rose
07910272e2 Get socket reads and writes working with IOCP. 2020-11-08 10:38:28 -06:00
Calvin Rose
1092013c2b Merge branch 'master' into ev 2020-11-07 14:36:25 -06:00
Calvin Rose
0db83bd787 Merge pull request #498 from pepe/fix-asm-example
Fix asm example
2020-11-07 14:35:39 -06:00
Calvin Rose
f55316eabc Merge pull request #494 from harryvederci/patch-1
Fix typo
2020-11-07 14:34:43 -06:00
Calvin Rose
840f59934e Merge pull request #497 from ahungry/bugfix/Adjust-popen-doc
Fix function doc to match that of C POPEN
2020-11-07 14:34:11 -06:00
Calvin Rose
75a9c59ad8 Merge pull request #499 from sogaiu/disasm-doc-typo
Tweak disasm doc
2020-11-07 14:33:53 -06:00
sogaiu
adfccd33ae Tweak disasm doc 2020-11-05 13:56:42 +09:00
Josef Pospíšil
9d41243c15 Fix comments formating 2020-11-04 10:18:31 +01:00
Josef Pospíšil
e33e182eb0 Fix assembly example 2020-11-04 10:16:02 +01:00
Matthew Carter
4dffd662f0 Fix function doc to match that of C POPEN
It may be misleading to some user to believe it is taking a path to a
file, when it is executing an arbitrary shell command for the first argument.
2020-11-03 20:52:02 -05:00
Calvin Rose
5064d579d4 Add net header instead of ifdef check. 2020-11-03 17:49:09 -06:00
Calvin Rose
540425a41b Make docstring less confusing - Fix #493. 2020-11-02 09:09:22 -06:00
Calvin Rose
4d21b582c7 Improve reading and writing from streams.
Handle errors earlier, and allow 0 length packets from UDP
but not from TCP. This should more closely follow the exact specs
of send and recv calls.
2020-11-02 09:06:31 -06:00
Calvin Rose
f288bc1790 Merge pull request #492 from drkameleon/master
Copy editing & fixing typos
2020-11-02 08:29:49 -06:00
Calvin Rose
8942e348bd Small change to windows ev. 2020-11-02 08:27:45 -06:00
Calvin Rose
9f27336827 Update windows ev code. 2020-11-02 08:19:53 -06:00
Calvin Rose
f517cccf7b Fix unix domain sockets.
Also allow abstract unix domain sockets on Linux
(prefixed with '@' as is customary).
2020-11-02 08:16:28 -06:00
Harry Prins
3a937ace51 Fix typo 2020-10-29 08:45:04 +00:00
Yanis Zafirópulos
b8661f8bff Update README.md
Co-authored-by: Michael Camilleri <mike@inqk.net>
2020-10-26 16:21:06 +01:00
Yanis Zafirópulos
51828ab5f8 Copy editing :) 2020-10-26 14:46:31 +01:00
Yanis Zafirópulos
84fe5d7f34 Copy editing :) 2020-10-26 14:44:17 +01:00
Calvin Rose
2891d2b260 Address #489
Add gc pressure when resizing fiber stack.
2020-10-18 18:41:18 -05:00
Calvin Rose
edfb861a5f Disable epoll by default for evloop. 2020-10-11 15:05:27 -05:00
Calvin Rose
88c1cf3ee7 Reformat files. 2020-10-11 09:33:07 -05:00
Calvin Rose
813e3fdcfd Merge branch 'windows-ev' into ev 2020-10-11 09:32:17 -05:00
Calvin Rose
bbe10e4938 Add another select example. 2020-10-11 09:14:31 -05:00
Calvin Rose
cb4903fa86 Overhaul of poll loop, redo ev/select. 2020-10-11 09:14:14 -05:00
Calvin Rose
ea45165db8 Merge branch 'master' into ev 2020-10-10 09:04:05 -05:00
Calvin Rose
1fba699ed4 Merge branch 'master' of github.com:janet-lang/janet into master 2020-10-08 12:35:16 -05:00
Calvin Rose
ce3d574c41 Update docstring. 2020-10-08 12:34:08 -05:00
Calvin Rose
7a601a7eb2 Merge pull request #481 from sogaiu/remove-namespace-reference
Use "module" instead of "namespace" in docstring
2020-10-07 17:43:56 -05:00
sogaiu
9ec66ab826 Use "module" instead of "namespace" in docstring 2020-10-07 18:18:28 +09:00
Calvin Rose
ebfa07f8ce Registering an abstract type is idemptotent. 2020-10-06 19:11:29 -05:00
Calvin Rose
5c05dec65a Merge pull request #479 from andrewchambers/mf
Add missing Makefile dependencies for install.
2020-10-06 18:26:16 -05:00
Andrew Chambers
bf6ebc4a68 Add missing Makefile dependencies for install. 2020-10-07 10:19:49 +13:00
20 changed files with 699 additions and 355 deletions

View File

@@ -262,7 +262,7 @@ build/janet.pc: $(JANET_TARGET)
echo 'Libs: -L$${libdir} -ljanet' >> $@
echo 'Libs.private: $(CLIBS)' >> $@
install: $(JANET_TARGET) build/janet.pc build/jpm
install: $(JANET_TARGET) $(JANET_LIBRARY) $(JANET_STATIC_LIBRARY) build/janet.pc build/jpm
mkdir -p '$(DESTDIR)$(BINDIR)'
cp $(JANET_TARGET) '$(DESTDIR)$(BINDIR)/janet'
mkdir -p '$(DESTDIR)$(INCLUDEDIR)/janet'

View File

@@ -14,9 +14,9 @@ lisp-like language, but lists are replaced
by other data structures (arrays, tables (hash table), struct (immutable hash table), tuples).
The language also supports bridging to native code written in C, meta-programming with macros, and bytecode assembly.
There is a repl for trying out the language, as well as the ability
There is a REPL for trying out the language, as well as the ability
to run script files. This client program is separate from the core runtime, so
Janet can be embedded into other programs. Try Janet in your browser at
Janet can be embedded in other programs. Try Janet in your browser at
[https://janet-lang.org](https://janet-lang.org).
<br>
@@ -30,23 +30,23 @@ Lua, but smaller than GNU Guile or Python.
## Features
* Minimal setup - one binary and you are good to go!
* First class closures
* First-class closures
* Garbage collection
* First class green threads (continuations)
* Python style generators (implemented as a plain macro)
* First-class green threads (continuations)
* Python-style generators (implemented as a plain macro)
* Mutable and immutable arrays (array/tuple)
* Mutable and immutable hashtables (table/struct)
* Mutable and immutable strings (buffer/string)
* Macros
* Byte code interpreter with an assembly interface, as well as bytecode verification
* Tailcall Optimization
* Tail call Optimization
* Direct interop with C via abstract types and C functions
* Dynamically load C libraries
* Functional and imperative standard library
* Lexical scoping
* Imperative programming as well as functional
* REPL
* Parsing Expression Grammars built in to the core library
* Parsing Expression Grammars built into the core library
* 400+ functions and macros in the core library
* Embedding Janet in other programs
* Interactive environment with detailed stack traces
@@ -56,7 +56,7 @@ Lua, but smaller than GNU Guile or Python.
* For a quick tutorial, see [the introduction](https://janet-lang.org/docs/index.html) for more details.
* For the full API for all functions in the core library, see [the core API doc](https://janet-lang.org/api/index.html)
Documentation is also available locally in the repl.
Documentation is also available locally in the REPL.
Use the `(doc symbol-name)` macro to get API
documentation for symbols in the core library. For example,
```
@@ -66,7 +66,7 @@ Shows documentation for the doc macro.
To get a list of all bindings in the default
environment, use the `(all-bindings)` function. You
can also use the `(doc)` macro with no arguments if you are in the repl
can also use the `(doc)` macro with no arguments if you are in the REPL
to show bound symbols.
## Source
@@ -92,7 +92,7 @@ Find out more about the available make targets by running `make help`.
### 32-bit Haiku
32-bit Haiku build instructions are the same as the unix-like build instructions,
32-bit Haiku build instructions are the same as the UNIX-like build instructions,
but you need to specify an alternative compiler, such as `gcc-x86`.
```
@@ -104,7 +104,7 @@ make repl
### FreeBSD
FreeBSD build instructions are the same as the unix-like build instuctions,
FreeBSD build instructions are the same as the UNIX-like build instructions,
but you need `gmake` to compile. Alternatively, install directly from
packages, using `pkg install lang/janet`.
@@ -117,7 +117,7 @@ gmake repl
### NetBSD
NetBSD build instructions are the same as the FreeBSD build instuctions.
NetBSD build instructions are the same as the FreeBSD build instructions.
Alternatively, install directly from packages, using `pkgin install janet`.
### Windows
@@ -136,11 +136,11 @@ Now you should have an `.msi`. You can run `build_win install` to install the `.
### Meson
Janet also has a build file for [Meson](https://mesonbuild.com/), a cross platform build
system. Although Meson has a python dependency, Meson is a very complete build system that
Janet also has a build file for [Meson](https://mesonbuild.com/), a cross-platform build
system. Although Meson has a Python dependency, Meson is a very complete build system that
is maybe more convenient and flexible for integrating into existing pipelines.
Meson also provides much better IDE integration than Make or batch files, as well as support
for cross compilation.
for cross-compilation.
For the impatient, building with Meson is as follows. The options provided to
`meson setup` below emulate Janet's Makefile.
@@ -177,11 +177,11 @@ to try out the language, you don't need to install anything. You can also move t
## Usage
A repl is launched when the binary is invoked with no arguments. Pass the -h flag
A REPL is launched when the binary is invoked with no arguments. Pass the -h flag
to display the usage information. Individual scripts can be run with `./janet myscript.janet`
If you are looking to explore, you can print a list of all available macros, functions, and constants
by entering the command `(all-bindings)` into the repl.
by entering the command `(all-bindings)` into the REPL.
```
$ janet
@@ -199,13 +199,13 @@ Options are:
-v : Print the version string
-s : Use raw stdin instead of getline like functionality
-e code : Execute a string of janet
-r : Enter the repl after running all scripts
-p : Keep on executing if there is a top level error (persistent)
-q : Hide prompt, logo, and repl output (quiet)
-r : Enter the REPL after running all scripts
-p : Keep on executing if there is a top-level error (persistent)
-q : Hide prompt, logo, and REPL output (quiet)
-k : Compile scripts but do not execute (flycheck)
-m syspath : Set system path for loading global modules
-c source output : Compile janet source code into an image
-n : Disable ANSI color output in the repl
-n : Disable ANSI color output in the REPL
-l path : Execute code in a file before running the main script
-- : Stop handling options
```
@@ -232,16 +232,16 @@ See the examples directory for some example janet code.
## Discussion
Feel free to ask questions and join discussion on the [Janet Gitter Channel](https://gitter.im/janet-language/community).
Feel free to ask questions and join the discussion on the [Janet Gitter Channel](https://gitter.im/janet-language/community).
Alternatively, check out [the #janet channel on Freenode](https://webchat.freenode.net/)
## FAQ
### Why is my terminal spitting out junk when I run the repl?
### Why is my terminal spitting out junk when I run the REPL?
Make sure your terminal supports ANSI escape codes. Most modern terminals will
support these, but some older terminals, Windows consoles, or embedded terminals
will not. If your terminal does not support ANSI escape codes, run the repl with
will not. If your terminal does not support ANSI escape codes, run the REPL with
the `-n` flag, which disables color output. You can also try the `-s` if further issues
ensue.

View File

@@ -1,23 +1,22 @@
# Example of dst bytecode assembly
# Fibonacci sequence, implemented with naive recursion.
(def fibasm (asm '{
arity 1
bytecode [
(ltim 1 0 0x2) # $1 = $0 < 2
(jmpif 1 :done) # if ($1) goto :done
(lds 1) # $1 = self
(addim 0 0 -0x1) # $0 = $0 - 1
(push 0) # push($0), push argument for next function call
(call 2 1) # $2 = call($1)
(addim 0 0 -0x1) # $0 = $0 - 1
(push 0) # push($0)
(call 0 1) # $0 = call($1)
(add 0 0 2) # $0 = $0 + $2 (integers)
:done
(ret 0) # return $0
]
}))
(def fibasm
(asm
'{:arity 1
:bytecode @[(ltim 1 0 0x2) # $1 = $0 < 2
(jmpif 1 :done) # if ($1) goto :done
(lds 1) # $1 = self
(addim 0 0 -0x1) # $0 = $0 - 1
(push 0) # push($0), push argument for next function call
(call 2 1) # $2 = call($1)
(addim 0 0 -0x1) # $0 = $0 - 1
(push 0) # push($0)
(call 0 1) # $0 = call($1)
(add 0 0 2) # $0 = $0 + $2 (integers)
:done
(ret 0) # return $0
]}))
# Test it

View File

@@ -0,0 +1,5 @@
(with [conn (net/connect "127.0.0.1" 8000)]
(print "writing abcdefg...")
(:write conn "abcdefg")
(print "reading...")
(printf "got: %v" (:read conn 1024)))

15
examples/echoserve.janet Normal file
View File

@@ -0,0 +1,15 @@
(defn handler
"Simple handler for connections."
[stream]
(defer (:close stream)
(def id (gensym))
(def b @"")
(print "Connection " id "!")
(while (:read stream 1024 b)
(printf " %v -> %v" id b)
(:write stream b)
(buffer/clear b))
(printf "Done %v!" id)
(ev/sleep 0.5)))
(net/server "127.0.0.1" "8000" handler)

View File

@@ -3,7 +3,7 @@
(defn writer [c]
(for i 0 3
(def item (string i ":" (hash c)))
(def item (string i ":" (mod (hash c) 999)))
(ev/sleep 0.1)
(print "writer giving item " item " to " c "...")
(ev/give c item))
@@ -11,8 +11,8 @@
(defn reader [name]
(forever
(def c (ev/select ;channels))
(print "reader " name " got " (ev/take c) " from " c)))
(def [_ c x] (ev/rselect ;channels))
(print "reader " name " got " x " from " c)))
# Readers
(each letter [:a :b :c :d :e :f :g]

37
examples/select2.janet Normal file
View File

@@ -0,0 +1,37 @@
###
### examples/select2.janet
###
### Mix reads and writes in select.
###
(def c1 (ev/chan 40))
(def c2 (ev/chan 40))
(def c3 (ev/chan 40))
(def c4 (ev/chan 40))
(def c5 (ev/chan 4))
(defn worker
[c n x]
(forever
(ev/sleep n)
(ev/give c x)))
(defn writer-worker
[c]
(forever
(ev/sleep 0.2)
(print "writing " (ev/take c))))
(ev/call worker c1 1 :item1)
(ev/sleep 0.2)
(ev/call worker c2 1 :item2)
(ev/sleep 0.1)
(ev/call worker c3 1 :item3)
(ev/sleep 0.2)
(ev/call worker c4 1 :item4)
(ev/sleep 0.1)
(ev/call worker c4 1 :item5)
(ev/call writer-worker c5)
(forever (pp (ev/rselect c1 c2 c3 c4 [c5 :thing])))

View File

@@ -6,7 +6,7 @@
(def b @"")
(print "Connection " id "!")
(while (:read stream 1024 b)
(repeat 10 (print "work for " id " ...") (ev/sleep 1))
(repeat 10 (print "work for " id " ...") (ev/sleep 0.1))
(:write stream b)
(buffer/clear b))
(printf "Done %v!" id)))

View File

@@ -1392,7 +1392,7 @@
arr)
(defn pairs
"Get the values of an associative data structure."
"Get the key-value pairs of an associative data structure."
[x]
(def arr (array/new (length x)))
(var k (next x nil))
@@ -2488,7 +2488,7 @@
(tuple import* (string path) ;argm))
(defmacro use
"Similar to import, but imported bindings are not prefixed with a namespace
"Similar to import, but imported bindings are not prefixed with a module
identifier. Can also import multiple modules in one shot."
[& modules]
~(do ,;(map |~(,import* ,(string $) :prefix "") modules)))
@@ -2722,6 +2722,28 @@
:on-status (or onsignal (make-onsignal env 1))
:source "repl"}))
###
###
### Extras
###
###
(defmacro- guarddef
[sym form]
(if (dyn sym)
form))
(guarddef net/listen
(defn net/server
"Start a server asynchornously with net/listen and net/accept-loop. Returns the new server stream."
[host port &opt handler type]
(def s (net/listen host port type))
(if handler
(ev/go (fn [] (net/accept-loop s handler))))
s))
(undef guarddef)
###
###
### CLI Tool Main
@@ -2786,14 +2808,14 @@
-v : Print the version string
-s : Use raw stdin instead of getline like functionality
-e code : Execute a string of janet
-d : Set the debug flag in the repl
-r : Enter the repl after running all scripts
-p : Keep on executing if there is a top level error (persistent)
-d : Set the debug flag in the REPL
-r : Enter the REPL after running all scripts
-p : Keep on executing if there is a top-level error (persistent)
-q : Hide logo (quiet)
-k : Compile scripts but do not execute (flycheck)
-m syspath : Set system path for loading global modules
-c source output : Compile janet source code into an image
-n : Disable ANSI color output in the repl
-n : Disable ANSI color output in the REPL
-l lib : Import a module before processing more arguments
-- : Stop handling options`)
(os/exit 0)

View File

@@ -988,7 +988,7 @@ static const JanetReg asm_cfuns[] = {
{
"disasm", cfun_disasm,
JDOC("(disasm func &opt field)\n\n"
"Returns assembly that could be used be compile the given function.\n"
"Returns assembly that could be used to compile the given function.\n"
"func must be a function, not a c function. Will throw on error on a badly\n"
"typed argument. If given a field name, will only return that part of the function assembly.\n"
"Possible fields are:\n\n"

View File

@@ -36,6 +36,7 @@
#ifdef JANET_WINDOWS
#include <windows.h>
#include <math.h>
#else
@@ -137,6 +138,7 @@ typedef struct JanetTimeout JanetTimeout;
struct JanetTimeout {
JanetTimestamp when;
JanetFiber *fiber;
uint32_t sched_id;
int is_error;
};
@@ -171,9 +173,7 @@ static int peek_timeout(JanetTimeout *out) {
/* Remove the next timeout from the priority queue */
static void pop_timeout(size_t index) {
if (janet_vm_tq_count <= index) return;
janet_vm_tq[index].fiber->timeout_index = -1;
janet_vm_tq[index] = janet_vm_tq[--janet_vm_tq_count];
janet_vm_tq[index].fiber->timeout_index = (int32_t) index;
for (;;) {
size_t left = (index << 1) + 1;
size_t right = left + 1;
@@ -188,8 +188,6 @@ static void pop_timeout(size_t index) {
JanetTimeout temp = janet_vm_tq[index];
janet_vm_tq[index] = janet_vm_tq[smallest];
janet_vm_tq[smallest] = temp;
janet_vm_tq[index].fiber->timeout_index = (int32_t) index;
janet_vm_tq[smallest].fiber->timeout_index = (int32_t) smallest;
index = smallest;
}
}
@@ -212,10 +210,6 @@ static void add_timeout(JanetTimeout to) {
janet_vm_tq[oldcount] = to;
/* Heapify */
size_t index = oldcount;
if (to.fiber->timeout_index >= 0) {
pop_timeout(to.fiber->timeout_index);
}
to.fiber->timeout_index = (int32_t) index;
while (index > 0) {
size_t parent = (index - 1) >> 1;
if (janet_vm_tq[parent].when <= janet_vm_tq[index].when) break;
@@ -339,10 +333,6 @@ void janet_schedule(JanetFiber *fiber, Janet value) {
void janet_fiber_did_resume(JanetFiber *fiber) {
/* Cancel any pending fibers */
if (fiber->waiting) janet_unlisten(fiber->waiting);
if (fiber->timeout_index >= 0) {
pop_timeout(fiber->timeout_index);
fiber->timeout_index = -1;
}
}
/* Mark all pending tasks */
@@ -404,6 +394,7 @@ void janet_addtimeout(double sec) {
JanetTimeout to;
to.when = ts_delta(ts_now(), sec);
to.fiber = fiber;
to.sched_id = fiber->sched_id;
to.is_error = 1;
add_timeout(to);
}
@@ -414,9 +405,9 @@ typedef struct {
JanetFiber *fiber;
uint32_t sched_id;
enum {
JANET_CP_MODE_NONE,
JANET_CP_MODE_ITEM,
JANET_CP_MODE_SELECT
JANET_CP_MODE_CHOICE_READ,
JANET_CP_MODE_CHOICE_WRITE
} mode;
} JanetChannelPending;
@@ -497,11 +488,24 @@ static int janet_chanat_mark(void *p, size_t s) {
return 0;
}
/* Channel Methods */
static Janet make_write_result(JanetChannel *channel) {
Janet *tup = janet_tuple_begin(2);
tup[0] = janet_ckeywordv("give");
tup[1] = janet_wrap_abstract(channel);
return janet_wrap_tuple(janet_tuple_end(tup));
}
static Janet cfun_channel_push(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
static Janet make_read_result(JanetChannel *channel, Janet x) {
Janet *tup = janet_tuple_begin(3);
tup[0] = janet_ckeywordv("take");
tup[1] = janet_wrap_abstract(channel);
tup[2] = x;
return janet_wrap_tuple(janet_tuple_end(tup));
}
/* Push a value to a channel, and return 1 if channel should block, zero otherwise.
* If the push would block, will add to the write_pending queue in the channel. */
static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice) {
JanetChannelPending reader;
int is_empty;
do {
@@ -509,66 +513,113 @@ static Janet cfun_channel_push(int32_t argc, Janet *argv) {
} while (!is_empty && (reader.sched_id != reader.fiber->sched_id));
if (is_empty) {
/* No pending reader */
if (janet_q_push(&channel->items, argv + 1, sizeof(Janet))) {
janet_panicf("channel overflow: %v", argv[1]);
if (janet_q_push(&channel->items, &x, sizeof(Janet))) {
janet_panicf("channel overflow: %v", x);
} else if (janet_q_count(&channel->items) > channel->limit) {
/* Pushed successfully, but should block. */
JanetChannelPending pending;
pending.fiber = janet_vm_root_fiber,
pending.sched_id = janet_vm_root_fiber->sched_id,
pending.mode = JANET_CP_MODE_ITEM;
pending.mode = is_choice ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_ITEM;
janet_q_push(&channel->write_pending, &pending, sizeof(pending));
return 1;
}
} else {
/* Pending reader */
if (reader.mode == JANET_CP_MODE_SELECT) {
janet_q_push(&channel->items, argv + 1, sizeof(Janet));
janet_schedule(reader.fiber, argv[0]);
if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
janet_schedule(reader.fiber, make_read_result(channel, x));
} else {
janet_schedule(reader.fiber, argv[1]);
janet_schedule(reader.fiber, x);
}
}
return 0;
}
/* Pop from a channel - returns 1 if item was obtain, 0 otherwise. The item
* is returned by reference. If the pop would block, will add to the read_pending
* queue in the channel. */
static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) {
JanetChannelPending writer;
if (janet_q_pop(&channel->items, item, sizeof(Janet))) {
/* Queue empty */
JanetChannelPending pending;
pending.fiber = janet_vm_root_fiber,
pending.sched_id = janet_vm_root_fiber->sched_id;
pending.mode = is_choice ? JANET_CP_MODE_CHOICE_READ : JANET_CP_MODE_ITEM;
janet_q_push(&channel->read_pending, &pending, sizeof(pending));
return 0;
}
if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
/* pending writer */
if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) {
janet_schedule(writer.fiber, make_write_result(channel));
} else {
janet_schedule(writer.fiber, janet_wrap_abstract(channel));
}
}
return 1;
}
/* Channel Methods */
static Janet cfun_channel_push(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
if (janet_channel_push(channel, argv[1], 0)) {
janet_await();
}
return argv[0];
}
static Janet cfun_channel_pop(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
Janet item = janet_wrap_nil();
JanetChannelPending writer;
if (janet_q_pop(&channel->items, &item, sizeof(item))) {
/* Queue empty */
JanetChannelPending pending;
pending.fiber = janet_vm_root_fiber,
pending.sched_id = janet_vm_root_fiber->sched_id;
pending.mode = JANET_CP_MODE_ITEM;
janet_q_push(&channel->read_pending, &pending, sizeof(pending));
janet_await();
}
janet_schedule(janet_vm_root_fiber, item);
if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
/* Got item, and there are pending writers. This means we should
* schedule one. */
janet_schedule(writer.fiber, argv[0]);
Janet item;
if (janet_channel_pop(channel, &item, 0)) {
janet_schedule(janet_vm_root_fiber, item);
}
janet_await();
}
static Janet cfun_channel_select(int32_t argc, Janet *argv) {
static Janet cfun_channel_choice(int32_t argc, Janet *argv) {
janet_arity(argc, 1, -1);
int32_t len;
const Janet *data;
/* Check channels for immediate reads and writes */
for (int32_t i = 0; i < argc; i++) {
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
if (chan->items.head != chan->items.tail) return argv[i];
if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
/* Write */
JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT);
if (janet_q_count(&chan->items) < chan->limit) {
janet_channel_push(chan, data[1], 1);
return make_write_result(chan);
}
} else {
/* Read */
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
if (chan->items.head != chan->items.tail) {
Janet item;
janet_channel_pop(chan, &item, 1);
return make_read_result(chan, item);
}
}
}
/* None of the channels have data, so we wait on all of them. */
/* Wait for all readers or writers */
for (int32_t i = 0; i < argc; i++) {
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
JanetChannelPending pending;
pending.fiber = janet_vm_root_fiber,
pending.sched_id = janet_vm_root_fiber->sched_id;
pending.mode = JANET_CP_MODE_SELECT;
janet_q_push(&chan->read_pending, &pending, sizeof(pending));
if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
/* Write */
JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT);
janet_channel_push(chan, data[1], 1);
} else {
/* Read */
Janet item;
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
janet_channel_pop(chan, &item, 1);
}
}
janet_await();
}
@@ -590,15 +641,19 @@ static Janet cfun_channel_count(int32_t argc, Janet *argv) {
return janet_wrap_integer(janet_q_count(&channel->items));
}
static Janet cfun_channel_rselect(int32_t argc, Janet *argv) {
/* Fisher yates shuffle of arguments to get fairness */
/* Fisher yates shuffle of arguments to get fairness */
static void fisher_yates_args(int32_t argc, Janet *argv) {
for (int32_t i = argc; i > 1; i--) {
int32_t swap_index = janet_rng_u32(&janet_vm_ev_rng) % i;
Janet temp = argv[swap_index];
argv[swap_index] = argv[i - 1];
argv[i - 1] = temp;
}
return cfun_channel_select(argc, argv);
}
static Janet cfun_channel_rchoice(int32_t argc, Janet *argv) {
fisher_yates_args(argc, argv);
return cfun_channel_choice(argc, argv);
}
static Janet cfun_channel_new(int32_t argc, Janet *argv) {
@@ -611,26 +666,45 @@ static Janet cfun_channel_new(int32_t argc, Janet *argv) {
/* Main event loop */
void janet_loop1_impl(void);
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout);
void janet_loop1(void) {
/* Schedule expired timers */
JanetTimeout to;
JanetTimestamp now = ts_now();
while (peek_timeout(&to) && to.when <= now) {
pop_timeout(0);
if (to.fiber->sched_id == to.sched_id) {
if (to.is_error) {
janet_cancel(to.fiber, janet_cstringv("timeout"));
} else {
janet_schedule(to.fiber, janet_wrap_nil());
}
}
}
/* Run scheduled fibers */
while (janet_vm_spawn.head != janet_vm_spawn.tail) {
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK};
janet_q_pop(&janet_vm_spawn, &task, sizeof(task));
run_one(task.fiber, task.value, task.sig);
}
/* Poll for events */
if (janet_vm_active_listeners || janet_vm_tq_count) {
JanetTimeout to;
memset(&to, 0, sizeof(to));
int has_timeout;
/* Drop timeouts that are no longer needed */
while ((has_timeout = peek_timeout(&to)) && to.fiber->sched_id != to.sched_id) {
pop_timeout(0);
}
/* Run polling implementation */
janet_loop1_impl(has_timeout, to.when);
}
}
void janet_loop(void) {
while (janet_vm_active_listeners || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) {
/* Run expired timers */
JanetTimeout to;
while (peek_timeout(&to) && to.when <= ts_now()) {
pop_timeout(0);
janet_schedule(to.fiber, janet_wrap_nil());
}
/* Run scheduled fibers */
while (janet_vm_spawn.head != janet_vm_spawn.tail) {
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK};
janet_q_pop(&janet_vm_spawn, &task, sizeof(task));
run_one(task.fiber, task.value, task.sig);
}
/* Poll for events */
if (janet_vm_active_listeners || janet_vm_tq_count) {
janet_loop1_impl();
}
janet_loop1();
}
}
@@ -655,26 +729,23 @@ void janet_ev_deinit(void) {
}
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) {
/* Add the handle to the io completion port if not already added */
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user);
/* TODO - associate IO operation with listener state somehow
* maybe we could require encoding the operation in a mask. */
/* on windows, janet_listen does not actually start any listening behavior. */
if (!(pollable->flags & JANET_POLL_FLAG_IOCP)) {
if (NULL == CreateIoCompletionPort(pollable->handle, janet_vm_iocp, (ULONG_PTR) pollable, 0)) {
janet_panic("failed to listen for events");
}
pollable->flags |= JANET_POLL_FLAG_IOCP;
}
return state;
}
static void janet_unlisten(JanetListenerState *state) {
/* We don't necessarily want to cancel all io on this pollable */
janet_unlisten_impl(state);
}
void janet_loop1_impl(void) {
/* Check for timeout */
JanetTimeout to;
memset(&to, 0, sizeof(to));
int has_timeout = peek_timeout(&to);
void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
ULONG_PTR completionKey = 0;
DWORD num_bytes_transfered = 0;
LPOVERLAPPED overlapped;
@@ -683,42 +754,40 @@ void janet_loop1_impl(void) {
uint64_t waittime;
if (has_timeout) {
JanetTimestamp now = ts_now();
if (now > to.when) {
if (now > to) {
waittime = 0;
} else {
waittime = (uint64_t) (to.when - now);
waittime = (uint64_t)(to - now);
}
} else {
waittime = INFINITE;
}
BOOL result = GetQueuedCompletionStatus(janet_vm_iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime);
BOOL result = GetQueuedCompletionStatus(janet_vm_iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime);
if (!result) {
/* timeout ? */
if (has_timeout) {
/* Timer event */
pop_timeout(0);
/* Cancel waiters for this fiber */
if (to.is_error) {
janet_cancel(to.fiber, janet_cstringv("timeout"));
} else {
janet_schedule(to.fiber, janet_wrap_nil());
}
} else {
JANET_EXIT("failed to get iocp GetQueuedCompletionStatus");
if (!result) {
if (!has_timeout) {
/* queue emptied */
}
} else {
JANET_EXIT("Unexpected event");
} else {
/* Normal event */
JanetListenerState *state = (JanetListenerState *) completionKey;
state->event = overlapped;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state);
JanetPollable *pollable = (JanetPollable *) completionKey;
JanetListenerState *state = pollable->state;
while (state != NULL) {
if (state->tag == overlapped) {
state->event = overlapped;
state->bytes = num_bytes_transfered;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state);
}
break;
} else {
state = state->_next;
}
}
}
}
#elif defined(JANET_EV_POLL)
/*
@@ -786,17 +855,12 @@ static void janet_unlisten(JanetListenerState *state) {
}
#define JANET_EPOLL_MAX_EVENTS 64
void janet_loop1_impl(void) {
/* Set timer */
JanetTimeout to;
struct itimerspec its;
memset(&to, 0, sizeof(to));
int has_timeout = peek_timeout(&to);
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
if (janet_vm_timer_enabled || has_timeout) {
memset(&its, 0, sizeof(its));
if (has_timeout) {
its.it_value.tv_sec = to.when / 1000;
its.it_value.tv_nsec = (to.when % 1000) * 1000000;
its.it_value.tv_sec = timeout / 1000;
its.it_value.tv_nsec = (timeout % 1000) * 1000000;
}
timerfd_settime(janet_vm_timerfd, TFD_TIMER_ABSTIME, &its, NULL);
}
@@ -815,17 +879,7 @@ void janet_loop1_impl(void) {
/* Step state machines */
for (int i = 0; i < ready; i++) {
JanetPollable *pollable = events[i].data.ptr;
if (NULL == pollable) {
/* Timer event */
pop_timeout(0);
/* Cancel waiters for this fiber */
if (to.is_error) {
janet_cancel(to.fiber, janet_cstringv("timeout"));
} else {
janet_schedule(to.fiber, janet_wrap_nil());
}
} else {
/* Normal event */
if (NULL != pollable) { /* If NULL, is a timeout */
int mask = events[i].events;
JanetListenerState *state = pollable->state;
state->event = events + i;
@@ -940,18 +994,13 @@ static void janet_unlisten(JanetListenerState *state) {
janet_unlisten_impl(state);
}
void janet_loop1_impl(void) {
/* Set timer */
JanetTimeout to;
memset(&to, 0, sizeof(to));
int has_timeout = peek_timeout(&to);
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
/* Poll for events */
int ready;
do {
if (has_timeout) {
int64_t diff = to.when - ts_now();
ready = poll(janet_vm_fds, janet_vm_fdcount, diff < 0 ? 0 : (int) diff);
JanetTimestamp now = ts_now();
ready = poll(janet_vm_fds, janet_vm_fdcount, now > timeout ? 0 : (int)(timeout - now));
} else {
ready = poll(janet_vm_fds, janet_vm_fdcount, -1);
}
@@ -961,10 +1010,8 @@ void janet_loop1_impl(void) {
}
/* Step state machines */
int did_handle_something = 0;
for (size_t i = 0; i < janet_vm_fdcount; i++) {
struct pollfd *pfd = janet_vm_fds + i;
did_handle_something |= pfd->revents;
/* Skip fds where nothing interesting happened */
if (!(pfd->revents & (pfd->events | POLLHUP | POLLERR | POLLNVAL))) continue;
JanetListenerState *state = janet_vm_listener_map[i];
@@ -980,19 +1027,6 @@ void janet_loop1_impl(void) {
if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state);
}
/* If nothing was handled and poll returned, then we know that it timedout and we should trigger
* one of our timers. */
if (!did_handle_something) {
/* Timer event */
pop_timeout(0);
/* Cancel waiters for this fiber */
if (to.is_error) {
janet_cancel(to.fiber, janet_cstringv("timeout"));
} else {
janet_schedule(to.fiber, janet_wrap_nil());
}
}
}
void janet_ev_init(void) {
@@ -1041,6 +1075,7 @@ static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
to.when = ts_delta(ts_now(), sec);
to.fiber = janet_vm_root_fiber;
to.is_error = 0;
to.sched_id = to.fiber->sched_id;
add_timeout(to);
janet_await();
}
@@ -1102,25 +1137,24 @@ static const JanetReg ev_cfuns[] = {
JDOC("(ev/count channel)\n\n"
"Get the number of items currently waiting in a channel.")
},
{
"ev/select", cfun_channel_select,
JDOC("(ev/select & channels)\n\n"
"Get a channel that is not empty, suspending the current fiber until at least one channel "
"is not empty. Will prefer channels in the order they are passed as arguments (ordered choice). "
"Returns a non-empty channel.")
},
{
"ev/rselect", cfun_channel_rselect,
JDOC("(ev/rselect & channels)\n\n"
"Get a channel that is not empty, suspending the current fiber until at least one channel "
"is not empty. Will prefer channels in a random order (random choice). "
"Returns a non-empty channel.")
},
{
"ev/cancel", cfun_ev_cancel,
JDOC("(ev/cancel fiber err)\n\n"
"Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately")
},
{
"ev/select", cfun_channel_choice,
JDOC("(ev/select & clauses)\n\n"
"Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where "
"a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for "
"a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first "
"clauses will take precedence over later clauses.")
},
{
"ev/rselect", cfun_channel_rchoice,
JDOC("(ev/rselect & clauses)\n\n"
"Similar to ev/choice, but will try clauses in a random order for fairness.")
},
{NULL, NULL, NULL}
};

View File

@@ -39,7 +39,6 @@ static void fiber_reset(JanetFiber *fiber) {
fiber->env = NULL;
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->timeout_index = -1;
fiber->sched_id = 0;
#endif
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
@@ -109,12 +108,15 @@ static void janet_fiber_refresh_memory(JanetFiber *fiber) {
/* Ensure that the fiber has enough extra capacity */
void janet_fiber_setcapacity(JanetFiber *fiber, int32_t n) {
int32_t old_size = fiber->capacity;
int32_t diff = n - old_size;
Janet *newData = realloc(fiber->data, sizeof(Janet) * n);
if (NULL == newData) {
JANET_OUT_OF_MEMORY;
}
fiber->data = newData;
fiber->capacity = n;
janet_vm_next_collection += sizeof(Janet) * diff;
}
/* Grow fiber if needed */

View File

@@ -777,7 +777,7 @@ static const JanetReg io_cfuns[] = {
#ifndef JANET_NO_PROCESSES
{
"file/popen", cfun_io_popen,
JDOC("(file/popen path &opt mode)\n\n"
JDOC("(file/popen command &opt mode)\n\n"
"Open a file that is backed by a process. The file must be opened in either "
"the :r (read) or the :w (write) mode. In :r mode, the stdout of the "
"process can be read from the file. In :w mode, the stdin of the process "

View File

@@ -934,8 +934,10 @@ static const uint8_t *unmarshal_one_fiber(
fiber->data = NULL;
fiber->child = NULL;
fiber->env = NULL;
#ifdef JANET_EV
fiber->waiting = NULL;
fiber->timeout_index = -1;
fiber->sched_id = 0;
#endif
/* Push fiber to seen stack */
janet_v_push(st->lookup, janet_wrap_fiber(fiber));

View File

@@ -32,6 +32,7 @@
#include <winsock2.h>
#include <windows.h>
#include <ws2tcpip.h>
#include <mswsock.h>
#pragma comment (lib, "Ws2_32.lib")
#pragma comment (lib, "Mswsock.lib")
#pragma comment (lib, "Advapi32.lib")
@@ -42,9 +43,11 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <fcntl.h>
#include <math.h>
#endif
/*
@@ -75,6 +78,7 @@ static const JanetAbstractType AddressAT = {
};
#ifdef JANET_WINDOWS
#define JANET_NET_CHUNKSIZE 4096
#define JSOCKCLOSE(x) closesocket((SOCKET) x)
#define JSOCKDEFAULT INVALID_SOCKET
#define JLASTERR WSAGetLastError()
@@ -94,6 +98,28 @@ static JanetStream *make_stream(SOCKET fd, uint32_t flags) {
stream->flags = flags;
return stream;
}
static Janet net_lasterr(void) {
int code = WSAGetLastError();
char msgbuf[256];
msgbuf[0] = '\0';
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
code,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
msgbuf,
sizeof(msgbuf),
NULL);
if (!*msgbuf) sprintf(msgbuf, "%d", code);
char *c = msgbuf;
while (*c) {
if (*c == '\n' || *c == '\r') {
*c = '\0';
break;
}
c++;
}
return janet_cstringv(msgbuf);
}
#else
#define JSOCKCLOSE(x) close(x)
#define JSOCKDEFAULT 0
@@ -122,6 +148,9 @@ static JanetStream *make_stream(int fd, uint32_t flags) {
stream->flags = flags;
return stream;
}
static Janet net_lasterr(void) {
return janet_cstringv(strerror(errno));
}
#endif
/* We pass this flag to all send calls to prevent sigpipe */
@@ -169,7 +198,12 @@ typedef struct {
int is_recv_from;
#ifdef JANET_WINDOWS
WSAOVERLAPPED overlapped;
uint8_t chunk_buf[2048];
WSABUF wbuf;
DWORD flags;
int32_t chunk_size;
struct sockaddr from;
int fromlen;
uint8_t chunk_buf[JANET_NET_CHUNKSIZE];
#endif
} NetStateRead;
@@ -187,6 +221,47 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when read finished */
if (s->bytes == 0 && !state->is_recv_from) {
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
}
janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes);
state->bytes_left -= s->bytes;
if (state->bytes_left <= 0 || !state->is_chunk) {
Janet resume_val;
if (state->is_recv_from) {
void *abst = janet_abstract(&AddressAT, state->fromlen);
memcpy(abst, &state->from, state->fromlen);
resume_val = janet_wrap_abstract(abst);
} else {
resume_val = janet_wrap_buffer(state->buf);
}
janet_schedule(s->fiber, resume_val);
return JANET_ASYNC_STATUS_DONE;
}
}
/* fallthrough */
case JANET_ASYNC_EVENT_USER: {
state->flags = 0;
int32_t chunk_size = state->bytes_left > JANET_NET_CHUNKSIZE ? JANET_NET_CHUNKSIZE : state->bytes_left;
state->wbuf.len = (ULONG) chunk_size;
state->wbuf.buf = state->chunk_buf;
state->chunk_size = chunk_size;
s->tag = &state->overlapped;
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
int status;
if (state->is_recv_from) {
status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
} else {
status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL);
}
if (status && WSA_IO_PENDING != WSAGetLastError()) {
janet_cancel(s->fiber, net_lasterr());
return JANET_ASYNC_STATUS_DONE;
}
}
break;
#else
@@ -207,8 +282,18 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0);
}
} while (nread == -1 && JLASTERR == JEINTR);
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) {
break;
/* Check for errors - special case errors that can just be waited on to fix */
if (nread == -1) {
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
janet_cancel(s->fiber, net_lasterr());
return JANET_ASYNC_STATUS_DONE;
}
/* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
if (nread == 0 && !state->is_recv_from) {
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
}
/* Increment buffer counts */
@@ -222,23 +307,15 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
/* Resume if done */
if (!state->is_chunk || bytes_left == 0) {
JanetSignal sig = JANET_SIGNAL_OK;
Janet resume_val;
if (state->is_recv_from) {
void *abst = janet_abstract(&AddressAT, socklen);
memcpy(abst, &saddr, socklen);
resume_val = janet_wrap_abstract(abst);
} else {
if (nread > 0) {
resume_val = janet_wrap_buffer(buffer);
} else {
sig = JANET_SIGNAL_ERROR;
resume_val = (nread == -1)
? janet_cstringv(strerror(JLASTERR))
: janet_cstringv("could not read");
}
resume_val = janet_wrap_buffer(buffer);
}
janet_schedule_signal(s->fiber, resume_val, sig);
janet_schedule(s->fiber, resume_val);
return JANET_ASYNC_STATUS_DONE;
}
}
@@ -255,10 +332,7 @@ JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *b
state->buf = buf;
state->bytes_left = nbytes;
state->is_recv_from = 0;
#ifdef JANET_WINDOWS
WSARecv((SOCKET) stream->handle,
#endif
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
@@ -269,6 +343,7 @@ JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *
state->buf = buf;
state->bytes_left = nbytes;
state->is_recv_from = 0;
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
@@ -279,6 +354,7 @@ JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuff
state->buf = buf;
state->bytes_left = nbytes;
state->is_recv_from = 1;
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
@@ -295,6 +371,11 @@ typedef struct {
int32_t start;
int is_buffer;
void *dest_abst;
#ifdef JANET_WINDOWS
WSAOVERLAPPED overlapped;
WSABUF wbuf;
DWORD flags;
#endif
} NetStateWrite;
JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
@@ -314,13 +395,55 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
janet_cancel(s->fiber, janet_cstringv("stream closed"));
return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_INIT: {
/* Begin write */
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when write finished */
if (s->bytes == 0 && !state->dest_abst) {
janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE;
}
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
}
break;
case JANET_ASYNC_EVENT_COMPLETE: {
case JANET_ASYNC_EVENT_USER: {
/* Begin write */
int32_t start, len;
const uint8_t *bytes;
start = state->start;
if (state->is_buffer) {
/* If buffer, convert to string. */
/* TODO - be more efficient about this */
JanetBuffer *buffer = state->src.buf;
JanetString str = janet_string(buffer->data, buffer->count);
bytes = str;
len = buffer->count;
state->is_buffer = 0;
state->src.str = str;
} else {
bytes = state->src.str;
len = janet_string_length(bytes);
}
state->wbuf.buf = (char *) bytes;
state->wbuf.len = len;
state->flags = 0;
s->tag = &state->overlapped;
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
/* Called when write finished */
int status;
SOCKET sock = (SOCKET) s->pollable->handle;
if (state->dest_abst) {
const struct sockaddr *to = state->dest_abst;
int tolen = (int) janet_abstract_size((void *) to);
status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
} else {
status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL);
}
if (status && WSA_IO_PENDING != WSAGetLastError()) {
janet_cancel(s->fiber, janet_cstringv("failed to write to stream"));
return JANET_ASYNC_STATUS_DONE;
}
}
break;
#else
@@ -339,8 +462,8 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
JReadInt nwrote = 0;
if (start < len) {
int32_t nbytes = len - start;
void *dest_abst = state->dest_abst;
do {
void *dest_abst = state->dest_abst;
if (dest_abst) {
nwrote = sendto(s->pollable->handle, bytes + start, nbytes, 0,
(struct sockaddr *) dest_abst, janet_abstract_size(dest_abst));
@@ -348,6 +471,20 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL);
}
} while (nwrote == -1 && JLASTERR == JEINTR);
/* Handle write errors */
if (nwrote == -1) {
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
janet_cancel(s->fiber, net_lasterr());
return JANET_ASYNC_STATUS_DONE;
}
/* Unless using datagrams, empty message is a disconnect */
if (nwrote == 0 && !dest_abst) {
janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE;
}
if (nwrote > 0) {
start += nwrote;
} else {
@@ -356,13 +493,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
}
state->start = start;
if (start >= len) {
if (nwrote > 0) {
janet_schedule(s->fiber, janet_wrap_nil());
} else if (nwrote == 0) {
janet_cancel(s->fiber, janet_cstringv("could not write"));
} else {
janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR)));
}
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
}
break;
@@ -380,6 +511,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB
state->start = 0;
state->src.buf = buf;
state->dest_abst = dest_abst;
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
@@ -391,6 +523,7 @@ JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, co
state->start = 0;
state->src.str = str;
state->dest_abst = dest_abst;
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
janet_await();
}
@@ -398,96 +531,150 @@ JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, co
* State machine for simple server
*/
typedef struct {
JanetListenerState head;
JanetFunction *function;
} NetStateSimpleServer;
JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEvent event) {
NetStateSimpleServer *state = (NetStateSimpleServer *) s;
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_INIT:
/* We know the pollable will be a stream */
janet_gcroot(janet_wrap_abstract(s->pollable));
#ifdef JANET_WINDOWS
/* requires some more setup code */
#endif
break;
case JANET_ASYNC_EVENT_MARK:
janet_mark(janet_wrap_function(state->function));
break;
case JANET_ASYNC_EVENT_CLOSE:
janet_gcunroot(janet_wrap_abstract(s->pollable));
return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when ever we get an IOCP event */
}
break;
#else
case JANET_ASYNC_EVENT_READ: {
JSock connfd = accept(s->pollable->handle, NULL, NULL);
if (JSOCKVALID(connfd)) {
/* Made a new connection socket */
nosigpipe(connfd);
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
Janet streamv = janet_wrap_abstract(stream);
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
janet_schedule(fiber, janet_wrap_nil());
}
break;
}
#endif
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
/* State machine for accepting connections. */
#ifdef JANET_WINDOWS
typedef struct {
JanetListenerState head;
WSAOVERLAPPED overlapped;
JanetFunction *function;
JanetStream *lstream;
JanetStream *astream;
char buf[1024];
} NetStateAccept;
static int net_sched_accept_impl(NetStateAccept *state, Janet *err);
JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) {
NetStateAccept *state = (NetStateAccept *)s;
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_MARK: {
if (state->lstream) janet_mark(janet_wrap_abstract(state->lstream));
if (state->astream) janet_mark(janet_wrap_abstract(state->astream));
if (state->function) janet_mark(janet_wrap_abstract(state->function));
break;
}
case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed"));
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_INIT: {
}
break;
case JANET_ASYNC_EVENT_COMPLETE: {
int seconds;
int bytes = sizeof(seconds);
if (NO_ERROR != getsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_CONNECT_TIME,
(char *)&seconds, &bytes)) {
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
return JANET_ASYNC_STATUS_DONE;
}
if (NO_ERROR != setsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char *) & (state->lstream->handle), sizeof(SOCKET))) {
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
return JANET_ASYNC_STATUS_DONE;
}
}
break;
#else
case JANET_ASYNC_EVENT_READ: {
JSock connfd = accept(s->pollable->handle, NULL, NULL);
if (JSOCKVALID(connfd)) {
nosigpipe(connfd);
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
Janet streamv = janet_wrap_abstract(stream);
Janet streamv = janet_wrap_abstract(state->astream);
if (state->function) {
/* Schedule worker */
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
janet_schedule(fiber, janet_wrap_nil());
/* Now listen again for next connection */
Janet err;
if (net_sched_accept_impl(state, &err)) {
janet_cancel(s->fiber, err);
return JANET_ASYNC_STATUS_DONE;
}
} else {
janet_schedule(s->fiber, streamv);
return JANET_ASYNC_STATUS_DONE;
}
break;
}
#endif
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) {
janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
Janet err;
SOCKET lsock = (SOCKET) stream->handle;
JanetListenerState *s = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
NetStateAccept *state = (NetStateAccept *)s;
memset(&state->overlapped, 0, sizeof(WSAOVERLAPPED));
memset(&state->buf, 0, 1024);
state->function = fun;
state->lstream = stream;
s->tag = &state->overlapped;
if (net_sched_accept_impl(state, &err)) janet_panicv(err);
janet_await();
}
static int net_sched_accept_impl(NetStateAccept *state, Janet *err) {
SOCKET lsock = (SOCKET) state->lstream->handle;
SOCKET asock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (asock == INVALID_SOCKET) {
*err = net_lasterr();
return 1;
}
JanetStream *astream = make_stream(asock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
state->astream = astream;
int socksize = sizeof(SOCKADDR_STORAGE) + 16;
if (FALSE == AcceptEx(lsock, asock, state->buf, 0, socksize, socksize, NULL, &state->overlapped)) {
int code = WSAGetLastError();
if (code == WSA_IO_PENDING) return 0; /* indicates io is happening async */
*err = net_lasterr();
return 1;
}
return 0;
}
#else
typedef struct {
JanetListenerState head;
JanetFunction *function;
} NetStateAccept;
JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) {
NetStateAccept *state = (NetStateAccept *)s;
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_MARK: {
if (state->function) janet_mark(janet_wrap_function(state->function));
break;
}
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_READ: {
JSock connfd = accept(s->pollable->handle, NULL, NULL);
if (JSOCKVALID(connfd)) {
nosigpipe(connfd);
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
Janet streamv = janet_wrap_abstract(stream);
if (state->function) {
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
janet_schedule(fiber, janet_wrap_nil());
} else {
janet_schedule(s->fiber, streamv);
return JANET_ASYNC_STATUS_DONE;
}
}
break;
}
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
NetStateAccept *state = (NetStateAccept *) janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
state->function = fun;
janet_await();
}
#endif
/* Adress info */
static int janet_get_sockettype(Janet *argv, int32_t argc, int32_t n) {
@@ -513,7 +700,16 @@ static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset, int sock
JANET_OUT_OF_MEMORY;
}
saddr->sun_family = AF_UNIX;
snprintf(saddr->sun_path, 108, "%s", path);
memset(&saddr->sun_path, 0, 108);
#ifdef JANET_LINUX
if (path[0] == '@') {
saddr->sun_path[0] = '\0';
snprintf(saddr->sun_path + 1, 107, "%s", path + 1);
} else
#endif
{
snprintf(saddr->sun_path, 108, "%s", path);
}
*is_unix = 1;
return (struct addrinfo *) saddr;
}
@@ -608,7 +804,11 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
{
struct addrinfo *rp = NULL;
for (rp = ai; rp != NULL; rp = rp->ai_next) {
#ifdef JANET_WINDOWS
sock = WSASocketW(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
#else
sock = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol);
#endif
if (JSOCKVALID(sock)) {
addr = rp->ai_addr;
addrlen = (socklen_t) rp->ai_addrlen;
@@ -655,13 +855,11 @@ static const char *serverify_socket(JSock sfd) {
return NULL;
}
static Janet cfun_net_server(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 4);
static Janet cfun_net_listen(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 3);
/* Get host, port, and handler*/
JanetFunction *fun = janet_optfunction(argv, argc, 2, NULL);
int socktype = janet_get_sockettype(argv, argc, 3);
int socktype = janet_get_sockettype(argv, argc, 2);
int is_unix = 0;
struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 1, &is_unix);
@@ -686,7 +884,11 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
/* Check all addrinfos in a loop for the first that we can bind to. */
struct addrinfo *rp = NULL;
for (rp = ai; rp != NULL; rp = rp->ai_next) {
#ifdef JANET_WINDOWS
sfd = WSASocketW(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
#else
sfd = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol);
#endif
if (!JSOCKVALID(sfd)) continue;
const char *err = serverify_socket(sfd);
if (NULL != err) {
@@ -707,15 +909,8 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
if (socktype == SOCK_DGRAM) {
/* Datagram server (UDP) */
if (NULL == fun) {
/* Server no handler */
JanetStream *stream = make_stream(sfd, JANET_STREAM_UDPSERVER | JANET_STREAM_READABLE);
return janet_wrap_abstract(stream);
} else {
/* Server with handler */
janet_panic("handler must be nil for datagram server");
}
JanetStream *stream = make_stream(sfd, JANET_STREAM_UDPSERVER | JANET_STREAM_READABLE);
return janet_wrap_abstract(stream);
} else {
/* Stream server (TCP) */
@@ -727,17 +922,8 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
}
/* Put sfd on our loop */
if (NULL == fun) {
JanetStream *stream = make_stream(sfd, JANET_STREAM_ACCEPTABLE);
return janet_wrap_abstract(stream);
} else {
/* Server with handler */
JanetStream *stream = make_stream(sfd, 0);
NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server,
JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer), NULL);
ss->function = fun;
return janet_wrap_abstract(stream);
}
JanetStream *stream = make_stream(sfd, JANET_STREAM_ACCEPTABLE);
return janet_wrap_abstract(stream);
}
}
@@ -752,13 +938,21 @@ static void check_stream_flag(JanetStream *stream, int flag) {
}
}
static Janet cfun_stream_accept_loop(int32_t argc, Janet *argv) {
janet_fixarity(argc, 2);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE);
JanetFunction *fun = janet_getfunction(argv, 1);
janet_sched_accept(stream, fun);
}
static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
janet_arity(argc, 1, 2);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE);
double to = janet_optnumber(argv, argc, 1, INFINITY);
if (to != INFINITY) janet_addtimeout(to);
janet_sched_accept(stream);
janet_sched_accept(stream, NULL);
}
static Janet cfun_stream_read(int32_t argc, Janet *argv) {
@@ -801,6 +995,12 @@ static Janet cfun_stream_close(int32_t argc, Janet *argv) {
return janet_wrap_nil();
}
static Janet cfun_stream_closed(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
return janet_wrap_boolean(stream->flags & JANET_POLL_FLAG_CLOSED);
}
static Janet cfun_stream_write(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 3);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
@@ -847,10 +1047,12 @@ static Janet cfun_stream_flush(int32_t argc, Janet *argv) {
static const JanetMethod stream_methods[] = {
{"chunk", cfun_stream_chunk},
{"close", cfun_stream_close},
{"closed?", cfun_stream_closed},
{"read", cfun_stream_read},
{"write", cfun_stream_write},
{"flush", cfun_stream_flush},
{"accept", cfun_stream_accept},
{"accept-loop", cfun_stream_accept_loop},
{"send-to", cfun_stream_send_to},
{"recv-from", cfun_stream_recv_from},
{NULL, NULL}
@@ -867,17 +1069,19 @@ static const JanetReg net_cfuns[] = {
"net/address", cfun_net_sockaddr,
JDOC("(net/address host port &opt type)\n\n"
"Look up the connection information for a given hostname, port, and connection type. Returns "
"a handle that can be used to send datagrams over network without establishing a connection.")
"a handle that can be used to send datagrams over network without establishing a connection. "
"On Posix platforms, you can use :unix for host to connect to a unix domain socket, where the name is "
"given in the port argument. On Linux, abstract "
"unix domain sockets are specified with a leading '@' character in port.")
},
{
"net/server", cfun_net_server,
JDOC("(net/server host port &opt handler type)\n\n"
"Start a TCP server. handler is a function that will be called with a stream "
"on each connection to the server. Returns a new stream that is neither readable nor "
"writeable. If handler is nil or not provided, net/accept must be used to get the next connection "
"to the server. The type parameter specifies the type of network connection, either "
"a stream (usually tcp), or datagram (usually udp). If not specified, the default is "
"stream.")
"net/listen", cfun_net_listen,
JDOC("(net/listen host port &opt type)\n\n"
"Creates a server. Returns a new stream that is neither readable nor "
"writeable. Use net/accept or net/accept-loop be to handle connections and start the server."
"The type parameter specifies the type of network connection, either "
"a :stream (usually tcp), or :datagram (usually udp). If not specified, the default is "
":stream. The host and port arguments are the same as in net/address.")
},
{
"net/accept", cfun_stream_accept,
@@ -886,6 +1090,12 @@ static const JanetReg net_cfuns[] = {
"Takes an optional timeout in seconds, after which will return nil. "
"Returns a new duplex stream which represents a connection to the client.")
},
{
"net/accept-loop", cfun_stream_accept_loop,
JDOC("(net/accept-loop stream handler)\n\n"
"Shorthand for running a server stream that will continuously accept new connections."
"Blocks the current fiber until the stream is closed, and will return the stream.")
},
{
"net/read", cfun_stream_read,
JDOC("(net/read stream nbytes &opt buf timeout)\n\n"
@@ -931,6 +1141,11 @@ static const JanetReg net_cfuns[] = {
JDOC("(net/close stream)\n\n"
"Close a stream so that no further communication can occur.")
},
{
"net/closed?", cfun_stream_closed,
JDOC("(net/closed? stream)\n\n"
"Check if a stream is closed.")
},
{
"net/connect", cfun_net_connect,
JDOC("(net/connect host porti &opt type)\n\n"

View File

@@ -1743,8 +1743,8 @@ static const JanetReg os_cfuns[] = {
},
{
"os/sleep", os_sleep,
JDOC("(os/sleep nsec)\n\n"
"Suspend the program for nsec seconds. 'nsec' can be a real number. Returns "
JDOC("(os/sleep n)\n\n"
"Suspend the program for n seconds. 'nsec' can be a real number. Returns "
"nil.")
},
{

View File

@@ -256,7 +256,7 @@ static void janet_table_mergekv(JanetTable *table, const JanetKV *kvs, int32_t c
}
}
/* Merge a table other into another table */
/* Merge a table into another table */
void janet_table_merge_table(JanetTable *table, JanetTable *other) {
janet_table_mergekv(table, other->data, other->capacity);
}

View File

@@ -449,7 +449,8 @@ void janet_cfuns(JanetTable *env, const char *regprefix, const JanetReg *cfuns)
void janet_register_abstract_type(const JanetAbstractType *at) {
Janet sym = janet_csymbolv(at->name);
if (!(janet_checktype(janet_table_get(janet_vm_abstract_registry, sym), JANET_NIL))) {
Janet check = janet_table_get(janet_vm_abstract_registry, sym);
if (!janet_checktype(check, JANET_NIL) && at != janet_unwrap_pointer(check)) {
janet_panicf("cannot register abstract type %s, "
"a type with the same name exists", at->name);
}

View File

@@ -501,6 +501,7 @@ typedef void *JanetAbstract;
#ifdef JANET_EV
#define JANET_POLL_FLAG_CLOSED 0x1
#define JANET_POLL_FLAG_SOCKET 0x2
#define JANET_POLL_FLAG_IOCP 0x4
typedef enum {
JANET_ASYNC_EVENT_INIT,
@@ -510,7 +511,8 @@ typedef enum {
JANET_ASYNC_EVENT_READ,
JANET_ASYNC_EVENT_WRITE,
JANET_ASYNC_EVENT_TIMEOUT,
JANET_ASYNC_EVENT_COMPLETE /* Used on windows for IOCP */
JANET_ASYNC_EVENT_COMPLETE, /* Used on windows for IOCP */
JANET_ASYNC_EVENT_USER
} JanetAsyncEvent;
#define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ)
@@ -546,6 +548,10 @@ struct JanetListenerState {
JanetPollable *pollable;
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
implementation of the event loop and the particular event. */
#ifdef JANET_WINDOWS
void *tag; /* Used to associate listeners with an overlapped structure */
int bytes; /* Used to track how many bytes were transfered. */
#endif
/* internal */
int _index; /* not used in all implementations */
int _mask;
@@ -827,7 +833,6 @@ struct JanetFiber {
JanetFiber *child; /* Keep linked list of fibers for restarting pending fibers */
#ifdef JANET_EV
JanetListenerState *waiting;
int32_t timeout_index;
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
#endif
};

View File

@@ -33,9 +33,16 @@
(:write stream b)
(buffer/clear b)))
(def s (net/server "127.0.0.1" "8000" handler))
(def s (net/server "127.0.0.1" "8000"))
(assert s "made server 1")
(ev/go
(coro
(while (not (net/closed? s))
(def conn (net/accept s))
(unless conn (break))
(ev/call handler conn))))
(defn test-echo [msg]
(with [conn (net/connect "127.0.0.1" "8000")]
(:write conn msg)