From b0a2df9ea35b87edb813ed444975b6c7e26a673d Mon Sep 17 00:00:00 2001 From: jcorporation Date: Sat, 19 Jan 2019 08:56:27 +0000 Subject: [PATCH] Feat: timeout for tiny_queue_length and tiny_queue_shift --- src/mpd_client.c | 18 ++++++++++-------- src/mympd_api.c | 6 ++++-- src/tiny_queue.c | 40 +++++++++++++++++++++++++++++++++++----- src/tiny_queue.h | 4 ++-- src/web_server.c | 26 ++++++++++++++------------ 5 files changed, 65 insertions(+), 29 deletions(-) diff --git a/src/mpd_client.c b/src/mpd_client.c index ef2cf94..58126b0 100644 --- a/src/mpd_client.c +++ b/src/mpd_client.c @@ -126,7 +126,7 @@ typedef struct t_sticker { long like; } t_sticker; -static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state, const int timeout); +static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state); static void mpd_client_parse_idle(t_config *config, t_mpd_state *mpd_state, const int idle_bitmask); static void mpd_client_api(t_config *config, t_mpd_state *mpd_state, void *arg_request); static void mpd_client_notify(const char *message, const size_t n); @@ -199,7 +199,7 @@ void *mpd_client_loop(void *arg_config) { LOG_INFO() printf("Reading last played songs: %d\n", len); while (s_signal_received == 0) { - mpd_client_idle(config, &mpd_state, 100); + mpd_client_idle(config, &mpd_state); } //Cleanup mpd_client_disconnect(config, &mpd_state); @@ -1107,7 +1107,7 @@ static void mpd_client_mpd_features(t_config *config, t_mpd_state *mpd_state) { LOG_INFO() printf("Disabling advanced search, depends on mpd >= 0.21.0 and libmpdclient >= 2.17.0.\n"); } -static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state, const int timeout) { +static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state) { struct pollfd fds[1]; int pollrc; char buffer[MAX_SIZE]; @@ -1168,8 +1168,8 @@ static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state, const int case MPD_CONNECTED: fds[0].fd = mpd_connection_get_fd(mpd_state->conn); fds[0].events = POLLIN; - pollrc = poll(fds, 1, timeout); - unsigned mpd_client_queue_length = tiny_queue_length(mpd_client_queue); + pollrc = poll(fds, 1, 50); + unsigned mpd_client_queue_length = tiny_queue_length(mpd_client_queue, 50); if (pollrc > 0 || mpd_client_queue_length > 0) { LOG_DEBUG() fprintf(stderr, "DEBUG: Leaving mpd idle mode.\n"); mpd_send_noidle(mpd_state->conn); @@ -1185,8 +1185,10 @@ static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state, const int if (mpd_client_queue_length > 0) { //Handle request LOG_DEBUG() fprintf(stderr, "DEBUG: Handle request.\n"); - struct work_request_t *req = tiny_queue_shift(mpd_client_queue); - mpd_client_api(config, mpd_state, req); + struct work_request_t *request = tiny_queue_shift(mpd_client_queue, 100); + if (request != NULL) { + mpd_client_api(config, mpd_state, request); + } } LOG_DEBUG() fprintf(stderr, "DEBUG: Entering mpd idle mode.\n"); mpd_send_idle(mpd_state->conn); @@ -2554,7 +2556,7 @@ static int mpd_client_put_stats(t_mpd_state *mpd_state, char *buffer) { static void mpd_client_disconnect(t_config *config, t_mpd_state *mpd_state) { mpd_state->conn_state = MPD_DISCONNECT; - mpd_client_idle(config, mpd_state, 100); +// mpd_client_idle(config, mpd_state); } static int mpd_client_smartpls_put(t_config *config, char *buffer, const char *playlist) { diff --git a/src/mympd_api.c b/src/mympd_api.c index 2bed934..ca34e5b 100644 --- a/src/mympd_api.c +++ b/src/mympd_api.c @@ -100,8 +100,10 @@ void *mympd_api_loop(void *arg_config) { } while (s_signal_received == 0) { - struct t_work_request *request = tiny_queue_shift(mympd_api_queue); - mympd_api(config, &mympd_state, request); + struct t_work_request *request = tiny_queue_shift(mympd_api_queue, 0); + if (request != NULL) { + mympd_api(config, &mympd_state, request); + } } list_free(&mympd_state.syscmd_list); diff --git a/src/tiny_queue.c b/src/tiny_queue.c index 17e3744..c6b40b5 100644 --- a/src/tiny_queue.c +++ b/src/tiny_queue.c @@ -21,6 +21,7 @@ #include #include #include +#include #include "tiny_queue.h" tiny_queue_t *tiny_queue_create(void) { @@ -63,18 +64,47 @@ void tiny_queue_push(tiny_queue_t *queue, void *data) { pthread_cond_signal(&queue->wakeup); } -int tiny_queue_length(tiny_queue_t *queue) { +int tiny_queue_length(tiny_queue_t *queue, int timeout) { pthread_mutex_lock(&queue->mutex); + if (timeout > 0) { + struct timespec max_wait = {0, 0}; + clock_gettime(CLOCK_REALTIME, &max_wait); + //timeout in ms + max_wait.tv_nsec += timeout * 1000; + while (queue->length == 0) { + // block if queue is empty + int rc = pthread_cond_timedwait(&queue->wakeup, &queue->mutex, &max_wait); + if (rc == ETIMEDOUT) { + break; + } + } + } unsigned len = queue->length; pthread_mutex_unlock(&queue->mutex); return len; } -void *tiny_queue_shift(tiny_queue_t *queue) { +void *tiny_queue_shift(tiny_queue_t *queue, int timeout) { pthread_mutex_lock(&queue->mutex); - while (queue->head == NULL) { - // block if buffer is empty - pthread_cond_wait(&queue->wakeup, &queue->mutex); + if (timeout > 0) { + struct timespec max_wait = {0, 0}; + clock_gettime(CLOCK_REALTIME, &max_wait); + //timeout in ms + max_wait.tv_nsec += timeout * 1000; + while (queue->head == NULL) { + // block if buffer is empty + int rc = pthread_cond_timedwait(&queue->wakeup, &queue->mutex, &max_wait); + if (rc == ETIMEDOUT) { + pthread_mutex_unlock(&queue->mutex); + return NULL; + } + } + } + else { + while (queue->head == NULL) { + // block if buffer is empty + pthread_cond_wait(&queue->wakeup, &queue->mutex); + } } struct tiny_msg_t* current_head = queue->head; diff --git a/src/tiny_queue.h b/src/tiny_queue.h index 64efeda..2ad1d36 100644 --- a/src/tiny_queue.h +++ b/src/tiny_queue.h @@ -36,6 +36,6 @@ typedef struct tiny_queue_t { tiny_queue_t *tiny_queue_create(void); void tiny_queue_free(tiny_queue_t *queue); void tiny_queue_push(struct tiny_queue_t *queue, void *data); -void *tiny_queue_shift(struct tiny_queue_t *queue); -int tiny_queue_length(struct tiny_queue_t *queue); +void *tiny_queue_shift(struct tiny_queue_t *queue, int timeout); +int tiny_queue_length(struct tiny_queue_t *queue, int timeout); #endif diff --git a/src/web_server.c b/src/web_server.c index 6a8696a..12dd0b8 100644 --- a/src/web_server.c +++ b/src/web_server.c @@ -106,19 +106,21 @@ void web_server_free(void *arg_mgr) { void *web_server_loop(void *arg_mgr) { struct mg_mgr *mgr = (struct mg_mgr *) arg_mgr; while (s_signal_received == 0) { - mg_mgr_poll(mgr, 100); - unsigned web_server_queue_length = tiny_queue_length(web_server_queue); - if (web_server_queue_length > 0) { - t_work_result *response = tiny_queue_shift(web_server_queue); - if (response->conn_id == 0) { - //Websocket notify from mpd idle - send_ws_notify(mgr, response); - } - else { - //api response - send_api_response(mgr, response); + mg_mgr_poll(mgr, 50); +// unsigned web_server_queue_length = tiny_queue_length(web_server_queue, 100); +// if (web_server_queue_length > 0) { + t_work_result *response = tiny_queue_shift(web_server_queue, 50); + if (response != NULL) { + if (response->conn_id == 0) { + //Websocket notify from mpd idle + send_ws_notify(mgr, response); + } + else { + //api response + send_api_response(mgr, response); + } } - } +// } } mg_mgr_free(mgr); return NULL;