From 158b7caeb426bc5f0f0b6764824dd96f9ed3f442 Mon Sep 17 00:00:00 2001 From: Carles Fernandez Date: Wed, 7 Aug 2024 09:34:09 +0200 Subject: [PATCH] Improve efficiency of Concurrent_Map and Concurrent_Queue classes --- src/core/receiver/concurrent_map.h | 40 ++++++++---------- src/core/receiver/concurrent_queue.h | 61 ++++++++++++++-------------- 2 files changed, 49 insertions(+), 52 deletions(-) diff --git a/src/core/receiver/concurrent_map.h b/src/core/receiver/concurrent_map.h index 330ab02ef..d16a963d7 100644 --- a/src/core/receiver/concurrent_map.h +++ b/src/core/receiver/concurrent_map.h @@ -36,52 +36,48 @@ template */ class Concurrent_Map { - typedef typename std::map::iterator Data_iterator; // iterator is scope dependent public: void write(int key, Data const& data) { - std::unique_lock lock(the_mutex); - Data_iterator data_iter; - data_iter = the_map.find(key); + std::lock_guard lock(the_mutex); + auto data_iter = the_map.find(key); if (data_iter != the_map.end()) { data_iter->second = data; // update } else { - the_map.insert(std::pair(key, data)); // insert SILENTLY fails if the item already exists in the map! + the_map.insert(std::pair(key, data)); // insert does not overwrite if the item already exists in the map! } - lock.unlock(); } - std::map get_map_copy() + std::map get_map_copy() const& { - std::unique_lock lock(the_mutex); - std::map map_aux = the_map; - lock.unlock(); - return map_aux; + std::lock_guard lock(the_mutex); + return the_map; // This implicitly creates a copy } - size_t size() + std::map get_map_copy() && { - std::unique_lock lock(the_mutex); - size_t size_ = the_map.size(); - lock.unlock(); - return size_; + std::lock_guard lock(the_mutex); + return std::move(the_map); } - bool read(int key, Data& p_data) + size_t size() const { - std::unique_lock lock(the_mutex); - Data_iterator data_iter; - data_iter = the_map.find(key); + std::lock_guard lock(the_mutex); + return the_map.size(); + } + + bool read(int key, Data& p_data) const + { + std::lock_guard lock(the_mutex); + auto data_iter = the_map.find(key); if (data_iter != the_map.end()) { p_data = data_iter->second; - lock.unlock(); return true; } - lock.unlock(); return false; } diff --git a/src/core/receiver/concurrent_queue.h b/src/core/receiver/concurrent_queue.h index ac7be0f17..ed4e68b0a 100644 --- a/src/core/receiver/concurrent_queue.h +++ b/src/core/receiver/concurrent_queue.h @@ -19,9 +19,10 @@ #include #include +#include #include #include -#include +#include /** \addtogroup Core * \{ */ @@ -33,48 +34,53 @@ template /*! * \brief This class implements a thread-safe std::queue - * - * Thread-safe object queue which uses the library - * boost_thread to perform MUTEX based on the code available at - * https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html */ class Concurrent_Queue { public: - void push(Data const& data) + void push(const Data& data) { - std::unique_lock lock(the_mutex); - the_queue.push(data); - lock.unlock(); + { + std::lock_guard lock(the_mutex); + the_queue.push(data); + } the_condition_variable.notify_one(); } - bool empty() const + void push(Data&& data) { - std::unique_lock lock(the_mutex); - return the_queue.empty(); + { + std::lock_guard lock(the_mutex); + the_queue.push(std::move(data)); + } + the_condition_variable.notify_one(); } - size_t size() const + bool empty() const noexcept { - std::unique_lock lock(the_mutex); + return size() == 0; + } + + size_t size() const noexcept + { + std::lock_guard lock(the_mutex); return the_queue.size(); } void clear() { - std::unique_lock lock(the_mutex); - the_queue = std::queue(); + std::lock_guard lock(the_mutex); + std::queue().swap(the_queue); } bool try_pop(Data& popped_value) { - std::unique_lock lock(the_mutex); + std::lock_guard lock(the_mutex); if (the_queue.empty()) { return false; } - popped_value = the_queue.front(); + popped_value = std::move(the_queue.front()); the_queue.pop(); return true; } @@ -82,26 +88,21 @@ public: void wait_and_pop(Data& popped_value) { std::unique_lock lock(the_mutex); - while (the_queue.empty()) - { - the_condition_variable.wait(lock); - } - popped_value = the_queue.front(); + the_condition_variable.wait(lock, [this] { return !the_queue.empty(); }); + popped_value = std::move(the_queue.front()); the_queue.pop(); } bool timed_wait_and_pop(Data& popped_value, int wait_ms) { std::unique_lock lock(the_mutex); - if (the_queue.empty()) + if (!the_condition_variable.wait_for(lock, + std::chrono::milliseconds(wait_ms), + [this] { return !the_queue.empty(); })) { - the_condition_variable.wait_for(lock, std::chrono::milliseconds(wait_ms)); - if (the_queue.empty()) - { - return false; - } + return false; } - popped_value = the_queue.front(); + popped_value = std::move(the_queue.front()); the_queue.pop(); return true; }