Get windows IOCP working for accept.

This also changes the api of servers slightly -
in light of having support for ev tasks, it is probably better
to remove the "simple" server code and replace it with some Janet
or remove it all together. While convenient, it has issues with error
handling and rigidity.
This commit is contained in:
Calvin Rose 2020-11-08 18:56:13 -06:00
parent 07910272e2
commit d6391f2d70
6 changed files with 199 additions and 92 deletions

View File

@ -0,0 +1,5 @@
(with [conn (net/connect "127.0.0.1" 8000)]
(print "writing abcdefg...")
(:write conn "abcdefg")
(print "reading...")
(printf "got: %v" (:read conn 1024)))

View File

@ -9,6 +9,7 @@
(printf " %v -> %v" id b) (printf " %v -> %v" id b)
(:write stream b) (:write stream b)
(buffer/clear b)) (buffer/clear b))
(printf "Done %v!" id))) (printf "Done %v!" id)
(ev/sleep 0.5)))
(net/server "127.0.0.1" "8000" handler) (net/server "127.0.0.1" "8000" handler)

View File

@ -765,7 +765,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
if (!result) { if (!result) {
if (!has_timeout) { if (!has_timeout) {
JANET_EXIT("failed to get iocp GetQueuedCompletionStatus"); /* queue emptied */
} }
} else { } else {
/* Normal event */ /* Normal event */
@ -774,6 +774,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
while (state != NULL) { while (state != NULL) {
if (state->tag == overlapped) { if (state->tag == overlapped) {
state->event = overlapped; state->event = overlapped;
state->bytes = num_bytes_transfered;
JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE); JanetAsyncStatus status = state->machine(state, JANET_ASYNC_EVENT_COMPLETE);
if (status == JANET_ASYNC_STATUS_DONE) { if (status == JANET_ASYNC_STATUS_DONE) {
janet_unlisten(state); janet_unlisten(state);

View File

@ -32,6 +32,7 @@
#include <winsock2.h> #include <winsock2.h>
#include <windows.h> #include <windows.h>
#include <ws2tcpip.h> #include <ws2tcpip.h>
#include <mswsock.h>
#pragma comment (lib, "Ws2_32.lib") #pragma comment (lib, "Ws2_32.lib")
#pragma comment (lib, "Mswsock.lib") #pragma comment (lib, "Mswsock.lib")
#pragma comment (lib, "Advapi32.lib") #pragma comment (lib, "Advapi32.lib")
@ -96,6 +97,28 @@ static JanetStream *make_stream(SOCKET fd, uint32_t flags) {
stream->flags = flags; stream->flags = flags;
return stream; return stream;
} }
static Janet net_lasterr(void) {
int code = WSAGetLastError();
char msgbuf[256];
msgbuf[0] = '\0';
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
code,
MAKELANGID (LANG_NEUTRAL, SUBLANG_DEFAULT),
msgbuf,
sizeof (msgbuf),
NULL);
if (!*msgbuf) sprintf(msgbuf, "%d", code);
char *c = msgbuf;
while (*c) {
if (*c == '\n' || *c == '\r') {
*c = '\0';
break;
}
c++;
}
return janet_cstringv(msgbuf);
}
#else #else
#define JSOCKCLOSE(x) close(x) #define JSOCKCLOSE(x) close(x)
#define JSOCKDEFAULT 0 #define JSOCKDEFAULT 0
@ -124,6 +147,9 @@ static JanetStream *make_stream(int fd, uint32_t flags) {
stream->flags = flags; stream->flags = flags;
return stream; return stream;
} }
static Janet net_lasterr(void) {
return janet_cstringv(strerror(errno));
}
#endif #endif
/* We pass this flag to all send calls to prevent sigpipe */ /* We pass this flag to all send calls to prevent sigpipe */
@ -173,7 +199,6 @@ typedef struct {
WSAOVERLAPPED overlapped; WSAOVERLAPPED overlapped;
WSABUF wbuf; WSABUF wbuf;
DWORD flags; DWORD flags;
DWORD received;
int32_t chunk_size; int32_t chunk_size;
struct sockaddr from; struct sockaddr from;
int fromlen; int fromlen;
@ -195,13 +220,13 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: { case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when read finished */ /* Called when read finished */
if (state->received == 0 && !state->is_recv_from) { if (s->bytes == 0 && !state->is_recv_from) {
janet_cancel(s->fiber, janet_cstringv("disconnect")); janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
} }
janet_buffer_push_bytes(state->buf, state->chunk_buf, state->received); janet_buffer_push_bytes(state->buf, state->chunk_buf, s->bytes);
state->bytes_left -= state->received; state->bytes_left -= s->bytes;
if (state->bytes_left <= 0 || !state->is_chunk) { if (state->bytes_left <= 0 || !state->is_chunk) {
Janet resume_val; Janet resume_val;
@ -221,22 +246,20 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
case JANET_ASYNC_EVENT_USER: case JANET_ASYNC_EVENT_USER:
{ {
state->flags = 0; state->flags = 0;
/* If there are bytes left, do another read. Currently only read 2k at a time. */
int32_t chunk_size = state->bytes_left > JANET_NET_CHUNKSIZE ? JANET_NET_CHUNKSIZE : state->bytes_left; int32_t chunk_size = state->bytes_left > JANET_NET_CHUNKSIZE ? JANET_NET_CHUNKSIZE : state->bytes_left;
state->wbuf.len = (ULONG) chunk_size; state->wbuf.len = (ULONG) chunk_size;
state->wbuf.buf = state->chunk_buf; state->wbuf.buf = state->chunk_buf;
state->chunk_size = chunk_size; state->chunk_size = chunk_size;
state->received = 0;
s->tag = &state->overlapped; s->tag = &state->overlapped;
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED)); memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
int status; int status;
if (state->is_recv_from) { if (state->is_recv_from) {
status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, &state->received, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); status = WSARecvFrom((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL);
} else { } else {
status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, &state->received, &state->flags, &state->overlapped, NULL); status = WSARecv((SOCKET) s->pollable->handle, &state->wbuf, 1, NULL, &state->flags, &state->overlapped, NULL);
} }
if (status) { if (status && WSA_IO_PENDING != WSAGetLastError()) {
janet_cancel(s->fiber, janet_cstringv("failed to read from stream")); janet_cancel(s->fiber, net_lasterr());
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
} }
} }
@ -263,13 +286,13 @@ JanetAsyncStatus net_machine_read(JanetListenerState *s, JanetAsyncEvent event)
/* Check for errors - special case errors that can just be waited on to fix */ /* Check for errors - special case errors that can just be waited on to fix */
if (nread == -1) { if (nread == -1) {
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break; if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR))); janet_cancel(s->fiber, net_lasterr());
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
} }
/* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */ /* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */
if (nread == 0 && !state->is_recv_from) { if (nread == 0 && !state->is_recv_from) {
janet_cancel(s->fiber, janet_cstringv("disconnect")); janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
} }
@ -351,7 +374,6 @@ typedef struct {
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
WSAOVERLAPPED overlapped; WSAOVERLAPPED overlapped;
WSABUF wbuf; WSABUF wbuf;
DWORD sent;
DWORD flags; DWORD flags;
#endif #endif
} NetStateWrite; } NetStateWrite;
@ -375,7 +397,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: { case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when write finished */ /* Called when write finished */
if (state->sent == 0 && !state->dest_abst) { if (s->bytes == 0 && !state->dest_abst) {
janet_cancel(s->fiber, janet_cstringv("disconnect")); janet_cancel(s->fiber, janet_cstringv("disconnect"));
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
} }
@ -404,7 +426,6 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
} }
state->wbuf.buf = (char *) bytes; state->wbuf.buf = (char *) bytes;
state->wbuf.len = len; state->wbuf.len = len;
state->sent = 0;
state->flags = 0; state->flags = 0;
s->tag = &state->overlapped; s->tag = &state->overlapped;
memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED)); memset(&(state->overlapped), 0, sizeof(WSAOVERLAPPED));
@ -414,12 +435,12 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
if (state->dest_abst) { if (state->dest_abst) {
const struct sockaddr *to = state->dest_abst; const struct sockaddr *to = state->dest_abst;
int tolen = (int) janet_abstract_size((void *) to); int tolen = (int) janet_abstract_size((void *) to);
status = WSASendTo(sock, &state->wbuf, 1, &state->sent, state->flags, to, tolen, &state->overlapped, NULL); status = WSASendTo(sock, &state->wbuf, 1, NULL, state->flags, to, tolen, &state->overlapped, NULL);
} else { } else {
status = WSASend(sock, &state->wbuf, 1, &state->sent, state->flags, &state->overlapped, NULL); status = WSASend(sock, &state->wbuf, 1, NULL, state->flags, &state->overlapped, NULL);
} }
if (status) { if (status && WSA_IO_PENDING != WSAGetLastError()) {
janet_cancel(s->fiber, janet_cstringv("failed to write to stream")); janet_cancel(s->fiber, janet_cstringv("failed to write to stream"));
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
} }
@ -454,7 +475,7 @@ JanetAsyncStatus net_machine_write(JanetListenerState *s, JanetAsyncEvent event)
/* Handle write errors */ /* Handle write errors */
if (nwrote == -1) { if (nwrote == -1) {
if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break; if (JLASTERR == JEAGAIN || JLASTERR == JEWOULDBLOCK) break;
janet_cancel(s->fiber, janet_cstringv(strerror(JLASTERR))); janet_cancel(s->fiber, net_lasterr());
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
} }
@ -510,96 +531,150 @@ JANET_NO_RETURN static void janet_sched_write_stringlike(JanetStream *stream, co
* State machine for simple server * State machine for simple server
*/ */
/* State machine for accepting connections. */
#ifdef JANET_WINDOWS
typedef struct {
JanetListenerState head;
WSAOVERLAPPED overlapped;
JanetFunction *function;
JanetStream *lstream;
JanetStream *astream;
char buf[1024];
} NetStateAccept;
static int net_sched_accept_impl(NetStateAccept *state, Janet *err);
JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) {
NetStateAccept *state = (NetStateAccept *)s;
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_MARK: {
if (state->lstream) janet_mark(janet_wrap_abstract(state->lstream));
if (state->astream) janet_mark(janet_wrap_abstract(state->astream));
if (state->function) janet_mark(janet_wrap_abstract(state->function));
break;
}
case JANET_ASYNC_EVENT_CLOSE:
janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_COMPLETE: {
int seconds;
int bytes = sizeof(seconds);
if (NO_ERROR != getsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_CONNECT_TIME,
(char *)&seconds, &bytes)) {
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
return JANET_ASYNC_STATUS_DONE;
}
if (NO_ERROR != setsockopt((SOCKET) state->astream->handle, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char *)&(state->lstream->handle), sizeof(SOCKET))) {
janet_cancel(s->fiber, janet_cstringv("failed to accept connection"));
return JANET_ASYNC_STATUS_DONE;
}
Janet streamv = janet_wrap_abstract(state->astream);
if (state->function) {
/* Schedule worker */
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
janet_schedule(fiber, janet_wrap_nil());
/* Now listen again for next connection */
Janet err;
if (net_sched_accept_impl(state, &err)) {
janet_cancel(s->fiber, err);
return JANET_ASYNC_STATUS_DONE;
}
} else {
janet_schedule(s->fiber, streamv);
return JANET_ASYNC_STATUS_DONE;
}
}
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
Janet err;
SOCKET lsock = (SOCKET) stream->handle;
JanetListenerState *s = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
NetStateAccept *state = (NetStateAccept *)s;
memset(&state->overlapped, 0, sizeof(WSAOVERLAPPED));
memset(&state->buf, 0, 1024);
state->function = fun;
state->lstream = stream;
s->tag = &state->overlapped;
if (net_sched_accept_impl(state, &err)) janet_panicv(err);
janet_await();
}
static int net_sched_accept_impl(NetStateAccept *state, Janet *err) {
SOCKET lsock = (SOCKET) state->lstream->handle;
SOCKET asock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (asock == INVALID_SOCKET) {
*err = net_lasterr();
return 1;
}
JanetStream *astream = make_stream(asock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
state->astream = astream;
int socksize = sizeof(SOCKADDR_STORAGE) + 16;
if (FALSE == AcceptEx(lsock, asock, state->buf, 0, socksize, socksize, NULL, &state->overlapped)) {
int code = WSAGetLastError();
if (code == WSA_IO_PENDING) return 0; /* indicates io is happening async */
*err = net_lasterr();
return 1;
}
return 0;
}
#else
typedef struct { typedef struct {
JanetListenerState head; JanetListenerState head;
JanetFunction *function; JanetFunction *function;
} NetStateSimpleServer;
JanetAsyncStatus net_machine_simple_server(JanetListenerState *s, JanetAsyncEvent event) {
NetStateSimpleServer *state = (NetStateSimpleServer *) s;
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_INIT:
/* We know the pollable will be a stream */
janet_gcroot(janet_wrap_abstract(s->pollable));
#ifdef JANET_WINDOWS
/* requires some more setup code */
#endif
break;
case JANET_ASYNC_EVENT_MARK:
janet_mark(janet_wrap_function(state->function));
break;
case JANET_ASYNC_EVENT_CLOSE:
janet_gcunroot(janet_wrap_abstract(s->pollable));
return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_COMPLETE: {
/* Called when ever we get an IOCP event */
}
break;
#else
case JANET_ASYNC_EVENT_READ: {
JSock connfd = accept(s->pollable->handle, NULL, NULL);
if (JSOCKVALID(connfd)) {
/* Made a new connection socket */
nosigpipe(connfd);
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
Janet streamv = janet_wrap_abstract(stream);
JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
janet_schedule(fiber, janet_wrap_nil());
}
break;
}
#endif
}
return JANET_ASYNC_STATUS_NOT_DONE;
}
/* State machine for accepting connections. */
typedef struct {
JanetListenerState head;
} NetStateAccept; } NetStateAccept;
JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) { JanetAsyncStatus net_machine_accept(JanetListenerState *s, JanetAsyncEvent event) {
NetStateAccept *state = (NetStateAccept *)s;
switch (event) { switch (event) {
default: default:
break; break;
case JANET_ASYNC_EVENT_MARK: {
if (state->function) janet_mark(janet_wrap_function(state->function));
break;
}
case JANET_ASYNC_EVENT_CLOSE: case JANET_ASYNC_EVENT_CLOSE:
janet_cancel(s->fiber, janet_cstringv("stream closed")); janet_schedule(s->fiber, janet_wrap_nil());
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
#ifdef JANET_WINDOWS
case JANET_ASYNC_EVENT_INIT: {
}
break;
case JANET_ASYNC_EVENT_COMPLETE: {
}
break;
#else
case JANET_ASYNC_EVENT_READ: { case JANET_ASYNC_EVENT_READ: {
JSock connfd = accept(s->pollable->handle, NULL, NULL); JSock connfd = accept(s->pollable->handle, NULL, NULL);
if (JSOCKVALID(connfd)) { if (JSOCKVALID(connfd)) {
nosigpipe(connfd); nosigpipe(connfd);
JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); JanetStream *stream = make_stream(connfd, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
Janet streamv = janet_wrap_abstract(stream); Janet streamv = janet_wrap_abstract(stream);
janet_schedule(s->fiber, streamv); if (state->function) {
return JANET_ASYNC_STATUS_DONE; JanetFiber *fiber = janet_fiber(state->function, 64, 1, &streamv);
janet_schedule(fiber, janet_wrap_nil());
} else {
janet_schedule(s->fiber, streamv);
return JANET_ASYNC_STATUS_DONE;
}
} }
break; break;
} }
#endif
} }
return JANET_ASYNC_STATUS_NOT_DONE; return JANET_ASYNC_STATUS_NOT_DONE;
} }
JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream) { JANET_NO_RETURN static void janet_sched_accept(JanetStream *stream, JanetFunction *fun) {
janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL); JanetListenerState *state = janet_listen(stream, net_machine_accept, JANET_ASYNC_LISTEN_READ, sizeof(NetStateAccept), NULL);
state->function = fun;
janet_await(); janet_await();
} }
#endif
/* Adress info */ /* Adress info */
static int janet_get_sockettype(Janet *argv, int32_t argc, int32_t n) { static int janet_get_sockettype(Janet *argv, int32_t argc, int32_t n) {
@ -729,7 +804,11 @@ static Janet cfun_net_connect(int32_t argc, Janet *argv) {
{ {
struct addrinfo *rp = NULL; struct addrinfo *rp = NULL;
for (rp = ai; rp != NULL; rp = rp->ai_next) { for (rp = ai; rp != NULL; rp = rp->ai_next) {
#ifdef JANET_WINDOWS
sock = WSASocketW(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
#else
sock = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol); sock = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol);
#endif
if (JSOCKVALID(sock)) { if (JSOCKVALID(sock)) {
addr = rp->ai_addr; addr = rp->ai_addr;
addrlen = (socklen_t) rp->ai_addrlen; addrlen = (socklen_t) rp->ai_addrlen;
@ -807,7 +886,11 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
/* Check all addrinfos in a loop for the first that we can bind to. */ /* Check all addrinfos in a loop for the first that we can bind to. */
struct addrinfo *rp = NULL; struct addrinfo *rp = NULL;
for (rp = ai; rp != NULL; rp = rp->ai_next) { for (rp = ai; rp != NULL; rp = rp->ai_next) {
#ifdef JANET_WINDOWS
sfd = WSASocketW(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol, NULL, 0, WSA_FLAG_OVERLAPPED);
#else
sfd = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol); sfd = socket(rp->ai_family, rp->ai_socktype | JSOCKFLAGS, rp->ai_protocol);
#endif
if (!JSOCKVALID(sfd)) continue; if (!JSOCKVALID(sfd)) continue;
const char *err = serverify_socket(sfd); const char *err = serverify_socket(sfd);
if (NULL != err) { if (NULL != err) {
@ -853,11 +936,8 @@ static Janet cfun_net_server(int32_t argc, Janet *argv) {
return janet_wrap_abstract(stream); return janet_wrap_abstract(stream);
} else { } else {
/* Server with handler */ /* Server with handler */
JanetStream *stream = make_stream(sfd, 0); JanetStream *stream = make_stream(sfd, JANET_STREAM_ACCEPTABLE);
NetStateSimpleServer *ss = (NetStateSimpleServer *) janet_listen(stream, net_machine_simple_server, janet_sched_accept(stream, fun);
JANET_ASYNC_LISTEN_READ | JANET_ASYNC_LISTEN_SPAWNER, sizeof(NetStateSimpleServer), NULL);
ss->function = fun;
return janet_wrap_abstract(stream);
} }
} }
} }
@ -879,7 +959,7 @@ static Janet cfun_stream_accept(int32_t argc, Janet *argv) {
check_stream_flag(stream, JANET_STREAM_ACCEPTABLE); check_stream_flag(stream, JANET_STREAM_ACCEPTABLE);
double to = janet_optnumber(argv, argc, 1, INFINITY); double to = janet_optnumber(argv, argc, 1, INFINITY);
if (to != INFINITY) janet_addtimeout(to); if (to != INFINITY) janet_addtimeout(to);
janet_sched_accept(stream); janet_sched_accept(stream, NULL);
} }
static Janet cfun_stream_read(int32_t argc, Janet *argv) { static Janet cfun_stream_read(int32_t argc, Janet *argv) {
@ -922,6 +1002,12 @@ static Janet cfun_stream_close(int32_t argc, Janet *argv) {
return janet_wrap_nil(); return janet_wrap_nil();
} }
static Janet cfun_stream_closed(int32_t argc, Janet *argv) {
janet_fixarity(argc, 1);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
return janet_wrap_boolean(stream->flags & JANET_POLL_FLAG_CLOSED);
}
static Janet cfun_stream_write(int32_t argc, Janet *argv) { static Janet cfun_stream_write(int32_t argc, Janet *argv) {
janet_arity(argc, 2, 3); janet_arity(argc, 2, 3);
JanetStream *stream = janet_getabstract(argv, 0, &StreamAT); JanetStream *stream = janet_getabstract(argv, 0, &StreamAT);
@ -968,6 +1054,7 @@ static Janet cfun_stream_flush(int32_t argc, Janet *argv) {
static const JanetMethod stream_methods[] = { static const JanetMethod stream_methods[] = {
{"chunk", cfun_stream_chunk}, {"chunk", cfun_stream_chunk},
{"close", cfun_stream_close}, {"close", cfun_stream_close},
{"closed?", cfun_stream_closed},
{"read", cfun_stream_read}, {"read", cfun_stream_read},
{"write", cfun_stream_write}, {"write", cfun_stream_write},
{"flush", cfun_stream_flush}, {"flush", cfun_stream_flush},
@ -1055,6 +1142,11 @@ static const JanetReg net_cfuns[] = {
JDOC("(net/close stream)\n\n" JDOC("(net/close stream)\n\n"
"Close a stream so that no further communication can occur.") "Close a stream so that no further communication can occur.")
}, },
{
"net/closed?", cfun_stream_closed,
JDOC("(net/closed? stream)\n\n"
"Check if a stream is closed.")
},
{ {
"net/connect", cfun_net_connect, "net/connect", cfun_net_connect,
JDOC("(net/connect host porti &opt type)\n\n" JDOC("(net/connect host porti &opt type)\n\n"

View File

@ -550,6 +550,7 @@ struct JanetListenerState {
implementation of the event loop and the particular event. */ implementation of the event loop and the particular event. */
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
void *tag; /* Used to associate listeners with an overlapped structure */ void *tag; /* Used to associate listeners with an overlapped structure */
int bytes; /* Used to track how many bytes were transfered. */
#endif #endif
/* internal */ /* internal */
int _index; /* not used in all implementations */ int _index; /* not used in all implementations */

View File

@ -33,9 +33,16 @@
(:write stream b) (:write stream b)
(buffer/clear b))) (buffer/clear b)))
(def s (net/server "127.0.0.1" "8000" handler)) (def s (net/server "127.0.0.1" "8000"))
(assert s "made server 1") (assert s "made server 1")
(ev/go
(coro
(while (not (net/closed? s))
(def conn (net/accept s))
(unless conn (break))
(ev/call handler conn))))
(defn test-echo [msg] (defn test-echo [msg]
(with [conn (net/connect "127.0.0.1" "8000")] (with [conn (net/connect "127.0.0.1" "8000")]
(:write conn msg) (:write conn msg)