From 7c7ff802fac5e1159e33384ecdf836fb8b08c1f0 Mon Sep 17 00:00:00 2001 From: Calvin Rose Date: Fri, 26 Mar 2021 15:36:25 -0500 Subject: [PATCH] Add net/shutdown to allow better networking with streams. --- src/core/ev.c | 4 ++-- src/core/net.c | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/src/core/ev.c b/src/core/ev.c index 7795b320..9706daf8 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -1176,7 +1176,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { status2 = state->machine(state, JANET_ASYNC_EVENT_READ); if (mask & EPOLLERR) status3 = state->machine(state, JANET_ASYNC_EVENT_ERR); - if (mask & EPOLLHUP) + if ((mask & EPOLLHUP) && !(mask & (EPOLLOUT | EPOLLIN))) status4 = state->machine(state, JANET_ASYNC_EVENT_HUP); if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE || @@ -1306,7 +1306,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { status2 = state->machine(state, JANET_ASYNC_EVENT_READ); if (mask & POLLERR) status3 = state->machine(state, JANET_ASYNC_EVENT_ERR); - if (mask & POLLHUP) + if ((mask & POLLHUP) && !(mask & (POLLIN | POLLOUT))) status4 = state->machine(state, JANET_ASYNC_EVENT_HUP); if (status1 == JANET_ASYNC_STATUS_DONE || status2 == JANET_ASYNC_STATUS_DONE || diff --git a/src/core/net.c b/src/core/net.c index 2d30af9f..da972fcc 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -431,6 +431,47 @@ static const char *serverify_socket(JSock sfd) { return NULL; } +#ifdef JANET_WINDOWS +#define JANET_SHUTDOWN_RW SD_BOTH +#define JANET_SHUTDOWN_R SD_RECEIVE +#define JANET_SHUTDOWN_W SD_SEND +#else +#define JANET_SHUTDOWN_RW SHUT_RDWR +#define JANET_SHUTDOWN_R SHUT_RD +#define JANET_SHUTDOWN_W SHUT_WR +#endif + +static Janet cfun_net_shutdown(int32_t argc, Janet *argv) { + janet_arity(argc, 1, 2); + JanetStream *stream = janet_getabstract(argv, 0, &janet_stream_type); + janet_stream_flags(stream, JANET_STREAM_SOCKET); + int shutdown_type = SHUT_RDWR; + if (argc == 2) { + const uint8_t *kw = janet_getkeyword(argv, 1); + if (0 == janet_cstrcmp(kw, "rw")) { + shutdown_type = JANET_SHUTDOWN_RW; + } else if (0 == janet_cstrcmp(kw, "r")) { + shutdown_type = JANET_SHUTDOWN_R; + } else if (0 == janet_cstrcmp(kw, "w")) { + shutdown_type = JANET_SHUTDOWN_W; + } else { + janet_panicf("unexpected keyword %v", argv[1]); + } + } + int status; +#ifdef JANET_WINDOWS + status = shutdown((SOCKET) stream->handle, shutdown_type); +#else + do { + status = shutdown(stream->handle, shutdown_type); + } while (status == -1 && errno == EINTR); +#endif + if (status) { + janet_panicf("could not shutdown socket: %V", janet_ev_lasterr()); + } + return argv[0]; +} + static Janet cfun_net_listen(int32_t argc, Janet *argv) { janet_arity(argc, 2, 3); @@ -622,6 +663,7 @@ static const JanetMethod net_stream_methods[] = { {"evread", janet_cfun_stream_read}, {"evchunk", janet_cfun_stream_chunk}, {"evwrite", janet_cfun_stream_write}, + {"shutdown", cfun_net_shutdown}, {NULL, NULL} }; @@ -709,6 +751,16 @@ static const JanetReg net_cfuns[] = { "that can be used to communicate with the server. Type is an optional keyword " "to specify a connection type, either :stream or :datagram. The default is :stream. ") }, + { + "net/shutdown", cfun_net_shutdown, + JDOC("(net/shutdown stream &opt mode)\n\n" + "Stop communication on this socket in a graceful manner, either in both directions or just " + "reading/writing from the stream. The `mode` parameter controls which communication to stop on the socket. " + "\n\n* `:wr` is the default and prevents both reading new data from the socket and writing new data to the socket.\n" + "* `:r` disables reading new data from the socket.\n" + "* `:w` disable writing data to the socket.\n\n" + "Returns the original socket.") + }, {NULL, NULL, NULL} };