Redo async connect code to be moved out of ev.c.

Async connect is different than write.
This commit is contained in:
Calvin Rose 2023-09-24 10:08:40 -07:00
parent 221645d2ce
commit e6e9bd8147
9 changed files with 93 additions and 78 deletions

View File

@ -349,6 +349,7 @@ JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod
/* Close a stream */
static void janet_stream_close_impl(JanetStream *stream, int is_gc) {
if (stream->flags & JANET_STREAM_CLOSED) return;
stream->flags |= JANET_STREAM_CLOSED;
JanetListenerState *state = stream->state;
while (NULL != state) {
if (!is_gc) {
@ -359,7 +360,6 @@ static void janet_stream_close_impl(JanetStream *stream, int is_gc) {
state = next_state;
}
stream->state = NULL;
stream->flags |= JANET_STREAM_CLOSED;
#ifdef JANET_WINDOWS
#ifdef JANET_NET
if (stream->flags & JANET_STREAM_SOCKET) {
@ -1554,10 +1554,6 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
state = state->_next;
}
}
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
}
}
}
}
@ -1712,10 +1708,6 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
janet_unlisten(state, 0);
state = next_state;
}
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
}
}
}
}
@ -1914,10 +1906,6 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
state = next_state;
}
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
}
}
}
}
@ -2035,10 +2023,6 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
status3 == JANET_ASYNC_STATUS_DONE ||
status4 == JANET_ASYNC_STATUS_DONE)
janet_unlisten(state, 0);
/* Close the stream if requested and no more listeners are left */
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
janet_stream_close(stream);
}
}
}
@ -2525,8 +2509,7 @@ void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, in
typedef enum {
JANET_ASYNC_WRITEMODE_WRITE,
JANET_ASYNC_WRITEMODE_SEND,
JANET_ASYNC_WRITEMODE_SENDTO,
JANET_ASYNC_WRITEMODE_CONNECT
JANET_ASYNC_WRITEMODE_SENDTO
} JanetWriteMode;
typedef struct {
@ -2550,41 +2533,15 @@ typedef struct {
#endif
} StateWrite;
static JanetAsyncStatus handle_connect(JanetListenerState *s) {
#ifdef JANET_WINDOWS
int res = 0;
int size = sizeof(res);
int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
#else
int res = 0;
socklen_t size = sizeof res;
int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
#endif
if (r == 0) {
if (res == 0) {
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
} else {
s->stream->flags |= JANET_STREAM_TOCLOSE;
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
}
} else {
s->stream->flags |= JANET_STREAM_TOCLOSE;
janet_cancel(s->fiber, janet_ev_lasterr());
}
return JANET_ASYNC_STATUS_DONE;
}
JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
StateWrite *state = (StateWrite *) s;
switch (event) {
default:
break;
case JANET_ASYNC_EVENT_MARK: {
if (state->mode != JANET_ASYNC_WRITEMODE_CONNECT) {
janet_mark(state->is_buffer
? janet_wrap_buffer(state->src.buf)
: janet_wrap_string(state->src.str));
}
janet_mark(state->is_buffer
? janet_wrap_buffer(state->src.buf)
: janet_wrap_string(state->src.str));
if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) {
janet_mark(janet_wrap_abstract(state->dest_abst));
}
@ -2606,11 +2563,6 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
}
break;
case JANET_ASYNC_EVENT_USER: {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
return handle_connect(s);
}
#endif
/* Begin write */
int32_t len;
const uint8_t *bytes;
@ -2674,11 +2626,6 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
janet_cancel(s->fiber, janet_cstringv("stream hup"));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_WRITE: {
#ifdef JANET_NET
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
return handle_connect(s);
}
#endif
int32_t start, len;
const uint8_t *bytes;
start = state->start;
@ -2780,10 +2727,6 @@ void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, i
void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
}
void janet_ev_connect(JanetStream *stream, int flags) {
janet_ev_write_generic(stream, NULL, NULL, JANET_ASYNC_WRITEMODE_CONNECT, 0, flags);
}
#endif
/* For a pipe ID */

