diff --git a/src/core/ev.c b/src/core/ev.c index 603d1496..838bdc2b 100644 --- a/src/core/ev.c +++ b/src/core/ev.c @@ -1239,10 +1239,10 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { state->wbuf.buf = state->chunk_buf; if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { status = WSARecvFrom((SOCKET) s->stream->handle, &state->wbuf, 1, - NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); + NULL, &state->flags, &state->from, &state->fromlen, &state->overlapped, NULL); } else { status = WSARecv((SOCKET) s->stream->handle, &state->wbuf, 1, - NULL, &state->flags, &state->overlapped, NULL); + NULL, &state->flags, &state->overlapped, NULL); } if (status && (WSA_IO_PENDING != WSAGetLastError())) { janet_cancel(s->fiber, janet_ev_lasterr()); @@ -1262,78 +1262,78 @@ JanetAsyncStatus ev_machine_read(JanetListenerState *s, JanetAsyncEvent event) { #else case JANET_ASYNC_EVENT_ERR: case JANET_ASYNC_EVENT_HUP: { - if (state->bytes_read) { - janet_schedule(s->fiber, janet_wrap_buffer(state->buf)); - } else { - janet_schedule(s->fiber, janet_wrap_nil()); - } - return JANET_ASYNC_STATUS_DONE; + if (state->bytes_read) { + janet_schedule(s->fiber, janet_wrap_buffer(state->buf)); + } else { + janet_schedule(s->fiber, janet_wrap_nil()); } + return JANET_ASYNC_STATUS_DONE; + } case JANET_ASYNC_EVENT_READ: /* Read in bytes */ - { - JanetBuffer *buffer = state->buf; - int32_t bytes_left = state->bytes_left; - janet_buffer_extra(buffer, bytes_left); - ssize_t nread; + { + JanetBuffer *buffer = state->buf; + int32_t bytes_left = state->bytes_left; + janet_buffer_extra(buffer, bytes_left); + ssize_t nread; #ifdef JANET_NET - char saddr[256]; - socklen_t socklen = sizeof(saddr); + char saddr[256]; + socklen_t socklen = sizeof(saddr); #endif - do { + do { #ifdef JANET_NET - if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { - nread = recvfrom(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags, - (struct sockaddr *)&saddr, &socklen); - } else if (state->mode == JANET_ASYNC_READMODE_RECV) { - nread = recv(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags); - } else + if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { + nread = recvfrom(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags, + (struct sockaddr *)&saddr, &socklen); + } else if (state->mode == JANET_ASYNC_READMODE_RECV) { + nread = recv(s->stream->handle, buffer->data + buffer->count, bytes_left, state->flags); + } else #endif - { - nread = read(s->stream->handle, buffer->data + buffer->count, bytes_left); - } - } while (nread == -1 && errno == EINTR); - - /* Check for errors - special case errors that can just be waited on to fix */ - if (nread == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) break; - janet_cancel(s->fiber, janet_ev_lasterr()); - return JANET_ASYNC_STATUS_DONE; + { + nread = read(s->stream->handle, buffer->data + buffer->count, bytes_left); } + } while (nread == -1 && errno == EINTR); - /* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */ - state->bytes_read += nread; - if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { - janet_schedule(s->fiber, janet_wrap_nil()); - return JANET_ASYNC_STATUS_DONE; - } - - /* Increment buffer counts */ - buffer->count += nread; - bytes_left -= nread; - state->bytes_left = bytes_left; - - /* Resume if done */ - if (!state->is_chunk || bytes_left == 0 || nread == 0) { - Janet resume_val; -#ifdef JANET_NET - if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { - void *abst = janet_abstract(&janet_address_type, socklen); - memcpy(abst, &saddr, socklen); - resume_val = janet_wrap_abstract(abst); - } else -#endif - { - resume_val = janet_wrap_buffer(buffer); - } - janet_schedule(s->fiber, resume_val); - return JANET_ASYNC_STATUS_DONE; - } + /* Check for errors - special case errors that can just be waited on to fix */ + if (nread == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) break; + janet_cancel(s->fiber, janet_ev_lasterr()); + return JANET_ASYNC_STATUS_DONE; } - break; + + /* Only allow 0-length packets in recv-from. In stream protocols, a zero length packet is EOS. */ + state->bytes_read += nread; + if (state->bytes_read == 0 && (state->mode != JANET_ASYNC_READMODE_RECVFROM)) { + janet_schedule(s->fiber, janet_wrap_nil()); + return JANET_ASYNC_STATUS_DONE; + } + + /* Increment buffer counts */ + buffer->count += nread; + bytes_left -= nread; + state->bytes_left = bytes_left; + + /* Resume if done */ + if (!state->is_chunk || bytes_left == 0 || nread == 0) { + Janet resume_val; +#ifdef JANET_NET + if (state->mode == JANET_ASYNC_READMODE_RECVFROM) { + void *abst = janet_abstract(&janet_address_type, socklen); + memcpy(abst, &saddr, socklen); + resume_val = janet_wrap_abstract(abst); + } else #endif -} -return JANET_ASYNC_STATUS_NOT_DONE; + { + resume_val = janet_wrap_buffer(buffer); + } + janet_schedule(s->fiber, resume_val); + return JANET_ASYNC_STATUS_DONE; + } + } + break; +#endif + } + return JANET_ASYNC_STATUS_NOT_DONE; } static void janet_ev_read_generic(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int is_chunked, JanetReadMode mode, int flags) { @@ -1431,7 +1431,7 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) break; case JANET_ASYNC_EVENT_USER: { /* Begin write */ - int32_t start, len; + int32_t len; const uint8_t *bytes; if (state->is_buffer) { /* If buffer, convert to string. */ @@ -1555,7 +1555,7 @@ static void janet_ev_write_generic(JanetStream *stream, void *buf, void *dest_ab state->mode = mode; #ifdef JANET_WINDOWS state->flags = (DWORD) flags; - net_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); + ev_machine_write((JanetListenerState *) state, JANET_ASYNC_EVENT_USER); #else state->start = 0; state->flags = flags; diff --git a/src/core/net.c b/src/core/net.c index 4af55c13..82301916 100644 --- a/src/core/net.c +++ b/src/core/net.c @@ -57,6 +57,7 @@ const JanetAbstractType janet_address_type = { #ifdef JANET_WINDOWS #define JSOCKCLOSE(x) closesocket((SOCKET) x) +#define JSOCKDEFAULT INVALID_SOCKET #define JSOCKVALID(x) ((x) != INVALID_SOCKET) #define JSock SOCKET #define JSOCKFLAGS 0 @@ -72,7 +73,7 @@ const JanetAbstractType janet_address_type = { #endif #endif -static JanetStream *make_stream(JanetHandle handle, uint32_t flags); +static JanetStream *make_stream(JSock handle, uint32_t flags); /* We pass this flag to all send calls to prevent sigpipe */ #ifndef MSG_NOSIGNAL @@ -172,7 +173,7 @@ static int net_sched_accept_impl(NetStateAccept *state, Janet *err) { SOCKET lsock = (SOCKET) state->lstream->handle; SOCKET asock = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (asock == INVALID_SOCKET) { - *err = net_lasterr(); + *err = ev_lasterr(); return 1; } JanetStream *astream = make_stream(asock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE); @@ -181,7 +182,7 @@ static int net_sched_accept_impl(NetStateAccept *state, Janet *err) { if (FALSE == AcceptEx(lsock, asock, state->buf, 0, socksize, socksize, NULL, &state->overlapped)) { int code = WSAGetLastError(); if (code == WSA_IO_PENDING) return 0; /* indicates io is happening async */ - *err = net_lasterr(); + *err = ev_lasterr(); return 1; } return 0; @@ -614,8 +615,8 @@ static const JanetMethod net_stream_methods[] = { {NULL, NULL} }; -static JanetStream *make_stream(JanetHandle handle, uint32_t flags) { - return janet_stream(handle, flags | JANET_STREAM_SOCKET, net_stream_methods); +static JanetStream *make_stream(JSock handle, uint32_t flags) { + return janet_stream((JanetHandle) handle, flags | JANET_STREAM_SOCKET, net_stream_methods); } static const JanetReg net_cfuns[] = {