diff --git a/src/core/ev.c b/src/core/ev.c index f2edb205..a0b77aad 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -349,6 +349,7 @@ JanetStream *janet_stream(JanetHandle handle, uint32_t flags, const JanetMethod /* Close a stream */ static void janet_stream_close_impl(JanetStream *stream, int is_gc) { if (stream->flags & JANET_STREAM_CLOSED) return; + stream->flags |= JANET_STREAM_CLOSED; JanetListenerState *state = stream->state; while (NULL != state) { if (!is_gc) { @@ -359,7 +360,6 @@ static void janet_stream_close_impl(JanetStream *stream, int is_gc) { state = next_state; } stream->state = NULL; - stream->flags |= JANET_STREAM_CLOSED; #ifdef JANET_WINDOWS #ifdef JANET_NET if (stream->flags & JANET_STREAM_SOCKET) { @@ -1554,10 +1554,6 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) { state = state->_next; } } - /* Close the stream if requested and no more listeners are left */ - if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) { - janet_stream_close(stream); - } } } } @@ -1712,10 +1708,6 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { janet_unlisten(state, 0); state = next_state; } - /* Close the stream if requested and no more listeners are left */ - if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) { - janet_stream_close(stream); - } } } } @@ -1914,10 +1906,6 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { state = next_state; } - /* Close the stream if requested and no more listeners are left */ - if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) { - janet_stream_close(stream); - } } } } @@ -2035,10 +2023,6 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) { status3 == JANET_ASYNC_STATUS_DONE || status4 == JANET_ASYNC_STATUS_DONE) janet_unlisten(state, 0); - /* Close the stream if requested and no more listeners are left */ - if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) { - janet_stream_close(stream); - } } } @@ -2525,8 +2509,7 @@ void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, in typedef enum { JANET_ASYNC_WRITEMODE_WRITE, JANET_ASYNC_WRITEMODE_SEND, - JANET_ASYNC_WRITEMODE_SENDTO, - JANET_ASYNC_WRITEMODE_CONNECT + JANET_ASYNC_WRITEMODE_SENDTO } JanetWriteMode; typedef struct { @@ -2550,41 +2533,15 @@ typedef struct { #endif } StateWrite; -static JanetAsyncStatus handle_connect(JanetListenerState *s) { -#ifdef JANET_WINDOWS - int res = 0; - int size = sizeof(res); - int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size); -#else - int res = 0; - socklen_t size = sizeof res; - int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size); -#endif - if (r == 0) { - if (res == 0) { - janet_schedule(s->fiber, janet_wrap_abstract(s->stream)); - } else { - s->stream->flags |= JANET_STREAM_TOCLOSE; - janet_cancel(s->fiber, janet_cstringv(strerror(res))); - } - } else { - s->stream->flags |= JANET_STREAM_TOCLOSE; - janet_cancel(s->fiber, janet_ev_lasterr()); - } - return JANET_ASYNC_STATUS_DONE; -} - JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) { StateWrite *state = (StateWrite *) s; switch (event) { default: break; case JANET_ASYNC_EVENT_MARK: { - if (state->mode != JANET_ASYNC_WRITEMODE_CONNECT) { - janet_mark(state->is_buffer - ? janet_wrap_buffer(state->src.buf) - : janet_wrap_string(state->src.str)); - } + janet_mark(state->is_buffer + ? janet_wrap_buffer(state->src.buf) + : janet_wrap_string(state->src.str)); if (state->mode == JANET_ASYNC_WRITEMODE_SENDTO) { janet_mark(janet_wrap_abstract(state->dest_abst)); } @@ -2606,11 +2563,6 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) } break; case JANET_ASYNC_EVENT_USER: { -#ifdef JANET_NET - if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) { - return handle_connect(s); - } -#endif /* Begin write */ int32_t len; const uint8_t *bytes; @@ -2674,11 +2626,6 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) janet_cancel(s->fiber, janet_cstringv("stream hup")); return JANET_ASYNC_STATUS_DONE; case JANET_ASYNC_EVENT_WRITE: { -#ifdef JANET_NET - if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) { - return handle_connect(s); - } -#endif int32_t start, len; const uint8_t *bytes; start = state->start; @@ -2780,10 +2727,6 @@ void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, i void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) { janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags); } - -void janet_ev_connect(JanetStream *stream, int flags) { - janet_ev_write_generic(stream, NULL, NULL, JANET_ASYNC_WRITEMODE_CONNECT, 0, flags); -} #endif /* For a pipe ID */ diff --git a/src/core/gc.c b/src/core/gc.c index c2a89877..f9de7d42 100644 --- a/src/core/gc.c +++ b/src/core/gc.c @@ -53,6 +53,11 @@ void janet_gcpressure(size_t s) { janet_vm.next_collection += s; } +/* Instrment freed memory for simple use after free detection. */ +static void gc_free_gcobj(JanetGCObject *mem) { + janet_free(mem); +} + /* Mark a value */ void janet_mark(Janet x) { if (depth) { @@ -339,13 +344,12 @@ void janet_sweep() { current->flags &= ~JANET_MEM_REACHABLE; } else { janet_vm.block_count--; - janet_deinit_block(current); if (NULL != previous) { previous->data.next = next; } else { janet_vm.blocks = next; } - janet_free(current); + gc_free_gcobj(current); } current = next; } @@ -371,7 +375,7 @@ void janet_sweep() { janet_assert(!head->type->gc(head->data, head->size), "finalizer failed"); } /* Free memory */ - janet_free(janet_abstract_head(abst)); + gc_free_gcobj(janet_abstract_head(abst)); } /* Mark as tombstone in place */ @@ -438,6 +442,8 @@ void janet_collect(void) { uint32_t i; if (janet_vm.gc_suspend) return; depth = JANET_RECURSION_GUARD; + fprintf(stderr, "gccollect\n"); + janet_vm.gc_mark_phase = 1; /* Try and prevent many major collections back to back. * A full collection will take O(janet_vm.block_count) time. * If we have a large heap, make sure our interval is not too @@ -457,6 +463,7 @@ void janet_collect(void) { Janet x = janet_vm.roots[--janet_vm.root_count]; janet_mark(x); } + janet_vm.gc_mark_phase = 0; janet_sweep(); janet_vm.next_collection = 0; janet_free_all_scratch(); @@ -542,9 +549,10 @@ void janet_clear_memory(void) { #endif JanetGCObject *current = janet_vm.blocks; while (NULL != current) { + current->flags |= JANET_MEM_USEAFTERFREE; janet_deinit_block(current); JanetGCObject *next = current->data.next; - janet_free(current); + gc_free_gcobj(current); current = next; } janet_vm.blocks = NULL; @@ -560,7 +568,9 @@ void janet_gcunlock(int handle) { janet_vm.gc_suspend = handle; } -/* Scratch memory API */ +/* Scratch memory API + * Scratch memory allocations do not need to be free (but optionally can be), and will be automatically cleaned + * up in the next call to janet_collect. */ void *janet_smalloc(size_t size) { JanetScratch *s = janet_malloc(sizeof(JanetScratch) + size); diff --git a/src/core/gc.h b/src/core/gc.h index 175caeba..151a2cc0 100644 --- a/src/core/gc.h +++ b/src/core/gc.h @@ -38,7 +38,7 @@ #define janet_gc_settype(m, t) ((janet_gc_header(m)->flags |= (0xFF & (t)))) #define janet_gc_type(m) (janet_gc_header(m)->flags & 0xFF) -#define janet_gc_mark(m) (janet_gc_header(m)->flags |= JANET_MEM_REACHABLE) +#define janet_gc_mark(m) do (janet_gc_header(m)->flags |= JANET_MEM_REACHABLE) #define janet_gc_reachable(m) (janet_gc_header(m)->flags & JANET_MEM_REACHABLE) /* Memory types for the GC. Different from JanetType to include funcenv and funcdef. */ diff --git a/src/core/net.c b/src/core/net.c index 096b385b..97768522 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -111,6 +111,69 @@ static void janet_net_socknoblock(JSock s) { #endif } +/* State machine for async connect */ + +typedef struct { + JanetListenerState head; + int did_connect; +} NetStateConnect; + +JanetAsyncStatus net_machine_connect(JanetListenerState *s, JanetAsyncEvent event) { + NetStateConnect *state = (NetStateConnect *)s; + switch (event) { + default: + return JANET_ASYNC_STATUS_NOT_DONE; + case JANET_ASYNC_EVENT_DEINIT: + { + if (!state->did_connect) { + janet_stream_close(s->stream); + return JANET_ASYNC_STATUS_DONE; + } + } + return JANET_ASYNC_STATUS_DONE; + case JANET_ASYNC_EVENT_CLOSE: + case JANET_ASYNC_EVENT_HUP: + case JANET_ASYNC_EVENT_ERR: + janet_cancel(s->fiber, janet_cstringv("failed to connect socket")); + return JANET_ASYNC_STATUS_DONE; + case JANET_ASYNC_EVENT_COMPLETE: + case JANET_ASYNC_EVENT_WRITE: + case JANET_ASYNC_EVENT_USER: + break; + } +#ifdef JANET_WINDOWS + int res = 0; + int size = sizeof(res); + int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size); +#else + int res = 0; + socklen_t size = sizeof res; + int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size); +#endif + if (r == 0) { + if (res == 0) { + state->did_connect = 1; + janet_schedule(s->fiber, janet_wrap_abstract(s->stream)); + } else { + janet_cancel(s->fiber, janet_cstringv(strerror(res))); + } + } else { + janet_cancel(s->fiber, janet_ev_lasterr()); + } + return JANET_ASYNC_STATUS_DONE; +} + +static void net_sched_connect(JanetStream *stream) { + Janet err; + JanetListenerState *s = janet_listen(stream, net_machine_connect, JANET_ASYNC_LISTEN_WRITE, sizeof(NetStateConnect), NULL); + NetStateConnect *state = (NetStateConnect *)s; + state->did_connect = 0; +#ifdef JANET_WINDOWS + net_machine_connect(s, JANET_ASYNC_EVENT_USER); +#endif +} + + /* State machine for accepting connections. */ #ifdef JANET_WINDOWS @@ -496,7 +559,7 @@ JANET_CORE_FN(cfun_net_connect, } #endif - if (status != 0) { + if (status) { #ifdef JANET_WINDOWS if (err != WSAEWOULDBLOCK) { #else @@ -506,11 +569,11 @@ JANET_CORE_FN(cfun_net_connect, Janet lasterr = janet_ev_lasterr(); janet_panicf("could not connect socket: %V", lasterr); } + } else { + return janet_wrap_abstract(stream); } - /* Handle the connect() result in the event loop*/ - janet_ev_connect(stream, MSG_NOSIGNAL); - + net_sched_connect(stream); janet_await(); } diff --git a/src/core/os.c b/src/core/os.c index bc82bf46..460cb7eb 100644 --- a/src/core/os.c +++ b/src/core/os.c @@ -1437,8 +1437,8 @@ JANET_CORE_FN(os_getenv, janet_sandbox_assert(JANET_SANDBOX_ENV); janet_arity(argc, 1, 2); const char *cstr = janet_getcstring(argv, 0); - const char *res = getenv(cstr); janet_lock_environ(); + const char *res = getenv(cstr); Janet ret = res ? janet_cstringv(res) : argc == 2 diff --git a/src/core/state.h b/src/core/state.h index a4312768..d1b16bfb 100644 --- a/src/core/state.h +++ b/src/core/state.h @@ -125,6 +125,7 @@ struct JanetVM { size_t next_collection; size_t block_count; int gc_suspend; + int gc_mark_phase; /* GC roots */ Janet *roots; diff --git a/src/core/table.c b/src/core/table.c index 5e711233..213faa0d 100644 --- a/src/core/table.c +++ b/src/core/table.c @@ -111,12 +111,11 @@ static void janet_table_rehash(JanetTable *t, int32_t size) { JANET_OUT_OF_MEMORY; } } - int32_t i, oldcapacity; - oldcapacity = t->capacity; + int32_t oldcapacity = t->capacity; t->data = newdata; t->capacity = size; t->deleted = 0; - for (i = 0; i < oldcapacity; i++) { + for (int32_t i = 0; i < oldcapacity; i++) { JanetKV *kv = olddata + i; if (!janet_checktype(kv->key, JANET_NIL)) { JanetKV *newkv = janet_table_find(t, kv->key); diff --git a/src/core/vm.c b/src/core/vm.c index caf51407..02a6bc4b 100644 --- a/src/core/vm.c +++ b/src/core/vm.c @@ -1588,6 +1588,7 @@ int janet_init(void) { janet_vm.next_collection = 0; janet_vm.gc_interval = 0x400000; janet_vm.block_count = 0; + janet_vm.gc_mark_phase = 0; janet_symcache_init(); diff --git a/src/include/janet.h b/src/include/janet.h index bdc0993e..72a7adda 100644 --- a/src/include/janet.h +++ b/src/include/janet.h @@ -580,7 +580,6 @@ typedef void *JanetAbstract; #define JANET_STREAM_WRITABLE 0x400 #define JANET_STREAM_ACCEPTABLE 0x800 #define JANET_STREAM_UDPSERVER 0x1000 -#define JANET_STREAM_TOCLOSE 0x10000 typedef enum { JANET_ASYNC_EVENT_INIT, @@ -1499,7 +1498,6 @@ JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags); -JANET_API void janet_ev_connect(JanetStream *stream, int flags); #endif /* Write async to a stream */