mirror of
https://github.com/janet-lang/janet
synced 2025-03-31 04:26:56 +00:00
changed net/connect to be non-blocking / asynchronous
This commit is contained in:
parent
148917d4ca
commit
8d78fb1f6b
@ -2456,7 +2456,8 @@ void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, in
|
|||||||
typedef enum {
|
typedef enum {
|
||||||
JANET_ASYNC_WRITEMODE_WRITE,
|
JANET_ASYNC_WRITEMODE_WRITE,
|
||||||
JANET_ASYNC_WRITEMODE_SEND,
|
JANET_ASYNC_WRITEMODE_SEND,
|
||||||
JANET_ASYNC_WRITEMODE_SENDTO
|
JANET_ASYNC_WRITEMODE_SENDTO,
|
||||||
|
JANET_ASYNC_WRITEMODE_CONNECT
|
||||||
} JanetWriteMode;
|
} JanetWriteMode;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
@ -2480,6 +2481,31 @@ typedef struct {
|
|||||||
#endif
|
#endif
|
||||||
} StateWrite;
|
} StateWrite;
|
||||||
|
|
||||||
|
static JanetAsyncStatus handle_connect(JanetListenerState *s) {
|
||||||
|
#ifdef JANET_WINDOWS
|
||||||
|
int res = 0;
|
||||||
|
int size = sizeof(res);
|
||||||
|
int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
|
||||||
|
#else
|
||||||
|
int res = 0;
|
||||||
|
socklen_t size = sizeof res;
|
||||||
|
int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
|
||||||
|
#endif
|
||||||
|
if (r == 0) {
|
||||||
|
if (res == 0) {
|
||||||
|
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
|
||||||
|
} else {
|
||||||
|
// TODO help needed. janet_stream_close(s->stream);
|
||||||
|
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// TODO help needed. janet_stream_close(s->stream);
|
||||||
|
janet_cancel(s->fiber, janet_ev_lasterr());
|
||||||
|
}
|
||||||
|
return JANET_ASYNC_STATUS_DONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
|
JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
|
||||||
StateWrite *state = (StateWrite *) s;
|
StateWrite *state = (StateWrite *) s;
|
||||||
switch (event) {
|
switch (event) {
|
||||||
@ -2509,6 +2535,11 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case JANET_ASYNC_EVENT_USER: {
|
case JANET_ASYNC_EVENT_USER: {
|
||||||
|
#ifdef JANET_NET
|
||||||
|
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
|
||||||
|
return handle_connect(s);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
/* Begin write */
|
/* Begin write */
|
||||||
int32_t len;
|
int32_t len;
|
||||||
const uint8_t *bytes;
|
const uint8_t *bytes;
|
||||||
@ -2572,6 +2603,11 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
|
|||||||
janet_cancel(s->fiber, janet_cstringv("stream hup"));
|
janet_cancel(s->fiber, janet_cstringv("stream hup"));
|
||||||
return JANET_ASYNC_STATUS_DONE;
|
return JANET_ASYNC_STATUS_DONE;
|
||||||
case JANET_ASYNC_EVENT_WRITE: {
|
case JANET_ASYNC_EVENT_WRITE: {
|
||||||
|
#ifdef JANET_NET
|
||||||
|
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
|
||||||
|
return handle_connect(s);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
int32_t start, len;
|
int32_t start, len;
|
||||||
const uint8_t *bytes;
|
const uint8_t *bytes;
|
||||||
start = state->start;
|
start = state->start;
|
||||||
@ -2674,6 +2710,10 @@ void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, i
|
|||||||
void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
|
void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
|
||||||
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
|
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void janet_ev_connect(JanetStream *stream, int flags) {
|
||||||
|
janet_ev_write_generic(stream, NULL, NULL, JANET_ASYNC_WRITEMODE_CONNECT, 0, flags);
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* For a pipe ID */
|
/* For a pipe ID */
|
||||||
|
@ -477,14 +477,20 @@ JANET_CORE_FN(cfun_net_connect,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Wrap socket in abstract type JanetStream */
|
||||||
|
JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
|
||||||
|
|
||||||
|
/* Set the socket to non-blocking mode */
|
||||||
|
janet_net_socknoblock(sock);
|
||||||
|
|
||||||
/* Connect to socket */
|
/* Connect to socket */
|
||||||
#ifdef JANET_WINDOWS
|
#ifdef JANET_WINDOWS
|
||||||
int status = WSAConnect(sock, addr, addrlen, NULL, NULL, NULL, NULL);
|
int status = WSAConnect(sock, addr, addrlen, NULL, NULL, NULL, NULL);
|
||||||
Janet lasterr = janet_ev_lasterr();
|
int err = WSAGetLastError();
|
||||||
freeaddrinfo(ai);
|
freeaddrinfo(ai);
|
||||||
#else
|
#else
|
||||||
int status = connect(sock, addr, addrlen);
|
int status = connect(sock, addr, addrlen);
|
||||||
Janet lasterr = janet_ev_lasterr();
|
int err = errno;
|
||||||
if (is_unix) {
|
if (is_unix) {
|
||||||
janet_free(ai);
|
janet_free(ai);
|
||||||
} else {
|
} else {
|
||||||
@ -492,17 +498,22 @@ JANET_CORE_FN(cfun_net_connect,
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (status == -1) {
|
if (status != 0) {
|
||||||
JSOCKCLOSE(sock);
|
#ifdef JANET_WINDOWS
|
||||||
janet_panicf("could not connect socket: %V", lasterr);
|
if (err != WSAEWOULDBLOCK) {
|
||||||
|
#else
|
||||||
|
if (err != EINPROGRESS) {
|
||||||
|
#endif
|
||||||
|
JSOCKCLOSE(sock);
|
||||||
|
Janet lasterr = janet_ev_lasterr();
|
||||||
|
janet_panicf("could not connect socket: %V", lasterr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Set up the socket for non-blocking IO after connect - TODO - non-blocking connect? */
|
/* Handle the connect() result in the event loop*/
|
||||||
janet_net_socknoblock(sock);
|
janet_ev_connect(stream, MSG_NOSIGNAL);
|
||||||
|
|
||||||
/* Wrap socket in abstract type JanetStream */
|
janet_await();
|
||||||
JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
|
|
||||||
return janet_wrap_abstract(stream);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char *serverify_socket(JSock sfd) {
|
static const char *serverify_socket(JSock sfd) {
|
||||||
|
@ -1479,6 +1479,7 @@ JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t
|
|||||||
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
||||||
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
||||||
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
|
||||||
|
JANET_API void janet_ev_connect(JanetStream *stream, int flags);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* Write async to a stream */
|
/* Write async to a stream */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user