View File

@ -53,6 +53,11 @@ void janet_gcpressure(size_t s) {
janet_vm.next_collection += s;
}
/* Instrment freed memory for simple use after free detection. */
static void gc_free_gcobj(JanetGCObject *mem) {
janet_free(mem);
}
/* Mark a value */
void janet_mark(Janet x) {
if (depth) {
@ -339,13 +344,12 @@ void janet_sweep() {
current->flags &= ~JANET_MEM_REACHABLE;
} else {
janet_vm.block_count--;
janet_deinit_block(current);
if (NULL != previous) {
previous->data.next = next;
} else {
janet_vm.blocks = next;
}
janet_free(current);
gc_free_gcobj(current);
}
current = next;
}
@ -371,7 +375,7 @@ void janet_sweep() {
janet_assert(!head->type->gc(head->data, head->size), "finalizer failed");
}
/* Free memory */
janet_free(janet_abstract_head(abst));
gc_free_gcobj(janet_abstract_head(abst));
}
/* Mark as tombstone in place */
@ -438,6 +442,8 @@ void janet_collect(void) {
uint32_t i;
if (janet_vm.gc_suspend) return;
depth = JANET_RECURSION_GUARD;
fprintf(stderr, "gccollect\n");
janet_vm.gc_mark_phase = 1;
/* Try and prevent many major collections back to back.
* A full collection will take O(janet_vm.block_count) time.
* If we have a large heap, make sure our interval is not too
@ -457,6 +463,7 @@ void janet_collect(void) {
Janet x = janet_vm.roots[--janet_vm.root_count];
janet_mark(x);
}
janet_vm.gc_mark_phase = 0;
janet_sweep();
janet_vm.next_collection = 0;
janet_free_all_scratch();
@ -542,9 +549,10 @@ void janet_clear_memory(void) {
#endif
JanetGCObject *current = janet_vm.blocks;
while (NULL != current) {
current->flags |= JANET_MEM_USEAFTERFREE;
janet_deinit_block(current);
JanetGCObject *next = current->data.next;
janet_free(current);
gc_free_gcobj(current);
current = next;
}
janet_vm.blocks = NULL;
@ -560,7 +568,9 @@ void janet_gcunlock(int handle) {
janet_vm.gc_suspend = handle;
}
/* Scratch memory API */
/* Scratch memory API
* Scratch memory allocations do not need to be free (but optionally can be), and will be automatically cleaned
* up in the next call to janet_collect. */
void *janet_smalloc(size_t size) {
JanetScratch *s = janet_malloc(sizeof(JanetScratch) + size);

View File

@ -38,7 +38,7 @@
#define janet_gc_settype(m, t) ((janet_gc_header(m)->flags |= (0xFF & (t))))
#define janet_gc_type(m) (janet_gc_header(m)->flags & 0xFF)
#define janet_gc_mark(m) (janet_gc_header(m)->flags |= JANET_MEM_REACHABLE)
#define janet_gc_mark(m) do (janet_gc_header(m)->flags |= JANET_MEM_REACHABLE)
#define janet_gc_reachable(m) (janet_gc_header(m)->flags & JANET_MEM_REACHABLE)
/* Memory types for the GC. Different from JanetType to include funcenv and funcdef. */

View File

@ -111,6 +111,69 @@ static void janet_net_socknoblock(JSock s) {
#endif
}
/* State machine for async connect */
typedef struct {
JanetListenerState head;
int did_connect;
} NetStateConnect;
JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent event) {
NetStateConnect *state = (NetStateConnect *)s;
switch (event) {
default:
return JANET_ASYNC_STATUS_NOT_DONE;
case JANET_ASYNC_EVENT_DEINIT:
{
if (!state->did_connect) {
janet_stream_close(s->stream);
return JANET_ASYNC_STATUS_DONE;
}
}
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_CLOSE:
case JANET_ASYNC_EVENT_HUP:
case JANET_ASYNC_EVENT_ERR:
janet_cancel(s->fiber, janet_cstringv("failed to connect socket"));
return JANET_ASYNC_STATUS_DONE;
case JANET_ASYNC_EVENT_COMPLETE:
case JANET_ASYNC_EVENT_WRITE:
case JANET_ASYNC_EVENT_USER:
break;
}
#ifdef JANET_WINDOWS
int res = 0;
int size = sizeof(res);
int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
#else
int res = 0;
socklen_t size = sizeof res;
int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
#endif
if (r == 0) {
if (res == 0) {
state->did_connect = 1;
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
} else {
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
}
} else {
janet_cancel(s->fiber, janet_ev_lasterr());
}
return JANET_ASYNC_STATUS_DONE;
}
static void net_sched_connect(JanetStream *stream) {
Janet err;
JanetListenerState *s = janet_listen(stream, net_machine_connect, JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateConnect), NULL);
NetStateConnect *state = (NetStateConnect *)s;
state->did_connect = 0;
#ifdef JANET_WINDOWS
net_machine_connect(s, JANET_ASYNC_EVENT_USER);
#endif
}
/* State machine for accepting connections. */
#ifdef JANET_WINDOWS
@ -496,7 +559,7 @@ JANET_CORE_FN(cfun_net_connect,
}
#endif
if (status != 0) {
if (status) {
#ifdef JANET_WINDOWS
if (err != WSAEWOULDBLOCK) {
#else
@ -506,11 +569,11 @@ JANET_CORE_FN(cfun_net_connect,
Janet lasterr = janet_ev_lasterr();
janet_panicf("could not connect socket: %V", lasterr);
}
} else {
return janet_wrap_abstract(stream);
}
/* Handle the connect() result in the event loop*/
janet_ev_connect(stream, MSG_NOSIGNAL);
net_sched_connect(stream);
janet_await();
}

View File

@ -1437,8 +1437,8 @@ JANET_CORE_FN(os_getenv,
janet_sandbox_assert(JANET_SANDBOX_ENV);
janet_arity(argc, 1, 2);
const char *cstr = janet_getcstring(argv, 0);
const char *res = getenv(cstr);
janet_lock_environ();
const char *res = getenv(cstr);
Janet ret = res
? janet_cstringv(res)
: argc == 2

View File

@ -125,6 +125,7 @@ struct JanetVM {
size_t next_collection;
size_t block_count;
int gc_suspend;
int gc_mark_phase;
/* GC roots */
Janet *roots;

View File

@ -111,12 +111,11 @@ static void janet_table_rehash(JanetTable *t, int32_t size) {
JANET_OUT_OF_MEMORY;
}
}
int32_t i, oldcapacity;
oldcapacity = t->capacity;
int32_t oldcapacity = t->capacity;
t->data = newdata;
t->capacity = size;
t->deleted = 0;
for (i = 0; i < oldcapacity; i++) {
for (int32_t i = 0; i < oldcapacity; i++) {
JanetKV *kv = olddata + i;
if (!janet_checktype(kv->key, JANET_NIL)) {
JanetKV *newkv = janet_table_find(t, kv->key);

View File

@ -1588,6 +1588,7 @@ int janet_init(void) {
janet_vm.next_collection = 0;
janet_vm.gc_interval = 0x400000;
janet_vm.block_count = 0;
janet_vm.gc_mark_phase = 0;
janet_symcache_init();

View File

@ -580,7 +580,6 @@ typedef void *JanetAbstract;
#define JANET_STREAM_WRITABLE 0x400
#define JANET_STREAM_ACCEPTABLE 0x800
#define JANET_STREAM_UDPSERVER 0x1000
#define JANET_STREAM_TOCLOSE 0x10000
typedef enum {
JANET_ASYNC_EVENT_INIT,
@ -1499,7 +1498,6 @@ JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
JANET_API void janet_ev_connect(JanetStream *stream, int flags);
#endif
/* Write async to a stream */