1
0
mirror of https://github.com/janet-lang/janet synced 2025-08-03 20:43:55 +00:00

Merge pull request #1139 from zevv/async-connect

changed net/connect to be non-blocking / asynchronous
This commit is contained in:
Calvin Rose 2023-05-19 21:12:16 -05:00 committed by GitHub
commit b621d4dd2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 11 deletions

View File

@ -1502,6 +1502,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
state = state->_next; state = state->_next;
} }
} }
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
}
} }
} }
} }
@ -1656,6 +1660,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
janet_unlisten(state, 0); janet_unlisten(state, 0);
state = next_state; state = next_state;
} }
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
}
} }
} }
} }
@ -1854,6 +1862,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
state = next_state; state = next_state;
} }
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
}
} }
} }
} }
@ -1957,6 +1969,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE; JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE;
state->event = pfd; state->event = pfd;
JanetStream *stream = state->stream;
if (mask & POLLOUT) if (mask & POLLOUT)
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE); status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
if (mask & POLLIN) if (mask & POLLIN)
@ -1970,6 +1983,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
status3 == JANET_ASYNC_STATUS_DONE || status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE) status4 == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state, 0); janet_unlisten(state, 0);
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
}
} }
} }
@ -2456,7 +2473,8 @@ void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, in
typedef enum { typedef enum {
JANET_ASYNC_WRITEMODE_WRITE, JANET_ASYNC_WRITEMODE_WRITE,
JANET_ASYNC_WRITEMODE_SEND, JANET_ASYNC_WRITEMODE_SEND,
JANET_ASYNC_WRITEMODE_SENDTO JANET_ASYNC_WRITEMODE_SENDTO,
JANET_ASYNC_WRITEMODE_CONNECT
} JanetWriteMode; } JanetWriteMode;
typedef struct { typedef struct {
@ -2480,6 +2498,31 @@ typedef struct {
#endif #endif
} StateWrite; } StateWrite;
static JanetAsyncStatus handle_connect(JanetListenerState *s) {
#ifdef JANET_WINDOWS
int res = 0;
int size = sizeof(res);
int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
#else
int res = 0;
socklen_t size = sizeof res;
int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
#endif
if (r == 0) {
if (res == 0) {
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
} else {
s->stream->flags |= JANET_STREAM_TOCLOSE;
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
}
} else {
s->stream->flags |= JANET_STREAM_TOCLOSE;
janet_cancel(s->fiber, janet_ev_lasterr());
}
return JANET_ASYNC_STATUS_DONE;
}
JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) { JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
StateWrite *state = (StateWrite *) s; StateWrite *state = (StateWrite *) s;
switch (event) { switch (event) {
@ -2509,6 +2552,11 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
} }
break; break;
case JANET_ASYNC_EVENT_USER: { case JANET_ASYNC_EVENT_USER: {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
return handle_connect(s);
}
#endif
/* Begin write */ /* Begin write */
int32_t len; int32_t len;
const uint8_t *bytes; const uint8_t *bytes;
@ -2572,6 +2620,11 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
janet_cancel(s->fiber, janet_cstringv("stream hup")); janet_cancel(s->fiber, janet_cstringv("stream hup"));
return JANET_ASYNC_STATUS_DONE; return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_WRITE: { case JANET_ASYNC_EVENT_WRITE: {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
return handle_connect(s);
}
#endif
int32_t start, len; int32_t start, len;
const uint8_t *bytes; const uint8_t *bytes;
start = state->start; start = state->start;
@ -2674,6 +2727,10 @@ void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, i
void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) { void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags); janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
} }
void janet_ev_connect(JanetStream *stream, int flags) {
janet_ev_write_generic(stream, NULL, NULL, JANET_ASYNC_WRITEMODE_CONNECT, 0, flags);
}
#endif #endif
/* For a pipe ID */ /* For a pipe ID */

View File

@ -477,14 +477,20 @@ JANET_CORE_FN(cfun_net_connect,
} }
} }
/* Wrap socket in abstract type JanetStream */
JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
/* Set up the socket for non-blocking IO before connecting */
janet_net_socknoblock(sock);
/* Connect to socket */ /* Connect to socket */
#ifdef JANET_WINDOWS #ifdef JANET_WINDOWS
int status = WSAConnect(sock, addr, addrlen, NULL, NULL, NULL, NULL); int status = WSAConnect(sock, addr, addrlen, NULL, NULL, NULL, NULL);
Janet lasterr = janet_ev_lasterr(); int err = WSAGetLastError();
freeaddrinfo(ai); freeaddrinfo(ai);
#else #else
int status = connect(sock, addr, addrlen); int status = connect(sock, addr, addrlen);
Janet lasterr = janet_ev_lasterr(); int err = errno;
if (is_unix) { if (is_unix) {
janet_free(ai); janet_free(ai);
} else { } else {
@ -492,17 +498,22 @@ JANET_CORE_FN(cfun_net_connect,
} }
#endif #endif
if (status == -1) { if (status != 0) {
JSOCKCLOSE(sock); #ifdef JANET_WINDOWS
janet_panicf("could not connect socket: %V", lasterr); if (err != WSAEWOULDBLOCK) {
#else
if (err != EINPROGRESS) {
#endif
JSOCKCLOSE(sock);
Janet lasterr = janet_ev_lasterr();
janet_panicf("could not connect socket: %V", lasterr);
}
} }
/* Set up the socket for non-blocking IO after connect - TODO - non-blocking connect? */ /* Handle the connect() result in the event loop*/
janet_net_socknoblock(sock); janet_ev_connect(stream, MSG_NOSIGNAL);
/* Wrap socket in abstract type JanetStream */ janet_await();
JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
return janet_wrap_abstract(stream);
} }
static const char *serverify_socket(JSock sfd) { static const char *serverify_socket(JSock sfd) {

View File

@ -568,6 +568,7 @@ typedef void *JanetAbstract;
#define JANET_STREAM_WRITABLE 0x400 #define JANET_STREAM_WRITABLE 0x400
#define JANET_STREAM_ACCEPTABLE 0x800 #define JANET_STREAM_ACCEPTABLE 0x800
#define JANET_STREAM_UDPSERVER 0x1000 #define JANET_STREAM_UDPSERVER 0x1000
#define JANET_STREAM_TOCLOSE 0x10000
typedef enum { typedef enum {
JANET_ASYNC_EVENT_INIT, JANET_ASYNC_EVENT_INIT,
@ -1479,6 +1480,7 @@ JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_connect(JanetStream *stream, int flags);
#endif #endif
/* Write async to a stream */ /* Write async to a stream */