From 8d78fb1f6b8f2b4a4bd1b729f4fce4d3e2e3f7b1 Mon Sep 17 00:00:00 2001
From: Ico Doornekamp <ico@zevv.nl>
Date: Tue, 16 May 2023 17:10:16 +0200
Subject: [PATCH] changed net/connect to be non-blocking / asynchronous

---
 src/core/ev.c       | 42 +++++++++++++++++++++++++++++++++++++++++-
 src/core/net.c      | 31 +++++++++++++++++++++----------
 src/include/janet.h |  1 +
 3 files changed, 63 insertions(+), 11 deletions(-)

diff --git a/src/core/ev.c b/src/core/ev.c
index 8a904745..290a1442 100644
--- a/src/core/ev.c
+++ b/src/core/ev.c
@@ -2456,7 +2456,8 @@ void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, in
 typedef enum {
     JANET_ASYNC_WRITEMODE_WRITE,
     JANET_ASYNC_WRITEMODE_SEND,
-    JANET_ASYNC_WRITEMODE_SENDTO
+    JANET_ASYNC_WRITEMODE_SENDTO,
+    JANET_ASYNC_WRITEMODE_CONNECT
 } JanetWriteMode;
 
 typedef struct {
@@ -2480,6 +2481,31 @@ typedef struct {
 #endif
 } StateWrite;
 
+static JanetAsyncStatus handle_connect(JanetListenerState *s) {
+#ifdef JANET_WINDOWS
+    int res = 0;
+    int size = sizeof(res);
+    int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
+#else
+    int res = 0;
+    socklen_t size = sizeof res;
+    int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
+#endif
+    if (r == 0) {
+        if (res == 0) {
+            janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
+        } else {
+            // TODO help needed. janet_stream_close(s->stream);
+            janet_cancel(s->fiber, janet_cstringv(strerror(res)));
+        }
+    } else {
+        // TODO help needed. janet_stream_close(s->stream);
+        janet_cancel(s->fiber, janet_ev_lasterr());
+    }
+    return JANET_ASYNC_STATUS_DONE;
+}
+
+
 JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
     StateWrite *state = (StateWrite *) s;
     switch (event) {
@@ -2509,6 +2535,11 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
         }
         break;
         case JANET_ASYNC_EVENT_USER: {
+#ifdef JANET_NET
+            if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
+                return handle_connect(s);
+            }
+#endif
             /* Begin write */
             int32_t len;
             const uint8_t *bytes;
@@ -2572,6 +2603,11 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
             janet_cancel(s->fiber, janet_cstringv("stream hup"));
             return JANET_ASYNC_STATUS_DONE;
         case JANET_ASYNC_EVENT_WRITE: {
+#ifdef JANET_NET
+            if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
+                return handle_connect(s);
+            }
+#endif
             int32_t start, len;
             const uint8_t *bytes;
             start = state->start;
@@ -2674,6 +2710,10 @@ void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, i
 void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
     janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
 }
+
+void janet_ev_connect(JanetStream *stream, int flags) {
+    janet_ev_write_generic(stream, NULL, NULL, JANET_ASYNC_WRITEMODE_CONNECT, 0, flags);
+}
 #endif
 
 /* For a pipe ID */
diff --git a/src/core/net.c b/src/core/net.c
index 843f36f8..36c598af 100644
--- a/src/core/net.c
+++ b/src/core/net.c
@@ -477,14 +477,20 @@ JANET_CORE_FN(cfun_net_connect,
         }
     }
 
+    /* Wrap socket in abstract type JanetStream */
+    JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
+
+    /* Set the socket to non-blocking mode */
+    janet_net_socknoblock(sock);
+
     /* Connect to socket */
 #ifdef JANET_WINDOWS
     int status = WSAConnect(sock, addr, addrlen, NULL, NULL, NULL, NULL);
-    Janet lasterr = janet_ev_lasterr();
+    int err = WSAGetLastError();
     freeaddrinfo(ai);
 #else
     int status = connect(sock, addr, addrlen);
-    Janet lasterr = janet_ev_lasterr();
+    int err = errno;
     if (is_unix) {
         janet_free(ai);
     } else {
@@ -492,17 +498,22 @@ JANET_CORE_FN(cfun_net_connect,
     }
 #endif
 
-    if (status == -1) {
-        JSOCKCLOSE(sock);
-        janet_panicf("could not connect socket: %V", lasterr);
+    if (status != 0) {
+#ifdef JANET_WINDOWS
+        if (err != WSAEWOULDBLOCK) {
+#else
+        if (err != EINPROGRESS) {
+#endif
+            JSOCKCLOSE(sock);
+            Janet lasterr = janet_ev_lasterr();
+            janet_panicf("could not connect socket: %V", lasterr);
+        }
     }
 
-    /* Set up the socket for non-blocking IO after connect - TODO - non-blocking connect? */
-    janet_net_socknoblock(sock);
+    /* Handle the connect() result in the event loop*/
+    janet_ev_connect(stream, MSG_NOSIGNAL);
 
-    /* Wrap socket in abstract type JanetStream */
-    JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
-    return janet_wrap_abstract(stream);
+    janet_await();
 }
 
 static const char *serverify_socket(JSock sfd) {
diff --git a/src/include/janet.h b/src/include/janet.h
index 941a6c35..130973dc 100644
--- a/src/include/janet.h
+++ b/src/include/janet.h
@@ -1479,6 +1479,7 @@ JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t
 JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
 JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
 JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
+JANET_API void janet_ev_connect(JanetStream *stream, int flags);
 #endif
 
 /* Write async to a stream */