mirror of
				https://github.com/SuperBFG7/ympd
				synced 2025-10-31 13:53:00 +00:00 
			
		
		
		
	Feat: timeout for tiny_queue_length and tiny_queue_shift
This commit is contained in:
		| @@ -126,7 +126,7 @@ typedef struct t_sticker { | |||||||
|     long like; |     long like; | ||||||
| } t_sticker; | } t_sticker; | ||||||
|  |  | ||||||
| static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state, const int timeout); | static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state); | ||||||
| static void mpd_client_parse_idle(t_config *config, t_mpd_state *mpd_state, const int idle_bitmask); | static void mpd_client_parse_idle(t_config *config, t_mpd_state *mpd_state, const int idle_bitmask); | ||||||
| static void mpd_client_api(t_config *config, t_mpd_state *mpd_state, void *arg_request); | static void mpd_client_api(t_config *config, t_mpd_state *mpd_state, void *arg_request); | ||||||
| static void mpd_client_notify(const char *message, const size_t n); | static void mpd_client_notify(const char *message, const size_t n); | ||||||
| @@ -199,7 +199,7 @@ void *mpd_client_loop(void *arg_config) { | |||||||
|     LOG_INFO() printf("Reading last played songs: %d\n", len); |     LOG_INFO() printf("Reading last played songs: %d\n", len); | ||||||
|      |      | ||||||
|     while (s_signal_received == 0) { |     while (s_signal_received == 0) { | ||||||
|         mpd_client_idle(config, &mpd_state, 100); |         mpd_client_idle(config, &mpd_state); | ||||||
|     } |     } | ||||||
|     //Cleanup |     //Cleanup | ||||||
|     mpd_client_disconnect(config, &mpd_state); |     mpd_client_disconnect(config, &mpd_state); | ||||||
| @@ -1107,7 +1107,7 @@ static void mpd_client_mpd_features(t_config *config, t_mpd_state *mpd_state) { | |||||||
|         LOG_INFO() printf("Disabling advanced search, depends on mpd >= 0.21.0 and libmpdclient >= 2.17.0.\n"); |         LOG_INFO() printf("Disabling advanced search, depends on mpd >= 0.21.0 and libmpdclient >= 2.17.0.\n"); | ||||||
| } | } | ||||||
|  |  | ||||||
| static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state, const int timeout) { | static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state) { | ||||||
|     struct pollfd fds[1]; |     struct pollfd fds[1]; | ||||||
|     int pollrc; |     int pollrc; | ||||||
|     char buffer[MAX_SIZE]; |     char buffer[MAX_SIZE]; | ||||||
| @@ -1168,8 +1168,8 @@ static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state, const int | |||||||
|         case MPD_CONNECTED: |         case MPD_CONNECTED: | ||||||
|             fds[0].fd = mpd_connection_get_fd(mpd_state->conn); |             fds[0].fd = mpd_connection_get_fd(mpd_state->conn); | ||||||
|             fds[0].events = POLLIN; |             fds[0].events = POLLIN; | ||||||
|             pollrc = poll(fds, 1, timeout); |             pollrc = poll(fds, 1, 50); | ||||||
|             unsigned mpd_client_queue_length = tiny_queue_length(mpd_client_queue); |             unsigned mpd_client_queue_length = tiny_queue_length(mpd_client_queue, 50); | ||||||
|             if (pollrc > 0 || mpd_client_queue_length > 0) { |             if (pollrc > 0 || mpd_client_queue_length > 0) { | ||||||
|                 LOG_DEBUG() fprintf(stderr, "DEBUG: Leaving mpd idle mode.\n"); |                 LOG_DEBUG() fprintf(stderr, "DEBUG: Leaving mpd idle mode.\n"); | ||||||
|                 mpd_send_noidle(mpd_state->conn); |                 mpd_send_noidle(mpd_state->conn); | ||||||
| @@ -1185,8 +1185,10 @@ static void mpd_client_idle(t_config *config, t_mpd_state *mpd_state, const int | |||||||
|                 if (mpd_client_queue_length > 0) { |                 if (mpd_client_queue_length > 0) { | ||||||
|                     //Handle request |                     //Handle request | ||||||
|                     LOG_DEBUG() fprintf(stderr, "DEBUG: Handle request.\n"); |                     LOG_DEBUG() fprintf(stderr, "DEBUG: Handle request.\n"); | ||||||
|                     struct work_request_t *req = tiny_queue_shift(mpd_client_queue); |                     struct work_request_t *request = tiny_queue_shift(mpd_client_queue, 100); | ||||||
|                     mpd_client_api(config, mpd_state, req); |                     if (request != NULL) { | ||||||
|  |                         mpd_client_api(config, mpd_state, request); | ||||||
|  |                     } | ||||||
|                 } |                 } | ||||||
|                 LOG_DEBUG() fprintf(stderr, "DEBUG: Entering mpd idle mode.\n"); |                 LOG_DEBUG() fprintf(stderr, "DEBUG: Entering mpd idle mode.\n"); | ||||||
|                 mpd_send_idle(mpd_state->conn); |                 mpd_send_idle(mpd_state->conn); | ||||||
| @@ -2554,7 +2556,7 @@ static int mpd_client_put_stats(t_mpd_state *mpd_state, char *buffer) { | |||||||
|  |  | ||||||
| static void mpd_client_disconnect(t_config *config, t_mpd_state *mpd_state) { | static void mpd_client_disconnect(t_config *config, t_mpd_state *mpd_state) { | ||||||
|     mpd_state->conn_state = MPD_DISCONNECT; |     mpd_state->conn_state = MPD_DISCONNECT; | ||||||
|     mpd_client_idle(config, mpd_state, 100); | //    mpd_client_idle(config, mpd_state); | ||||||
| } | } | ||||||
|  |  | ||||||
| static int mpd_client_smartpls_put(t_config *config, char *buffer, const char *playlist) { | static int mpd_client_smartpls_put(t_config *config, char *buffer, const char *playlist) { | ||||||
|   | |||||||
| @@ -100,8 +100,10 @@ void *mympd_api_loop(void *arg_config) { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     while (s_signal_received == 0) { |     while (s_signal_received == 0) { | ||||||
|         struct t_work_request *request = tiny_queue_shift(mympd_api_queue); |         struct t_work_request *request = tiny_queue_shift(mympd_api_queue, 0); | ||||||
|         mympd_api(config, &mympd_state, request); |         if (request != NULL) { | ||||||
|  |             mympd_api(config, &mympd_state, request); | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     list_free(&mympd_state.syscmd_list); |     list_free(&mympd_state.syscmd_list); | ||||||
|   | |||||||
| @@ -21,6 +21,7 @@ | |||||||
| #include <stdlib.h> | #include <stdlib.h> | ||||||
| #include <stdbool.h> | #include <stdbool.h> | ||||||
| #include <pthread.h> | #include <pthread.h> | ||||||
|  | #include <asm/errno.h> | ||||||
| #include "tiny_queue.h" | #include "tiny_queue.h" | ||||||
|  |  | ||||||
| tiny_queue_t *tiny_queue_create(void) { | tiny_queue_t *tiny_queue_create(void) { | ||||||
| @@ -63,18 +64,47 @@ void tiny_queue_push(tiny_queue_t *queue, void *data) { | |||||||
|     pthread_cond_signal(&queue->wakeup); |     pthread_cond_signal(&queue->wakeup); | ||||||
| } | } | ||||||
|  |  | ||||||
| int tiny_queue_length(tiny_queue_t *queue) { | int tiny_queue_length(tiny_queue_t *queue, int timeout) { | ||||||
|     pthread_mutex_lock(&queue->mutex); |     pthread_mutex_lock(&queue->mutex); | ||||||
|  |     if (timeout > 0) { | ||||||
|  |         struct timespec max_wait = {0, 0}; | ||||||
|  |         clock_gettime(CLOCK_REALTIME, &max_wait); | ||||||
|  |         //timeout in ms | ||||||
|  |         max_wait.tv_nsec += timeout * 1000; | ||||||
|  |         while (queue->length == 0) {  | ||||||
|  |             // block if queue is empty | ||||||
|  |             int rc = pthread_cond_timedwait(&queue->wakeup, &queue->mutex, &max_wait); | ||||||
|  |             if (rc == ETIMEDOUT) { | ||||||
|  |                 break; | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|     unsigned len = queue->length; |     unsigned len = queue->length; | ||||||
|     pthread_mutex_unlock(&queue->mutex); |     pthread_mutex_unlock(&queue->mutex); | ||||||
|     return len; |     return len; | ||||||
| } | } | ||||||
|  |  | ||||||
| void *tiny_queue_shift(tiny_queue_t *queue) { | void *tiny_queue_shift(tiny_queue_t *queue, int timeout) { | ||||||
|     pthread_mutex_lock(&queue->mutex); |     pthread_mutex_lock(&queue->mutex); | ||||||
|     while (queue->head == NULL) {  |     if (timeout > 0) { | ||||||
|         // block if buffer is empty |         struct timespec max_wait = {0, 0}; | ||||||
|         pthread_cond_wait(&queue->wakeup, &queue->mutex); |         clock_gettime(CLOCK_REALTIME, &max_wait); | ||||||
|  |         //timeout in ms | ||||||
|  |         max_wait.tv_nsec += timeout * 1000; | ||||||
|  |         while (queue->head == NULL) {  | ||||||
|  |             // block if buffer is empty | ||||||
|  |             int rc = pthread_cond_timedwait(&queue->wakeup, &queue->mutex, &max_wait); | ||||||
|  |             if (rc == ETIMEDOUT) { | ||||||
|  |                 pthread_mutex_unlock(&queue->mutex); | ||||||
|  |                 return NULL; | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     else { | ||||||
|  |         while (queue->head == NULL) {  | ||||||
|  |             // block if buffer is empty | ||||||
|  |             pthread_cond_wait(&queue->wakeup, &queue->mutex); | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     struct tiny_msg_t* current_head = queue->head; |     struct tiny_msg_t* current_head = queue->head; | ||||||
|   | |||||||
| @@ -36,6 +36,6 @@ typedef struct tiny_queue_t { | |||||||
| tiny_queue_t *tiny_queue_create(void); | tiny_queue_t *tiny_queue_create(void); | ||||||
| void tiny_queue_free(tiny_queue_t *queue); | void tiny_queue_free(tiny_queue_t *queue); | ||||||
| void tiny_queue_push(struct tiny_queue_t *queue, void *data); | void tiny_queue_push(struct tiny_queue_t *queue, void *data); | ||||||
| void *tiny_queue_shift(struct tiny_queue_t *queue); | void *tiny_queue_shift(struct tiny_queue_t *queue, int timeout); | ||||||
| int tiny_queue_length(struct tiny_queue_t *queue); | int tiny_queue_length(struct tiny_queue_t *queue, int timeout); | ||||||
| #endif | #endif | ||||||
|   | |||||||
| @@ -106,19 +106,21 @@ void web_server_free(void *arg_mgr) { | |||||||
| void *web_server_loop(void *arg_mgr) { | void *web_server_loop(void *arg_mgr) { | ||||||
|     struct mg_mgr *mgr = (struct mg_mgr *) arg_mgr; |     struct mg_mgr *mgr = (struct mg_mgr *) arg_mgr; | ||||||
|     while (s_signal_received == 0) { |     while (s_signal_received == 0) { | ||||||
|         mg_mgr_poll(mgr, 100); |         mg_mgr_poll(mgr, 50); | ||||||
|         unsigned web_server_queue_length = tiny_queue_length(web_server_queue); | //        unsigned web_server_queue_length = tiny_queue_length(web_server_queue, 100); | ||||||
|         if (web_server_queue_length > 0) { | //        if (web_server_queue_length > 0) { | ||||||
|             t_work_result *response = tiny_queue_shift(web_server_queue); |             t_work_result *response = tiny_queue_shift(web_server_queue, 50); | ||||||
|             if (response->conn_id == 0) { |             if (response != NULL) { | ||||||
|                 //Websocket notify from mpd idle |                 if (response->conn_id == 0) { | ||||||
|                 send_ws_notify(mgr, response); |                     //Websocket notify from mpd idle | ||||||
|             }  |                     send_ws_notify(mgr, response); | ||||||
|             else { |                 }  | ||||||
|                 //api response |                 else { | ||||||
|                 send_api_response(mgr, response); |                     //api response | ||||||
|  |                     send_api_response(mgr, response); | ||||||
|  |                 } | ||||||
|             } |             } | ||||||
|         } | //        } | ||||||
|     } |     } | ||||||
|     mg_mgr_free(mgr); |     mg_mgr_free(mgr); | ||||||
|     return NULL; |     return NULL; | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 jcorporation
					jcorporation