diff --git a/examples/udpclient.janet b/examples/udpclient.janet new file mode 100644 index 00000000..bd07c75c --- /dev/null +++ b/examples/udpclient.janet @@ -0,0 +1,5 @@ +(def conn (net/connect "127.0.0.1" "8009" :datagram)) +(:write conn (string/format "%q" (os/cryptorand 16))) +(def x (:read conn 1024)) +(pp x) + diff --git a/examples/udpserver.janet b/examples/udpserver.janet new file mode 100644 index 00000000..f00c8189 --- /dev/null +++ b/examples/udpserver.janet @@ -0,0 +1,6 @@ +(def server (net/server "127.0.0.1" "8009" nil :datagram)) +(while true + (def buf @"") + (def who (:recv-from server 1024 buf)) + (printf "got %q from %v, echoing!" buf who) + (:send-to server who buf)) diff --git a/src/core/ev.c b/src/core/ev.c index 93de9f02..82a7e0b8 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -139,6 +139,9 @@ static void pop_timeout(void) { /* Create a new event listener */ static JanetListenerState *janet_listen_impl(JanetPollable *pollable, JanetListener behavior, int mask, size_t size) { + if (pollable->_mask & mask) { + janet_panic("cannot listen for duplicate event on pollable"); + } if (size < sizeof(JanetListenerState)) size = sizeof(JanetListenerState); JanetListenerState *state = malloc(size); @@ -315,10 +318,10 @@ JANET_THREAD_LOCAL int janet_vm_timerfd = 0; JANET_THREAD_LOCAL int janet_vm_timer_enabled = 0; static int make_epoll_events(int mask) { - int events = 0; - if (mask & JANET_ASYNC_EVENT_READ) + int events = EPOLLET; + if (mask & JANET_ASYNC_LISTEN_READ) events |= EPOLLIN; - if (mask & JANET_ASYNC_EVENT_WRITE) + if (mask & JANET_ASYNC_LISTEN_WRITE) events |= EPOLLOUT; return events; } @@ -348,7 +351,7 @@ static void janet_unlisten(JanetListenerState *state) { int is_last = (state->_next == NULL && pollable->state == state); int op = is_last ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; struct epoll_event ev; - ev.events = make_epoll_events(pollable->_mask); + ev.events = make_epoll_events(pollable->_mask & ~state->_mask); ev.data.ptr = pollable; int status; do { @@ -402,12 +405,13 @@ void janet_loop1_impl(void) { JanetListenerState *state = pollable->state; while (NULL != state) { JanetListenerState *next_state = state->_next; - JanetAsyncStatus status = JANET_ASYNC_STATUS_NOT_DONE; + JanetAsyncStatus status1 = JANET_ASYNC_STATUS_NOT_DONE; + JanetAsyncStatus status2 = JANET_ASYNC_STATUS_NOT_DONE; if (mask & EPOLLOUT) - status = state->machine(state, JANET_ASYNC_EVENT_WRITE); - if (status == JANET_ASYNC_STATUS_NOT_DONE && (mask & EPOLLIN)) - status = state->machine(state, JANET_ASYNC_EVENT_READ); - if (status == JANET_ASYNC_STATUS_DONE) + status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); + if (mask & EPOLLIN) + status2 = state->machine(state, JANET_ASYNC_EVENT_READ); + if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE) janet_unlisten(state); state = next_state; } diff --git a/src/core/net.c b/src/core/net.c index 1019cdcd..23a92396 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -53,6 +53,7 @@ #define JANET_STREAM_READABLE 0x200 #define JANET_STREAM_WRITABLE 0x400 #define JANET_STREAM_ACCEPTABLE 0x800 +#define JANET_STREAM_UDPSERVER 0x1000 static int janet_stream_close(void *p, size_t s); static int janet_stream_mark(void *p, size_t s); @@ -67,6 +68,11 @@ static const JanetAbstractType StreamAT = { typedef JanetPollable JanetStream; +static const JanetAbstractType AddressAT = { + "core/socket-address", + JANET_ATEND_NAME +}; + #ifdef JANET_WINDOWS #define JSOCKCLOSE(x) closesocket(x) #define JSOCKDEFAULT INVALID_SOCKET @@ -147,6 +153,7 @@ typedef struct { int32_t bytes_left; JanetBuffer *buf; int is_chunk; + int is_recv_from; } NetStateRead; JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) { @@ -168,8 +175,15 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) int32_t bytes_left = state->bytes_left; janet_buffer_extra(buffer, bytes_left); JReadInt nread; + struct sockaddr_in saddr; + socklen_t socklen = sizeof(saddr); do { - nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0); + if (state->is_recv_from) { + nread = recvfrom(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0, + (struct sockaddr *)&saddr, &socklen); + } else { + nread = recv(s->pollable->handle, buffer->data + buffer->count, bytes_left, 0); + } } while (nread == -1 && JLASTERR == JEINTR); if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) { break; @@ -186,7 +200,14 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) /* Resume if done */ if (!state->is_chunk || bytes_left == 0) { - Janet resume_val = nread > 0 ? janet_wrap_buffer(buffer) : janet_wrap_nil(); + Janet resume_val; + if (state->is_recv_from) { + void *abst = janet_abstract(&AddressAT, socklen); + memcpy(abst, &saddr, socklen); + resume_val = janet_wrap_abstract(abst); + } else { + resume_val = nread > 0 ? janet_wrap_buffer(buffer) : janet_wrap_nil(); + } janet_schedule(s->fiber, resume_val); return JANET_ASYNC_STATUS_DONE; } @@ -198,24 +219,36 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event) JANET_NO_RETURN static void janet_sched_read(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_EVENT_READ, sizeof(NetStateRead)); + JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); state->is_chunk = 0; state->buf = buf; state->bytes_left = nbytes; + state->is_recv_from = 0; janet_await(); } JANET_NO_RETURN static void janet_sched_chunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, - JANET_ASYNC_EVENT_READ, sizeof(NetStateRead)); + JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); state->is_chunk = 1; state->buf = buf; state->bytes_left = nbytes; + state->is_recv_from = 0; + janet_await(); +} + +JANET_NO_RETURN static void janet_sched_recv_from(JanetStream *stream, JanetBuffer *buf, int32_t nbytes) { + NetStateRead *state = (NetStateRead *) janet_listen(stream, net_machine_read, + JANET_ASYNC_LISTEN_READ, sizeof(NetStateRead)); + state->is_chunk = 0; + state->buf = buf; + state->bytes_left = nbytes; + state->is_recv_from = 1; janet_await(); } /* - * State machine for write + * State machine for write/send-to */ typedef struct { @@ -226,6 +259,7 @@ typedef struct { } src; int32_t start; int is_buffer; + void *dest_abst; } NetStateWrite; JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) { @@ -237,6 +271,9 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) janet_mark(state->is_buffer ? janet_wrap_buffer(state->src.buf) : janet_wrap_string(state->src.str)); + if (state->dest_abst != NULL) { + janet_mark(janet_wrap_abstract(state->dest_abst)); + } break; case JANET_ASYNC_EVENT_CLOSE: janet_schedule(s->fiber, janet_wrap_nil()); @@ -257,7 +294,13 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) int32_t nbytes = len - start; JReadInt nwrote; do { - nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL); + void *dest_abst = state->dest_abst; + if (dest_abst) { + nwrote = sendto(s->pollable->handle, bytes + start, nbytes, 0, + (struct sockaddr *) dest_abst, janet_abstract_size(dest_abst)); + } else { + nwrote = send(s->pollable->handle, bytes + start, nbytes, MSG_NOSIGNAL); + } } while (nwrote == -1 && JLASTERR == JEINTR); if (nwrote > 0) { start += nwrote; @@ -277,22 +320,24 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event) return JANET_ASYNC_STATUS_NOT_DONE; } -JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf) { +JANET_NO_RETURN static void janet_sched_write_buffer(JanetStream *stream, JanetBuffer *buf, void *dest_abst) { NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, - JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite)); + JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite)); state->is_buffer = 1; state->start = 0; state->src.buf = buf; + state->dest_abst = dest_abst; janet_await(); } -JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str) { +JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, const uint8_t *str, void *dest_abst) { NetStateWrite *state = (NetStateWrite *) janet_listen(stream, net_machine_write, - JANET_ASYNC_EVENT_WRITE, sizeof(NetStateWrite)); + JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateWrite)); state->is_buffer = 0; state->start = 0; state->src.str = str; + state->dest_abst = dest_abst; janet_await(); } @@ -364,14 +409,25 @@ JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event } JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) { - janet_listen(stream, net_machine_accept, JANET_ASYNC_EVENT_READ, sizeof(NetStateAccept)); + janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept)); janet_await(); } /* Adress info */ +static int janet_get_sockettype(Janet *argv, int32_t argc, int32_t n) { + JanetKeyword stype = janet_optkeyword(argv, argc, n, NULL); + int socktype = SOCK_DGRAM; + if ((NULL == stype) || !janet_cstrcmp(stype, "stream")) { + socktype = SOCK_STREAM; + } else if (janet_cstrcmp(stype, "datagram")) { + janet_panicf("expected socket type as :stream or :datagram, got %v", argv[n]); + } + return socktype; +} + /* Needs argc >= offset + 2 */ -static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset) { +static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset, int socktype, int passive) { /* Get host and port */ const char *host = janet_getcstring(argv, offset); const char *port; @@ -385,9 +441,8 @@ static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset) { struct addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = 0; - hints.ai_flags = AI_PASSIVE; + hints.ai_socktype = socktype; + hints.ai_flags = passive ? AI_PASSIVE : 0; int status = getaddrinfo(host, port, &hints, &ai); if (status) { janet_panicf("could not get address info: %s", gai_strerror(status)); @@ -399,13 +454,47 @@ static struct addrinfo *janet_get_addrinfo(Janet *argv, int32_t offset) { * C Funs */ -static Janet cfun_net_connect(int32_t argc, Janet *argv) { - janet_fixarity(argc, 2); +static Janet cfun_net_sockaddr(int32_t argc, Janet *argv) { + janet_arity(argc, 2, 4); + int socktype = janet_get_sockettype(argv, argc, 2); + struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 0); + if (argc >= 3 && janet_truthy(argv[3])) { + /* Select all */ + JanetArray *arr = janet_array(10); + struct addrinfo *iter = ai; + while (NULL != iter) { + void *abst = janet_abstract(&AddressAT, iter->ai_addrlen); + memcpy(abst, iter->ai_addr, iter->ai_addrlen); + janet_array_push(arr, janet_wrap_abstract(abst)); + iter = iter->ai_next; + } + freeaddrinfo(ai); + return janet_wrap_array(arr); + } else { + /* Select first */ + if (NULL == ai) { + janet_panic("no data for given address"); + } + void *abst = janet_abstract(&AddressAT, ai->ai_addrlen); + memcpy(abst, ai->ai_addr, ai->ai_addrlen); + freeaddrinfo(ai); + return janet_wrap_abstract(abst); + } +} - struct addrinfo *ai = janet_get_addrinfo(argv, 0); +static Janet cfun_net_connect(int32_t argc, Janet *argv) { + janet_arity(argc, 2, 3); + + int socktype = janet_get_sockettype(argv, argc, 2); + struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 0); /* Create socket */ - JSock sock = socket(ai->ai_family, ai->ai_socktype | JSOCKFLAGS, ai->ai_protocol); + JSock sock = JSOCKDEFAULT; + struct addrinfo *rp = NULL; + for (rp = ai; rp != NULL; rp = rp->ai_next) { + sock = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol); + if (JSOCKVALID(sock)) break; + } if (!JSOCKVALID(sock)) { freeaddrinfo(ai); janet_panic("could not create socket"); @@ -425,12 +514,13 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) { } static Janet cfun_net_server(int32_t argc, Janet *argv) { - janet_arity(argc, 2, 3); + janet_arity(argc, 2, 4); /* Get host, port, and handler*/ JanetFunction *fun = janet_optfunction(argv, argc, 2, NULL); - struct addrinfo *ai = janet_get_addrinfo(argv, 0); + int socktype = janet_get_sockettype(argv, argc, 3); + struct addrinfo *ai = janet_get_addrinfo(argv, 0, socktype, 1); /* Check all addrinfos in a loop for the first that we can bind to. */ JSock sfd = JSOCKDEFAULT; @@ -460,48 +550,71 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) { if (bind(sfd, rp->ai_addr, (int) rp->ai_addrlen) == 0) break; JSOCKCLOSE(sfd); } + + freeaddrinfo(ai); if (NULL == rp) { - freeaddrinfo(ai); janet_panic("could not bind to any sockets"); } - /* listen */ - int status = listen(sfd, 1024); - freeaddrinfo(ai); - if (status) { - JSOCKCLOSE(sfd); - janet_panic("could not listen on file descriptor"); - } + if (socktype == SOCK_DGRAM) { + /* Datagram server (UDP) */ - /* Put sfd on our loop */ - if (NULL == fun) { - JanetStream *stream = make_stream(sfd, JANET_STREAM_ACCEPTABLE); - return janet_wrap_abstract(stream); + if (NULL == fun) { + /* Server no handler */ + JanetStream *stream = make_stream(sfd, JANET_STREAM_UDPSERVER | JANET_STREAM_READABLE); + return janet_wrap_abstract(stream); + } else { + /* Server with handler */ + /* TODO - state machine */ + janet_panic("nyi"); + } } else { - /* Server with handler */ - JanetStream *stream = make_stream(sfd, 0); - NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, - JANET_ASYNC_EVENT_READ, sizeof(NetStateSimpleServer)); - ss->function = fun; - return janet_wrap_abstract(stream); + /* Stream server (TCP) */ + + /* listen */ + int status = listen(sfd, 1024); + if (status) { + JSOCKCLOSE(sfd); + janet_panic("could not listen on file descriptor"); + } + + /* Put sfd on our loop */ + if (NULL == fun) { + JanetStream *stream = make_stream(sfd, JANET_STREAM_ACCEPTABLE); + return janet_wrap_abstract(stream); + } else { + /* Server with handler */ + JanetStream *stream = make_stream(sfd, 0); + NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, + JANET_ASYNC_LISTEN_READ, sizeof(NetStateSimpleServer)); + ss->function = fun; + return janet_wrap_abstract(stream); + } + } +} + +static void check_stream_flag(JanetStream *stream, int flag) { + if (!(stream->flags & flag) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { + const char *msg = ""; + if (flag == JANET_STREAM_READABLE) msg = "readable"; + if (flag == JANET_STREAM_WRITABLE) msg = "writable"; + if (flag == JANET_STREAM_ACCEPTABLE) msg = "server"; + if (flag == JANET_STREAM_UDPSERVER) msg = "datagram server"; + janet_panicf("bad stream - expected %s stream", msg); } } static Janet cfun_stream_accept(int32_t argc, Janet *argv) { janet_fixarity(argc, 1); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - if (!(stream->flags & JANET_STREAM_ACCEPTABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { - janet_panic("got non acceptable stream"); - } + check_stream_flag(stream, JANET_STREAM_ACCEPTABLE); janet_sched_accept(stream); } static Janet cfun_stream_read(int32_t argc, Janet *argv) { janet_arity(argc, 2, 3); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - if (!(stream->flags & JANET_STREAM_READABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { - janet_panic("got non readable stream"); - } + check_stream_flag(stream, JANET_STREAM_READABLE); int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); janet_sched_read(stream, buffer, n); @@ -510,14 +623,21 @@ static Janet cfun_stream_read(int32_t argc, Janet *argv) { static Janet cfun_stream_chunk(int32_t argc, Janet *argv) { janet_arity(argc, 2, 3); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - if (!(stream->flags & JANET_STREAM_READABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { - janet_panic("got non readable stream"); - } + check_stream_flag(stream, JANET_STREAM_READABLE); int32_t n = janet_getnat(argv, 1); JanetBuffer *buffer = janet_optbuffer(argv, argc, 2, 10); janet_sched_chunk(stream, buffer, n); } +static Janet cfun_stream_recv_from(int32_t argc, Janet *argv) { + janet_fixarity(argc, 3); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + check_stream_flag(stream, JANET_STREAM_UDPSERVER); + int32_t n = janet_getnat(argv, 1); + JanetBuffer *buffer = janet_getbuffer(argv, 2); + janet_sched_recv_from(stream, buffer, n); +} + static Janet cfun_stream_close(int32_t argc, Janet *argv) { janet_fixarity(argc, 1); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); @@ -528,23 +648,32 @@ static Janet cfun_stream_close(int32_t argc, Janet *argv) { static Janet cfun_stream_write(int32_t argc, Janet *argv) { janet_fixarity(argc, 2); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - if (!(stream->flags & JANET_STREAM_WRITABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { - janet_panic("got non writeable stream"); - } + check_stream_flag(stream, JANET_STREAM_WRITABLE); if (janet_checktype(argv[1], JANET_BUFFER)) { - janet_sched_write_buffer(stream, janet_getbuffer(argv, 1)); + janet_sched_write_buffer(stream, janet_getbuffer(argv, 1), NULL); } else { JanetByteView bytes = janet_getbytes(argv, 1); - janet_sched_write_stringlike(stream, bytes.bytes); + janet_sched_write_stringlike(stream, bytes.bytes, NULL); + } +} + +static Janet cfun_stream_send_to(int32_t argc, Janet *argv) { + janet_fixarity(argc, 3); + JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); + check_stream_flag(stream, JANET_STREAM_UDPSERVER); + void *dest = janet_getabstract(argv, 1, &AddressAT); + if (janet_checktype(argv[2], JANET_BUFFER)) { + janet_sched_write_buffer(stream, janet_getbuffer(argv, 2), dest); + } else { + JanetByteView bytes = janet_getbytes(argv, 2); + janet_sched_write_stringlike(stream, bytes.bytes, dest); } } static Janet cfun_stream_flush(int32_t argc, Janet *argv) { janet_fixarity(argc, 2); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); - if (!(stream->flags & JANET_STREAM_WRITABLE) || (stream->flags & JANET_POLL_FLAG_CLOSED)) { - janet_panic("got non writeable stream"); - } + check_stream_flag(stream, JANET_STREAM_WRITABLE); /* Toggle no delay flag */ int flag = 1; setsockopt(stream->handle, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)); @@ -560,6 +689,8 @@ static const JanetMethod stream_methods[] = { {"write", cfun_stream_write}, {"flush", cfun_stream_flush}, {"accept", cfun_stream_accept}, + {"send-to", cfun_stream_send_to}, + {"recv-from", cfun_stream_recv_from}, {NULL, NULL} }; @@ -570,13 +701,21 @@ static int janet_stream_getter(void *p, Janet key, Janet *out) { } static const JanetReg net_cfuns[] = { + { + "net/address", cfun_net_sockaddr, + JDOC("(net/address host port &opt type)\n\n" + "Look up the connection information for a given hostname, port, and connection type. Returns " + "a handle that can be used to send datagrams over network without establishing a connection.") + }, { "net/server", cfun_net_server, - JDOC("(net/server host port &opt handler)\n\n" + JDOC("(net/server host port &opt handler type)\n\n" "Start a TCP server. handler is a function that will be called with a stream " "on each connection to the server. Returns a new stream that is neither readable nor " - "writeable. If handler is not provided, net/accept must be used to get the next connection " - "to the server.") + "writeable. If handler is nil or not provided, net/accept must be used to get the next connection " + "to the server. The type parameter specifies the type of network connection, either " + "a stream (usually tcp), or datagram (usually udp). If not specified, the default is " + "stream.") }, { "net/accept", cfun_stream_accept, @@ -602,6 +741,18 @@ static const JanetReg net_cfuns[] = { "Write data to a stream, suspending the current fiber until the write " "completes. Returns stream.") }, + { + "net/send-to", cfun_stream_send_to, + JDOC("(net/send-to stream dest data)\n\n" + "Writes a datagram to a server stream. dest is a the destination address of the packet. " + "Returns stream.") + }, + { + "net/recv-from", cfun_stream_recv_from, + JDOC("(net/recv-from stream nbytes buf)\n\n" + "Receives data from a server stream and puts it into a buffer. Returns the socket-address the " + "packet came from.") + }, { "net/flush", cfun_stream_flush, JDOC("(net/flush stream)\n\n" @@ -615,9 +766,10 @@ static const JanetReg net_cfuns[] = { }, { "net/connect", cfun_net_connect, - JDOC("(net/connect host port)\n\n" + JDOC("(net/connect host porti &opt type)\n\n" "Open a connection to communicate with a server. Returns a duplex stream " - "that can be used to communicate with the server.") + "that can be used to communicate with the server. Type is an optional keyword " + "to specify a connection type, either :stream or :datagram. The default is :stream. ") }, {NULL, NULL, NULL} }; diff --git a/src/core/value.c b/src/core/value.c index 1fdc1aeb..141edf7e 100644 --- a/src/core/value.c +++ b/src/core/value.c @@ -271,7 +271,6 @@ int32_t janet_hash(Janet x) { } /* fallthrough */ default: - /* TODO - test performance with different hash functions */ if (sizeof(double) == sizeof(void *)) { /* Assuming 8 byte pointer */ uint64_t i = janet_u64(x); diff --git a/src/include/janet.h b/src/include/janet.h index f7736382..38073228 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -1153,6 +1153,9 @@ typedef enum { JANET_ASYNC_EVENT_TIMEOUT } JanetAsyncEvent; +#define JANET_ASYNC_LISTEN_READ (1 << JANET_ASYNC_EVENT_READ) +#define JANET_ASYNC_LISTEN_WRITE (1 << JANET_ASYNC_EVENT_WRITE) + typedef enum { JANET_ASYNC_STATUS_NOT_DONE, JANET_ASYNC_STATUS_DONE