mirror of
https://github.com/janet-lang/janet
synced 2025-11-09 20:13:02 +00:00
Compare commits
41 Commits
windows-ev
...
ev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4f10264f76 | ||
|
|
c192caa349 | ||
|
|
6f1d5d3b73 | ||
|
|
099a912992 | ||
|
|
56b1ea3726 | ||
|
|
d6391f2d70 | ||
|
|
07910272e2 | ||
|
|
1092013c2b | ||
|
|
0db83bd787 | ||
|
|
f55316eabc | ||
|
|
840f59934e | ||
|
|
75a9c59ad8 | ||
|
|
adfccd33ae | ||
|
|
9d41243c15 | ||
|
|
e33e182eb0 | ||
|
|
4dffd662f0 | ||
|
|
5064d579d4 | ||
|
|
540425a41b | ||
|
|
4d21b582c7 | ||
|
|
f288bc1790 | ||
|
|
8942e348bd | ||
|
|
9f27336827 | ||
|
|
f517cccf7b | ||
|
|
3a937ace51 | ||
|
|
b8661f8bff | ||
|
|
51828ab5f8 | ||
|
|
84fe5d7f34 | ||
|
|
2891d2b260 | ||
|
|
edfb861a5f | ||
|
|
88c1cf3ee7 | ||
|
|
813e3fdcfd | ||
|
|
bbe10e4938 | ||
|
|
cb4903fa86 | ||
|
|
ea45165db8 | ||
|
|
1fba699ed4 | ||
|
|
ce3d574c41 | ||
|
|
7a601a7eb2 | ||
|
|
9ec66ab826 | ||
|
|
ebfa07f8ce | ||
|
|
5c05dec65a | ||
|
|
bf6ebc4a68 |
2
Makefile
2
Makefile
@@ -262,7 +262,7 @@ build/janet.pc: $(JANET_TARGET)
|
||||
echo 'Libs: -L$${libdir} -ljanet' >> $@
|
||||
echo 'Libs.private: $(CLIBS)' >> $@
|
||||
|
||||
install: $(JANET_TARGET) build/janet.pc build/jpm
|
||||
install: $(JANET_TARGET) $(JANET_LIBRARY) $(JANET_STATIC_LIBRARY) build/janet.pc build/jpm
|
||||
mkdir -p '$(DESTDIR)$(BINDIR)'
|
||||
cp $(JANET_TARGET) '$(DESTDIR)$(BINDIR)/janet'
|
||||
mkdir -p '$(DESTDIR)$(INCLUDEDIR)/janet'
|
||||
|
||||
48
README.md
48
README.md
@@ -14,9 +14,9 @@ lisp-like language, but lists are replaced
|
||||
by other data structures (arrays, tables (hash table), struct (immutable hash table), tuples).
|
||||
The language also supports bridging to native code written in C, meta-programming with macros, and bytecode assembly.
|
||||
|
||||
There is a repl for trying out the language, as well as the ability
|
||||
There is a REPL for trying out the language, as well as the ability
|
||||
to run script files. This client program is separate from the core runtime, so
|
||||
Janet can be embedded into other programs. Try Janet in your browser at
|
||||
Janet can be embedded in other programs. Try Janet in your browser at
|
||||
[https://janet-lang.org](https://janet-lang.org).
|
||||
|
||||
<br>
|
||||
@@ -30,23 +30,23 @@ Lua, but smaller than GNU Guile or Python.
|
||||
## Features
|
||||
|
||||
* Minimal setup - one binary and you are good to go!
|
||||
* First class closures
|
||||
* First-class closures
|
||||
* Garbage collection
|
||||
* First class green threads (continuations)
|
||||
* Python style generators (implemented as a plain macro)
|
||||
* First-class green threads (continuations)
|
||||
* Python-style generators (implemented as a plain macro)
|
||||
* Mutable and immutable arrays (array/tuple)
|
||||
* Mutable and immutable hashtables (table/struct)
|
||||
* Mutable and immutable strings (buffer/string)
|
||||
* Macros
|
||||
* Byte code interpreter with an assembly interface, as well as bytecode verification
|
||||
* Tailcall Optimization
|
||||
* Tail call Optimization
|
||||
* Direct interop with C via abstract types and C functions
|
||||
* Dynamically load C libraries
|
||||
* Functional and imperative standard library
|
||||
* Lexical scoping
|
||||
* Imperative programming as well as functional
|
||||
* REPL
|
||||
* Parsing Expression Grammars built in to the core library
|
||||
* Parsing Expression Grammars built into the core library
|
||||
* 400+ functions and macros in the core library
|
||||
* Embedding Janet in other programs
|
||||
* Interactive environment with detailed stack traces
|
||||
@@ -56,7 +56,7 @@ Lua, but smaller than GNU Guile or Python.
|
||||
* For a quick tutorial, see [the introduction](https://janet-lang.org/docs/index.html) for more details.
|
||||
* For the full API for all functions in the core library, see [the core API doc](https://janet-lang.org/api/index.html)
|
||||
|
||||
Documentation is also available locally in the repl.
|
||||
Documentation is also available locally in the REPL.
|
||||
Use the `(doc symbol-name)` macro to get API
|
||||
documentation for symbols in the core library. For example,
|
||||
```
|
||||
@@ -66,7 +66,7 @@ Shows documentation for the doc macro.
|
||||
|
||||
To get a list of all bindings in the default
|
||||
environment, use the `(all-bindings)` function. You
|
||||
can also use the `(doc)` macro with no arguments if you are in the repl
|
||||
can also use the `(doc)` macro with no arguments if you are in the REPL
|
||||
to show bound symbols.
|
||||
|
||||
## Source
|
||||
@@ -92,7 +92,7 @@ Find out more about the available make targets by running `make help`.
|
||||
|
||||
### 32-bit Haiku
|
||||
|
||||
32-bit Haiku build instructions are the same as the unix-like build instructions,
|
||||
32-bit Haiku build instructions are the same as the UNIX-like build instructions,
|
||||
but you need to specify an alternative compiler, such as `gcc-x86`.
|
||||
|
||||
```
|
||||
@@ -104,7 +104,7 @@ make repl
|
||||
|
||||
### FreeBSD
|
||||
|
||||
FreeBSD build instructions are the same as the unix-like build instuctions,
|
||||
FreeBSD build instructions are the same as the UNIX-like build instructions,
|
||||
but you need `gmake` to compile. Alternatively, install directly from
|
||||
packages, using `pkg install lang/janet`.
|
||||
|
||||
@@ -117,7 +117,7 @@ gmake repl
|
||||
|
||||
### NetBSD
|
||||
|
||||
NetBSD build instructions are the same as the FreeBSD build instuctions.
|
||||
NetBSD build instructions are the same as the FreeBSD build instructions.
|
||||
Alternatively, install directly from packages, using `pkgin install janet`.
|
||||
|
||||
### Windows
|
||||
@@ -136,11 +136,11 @@ Now you should have an `.msi`. You can run `build_win install` to install the `.
|
||||
|
||||
### Meson
|
||||
|
||||
Janet also has a build file for [Meson](https://mesonbuild.com/), a cross platform build
|
||||
system. Although Meson has a python dependency, Meson is a very complete build system that
|
||||
Janet also has a build file for [Meson](https://mesonbuild.com/), a cross-platform build
|
||||
system. Although Meson has a Python dependency, Meson is a very complete build system that
|
||||
is maybe more convenient and flexible for integrating into existing pipelines.
|
||||
Meson also provides much better IDE integration than Make or batch files, as well as support
|
||||
for cross compilation.
|
||||
for cross-compilation.
|
||||
|
||||
For the impatient, building with Meson is as follows. The options provided to
|
||||
`meson setup` below emulate Janet's Makefile.
|
||||
@@ -177,11 +177,11 @@ to try out the language, you don't need to install anything. You can also move t
|
||||
|
||||
## Usage
|
||||
|
||||
A repl is launched when the binary is invoked with no arguments. Pass the -h flag
|
||||
A REPL is launched when the binary is invoked with no arguments. Pass the -h flag
|
||||
to display the usage information. Individual scripts can be run with `./janet myscript.janet`
|
||||
|
||||
If you are looking to explore, you can print a list of all available macros, functions, and constants
|
||||
by entering the command `(all-bindings)` into the repl.
|
||||
by entering the command `(all-bindings)` into the REPL.
|
||||
|
||||
```
|
||||
$ janet
|
||||
@@ -199,13 +199,13 @@ Options are:
|
||||
-v : Print the version string
|
||||
-s : Use raw stdin instead of getline like functionality
|
||||
-e code : Execute a string of janet
|
||||
-r : Enter the repl after running all scripts
|
||||
-p : Keep on executing if there is a top level error (persistent)
|
||||
-q : Hide prompt, logo, and repl output (quiet)
|
||||
-r : Enter the REPL after running all scripts
|
||||
-p : Keep on executing if there is a top-level error (persistent)
|
||||
-q : Hide prompt, logo, and REPL output (quiet)
|
||||
-k : Compile scripts but do not execute (flycheck)
|
||||
-m syspath : Set system path for loading global modules
|
||||
-c source output : Compile janet source code into an image
|
||||
-n : Disable ANSI color output in the repl
|
||||
-n : Disable ANSI color output in the REPL
|
||||
-l path : Execute code in a file before running the main script
|
||||
-- : Stop handling options
|
||||
```
|
||||
@@ -232,16 +232,16 @@ See the examples directory for some example janet code.
|
||||
|
||||
## Discussion
|
||||
|
||||
Feel free to ask questions and join discussion on the [Janet Gitter Channel](https://gitter.im/janet-language/community).
|
||||
Feel free to ask questions and join the discussion on the [Janet Gitter Channel](https://gitter.im/janet-language/community).
|
||||
Alternatively, check out [the #janet channel on Freenode](https://webchat.freenode.net/)
|
||||
|
||||
## FAQ
|
||||
|
||||
### Why is my terminal spitting out junk when I run the repl?
|
||||
### Why is my terminal spitting out junk when I run the REPL?
|
||||
|
||||
Make sure your terminal supports ANSI escape codes. Most modern terminals will
|
||||
support these, but some older terminals, Windows consoles, or embedded terminals
|
||||
will not. If your terminal does not support ANSI escape codes, run the repl with
|
||||
will not. If your terminal does not support ANSI escape codes, run the REPL with
|
||||
the `-n` flag, which disables color output. You can also try the `-s` if further issues
|
||||
ensue.
|
||||
|
||||
|
||||
@@ -1,23 +1,22 @@
|
||||
# Example of dst bytecode assembly
|
||||
|
||||
# Fibonacci sequence, implemented with naive recursion.
|
||||
(def fibasm (asm '{
|
||||
arity 1
|
||||
bytecode [
|
||||
(ltim 1 0 0x2) # $1 = $0 < 2
|
||||
(jmpif 1 :done) # if ($1) goto :done
|
||||
(lds 1) # $1 = self
|
||||
(addim 0 0 -0x1) # $0 = $0 - 1
|
||||
(push 0) # push($0), push argument for next function call
|
||||
(call 2 1) # $2 = call($1)
|
||||
(addim 0 0 -0x1) # $0 = $0 - 1
|
||||
(push 0) # push($0)
|
||||
(call 0 1) # $0 = call($1)
|
||||
(add 0 0 2) # $0 = $0 + $2 (integers)
|
||||
:done
|
||||
(ret 0) # return $0
|
||||
]
|
||||
}))
|
||||
(def fibasm
|
||||
(asm
|
||||
'{:arity 1
|
||||
:bytecode @[(ltim 1 0 0x2) # $1 = $0 < 2
|
||||
(jmpif 1 :done) # if ($1) goto :done
|
||||
(lds 1) # $1 = self
|
||||
(addim 0 0 -0x1) # $0 = $0 - 1
|
||||
(push 0) # push($0), push argument for next function call
|
||||
(call 2 1) # $2 = call($1)
|
||||
(addim 0 0 -0x1) # $0 = $0 - 1
|
||||
(push 0) # push($0)
|
||||
(call 0 1) # $0 = call($1)
|
||||
(add 0 0 2) # $0 = $0 + $2 (integers)
|
||||
:done
|
||||
(ret 0) # return $0
|
||||
]}))
|
||||
|
||||
# Test it
|
||||
|
||||
|
||||
5
examples/echoclient.janet
Normal file
5
examples/echoclient.janet
Normal file
@@ -0,0 +1,5 @@
|
||||
(with [conn (net/connect "127.0.0.1" 8000)]
|
||||
(print "writing abcdefg...")
|
||||
(:write conn "abcdefg")
|
||||
(print "reading...")
|
||||
(printf "got: %v" (:read conn 1024)))
|
||||
15
examples/echoserve.janet
Normal file
15
examples/echoserve.janet
Normal file
@@ -0,0 +1,15 @@
|
||||
(defn handler
|
||||
"Simple handler for connections."
|
||||
[stream]
|
||||
(defer (:close stream)
|
||||
(def id (gensym))
|
||||
(def b @"")
|
||||
(print "Connection " id "!")
|
||||
(while (:read stream 1024 b)
|
||||
(printf " %v -> %v" id b)
|
||||
(:write stream b)
|
||||
(buffer/clear b))
|
||||
(printf "Done %v!" id)
|
||||
(ev/sleep 0.5)))
|
||||
|
||||
(net/server "127.0.0.1" "8000" handler)
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
(defn writer [c]
|
||||
(for i 0 3
|
||||
(def item (string i ":" (hash c)))
|
||||
(def item (string i ":" (mod (hash c) 999)))
|
||||
(ev/sleep 0.1)
|
||||
(print "writer giving item " item " to " c "...")
|
||||
(ev/give c item))
|
||||
@@ -11,8 +11,8 @@
|
||||
|
||||
(defn reader [name]
|
||||
(forever
|
||||
(def c (ev/select ;channels))
|
||||
(print "reader " name " got " (ev/take c) " from " c)))
|
||||
(def [_ c x] (ev/rselect ;channels))
|
||||
(print "reader " name " got " x " from " c)))
|
||||
|
||||
# Readers
|
||||
(each letter [:a :b :c :d :e :f :g]
|
||||
|
||||
37
examples/select2.janet
Normal file
37
examples/select2.janet
Normal file
@@ -0,0 +1,37 @@
|
||||
###
|
||||
### examples/select2.janet
|
||||
###
|
||||
### Mix reads and writes in select.
|
||||
###
|
||||
|
||||
(def c1 (ev/chan 40))
|
||||
(def c2 (ev/chan 40))
|
||||
(def c3 (ev/chan 40))
|
||||
(def c4 (ev/chan 40))
|
||||
|
||||
(def c5 (ev/chan 4))
|
||||
|
||||
(defn worker
|
||||
[c n x]
|
||||
(forever
|
||||
(ev/sleep n)
|
||||
(ev/give c x)))
|
||||
|
||||
(defn writer-worker
|
||||
[c]
|
||||
(forever
|
||||
(ev/sleep 0.2)
|
||||
(print "writing " (ev/take c))))
|
||||
|
||||
(ev/call worker c1 1 :item1)
|
||||
(ev/sleep 0.2)
|
||||
(ev/call worker c2 1 :item2)
|
||||
(ev/sleep 0.1)
|
||||
(ev/call worker c3 1 :item3)
|
||||
(ev/sleep 0.2)
|
||||
(ev/call worker c4 1 :item4)
|
||||
(ev/sleep 0.1)
|
||||
(ev/call worker c4 1 :item5)
|
||||
(ev/call writer-worker c5)
|
||||
|
||||
(forever (pp (ev/rselect c1 c2 c3 c4 [c5 :thing])))
|
||||
@@ -6,7 +6,7 @@
|
||||
(def b @"")
|
||||
(print "Connection " id "!")
|
||||
(while (:read stream 1024 b)
|
||||
(repeat 10 (print "work for " id " ...") (ev/sleep 1))
|
||||
(repeat 10 (print "work for " id " ...") (ev/sleep 0.1))
|
||||
(:write stream b)
|
||||
(buffer/clear b))
|
||||
(printf "Done %v!" id)))
|
||||
|
||||
@@ -1392,7 +1392,7 @@
|
||||
arr)
|
||||
|
||||
(defn pairs
|
||||
"Get the values of an associative data structure."
|
||||
"Get the key-value pairs of an associative data structure."
|
||||
[x]
|
||||
(def arr (array/new (length x)))
|
||||
(var k (next x nil))
|
||||
@@ -2488,7 +2488,7 @@
|
||||
(tuple import* (string path) ;argm))
|
||||
|
||||
(defmacro use
|
||||
"Similar to import, but imported bindings are not prefixed with a namespace
|
||||
"Similar to import, but imported bindings are not prefixed with a module
|
||||
identifier. Can also import multiple modules in one shot."
|
||||
[& modules]
|
||||
~(do ,;(map |~(,import* ,(string $) :prefix "") modules)))
|
||||
@@ -2722,6 +2722,28 @@
|
||||
:on-status (or onsignal (make-onsignal env 1))
|
||||
:source "repl"}))
|
||||
|
||||
###
|
||||
###
|
||||
### Extras
|
||||
###
|
||||
###
|
||||
|
||||
(defmacro- guarddef
|
||||
[sym form]
|
||||
(if (dyn sym)
|
||||
form))
|
||||
|
||||
(guarddef net/listen
|
||||
(defn net/server
|
||||
"Start a server asynchornously with net/listen and net/accept-loop. Returns the new server stream."
|
||||
[host port &opt handler type]
|
||||
(def s (net/listen host port type))
|
||||
(if handler
|
||||
(ev/go (fn [] (net/accept-loop s handler))))
|
||||
s))
|
||||
|
||||
(undef guarddef)
|
||||
|
||||
###
|
||||
###
|
||||
### CLI Tool Main
|
||||
@@ -2786,14 +2808,14 @@
|
||||
-v : Print the version string
|
||||
-s : Use raw stdin instead of getline like functionality
|
||||
-e code : Execute a string of janet
|
||||
-d : Set the debug flag in the repl
|
||||
-r : Enter the repl after running all scripts
|
||||
-p : Keep on executing if there is a top level error (persistent)
|
||||
-d : Set the debug flag in the REPL
|
||||
-r : Enter the REPL after running all scripts
|
||||
-p : Keep on executing if there is a top-level error (persistent)
|
||||
-q : Hide logo (quiet)
|
||||
-k : Compile scripts but do not execute (flycheck)
|
||||
-m syspath : Set system path for loading global modules
|
||||
-c source output : Compile janet source code into an image
|
||||
-n : Disable ANSI color output in the repl
|
||||
-n : Disable ANSI color output in the REPL
|
||||
-l lib : Import a module before processing more arguments
|
||||
-- : Stop handling options`)
|
||||
(os/exit 0)
|
||||
|
||||
@@ -988,7 +988,7 @@ static const JanetReg asm_cfuns[] = {
|
||||
{
|
||||
"disasm", cfun_disasm,
|
||||
JDOC("(disasm func &opt field)\n\n"
|
||||
"Returns assembly that could be used be compile the given function.\n"
|
||||
"Returns assembly that could be used to compile the given function.\n"
|
||||
"func must be a function, not a c function. Will throw on error on a badly\n"
|
||||
"typed argument. If given a field name, will only return that part of the function assembly.\n"
|
||||
"Possible fields are:\n\n"
|
||||
|
||||
358
src/core/ev.c
358
src/core/ev.c
@@ -36,6 +36,7 @@
|
||||
#ifdef JANET_WINDOWS
|
||||
|
||||
#include <windows.h>
|
||||
#include <math.h>
|
||||
|
||||
#else
|
||||
|
||||
@@ -137,6 +138,7 @@ typedef struct JanetTimeout JanetTimeout;
|
||||
struct JanetTimeout {
|
||||
JanetTimestamp when;
|
||||
JanetFiber *fiber;
|
||||
uint32_t sched_id;
|
||||
int is_error;
|
||||
};
|
||||
|
||||
@@ -171,9 +173,7 @@ static int peek_timeout(JanetTimeout *out) {
|
||||
/* Remove the next timeout from the priority queue */
|
||||
static void pop_timeout(size_t index) {
|
||||
if (janet_vm_tq_count <= index) return;
|
||||
janet_vm_tq[index].fiber->timeout_index = -1;
|
||||
janet_vm_tq[index] = janet_vm_tq[--janet_vm_tq_count];
|
||||
janet_vm_tq[index].fiber->timeout_index = (int32_t) index;
|
||||
for (;;) {
|
||||
size_t left = (index << 1) + 1;
|
||||
size_t right = left + 1;
|
||||
@@ -188,8 +188,6 @@ static void pop_timeout(size_t index) {
|
||||
JanetTimeout temp = janet_vm_tq[index];
|
||||
janet_vm_tq[index] = janet_vm_tq[smallest];
|
||||
janet_vm_tq[smallest] = temp;
|
||||
janet_vm_tq[index].fiber->timeout_index = (int32_t) index;
|
||||
janet_vm_tq[smallest].fiber->timeout_index = (int32_t) smallest;
|
||||
index = smallest;
|
||||
}
|
||||
}
|
||||
@@ -212,10 +210,6 @@ static void add_timeout(JanetTimeout to) {
|
||||
janet_vm_tq[oldcount] = to;
|
||||
/* Heapify */
|
||||
size_t index = oldcount;
|
||||
if (to.fiber->timeout_index >= 0) {
|
||||
pop_timeout(to.fiber->timeout_index);
|
||||
}
|
||||
to.fiber->timeout_index = (int32_t) index;
|
||||
while (index > 0) {
|
||||
size_t parent = (index - 1) >> 1;
|
||||
if (janet_vm_tq[parent].when <= janet_vm_tq[index].when) break;
|
||||
@@ -339,10 +333,6 @@ void janet_schedule(JanetFiber *fiber, Janet value) {
|
||||
void janet_fiber_did_resume(JanetFiber *fiber) {
|
||||
/* Cancel any pending fibers */
|
||||
if (fiber->waiting) janet_unlisten(fiber->waiting);
|
||||
if (fiber->timeout_index >= 0) {
|
||||
pop_timeout(fiber->timeout_index);
|
||||
fiber->timeout_index = -1;
|
||||
}
|
||||
}
|
||||
|
||||
/* Mark all pending tasks */
|
||||
@@ -404,6 +394,7 @@ void janet_addtimeout(double sec) {
|
||||
JanetTimeout to;
|
||||
to.when = ts_delta(ts_now(), sec);
|
||||
to.fiber = fiber;
|
||||
to.sched_id = fiber->sched_id;
|
||||
to.is_error = 1;
|
||||
add_timeout(to);
|
||||
}
|
||||
@@ -414,9 +405,9 @@ typedef struct {
|
||||
JanetFiber *fiber;
|
||||
uint32_t sched_id;
|
||||
enum {
|
||||
JANET_CP_MODE_NONE,
|
||||
JANET_CP_MODE_ITEM,
|
||||
JANET_CP_MODE_SELECT
|
||||
JANET_CP_MODE_CHOICE_READ,
|
||||
JANET_CP_MODE_CHOICE_WRITE
|
||||
} mode;
|
||||
} JanetChannelPending;
|
||||
|
||||
@@ -497,11 +488,24 @@ static int janet_chanat_mark(void *p, size_t s) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Channel Methods */
|
||||
static Janet make_write_result(JanetChannel *channel) {
|
||||
Janet *tup = janet_tuple_begin(2);
|
||||
tup[0] = janet_ckeywordv("give");
|
||||
tup[1] = janet_wrap_abstract(channel);
|
||||
return janet_wrap_tuple(janet_tuple_end(tup));
|
||||
}
|
||||
|
||||
static Janet cfun_channel_push(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 2);
|
||||
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
|
||||
static Janet make_read_result(JanetChannel *channel, Janet x) {
|
||||
Janet *tup = janet_tuple_begin(3);
|
||||
tup[0] = janet_ckeywordv("take");
|
||||
tup[1] = janet_wrap_abstract(channel);
|
||||
tup[2] = x;
|
||||
return janet_wrap_tuple(janet_tuple_end(tup));
|
||||
}
|
||||
|
||||
/* Push a value to a channel, and return 1 if channel should block, zero otherwise.
|
||||
* If the push would block, will add to the write_pending queue in the channel. */
|
||||
static int janet_channel_push(JanetChannel *channel, Janet x, int is_choice) {
|
||||
JanetChannelPending reader;
|
||||
int is_empty;
|
||||
do {
|
||||
@@ -509,66 +513,113 @@ static Janet cfun_channel_push(int32_t argc, Janet *argv) {
|
||||
} while (!is_empty && (reader.sched_id != reader.fiber->sched_id));
|
||||
if (is_empty) {
|
||||
/* No pending reader */
|
||||
if (janet_q_push(&channel->items, argv + 1, sizeof(Janet))) {
|
||||
janet_panicf("channel overflow: %v", argv[1]);
|
||||
if (janet_q_push(&channel->items, &x, sizeof(Janet))) {
|
||||
janet_panicf("channel overflow: %v", x);
|
||||
} else if (janet_q_count(&channel->items) > channel->limit) {
|
||||
/* Pushed successfully, but should block. */
|
||||
JanetChannelPending pending;
|
||||
pending.fiber = janet_vm_root_fiber,
|
||||
pending.sched_id = janet_vm_root_fiber->sched_id,
|
||||
pending.mode = JANET_CP_MODE_ITEM;
|
||||
pending.mode = is_choice ? JANET_CP_MODE_CHOICE_WRITE : JANET_CP_MODE_ITEM;
|
||||
janet_q_push(&channel->write_pending, &pending, sizeof(pending));
|
||||
return 1;
|
||||
}
|
||||
} else {
|
||||
/* Pending reader */
|
||||
if (reader.mode == JANET_CP_MODE_SELECT) {
|
||||
janet_q_push(&channel->items, argv + 1, sizeof(Janet));
|
||||
janet_schedule(reader.fiber, argv[0]);
|
||||
if (reader.mode == JANET_CP_MODE_CHOICE_READ) {
|
||||
janet_schedule(reader.fiber, make_read_result(channel, x));
|
||||
} else {
|
||||
janet_schedule(reader.fiber, argv[1]);
|
||||
janet_schedule(reader.fiber, x);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Pop from a channel - returns 1 if item was obtain, 0 otherwise. The item
|
||||
* is returned by reference. If the pop would block, will add to the read_pending
|
||||
* queue in the channel. */
|
||||
static int janet_channel_pop(JanetChannel *channel, Janet *item, int is_choice) {
|
||||
JanetChannelPending writer;
|
||||
if (janet_q_pop(&channel->items, item, sizeof(Janet))) {
|
||||
/* Queue empty */
|
||||
JanetChannelPending pending;
|
||||
pending.fiber = janet_vm_root_fiber,
|
||||
pending.sched_id = janet_vm_root_fiber->sched_id;
|
||||
pending.mode = is_choice ? JANET_CP_MODE_CHOICE_READ : JANET_CP_MODE_ITEM;
|
||||
janet_q_push(&channel->read_pending, &pending, sizeof(pending));
|
||||
return 0;
|
||||
}
|
||||
if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
|
||||
/* pending writer */
|
||||
if (writer.mode == JANET_CP_MODE_CHOICE_WRITE) {
|
||||
janet_schedule(writer.fiber, make_write_result(channel));
|
||||
} else {
|
||||
janet_schedule(writer.fiber, janet_wrap_abstract(channel));
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Channel Methods */
|
||||
|
||||
static Janet cfun_channel_push(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 2);
|
||||
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
|
||||
if (janet_channel_push(channel, argv[1], 0)) {
|
||||
janet_await();
|
||||
}
|
||||
return argv[0];
|
||||
}
|
||||
|
||||
static Janet cfun_channel_pop(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 1);
|
||||
JanetChannel *channel = janet_getabstract(argv, 0, &ChannelAT);
|
||||
Janet item = janet_wrap_nil();
|
||||
JanetChannelPending writer;
|
||||
if (janet_q_pop(&channel->items, &item, sizeof(item))) {
|
||||
/* Queue empty */
|
||||
JanetChannelPending pending;
|
||||
pending.fiber = janet_vm_root_fiber,
|
||||
pending.sched_id = janet_vm_root_fiber->sched_id;
|
||||
pending.mode = JANET_CP_MODE_ITEM;
|
||||
janet_q_push(&channel->read_pending, &pending, sizeof(pending));
|
||||
janet_await();
|
||||
}
|
||||
janet_schedule(janet_vm_root_fiber, item);
|
||||
if (!janet_q_pop(&channel->write_pending, &writer, sizeof(writer))) {
|
||||
/* Got item, and there are pending writers. This means we should
|
||||
* schedule one. */
|
||||
janet_schedule(writer.fiber, argv[0]);
|
||||
Janet item;
|
||||
if (janet_channel_pop(channel, &item, 0)) {
|
||||
janet_schedule(janet_vm_root_fiber, item);
|
||||
}
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static Janet cfun_channel_select(int32_t argc, Janet *argv) {
|
||||
static Janet cfun_channel_choice(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 1, -1);
|
||||
int32_t len;
|
||||
const Janet *data;
|
||||
|
||||
/* Check channels for immediate reads and writes */
|
||||
for (int32_t i = 0; i < argc; i++) {
|
||||
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
|
||||
if (chan->items.head != chan->items.tail) return argv[i];
|
||||
if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
|
||||
/* Write */
|
||||
JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT);
|
||||
if (janet_q_count(&chan->items) < chan->limit) {
|
||||
janet_channel_push(chan, data[1], 1);
|
||||
return make_write_result(chan);
|
||||
}
|
||||
} else {
|
||||
/* Read */
|
||||
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
|
||||
if (chan->items.head != chan->items.tail) {
|
||||
Janet item;
|
||||
janet_channel_pop(chan, &item, 1);
|
||||
return make_read_result(chan, item);
|
||||
}
|
||||
}
|
||||
}
|
||||
/* None of the channels have data, so we wait on all of them. */
|
||||
|
||||
/* Wait for all readers or writers */
|
||||
for (int32_t i = 0; i < argc; i++) {
|
||||
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
|
||||
JanetChannelPending pending;
|
||||
pending.fiber = janet_vm_root_fiber,
|
||||
pending.sched_id = janet_vm_root_fiber->sched_id;
|
||||
pending.mode = JANET_CP_MODE_SELECT;
|
||||
janet_q_push(&chan->read_pending, &pending, sizeof(pending));
|
||||
if (janet_indexed_view(argv[i], &data, &len) && len == 2) {
|
||||
/* Write */
|
||||
JanetChannel *chan = janet_getabstract(data, 0, &ChannelAT);
|
||||
janet_channel_push(chan, data[1], 1);
|
||||
} else {
|
||||
/* Read */
|
||||
Janet item;
|
||||
JanetChannel *chan = janet_getabstract(argv, i, &ChannelAT);
|
||||
janet_channel_pop(chan, &item, 1);
|
||||
}
|
||||
}
|
||||
|
||||
janet_await();
|
||||
}
|
||||
|
||||
@@ -590,15 +641,19 @@ static Janet cfun_channel_count(int32_t argc, Janet *argv) {
|
||||
return janet_wrap_integer(janet_q_count(&channel->items));
|
||||
}
|
||||
|
||||
static Janet cfun_channel_rselect(int32_t argc, Janet *argv) {
|
||||
/* Fisher yates shuffle of arguments to get fairness */
|
||||
/* Fisher yates shuffle of arguments to get fairness */
|
||||
static void fisher_yates_args(int32_t argc, Janet *argv) {
|
||||
for (int32_t i = argc; i > 1; i--) {
|
||||
int32_t swap_index = janet_rng_u32(&janet_vm_ev_rng) % i;
|
||||
Janet temp = argv[swap_index];
|
||||
argv[swap_index] = argv[i - 1];
|
||||
argv[i - 1] = temp;
|
||||
}
|
||||
return cfun_channel_select(argc, argv);
|
||||
}
|
||||
|
||||
static Janet cfun_channel_rchoice(int32_t argc, Janet *argv) {
|
||||
fisher_yates_args(argc, argv);
|
||||
return cfun_channel_choice(argc, argv);
|
||||
}
|
||||
|
||||
static Janet cfun_channel_new(int32_t argc, Janet *argv) {
|
||||
@@ -611,26 +666,45 @@ static Janet cfun_channel_new(int32_t argc, Janet *argv) {
|
||||
|
||||
/* Main event loop */
|
||||
|
||||
void janet_loop1_impl(void);
|
||||
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout);
|
||||
|
||||
void janet_loop1(void) {
|
||||
/* Schedule expired timers */
|
||||
JanetTimeout to;
|
||||
JanetTimestamp now = ts_now();
|
||||
while (peek_timeout(&to) && to.when <= now) {
|
||||
pop_timeout(0);
|
||||
if (to.fiber->sched_id == to.sched_id) {
|
||||
if (to.is_error) {
|
||||
janet_cancel(to.fiber, janet_cstringv("timeout"));
|
||||
} else {
|
||||
janet_schedule(to.fiber, janet_wrap_nil());
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Run scheduled fibers */
|
||||
while (janet_vm_spawn.head != janet_vm_spawn.tail) {
|
||||
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK};
|
||||
janet_q_pop(&janet_vm_spawn, &task, sizeof(task));
|
||||
run_one(task.fiber, task.value, task.sig);
|
||||
}
|
||||
/* Poll for events */
|
||||
if (janet_vm_active_listeners || janet_vm_tq_count) {
|
||||
JanetTimeout to;
|
||||
memset(&to, 0, sizeof(to));
|
||||
int has_timeout;
|
||||
/* Drop timeouts that are no longer needed */
|
||||
while ((has_timeout = peek_timeout(&to)) && to.fiber->sched_id != to.sched_id) {
|
||||
pop_timeout(0);
|
||||
}
|
||||
/* Run polling implementation */
|
||||
janet_loop1_impl(has_timeout, to.when);
|
||||
}
|
||||
}
|
||||
|
||||
void janet_loop(void) {
|
||||
while (janet_vm_active_listeners || (janet_vm_spawn.head != janet_vm_spawn.tail) || janet_vm_tq_count) {
|
||||
/* Run expired timers */
|
||||
JanetTimeout to;
|
||||
while (peek_timeout(&to) && to.when <= ts_now()) {
|
||||
pop_timeout(0);
|
||||
janet_schedule(to.fiber, janet_wrap_nil());
|
||||
}
|
||||
/* Run scheduled fibers */
|
||||
while (janet_vm_spawn.head != janet_vm_spawn.tail) {
|
||||
JanetTask task = {NULL, janet_wrap_nil(), JANET_SIGNAL_OK};
|
||||
janet_q_pop(&janet_vm_spawn, &task, sizeof(task));
|
||||
run_one(task.fiber, task.value, task.sig);
|
||||
}
|
||||
/* Poll for events */
|
||||
if (janet_vm_active_listeners || janet_vm_tq_count) {
|
||||
janet_loop1_impl();
|
||||
}
|
||||
janet_loop1();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -655,26 +729,23 @@ void janet_ev_deinit(void) {
|
||||
}
|
||||
|
||||
JanetListenerState *janet_listen(JanetPollable *pollable, JanetListener behavior, int mask, size_t size, void *user) {
|
||||
/* Add the handle to the io completion port if not already added */
|
||||
JanetListenerState *state = janet_listen_impl(pollable, behavior, mask, size, user);
|
||||
/* TODO - associate IO operation with listener state somehow
|
||||
* maybe we could require encoding the operation in a mask. */
|
||||
/* on windows, janet_listen does not actually start any listening behavior. */
|
||||
if (!(pollable->flags & JANET_POLL_FLAG_IOCP)) {
|
||||
if (NULL == CreateIoCompletionPort(pollable->handle, janet_vm_iocp, (ULONG_PTR) pollable, 0)) {
|
||||
janet_panic("failed to listen for events");
|
||||
}
|
||||
pollable->flags |= JANET_POLL_FLAG_IOCP;
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
|
||||
static void janet_unlisten(JanetListenerState *state) {
|
||||
/* We don't necessarily want to cancel all io on this pollable */
|
||||
janet_unlisten_impl(state);
|
||||
}
|
||||
|
||||
|
||||
void janet_loop1_impl(void) {
|
||||
/* Check for timeout */
|
||||
JanetTimeout to;
|
||||
memset(&to, 0, sizeof(to));
|
||||
int has_timeout = peek_timeout(&to);
|
||||
|
||||
void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
|
||||
ULONG_PTR completionKey = 0;
|
||||
DWORD num_bytes_transfered = 0;
|
||||
LPOVERLAPPED overlapped;
|
||||
@@ -683,42 +754,40 @@ void janet_loop1_impl(void) {
|
||||
uint64_t waittime;
|
||||
if (has_timeout) {
|
||||
JanetTimestamp now = ts_now();
|
||||
if (now > to.when) {
|
||||
if (now > to) {
|
||||
waittime = 0;
|
||||
} else {
|
||||
waittime = (uint64_t) (to.when - now);
|
||||
waittime = (uint64_t)(to - now);
|
||||
}
|
||||
} else {
|
||||
waittime = INFINITE;
|
||||
}
|
||||
BOOL result = GetQueuedCompletionStatus(janet_vm_iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime);
|
||||
BOOL result = GetQueuedCompletionStatus(janet_vm_iocp, &num_bytes_transfered, &completionKey, &overlapped, (DWORD) waittime);
|
||||
|
||||
if (!result) {
|
||||
/* timeout ? */
|
||||
if (has_timeout) {
|
||||
/* Timer event */
|
||||
pop_timeout(0);
|
||||
/* Cancel waiters for this fiber */
|
||||
if (to.is_error) {
|
||||
janet_cancel(to.fiber, janet_cstringv("timeout"));
|
||||
} else {
|
||||
janet_schedule(to.fiber, janet_wrap_nil());
|
||||
}
|
||||
} else {
|
||||
JANET_EXIT("failed to get iocp GetQueuedCompletionStatus");
|
||||
if (!result) {
|
||||
if (!has_timeout) {
|
||||
/* queue emptied */
|
||||
}
|
||||
} else {
|
||||
JANET_EXIT("Unexpected event");
|
||||
} else {
|
||||
/* Normal event */
|
||||
JanetListenerState *state = (JanetListenerState *) completionKey;
|
||||
state->event = overlapped;
|
||||
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
|
||||
if (status == JANET_ASYNC_STATUS_DONE)
|
||||
janet_unlisten(state);
|
||||
JanetPollable *pollable = (JanetPollable *) completionKey;
|
||||
JanetListenerState *state = pollable->state;
|
||||
while (state != NULL) {
|
||||
if (state->tag == overlapped) {
|
||||
state->event = overlapped;
|
||||
state->bytes = num_bytes_transfered;
|
||||
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
|
||||
if (status == JANET_ASYNC_STATUS_DONE) {
|
||||
janet_unlisten(state);
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
state = state->_next;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#elif defined(JANET_EV_POLL)
|
||||
|
||||
/*
|
||||
@@ -786,17 +855,12 @@ static void janet_unlisten(JanetListenerState *state) {
|
||||
}
|
||||
|
||||
#define JANET_EPOLL_MAX_EVENTS 64
|
||||
void janet_loop1_impl(void) {
|
||||
/* Set timer */
|
||||
JanetTimeout to;
|
||||
struct itimerspec its;
|
||||
memset(&to, 0, sizeof(to));
|
||||
int has_timeout = peek_timeout(&to);
|
||||
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
|
||||
if (janet_vm_timer_enabled || has_timeout) {
|
||||
memset(&its, 0, sizeof(its));
|
||||
if (has_timeout) {
|
||||
its.it_value.tv_sec = to.when / 1000;
|
||||
its.it_value.tv_nsec = (to.when % 1000) * 1000000;
|
||||
its.it_value.tv_sec = timeout / 1000;
|
||||
its.it_value.tv_nsec = (timeout % 1000) * 1000000;
|
||||
}
|
||||
timerfd_settime(janet_vm_timerfd, TFD_TIMER_ABSTIME, &its, NULL);
|
||||
}
|
||||
@@ -815,17 +879,7 @@ void janet_loop1_impl(void) {
|
||||
/* Step state machines */
|
||||
for (int i = 0; i < ready; i++) {
|
||||
JanetPollable *pollable = events[i].data.ptr;
|
||||
if (NULL == pollable) {
|
||||
/* Timer event */
|
||||
pop_timeout(0);
|
||||
/* Cancel waiters for this fiber */
|
||||
if (to.is_error) {
|
||||
janet_cancel(to.fiber, janet_cstringv("timeout"));
|
||||
} else {
|
||||
janet_schedule(to.fiber, janet_wrap_nil());
|
||||
}
|
||||
} else {
|
||||
/* Normal event */
|
||||
if (NULL != pollable) { /* If NULL, is a timeout */
|
||||
int mask = events[i].events;
|
||||
JanetListenerState *state = pollable->state;
|
||||
state->event = events + i;
|
||||
@@ -940,18 +994,13 @@ static void janet_unlisten(JanetListenerState *state) {
|
||||
janet_unlisten_impl(state);
|
||||
}
|
||||
|
||||
void janet_loop1_impl(void) {
|
||||
/* Set timer */
|
||||
JanetTimeout to;
|
||||
memset(&to, 0, sizeof(to));
|
||||
int has_timeout = peek_timeout(&to);
|
||||
|
||||
void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
|
||||
/* Poll for events */
|
||||
int ready;
|
||||
do {
|
||||
if (has_timeout) {
|
||||
int64_t diff = to.when - ts_now();
|
||||
ready = poll(janet_vm_fds, janet_vm_fdcount, diff < 0 ? 0 : (int) diff);
|
||||
JanetTimestamp now = ts_now();
|
||||
ready = poll(janet_vm_fds, janet_vm_fdcount, now > timeout ? 0 : (int)(timeout - now));
|
||||
} else {
|
||||
ready = poll(janet_vm_fds, janet_vm_fdcount, -1);
|
||||
}
|
||||
@@ -961,10 +1010,8 @@ void janet_loop1_impl(void) {
|
||||
}
|
||||
|
||||
/* Step state machines */
|
||||
int did_handle_something = 0;
|
||||
for (size_t i = 0; i < janet_vm_fdcount; i++) {
|
||||
struct pollfd *pfd = janet_vm_fds + i;
|
||||
did_handle_something |= pfd->revents;
|
||||
/* Skip fds where nothing interesting happened */
|
||||
if (!(pfd->revents & (pfd->events | POLLHUP | POLLERR | POLLNVAL))) continue;
|
||||
JanetListenerState *state = janet_vm_listener_map[i];
|
||||
@@ -980,19 +1027,6 @@ void janet_loop1_impl(void) {
|
||||
if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE)
|
||||
janet_unlisten(state);
|
||||
}
|
||||
|
||||
/* If nothing was handled and poll returned, then we know that it timedout and we should trigger
|
||||
* one of our timers. */
|
||||
if (!did_handle_something) {
|
||||
/* Timer event */
|
||||
pop_timeout(0);
|
||||
/* Cancel waiters for this fiber */
|
||||
if (to.is_error) {
|
||||
janet_cancel(to.fiber, janet_cstringv("timeout"));
|
||||
} else {
|
||||
janet_schedule(to.fiber, janet_wrap_nil());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void janet_ev_init(void) {
|
||||
@@ -1041,6 +1075,7 @@ static Janet cfun_ev_sleep(int32_t argc, Janet *argv) {
|
||||
to.when = ts_delta(ts_now(), sec);
|
||||
to.fiber = janet_vm_root_fiber;
|
||||
to.is_error = 0;
|
||||
to.sched_id = to.fiber->sched_id;
|
||||
add_timeout(to);
|
||||
janet_await();
|
||||
}
|
||||
@@ -1102,25 +1137,24 @@ static const JanetReg ev_cfuns[] = {
|
||||
JDOC("(ev/count channel)\n\n"
|
||||
"Get the number of items currently waiting in a channel.")
|
||||
},
|
||||
{
|
||||
"ev/select", cfun_channel_select,
|
||||
JDOC("(ev/select & channels)\n\n"
|
||||
"Get a channel that is not empty, suspending the current fiber until at least one channel "
|
||||
"is not empty. Will prefer channels in the order they are passed as arguments (ordered choice). "
|
||||
"Returns a non-empty channel.")
|
||||
},
|
||||
{
|
||||
"ev/rselect", cfun_channel_rselect,
|
||||
JDOC("(ev/rselect & channels)\n\n"
|
||||
"Get a channel that is not empty, suspending the current fiber until at least one channel "
|
||||
"is not empty. Will prefer channels in a random order (random choice). "
|
||||
"Returns a non-empty channel.")
|
||||
},
|
||||
{
|
||||
"ev/cancel", cfun_ev_cancel,
|
||||
JDOC("(ev/cancel fiber err)\n\n"
|
||||
"Cancel a suspended fiber in the event loop. Differs from cancel in that it returns the canceled fiber immediately")
|
||||
},
|
||||
{
|
||||
"ev/select", cfun_channel_choice,
|
||||
JDOC("(ev/select & clauses)\n\n"
|
||||
"Block until the first of several channel operations occur. Returns a tuple of the form [:give chan] or [:take chan x], where "
|
||||
"a :give tuple is the result of a write and :take tuple is the result of a write. Each clause must be either a channel (for "
|
||||
"a channel take operation) or a tuple [channel x] for a channel give operation. Operations are tried in order, such that the first "
|
||||
"clauses will take precedence over later clauses.")
|
||||
},
|
||||
{
|
||||
"ev/rselect", cfun_channel_rchoice,
|
||||
JDOC("(ev/rselect & clauses)\n\n"
|
||||
"Similar to ev/choice, but will try clauses in a random order for fairness.")
|
||||
},
|
||||
{NULL, NULL, NULL}
|
||||
};
|
||||
|
||||
|
||||
@@ -39,7 +39,6 @@ static void fiber_reset(JanetFiber *fiber) {
|
||||
fiber->env = NULL;
|
||||
#ifdef JANET_EV
|
||||
fiber->waiting = NULL;
|
||||
fiber->timeout_index = -1;
|
||||
fiber->sched_id = 0;
|
||||
#endif
|
||||
janet_fiber_set_status(fiber, JANET_STATUS_NEW);
|
||||
@@ -109,12 +108,15 @@ static void janet_fiber_refresh_memory(JanetFiber *fiber) {
|
||||
|
||||
/* Ensure that the fiber has enough extra capacity */
|
||||
void janet_fiber_setcapacity(JanetFiber *fiber, int32_t n) {
|
||||
int32_t old_size = fiber->capacity;
|
||||
int32_t diff = n - old_size;
|
||||
Janet *newData = realloc(fiber->data, sizeof(Janet) * n);
|
||||
if (NULL == newData) {
|
||||
JANET_OUT_OF_MEMORY;
|
||||
}
|
||||
fiber->data = newData;
|
||||
fiber->capacity = n;
|
||||
janet_vm_next_collection += sizeof(Janet) * diff;
|
||||
}
|
||||
|
||||
/* Grow fiber if needed */
|
||||
|
||||
@@ -777,7 +777,7 @@ static const JanetReg io_cfuns[] = {
|
||||
#ifndef JANET_NO_PROCESSES
|
||||
{
|
||||
"file/popen", cfun_io_popen,
|
||||
JDOC("(file/popen path &opt mode)\n\n"
|
||||
JDOC("(file/popen command &opt mode)\n\n"
|
||||
"Open a file that is backed by a process. The file must be opened in either "
|
||||
"the :r (read) or the :w (write) mode. In :r mode, the stdout of the "
|
||||
"process can be read from the file. In :w mode, the stdin of the process "
|
||||
|
||||
@@ -934,8 +934,10 @@ static const uint8_t *unmarshal_one_fiber(
|
||||
fiber->data = NULL;
|
||||
fiber->child = NULL;
|
||||
fiber->env = NULL;
|
||||
#ifdef JANET_EV
|
||||
fiber->waiting = NULL;
|
||||
fiber->timeout_index = -1;
|
||||
fiber->sched_id = 0;
|
||||
#endif
|
||||
|
||||
/* Push fiber to seen stack */
|
||||
janet_v_push(st->lookup, janet_wrap_fiber(fiber));
|
||||
|
||||
475
src/core/net.c
475
src/core/net.c
@@ -32,6 +32,7 @@
|
||||
#include <winsock2.h>
|
||||
#include <windows.h>
|
||||
#include <ws2tcpip.h>
|
||||
#include <mswsock.h>
|
||||
#pragma comment (lib, "Ws2_32.lib")
|
||||
#pragma comment (lib, "Mswsock.lib")
|
||||
#pragma comment (lib, "Advapi32.lib")
|
||||
@@ -42,9 +43,11 @@
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/un.h>
|
||||
#include <netinet/in.h>
|
||||
#include <netinet/tcp.h>
|
||||
#include <netdb.h>
|
||||
#include <fcntl.h>
|
||||
#include <math.h>
|
||||
#endif
|
||||
|
||||
/*
|
||||
@@ -75,6 +78,7 @@ static const JanetAbstractType AddressAT = {
|
||||
};
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
#define JANET_NET_CHUNKSIZE 4096
|
||||
#define JSOCKCLOSE(x) closesocket((SOCKET) x)
|
||||
#define JSOCKDEFAULT INVALID_SOCKET
|
||||
#define JLASTERR WSAGetLastError()
|
||||
@@ -94,6 +98,28 @@ static JanetStream *make_stream(SOCKET fd, uint32_t flags) {
|
||||
stream->flags = flags;
|
||||
return stream;
|
||||
}
|
||||
static Janet net_lasterr(void) {
|
||||
int code = WSAGetLastError();
|
||||
char msgbuf[256];
|
||||
msgbuf[0] = '\0';
|
||||
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
|
||||
NULL,
|
||||
code,
|
||||
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
|
||||
msgbuf,
|
||||
sizeof(msgbuf),
|
||||
NULL);
|
||||
if (!*msgbuf) sprintf(msgbuf, "%d", code);
|
||||
char *c = msgbuf;
|
||||
while (*c) {
|
||||
if (*c == '\n' || *c == '\r') {
|
||||
*c = '\0';
|
||||
break;
|
||||
}
|
||||
c++;
|
||||
}
|
||||
return janet_cstringv(msgbuf);
|
||||
}
|
||||
#else
|
||||
#define JSOCKCLOSE(x) close(x)
|
||||
#define JSOCKDEFAULT 0
|
||||
@@ -122,6 +148,9 @@ static JanetStream *make_stream(int fd, uint32_t flags) {
|
||||
stream->flags = flags;
|
||||
return stream;
|
||||
}
|
||||
static Janet net_lasterr(void) {
|
||||
return janet_cstringv(strerror(errno));
|
||||
}
|
||||
#endif
|
||||
|
||||
/* We pass this flag to all send calls to prevent sigpipe */
|
||||
@@ -169,7 +198,12 @@ typedef struct {
|
||||
int is_recv_from;
|
||||
#ifdef JANET_WINDOWS
|
||||
WSAOVERLAPPED overlapped;
|
||||
uint8_t chunk_buf[2048];
|
||||
WSABUF wbuf;
|
||||
DWORD flags;
|
||||
int32_t chunk_size;
|
||||
struct sockaddr from;
|
||||
int fromlen;
|
||||
uint8_t chunk_buf[JANET_NET_CHUNKSIZE];
|
||||
#endif
|
||||
} NetStateRead;
|
||||
|
||||
@@ -187,6 +221,47 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
|
||||
#ifdef JANET_WINDOWS
|
||||
case JANET_ASYNC_EVENT_COMPLETE: {
|
||||
/* Called when read finished */
|
||||
if (s->bytes == 0 && !state->is_recv_from) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes);
|
||||
state->bytes_left -= s->bytes;
|
||||
|
||||
if (state->bytes_left <= 0 || !state->is_chunk) {
|
||||
Janet resume_val;
|
||||
if (state->is_recv_from) {
|
||||
void *abst = janet_abstract(&AddressAT, state->fromlen);
|
||||
memcpy(abst, &state->from, state->fromlen);
|
||||
resume_val = janet_wrap_abstract(abst);
|
||||
} else {
|
||||
resume_val = janet_wrap_buffer(state->buf);
|
||||
}
|
||||
janet_schedule(s->fiber, resume_val);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
|
||||
/* fallthrough */
|
||||
case JANET_ASYNC_EVENT_USER: {
|
||||
state->flags = 0;
|
||||
int32_t chunk_size = state->bytes_left > JANET_NET_CHUNKSIZE ? JANET_NET_CHUNKSIZE : state->bytes_left;
|
||||
state->wbuf.len = (ULONG) chunk_size;
|
||||
state->wbuf.buf = state->chunk_buf;
|
||||
state->chunk_size = chunk_size;
|
||||
s->tag = &state->overlapped;
|
||||
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
|
||||
int status;
|
||||
if (state->is_recv_from) {
|
||||
status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
|
||||
} else {
|
||||
status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL);
|
||||
}
|
||||
if (status && WSA_IO_PENDING != WSAGetLastError()) {
|
||||
janet_cancel(s->fiber, net_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
#else
|
||||
@@ -207,8 +282,18 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
|
||||
nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0);
|
||||
}
|
||||
} while (nread == -1 && JLASTERR == JEINTR);
|
||||
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) {
|
||||
break;
|
||||
|
||||
/* Check for errors - special case errors that can just be waited on to fix */
|
||||
if (nread == -1) {
|
||||
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
|
||||
janet_cancel(s->fiber, net_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
|
||||
if (nread == 0 && !state->is_recv_from) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Increment buffer counts */
|
||||
@@ -222,23 +307,15 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
|
||||
|
||||
/* Resume if done */
|
||||
if (!state->is_chunk || bytes_left == 0) {
|
||||
JanetSignal sig = JANET_SIGNAL_OK;
|
||||
Janet resume_val;
|
||||
if (state->is_recv_from) {
|
||||
void *abst = janet_abstract(&AddressAT, socklen);
|
||||
memcpy(abst, &saddr, socklen);
|
||||
resume_val = janet_wrap_abstract(abst);
|
||||
} else {
|
||||
if (nread > 0) {
|
||||
resume_val = janet_wrap_buffer(buffer);
|
||||
} else {
|
||||
sig = JANET_SIGNAL_ERROR;
|
||||
resume_val = (nread == -1)
|
||||
? janet_cstringv(strerror(JLASTERR))
|
||||
: janet_cstringv("could not read");
|
||||
}
|
||||
resume_val = janet_wrap_buffer(buffer);
|
||||
}
|
||||
janet_schedule_signal(s->fiber, resume_val, sig);
|
||||
janet_schedule(s->fiber, resume_val);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
@@ -255,10 +332,7 @@ JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *b
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
state->is_recv_from = 0;
|
||||
#ifdef JANET_WINDOWS
|
||||
WSARecv((SOCKET) stream->handle,
|
||||
|
||||
#endif
|
||||
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
@@ -269,6 +343,7 @@ JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
state->is_recv_from = 0;
|
||||
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
@@ -279,6 +354,7 @@ JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuff
|
||||
state->buf = buf;
|
||||
state->bytes_left = nbytes;
|
||||
state->is_recv_from = 1;
|
||||
net_machine_read((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
@@ -295,6 +371,11 @@ typedef struct {
|
||||
int32_t start;
|
||||
int is_buffer;
|
||||
void *dest_abst;
|
||||
#ifdef JANET_WINDOWS
|
||||
WSAOVERLAPPED overlapped;
|
||||
WSABUF wbuf;
|
||||
DWORD flags;
|
||||
#endif
|
||||
} NetStateWrite;
|
||||
|
||||
JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
|
||||
@@ -314,13 +395,55 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
|
||||
janet_cancel(s->fiber, janet_cstringv("stream closed"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
#ifdef JANET_WINDOWS
|
||||
case JANET_ASYNC_EVENT_INIT: {
|
||||
/* Begin write */
|
||||
case JANET_ASYNC_EVENT_COMPLETE: {
|
||||
/* Called when write finished */
|
||||
if (s->bytes == 0 && !state->dest_abst) {
|
||||
janet_cancel(s->fiber, janet_cstringv("disconnect"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_COMPLETE: {
|
||||
case JANET_ASYNC_EVENT_USER: {
|
||||
/* Begin write */
|
||||
int32_t start, len;
|
||||
const uint8_t *bytes;
|
||||
start = state->start;
|
||||
if (state->is_buffer) {
|
||||
/* If buffer, convert to string. */
|
||||
/* TODO - be more efficient about this */
|
||||
JanetBuffer *buffer = state->src.buf;
|
||||
JanetString str = janet_string(buffer->data, buffer->count);
|
||||
bytes = str;
|
||||
len = buffer->count;
|
||||
state->is_buffer = 0;
|
||||
state->src.str = str;
|
||||
} else {
|
||||
bytes = state->src.str;
|
||||
len = janet_string_length(bytes);
|
||||
}
|
||||
state->wbuf.buf = (char *) bytes;
|
||||
state->wbuf.len = len;
|
||||
state->flags = 0;
|
||||
s->tag = &state->overlapped;
|
||||
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
|
||||
|
||||
/* Called when write finished */
|
||||
int status;
|
||||
SOCKET sock = (SOCKET) s->pollable->handle;
|
||||
if (state->dest_abst) {
|
||||
const struct sockaddr *to = state->dest_abst;
|
||||
int tolen = (int) janet_abstract_size((void *) to);
|
||||
status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
|
||||
} else {
|
||||
status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL);
|
||||
}
|
||||
|
||||
if (status && WSA_IO_PENDING != WSAGetLastError()) {
|
||||
janet_cancel(s->fiber, janet_cstringv("failed to write to stream"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
#else
|
||||
@@ -339,8 +462,8 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
|
||||
JReadInt nwrote = 0;
|
||||
if (start < len) {
|
||||
int32_t nbytes = len - start;
|
||||
void *dest_abst = state->dest_abst;
|
||||
do {
|
||||
void *dest_abst = state->dest_abst;
|
||||
if (dest_abst) {
|
||||
nwrote = sendto(s->pollable->handle, bytes + start, nbytes, 0,
|
||||
(struct sockaddr *) dest_abst, janet_abstract_size(dest_abst));
|
||||
@@ -348,6 +471,20 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
|
||||
nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL);
|
||||
}
|
||||
} while (nwrote == -1 && JLASTERR == JEINTR);
|
||||
|
||||
/* Handle write errors */
|
||||
if (nwrote == -1) {
|
||||
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
|
||||
janet_cancel(s->fiber, net_lasterr());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
/* Unless using datagrams, empty message is a disconnect */
|
||||
if (nwrote == 0 && !dest_abst) {
|
||||
janet_cancel(s->fiber, janet_cstringv("disconnect"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
if (nwrote > 0) {
|
||||
start += nwrote;
|
||||
} else {
|
||||
@@ -356,13 +493,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
|
||||
}
|
||||
state->start = start;
|
||||
if (start >= len) {
|
||||
if (nwrote > 0) {
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
} else if (nwrote == 0) {
|
||||
janet_cancel(s->fiber, janet_cstringv("could not write"));
|
||||
} else {
|
||||
janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR)));
|
||||
}
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
break;
|
||||
@@ -380,6 +511,7 @@ JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetB
|
||||
state->start = 0;
|
||||
state->src.buf = buf;
|
||||
state->dest_abst = dest_abst;
|
||||
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
@@ -391,6 +523,7 @@ JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, co
|
||||
state->start = 0;
|
||||
state->src.str = str;
|
||||
state->dest_abst = dest_abst;
|
||||
net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
@@ -398,96 +531,150 @@ JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, co
|
||||
* State machine for simple server
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
JanetFunction *function;
|
||||
} NetStateSimpleServer;
|
||||
|
||||
JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEvent event) {
|
||||
NetStateSimpleServer *state = (NetStateSimpleServer *) s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_INIT:
|
||||
/* We know the pollable will be a stream */
|
||||
janet_gcroot(janet_wrap_abstract(s->pollable));
|
||||
#ifdef JANET_WINDOWS
|
||||
/* requires some more setup code */
|
||||
#endif
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK:
|
||||
janet_mark(janet_wrap_function(state->function));
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_gcunroot(janet_wrap_abstract(s->pollable));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
#ifdef JANET_WINDOWS
|
||||
case JANET_ASYNC_EVENT_COMPLETE: {
|
||||
/* Called when ever we get an IOCP event */
|
||||
}
|
||||
break;
|
||||
#else
|
||||
case JANET_ASYNC_EVENT_READ: {
|
||||
JSock connfd = accept(s->pollable->handle, NULL, NULL);
|
||||
if (JSOCKVALID(connfd)) {
|
||||
/* Made a new connection socket */
|
||||
nosigpipe(connfd);
|
||||
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
|
||||
Janet streamv = janet_wrap_abstract(stream);
|
||||
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
|
||||
janet_schedule(fiber, janet_wrap_nil());
|
||||
}
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
return JANET_ASYNC_STATUS_NOT_DONE;
|
||||
}
|
||||
|
||||
/* State machine for accepting connections. */
|
||||
|
||||
#ifdef JANET_WINDOWS
|
||||
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
WSAOVERLAPPED overlapped;
|
||||
JanetFunction *function;
|
||||
JanetStream *lstream;
|
||||
JanetStream *astream;
|
||||
char buf[1024];
|
||||
} NetStateAccept;
|
||||
|
||||
static int net_sched_accept_impl(NetStateAccept *state, Janet *err);
|
||||
|
||||
JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) {
|
||||
NetStateAccept *state = (NetStateAccept *)s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK: {
|
||||
if (state->lstream) janet_mark(janet_wrap_abstract(state->lstream));
|
||||
if (state->astream) janet_mark(janet_wrap_abstract(state->astream));
|
||||
if (state->function) janet_mark(janet_wrap_abstract(state->function));
|
||||
break;
|
||||
}
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_cancel(s->fiber, janet_cstringv("stream closed"));
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
#ifdef JANET_WINDOWS
|
||||
case JANET_ASYNC_EVENT_INIT: {
|
||||
|
||||
}
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_COMPLETE: {
|
||||
int seconds;
|
||||
int bytes = sizeof(seconds);
|
||||
if (NO_ERROR != getsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_CONNECT_TIME,
|
||||
(char *)&seconds, &bytes)) {
|
||||
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
if (NO_ERROR != setsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
|
||||
(char *) & (state->lstream->handle), sizeof(SOCKET))) {
|
||||
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
|
||||
}
|
||||
break;
|
||||
#else
|
||||
case JANET_ASYNC_EVENT_READ: {
|
||||
JSock connfd = accept(s->pollable->handle, NULL, NULL);
|
||||
if (JSOCKVALID(connfd)) {
|
||||
nosigpipe(connfd);
|
||||
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
|
||||
Janet streamv = janet_wrap_abstract(stream);
|
||||
Janet streamv = janet_wrap_abstract(state->astream);
|
||||
if (state->function) {
|
||||
/* Schedule worker */
|
||||
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
|
||||
janet_schedule(fiber, janet_wrap_nil());
|
||||
/* Now listen again for next connection */
|
||||
Janet err;
|
||||
if (net_sched_accept_impl(state, &err)) {
|
||||
janet_cancel(s->fiber, err);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
} else {
|
||||
janet_schedule(s->fiber, streamv);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
return JANET_ASYNC_STATUS_NOT_DONE;
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) {
|
||||
janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
|
||||
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
|
||||
Janet err;
|
||||
SOCKET lsock = (SOCKET) stream->handle;
|
||||
JanetListenerState *s = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
|
||||
NetStateAccept *state = (NetStateAccept *)s;
|
||||
memset(&state->overlapped, 0, sizeof(WSAOVERLAPPED));
|
||||
memset(&state->buf, 0, 1024);
|
||||
state->function = fun;
|
||||
state->lstream = stream;
|
||||
s->tag = &state->overlapped;
|
||||
if (net_sched_accept_impl(state, &err)) janet_panicv(err);
|
||||
janet_await();
|
||||
}
|
||||
|
||||
static int net_sched_accept_impl(NetStateAccept *state, Janet *err) {
|
||||
SOCKET lsock = (SOCKET) state->lstream->handle;
|
||||
SOCKET asock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
|
||||
if (asock == INVALID_SOCKET) {
|
||||
*err = net_lasterr();
|
||||
return 1;
|
||||
}
|
||||
JanetStream *astream = make_stream(asock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
|
||||
state->astream = astream;
|
||||
int socksize = sizeof(SOCKADDR_STORAGE) + 16;
|
||||
if (FALSE == AcceptEx(lsock, asock, state->buf, 0, socksize, socksize, NULL, &state->overlapped)) {
|
||||
int code = WSAGetLastError();
|
||||
if (code == WSA_IO_PENDING) return 0; /* indicates io is happening async */
|
||||
*err = net_lasterr();
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
typedef struct {
|
||||
JanetListenerState head;
|
||||
JanetFunction *function;
|
||||
} NetStateAccept;
|
||||
|
||||
JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) {
|
||||
NetStateAccept *state = (NetStateAccept *)s;
|
||||
switch (event) {
|
||||
default:
|
||||
break;
|
||||
case JANET_ASYNC_EVENT_MARK: {
|
||||
if (state->function) janet_mark(janet_wrap_function(state->function));
|
||||
break;
|
||||
}
|
||||
case JANET_ASYNC_EVENT_CLOSE:
|
||||
janet_schedule(s->fiber, janet_wrap_nil());
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
case JANET_ASYNC_EVENT_READ: {
|
||||
JSock connfd = accept(s->pollable->handle, NULL, NULL);
|
||||
if (JSOCKVALID(connfd)) {
|
||||
nosigpipe(connfd);
|
||||
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
|
||||
Janet streamv = janet_wrap_abstract(stream);
|
||||
if (state->function) {
|
||||
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
|
||||
janet_schedule(fiber, janet_wrap_nil());
|
||||
} else {
|
||||
janet_schedule(s->fiber, streamv);
|
||||
return JANET_ASYNC_STATUS_DONE;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
return JANET_ASYNC_STATUS_NOT_DONE;
|
||||
}
|
||||
|
||||
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
|
||||
NetStateAccept *state = (NetStateAccept *) janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
|
||||
state->function = fun;
|
||||
janet_await();
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
/* Adress info */
|
||||
|
||||
static int janet_get_sockettype(Janet *argv, int32_t argc, int32_t n) {
|
||||
@@ -513,7 +700,16 @@ static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset, int sock
|
||||
JANET_OUT_OF_MEMORY;
|
||||
}
|
||||
saddr->sun_family = AF_UNIX;
|
||||
snprintf(saddr->sun_path, 108, "%s", path);
|
||||
memset(&saddr->sun_path, 0, 108);
|
||||
#ifdef JANET_LINUX
|
||||
if (path[0] == '@') {
|
||||
saddr->sun_path[0] = '\0';
|
||||
snprintf(saddr->sun_path + 1, 107, "%s", path + 1);
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
snprintf(saddr->sun_path, 108, "%s", path);
|
||||
}
|
||||
*is_unix = 1;
|
||||
return (struct addrinfo *) saddr;
|
||||
}
|
||||
@@ -608,7 +804,11 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
|
||||
{
|
||||
struct addrinfo *rp = NULL;
|
||||
for (rp = ai; rp != NULL; rp = rp->ai_next) {
|
||||
#ifdef JANET_WINDOWS
|
||||
sock = WSASocketW(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
|
||||
#else
|
||||
sock = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol);
|
||||
#endif
|
||||
if (JSOCKVALID(sock)) {
|
||||
addr = rp->ai_addr;
|
||||
addrlen = (socklen_t) rp->ai_addrlen;
|
||||
@@ -655,13 +855,11 @@ static const char *serverify_socket(JSock sfd) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static Janet cfun_net_server(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 2, 4);
|
||||
static Janet cfun_net_listen(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 2, 3);
|
||||
|
||||
/* Get host, port, and handler*/
|
||||
JanetFunction *fun = janet_optfunction(argv, argc, 2, NULL);
|
||||
|
||||
int socktype = janet_get_sockettype(argv, argc, 3);
|
||||
int socktype = janet_get_sockettype(argv, argc, 2);
|
||||
int is_unix = 0;
|
||||
struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 1, &is_unix);
|
||||
|
||||
@@ -686,7 +884,11 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
|
||||
/* Check all addrinfos in a loop for the first that we can bind to. */
|
||||
struct addrinfo *rp = NULL;
|
||||
for (rp = ai; rp != NULL; rp = rp->ai_next) {
|
||||
#ifdef JANET_WINDOWS
|
||||
sfd = WSASocketW(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
|
||||
#else
|
||||
sfd = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol);
|
||||
#endif
|
||||
if (!JSOCKVALID(sfd)) continue;
|
||||
const char *err = serverify_socket(sfd);
|
||||
if (NULL != err) {
|
||||
@@ -707,15 +909,8 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
|
||||
|
||||
if (socktype == SOCK_DGRAM) {
|
||||
/* Datagram server (UDP) */
|
||||
|
||||
if (NULL == fun) {
|
||||
/* Server no handler */
|
||||
JanetStream *stream = make_stream(sfd, JANET_STREAM_UDPSERVER | JANET_STREAM_READABLE);
|
||||
return janet_wrap_abstract(stream);
|
||||
} else {
|
||||
/* Server with handler */
|
||||
janet_panic("handler must be nil for datagram server");
|
||||
}
|
||||
JanetStream *stream = make_stream(sfd, JANET_STREAM_UDPSERVER | JANET_STREAM_READABLE);
|
||||
return janet_wrap_abstract(stream);
|
||||
} else {
|
||||
/* Stream server (TCP) */
|
||||
|
||||
@@ -727,17 +922,8 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
|
||||
}
|
||||
|
||||
/* Put sfd on our loop */
|
||||
if (NULL == fun) {
|
||||
JanetStream *stream = make_stream(sfd, JANET_STREAM_ACCEPTABLE);
|
||||
return janet_wrap_abstract(stream);
|
||||
} else {
|
||||
/* Server with handler */
|
||||
JanetStream *stream = make_stream(sfd, 0);
|
||||
NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server,
|
||||
JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer), NULL);
|
||||
ss->function = fun;
|
||||
return janet_wrap_abstract(stream);
|
||||
}
|
||||
JanetStream *stream = make_stream(sfd, JANET_STREAM_ACCEPTABLE);
|
||||
return janet_wrap_abstract(stream);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -752,13 +938,21 @@ static void check_stream_flag(JanetStream *stream, int flag) {
|
||||
}
|
||||
}
|
||||
|
||||
static Janet cfun_stream_accept_loop(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 2);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE);
|
||||
JanetFunction *fun = janet_getfunction(argv, 1);
|
||||
janet_sched_accept(stream, fun);
|
||||
}
|
||||
|
||||
static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 1, 2);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE);
|
||||
double to = janet_optnumber(argv, argc, 1, INFINITY);
|
||||
if (to != INFINITY) janet_addtimeout(to);
|
||||
janet_sched_accept(stream);
|
||||
janet_sched_accept(stream, NULL);
|
||||
}
|
||||
|
||||
static Janet cfun_stream_read(int32_t argc, Janet *argv) {
|
||||
@@ -801,6 +995,12 @@ static Janet cfun_stream_close(int32_t argc, Janet *argv) {
|
||||
return janet_wrap_nil();
|
||||
}
|
||||
|
||||
static Janet cfun_stream_closed(int32_t argc, Janet *argv) {
|
||||
janet_fixarity(argc, 1);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
return janet_wrap_boolean(stream->flags & JANET_POLL_FLAG_CLOSED);
|
||||
}
|
||||
|
||||
static Janet cfun_stream_write(int32_t argc, Janet *argv) {
|
||||
janet_arity(argc, 2, 3);
|
||||
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
|
||||
@@ -847,10 +1047,12 @@ static Janet cfun_stream_flush(int32_t argc, Janet *argv) {
|
||||
static const JanetMethod stream_methods[] = {
|
||||
{"chunk", cfun_stream_chunk},
|
||||
{"close", cfun_stream_close},
|
||||
{"closed?", cfun_stream_closed},
|
||||
{"read", cfun_stream_read},
|
||||
{"write", cfun_stream_write},
|
||||
{"flush", cfun_stream_flush},
|
||||
{"accept", cfun_stream_accept},
|
||||
{"accept-loop", cfun_stream_accept_loop},
|
||||
{"send-to", cfun_stream_send_to},
|
||||
{"recv-from", cfun_stream_recv_from},
|
||||
{NULL, NULL}
|
||||
@@ -867,17 +1069,19 @@ static const JanetReg net_cfuns[] = {
|
||||
"net/address", cfun_net_sockaddr,
|
||||
JDOC("(net/address host port &opt type)\n\n"
|
||||
"Look up the connection information for a given hostname, port, and connection type. Returns "
|
||||
"a handle that can be used to send datagrams over network without establishing a connection.")
|
||||
"a handle that can be used to send datagrams over network without establishing a connection. "
|
||||
"On Posix platforms, you can use :unix for host to connect to a unix domain socket, where the name is "
|
||||
"given in the port argument. On Linux, abstract "
|
||||
"unix domain sockets are specified with a leading '@' character in port.")
|
||||
},
|
||||
{
|
||||
"net/server", cfun_net_server,
|
||||
JDOC("(net/server host port &opt handler type)\n\n"
|
||||
"Start a TCP server. handler is a function that will be called with a stream "
|
||||
"on each connection to the server. Returns a new stream that is neither readable nor "
|
||||
"writeable. If handler is nil or not provided, net/accept must be used to get the next connection "
|
||||
"to the server. The type parameter specifies the type of network connection, either "
|
||||
"a stream (usually tcp), or datagram (usually udp). If not specified, the default is "
|
||||
"stream.")
|
||||
"net/listen", cfun_net_listen,
|
||||
JDOC("(net/listen host port &opt type)\n\n"
|
||||
"Creates a server. Returns a new stream that is neither readable nor "
|
||||
"writeable. Use net/accept or net/accept-loop be to handle connections and start the server."
|
||||
"The type parameter specifies the type of network connection, either "
|
||||
"a :stream (usually tcp), or :datagram (usually udp). If not specified, the default is "
|
||||
":stream. The host and port arguments are the same as in net/address.")
|
||||
},
|
||||
{
|
||||
"net/accept", cfun_stream_accept,
|
||||
@@ -886,6 +1090,12 @@ static const JanetReg net_cfuns[] = {
|
||||
"Takes an optional timeout in seconds, after which will return nil. "
|
||||
"Returns a new duplex stream which represents a connection to the client.")
|
||||
},
|
||||
{
|
||||
"net/accept-loop", cfun_stream_accept_loop,
|
||||
JDOC("(net/accept-loop stream handler)\n\n"
|
||||
"Shorthand for running a server stream that will continuously accept new connections."
|
||||
"Blocks the current fiber until the stream is closed, and will return the stream.")
|
||||
},
|
||||
{
|
||||
"net/read", cfun_stream_read,
|
||||
JDOC("(net/read stream nbytes &opt buf timeout)\n\n"
|
||||
@@ -931,6 +1141,11 @@ static const JanetReg net_cfuns[] = {
|
||||
JDOC("(net/close stream)\n\n"
|
||||
"Close a stream so that no further communication can occur.")
|
||||
},
|
||||
{
|
||||
"net/closed?", cfun_stream_closed,
|
||||
JDOC("(net/closed? stream)\n\n"
|
||||
"Check if a stream is closed.")
|
||||
},
|
||||
{
|
||||
"net/connect", cfun_net_connect,
|
||||
JDOC("(net/connect host porti &opt type)\n\n"
|
||||
|
||||
@@ -1743,8 +1743,8 @@ static const JanetReg os_cfuns[] = {
|
||||
},
|
||||
{
|
||||
"os/sleep", os_sleep,
|
||||
JDOC("(os/sleep nsec)\n\n"
|
||||
"Suspend the program for nsec seconds. 'nsec' can be a real number. Returns "
|
||||
JDOC("(os/sleep n)\n\n"
|
||||
"Suspend the program for n seconds. 'nsec' can be a real number. Returns "
|
||||
"nil.")
|
||||
},
|
||||
{
|
||||
|
||||
@@ -256,7 +256,7 @@ static void janet_table_mergekv(JanetTable *table, const JanetKV *kvs, int32_t c
|
||||
}
|
||||
}
|
||||
|
||||
/* Merge a table other into another table */
|
||||
/* Merge a table into another table */
|
||||
void janet_table_merge_table(JanetTable *table, JanetTable *other) {
|
||||
janet_table_mergekv(table, other->data, other->capacity);
|
||||
}
|
||||
|
||||
@@ -449,7 +449,8 @@ void janet_cfuns(JanetTable *env, const char *regprefix, const JanetReg *cfuns)
|
||||
|
||||
void janet_register_abstract_type(const JanetAbstractType *at) {
|
||||
Janet sym = janet_csymbolv(at->name);
|
||||
if (!(janet_checktype(janet_table_get(janet_vm_abstract_registry, sym), JANET_NIL))) {
|
||||
Janet check = janet_table_get(janet_vm_abstract_registry, sym);
|
||||
if (!janet_checktype(check, JANET_NIL) && at != janet_unwrap_pointer(check)) {
|
||||
janet_panicf("cannot register abstract type %s, "
|
||||
"a type with the same name exists", at->name);
|
||||
}
|
||||
|
||||
@@ -501,6 +501,7 @@ typedef void *JanetAbstract;
|
||||
#ifdef JANET_EV
|
||||
#define JANET_POLL_FLAG_CLOSED 0x1
|
||||
#define JANET_POLL_FLAG_SOCKET 0x2
|
||||
#define JANET_POLL_FLAG_IOCP 0x4
|
||||
|
||||
typedef enum {
|
||||
JANET_ASYNC_EVENT_INIT,
|
||||
@@ -510,7 +511,8 @@ typedef enum {
|
||||
JANET_ASYNC_EVENT_READ,
|
||||
JANET_ASYNC_EVENT_WRITE,
|
||||
JANET_ASYNC_EVENT_TIMEOUT,
|
||||
JANET_ASYNC_EVENT_COMPLETE /* Used on windows for IOCP */
|
||||
JANET_ASYNC_EVENT_COMPLETE, /* Used on windows for IOCP */
|
||||
JANET_ASYNC_EVENT_USER
|
||||
} JanetAsyncEvent;
|
||||
|
||||
#define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ)
|
||||
@@ -546,6 +548,10 @@ struct JanetListenerState {
|
||||
JanetPollable *pollable;
|
||||
void *event; /* Used to pass data from asynchronous IO event. Contents depend on both
|
||||
implementation of the event loop and the particular event. */
|
||||
#ifdef JANET_WINDOWS
|
||||
void *tag; /* Used to associate listeners with an overlapped structure */
|
||||
int bytes; /* Used to track how many bytes were transfered. */
|
||||
#endif
|
||||
/* internal */
|
||||
int _index; /* not used in all implementations */
|
||||
int _mask;
|
||||
@@ -827,7 +833,6 @@ struct JanetFiber {
|
||||
JanetFiber *child; /* Keep linked list of fibers for restarting pending fibers */
|
||||
#ifdef JANET_EV
|
||||
JanetListenerState *waiting;
|
||||
int32_t timeout_index;
|
||||
uint32_t sched_id; /* Increment everytime fiber is scheduled by event loop */
|
||||
#endif
|
||||
};
|
||||
|
||||
@@ -33,9 +33,16 @@
|
||||
(:write stream b)
|
||||
(buffer/clear b)))
|
||||
|
||||
(def s (net/server "127.0.0.1" "8000" handler))
|
||||
(def s (net/server "127.0.0.1" "8000"))
|
||||
(assert s "made server 1")
|
||||
|
||||
(ev/go
|
||||
(coro
|
||||
(while (not (net/closed? s))
|
||||
(def conn (net/accept s))
|
||||
(unless conn (break))
|
||||
(ev/call handler conn))))
|
||||
|
||||
(defn test-echo [msg]
|
||||
(with [conn (net/connect "127.0.0.1" "8000")]
|
||||
(:write conn msg)
|
||||
|
||||
Reference in New Issue
Block a user