diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 89736052b..a3e27ac9a 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -106,6 +106,7 @@ All notable changes to GNSS-SDR will be documented in this file. Accordingly, the GNSS-SDR building system now looks for OpenSSL in the first place and, if not found, then it looks for GnuTLS as a fallback. - Allow linking against Boost 1.87.0. +- Replace the System V queues by boost::interprocess, improving portability. ### Reliability diff --git a/src/algorithms/PVT/gnuradio_blocks/rtklib_pvt_gs.cc b/src/algorithms/PVT/gnuradio_blocks/rtklib_pvt_gs.cc index 33db19222..2bc2ae41a 100644 --- a/src/algorithms/PVT/gnuradio_blocks/rtklib_pvt_gs.cc +++ b/src/algorithms/PVT/gnuradio_blocks/rtklib_pvt_gs.cc @@ -77,8 +77,6 @@ #include // for locale #include // for ostringstream #include // for length_error -#include // for IPC_CREAT -#include // for msgctl #include // for std::type_info, typeid #include // for pair @@ -130,6 +128,7 @@ rtklib_pvt_gs::rtklib_pvt_gs(uint32_t nchannels, : gr::sync_block("rtklib_pvt_gs", gr::io_signature::make(nchannels, nchannels, sizeof(Gnss_Synchro)), gr::io_signature::make(0, 0, 0)), + d_queue_name("gnss_sdr_ttff_message_queue"), d_dump_filename(conf_.dump_filename), d_geohash(std::make_unique()), d_gps_ephemeris_sptr_type_hash_code(typeid(std::shared_ptr).hash_code()), @@ -509,14 +508,26 @@ rtklib_pvt_gs::rtklib_pvt_gs(uint32_t nchannels, d_eph_udp_sink_ptr = nullptr; } - // Create Sys V message queue + // Create message queue d_first_fix = true; - d_sysv_msg_key = 1101; - const int msgflg = IPC_CREAT | 0666; - if ((d_sysv_msqid = msgget(d_sysv_msg_key, msgflg)) == -1) + const std::size_t max_num_messages = 100; + try { - std::cout << "GNSS-SDR cannot create System V message queues.\n"; - LOG(WARNING) << "The System V message queue is not available. Error: " << errno << " - " << strerror(errno); + // Remove any existing queue with the same name + boost::interprocess::message_queue::remove(d_queue_name.c_str()); + + // Create a new message queue + d_mq = std::make_unique( + boost::interprocess::create_only, // Create a new queue + d_queue_name.c_str(), // Queue name + max_num_messages, // Maximum number of messages + sizeof(double) // Maximum message size + ); + } + + catch (const boost::interprocess::interprocess_exception& e) + { + std::cerr << "Error creating message queue: " << e.what() << std::endl; } // Display time in local time zone @@ -601,9 +612,9 @@ rtklib_pvt_gs::rtklib_pvt_gs(uint32_t nchannels, rtklib_pvt_gs::~rtklib_pvt_gs() { DLOG(INFO) << "PVT block destructor called."; - if (d_sysv_msqid != -1) + if (d_mq) { - msgctl(d_sysv_msqid, IPC_RMID, nullptr); + boost::interprocess::message_queue::remove(d_queue_name.c_str()); } try { @@ -1732,21 +1743,19 @@ void rtklib_pvt_gs::clear_ephemeris() } -bool rtklib_pvt_gs::send_sys_v_ttff_msg(d_ttff_msgbuf ttff) const +bool rtklib_pvt_gs::send_ttff_msg(double ttff) const { - if (d_sysv_msqid != -1) + if (d_mq) { - // Fill Sys V message structures - int msgsend_size; - d_ttff_msgbuf msg; - msg.ttff = ttff.ttff; - msgsend_size = sizeof(msg.ttff); - msg.mtype = 1; // default message ID - - // SEND SOLUTION OVER A MESSAGE QUEUE - // non-blocking Sys V message send - msgsnd(d_sysv_msqid, &msg, msgsend_size, IPC_NOWAIT); - return true; + try + { + d_mq->send(&ttff, sizeof(ttff), 0); // Priority 0 + return true; + } + catch (const boost::interprocess::interprocess_exception& e) + { + std::cerr << "Error sending message: " << e.what() << std::endl; + } } return false; } @@ -2389,12 +2398,10 @@ int rtklib_pvt_gs::work(int noutput_items, gr_vector_const_void_star& input_item } std::cout << " is Lat = " << d_user_pvt_solver->get_latitude() << " [deg], Long = " << d_user_pvt_solver->get_longitude() << " [deg], Height= " << d_user_pvt_solver->get_height() << " [m]\n"; - d_ttff_msgbuf ttff; - ttff.mtype = 1; d_end = std::chrono::system_clock::now(); std::chrono::duration elapsed_seconds = d_end - d_start; - ttff.ttff = elapsed_seconds.count(); - send_sys_v_ttff_msg(ttff); + double ttff = elapsed_seconds.count(); + send_ttff_msg(ttff); d_first_fix = false; } if (d_kml_output_enabled) diff --git a/src/algorithms/PVT/gnuradio_blocks/rtklib_pvt_gs.h b/src/algorithms/PVT/gnuradio_blocks/rtklib_pvt_gs.h index 65f7bd548..b54b3d670 100644 --- a/src/algorithms/PVT/gnuradio_blocks/rtklib_pvt_gs.h +++ b/src/algorithms/PVT/gnuradio_blocks/rtklib_pvt_gs.h @@ -18,12 +18,14 @@ #define GNSS_SDR_RTKLIB_PVT_GS_H #include "gnss_block_interface.h" +#include "gnss_sdr_make_unique.h" #include "gnss_synchro.h" #include "gnss_time.h" #include "osnma_data.h" #include "rtklib.h" #include #include +#include #include // for sync_block #include // for gr_vector_const_void_star #include // for pmt_t @@ -37,7 +39,6 @@ #include // for std::queue #include // for std::set #include // for string -#include // for key_t #include // for vector /** \addtogroup PVT @@ -166,13 +167,7 @@ private: std::vector split_string(const std::string& s, char delim) const; - typedef struct - { - long mtype; // NOLINT(google-runtime-int) - double ttff; - } d_ttff_msgbuf; - bool send_sys_v_ttff_msg(d_ttff_msgbuf ttff) const; - + bool send_ttff_msg(double ttff) const; bool save_gnss_synchro_map_xml(const std::string& file_name); // debug helper function bool load_gnss_synchro_map_xml(const std::string& file_name); // debug helper function @@ -181,6 +176,8 @@ private: std::shared_ptr d_internal_pvt_solver; std::shared_ptr d_user_pvt_solver; + std::unique_ptr d_mq; + std::unique_ptr d_rp; std::unique_ptr d_kml_dump; std::unique_ptr d_gpx_dump; @@ -195,6 +192,7 @@ private: std::chrono::time_point d_start; std::chrono::time_point d_end; + std::string d_queue_name; std::string d_dump_filename; std::string d_xml_base_path; std::string d_local_time_str; @@ -238,9 +236,6 @@ private: uint64_t d_local_counter_ms; uint64_t d_timestamp_rx_clock_offset_correction_msg_ms; - key_t d_sysv_msg_key; - int d_sysv_msqid; - int32_t d_rinexobs_rate_ms; int32_t d_rtcm_MT1045_rate_ms; // Galileo Broadcast Ephemeris int32_t d_rtcm_MT1019_rate_ms; // GPS Broadcast Ephemeris (orbits) diff --git a/src/core/receiver/control_thread.cc b/src/core/receiver/control_thread.cc index 52b0b344d..2e6231c91 100644 --- a/src/core/receiver/control_thread.cc +++ b/src/core/receiver/control_thread.cc @@ -36,34 +36,34 @@ #include "gnss_flowgraph.h" #include "gnss_satellite.h" #include "gnss_sdr_flags.h" -#include "gps_acq_assist.h" // for Gps_Acq_Assist -#include "gps_almanac.h" // for Gps_Almanac -#include "gps_cnav_ephemeris.h" // for Gps_CNAV_Ephemeris -#include "gps_cnav_utc_model.h" // for Gps_CNAV_Utc_Model -#include "gps_ephemeris.h" // for Gps_Ephemeris -#include "gps_iono.h" // for Gps_Iono -#include "gps_utc_model.h" // for Gps_Utc_Model -#include "pvt_interface.h" // for PvtInterface -#include "rtklib.h" // for gtime_t, alm_t -#include "rtklib_conversions.h" // for alm_to_rtklib -#include "rtklib_ephemeris.h" // for alm2pos, eph2pos -#include "rtklib_rtkcmn.h" // for utc2gpst -#include // for interaction with geofunctions -#include // for bad_lexical_cast -#include // for make_any -#include // for find, min -#include // for milliseconds -#include // for floor, fmod, log -#include // for signal, SIGINT -#include // for time_t, gmtime, strftime -#include // for exception -#include // for operator<< -#include // for numeric_limits -#include // for map -#include // for pthread_cancel -#include // for invalid_argument -#include // for IPC_CREAT -#include // for msgctl, msgget +#include "gnss_sdr_make_unique.h" +#include "gps_acq_assist.h" // for Gps_Acq_Assist +#include "gps_almanac.h" // for Gps_Almanac +#include "gps_cnav_ephemeris.h" // for Gps_CNAV_Ephemeris +#include "gps_cnav_utc_model.h" // for Gps_CNAV_Utc_Model +#include "gps_ephemeris.h" // for Gps_Ephemeris +#include "gps_iono.h" // for Gps_Iono +#include "gps_utc_model.h" // for Gps_Utc_Model +#include "pvt_interface.h" // for PvtInterface +#include "rtklib.h" // for gtime_t, alm_t +#include "rtklib_conversions.h" // for alm_to_rtklib +#include "rtklib_ephemeris.h" // for alm2pos, eph2pos +#include "rtklib_rtkcmn.h" // for utc2gpst +#include // for interaction with geofunctions +#include // for message_queue +#include // for bad_lexical_cast +#include // for make_any +#include // for find, min +#include // for milliseconds +#include // for floor, fmod, log +#include // for signal, SIGINT +#include // for time_t, gmtime, strftime +#include // for exception +#include // for operator<< +#include // for numeric_limits +#include // for map +#include // for pthread_cancel +#include // for invalid_argument #if USE_GLOG_AND_GFLAGS #include @@ -197,7 +197,7 @@ void ControlThread::init() supl_mns_ = 0; supl_lac_ = 0; supl_ci_ = 0; - msqid_ = -1; + agnss_ref_location_ = Agnss_Ref_Location(); agnss_ref_time_ = Agnss_Ref_Time(); @@ -273,14 +273,11 @@ void ControlThread::init() ControlThread::~ControlThread() // NOLINT(modernize-use-equals-default) { DLOG(INFO) << "Control Thread destructor called"; - if (msqid_ != -1) - { - msgctl(msqid_, IPC_RMID, nullptr); - } + boost::interprocess::message_queue::remove(control_message_queue_name_.c_str()); - if (sysv_queue_thread_.joinable()) + if (message_queue_thread_.joinable()) { - sysv_queue_thread_.join(); + message_queue_thread_.join(); } if (cmd_interface_thread_.joinable()) @@ -429,7 +426,7 @@ int ControlThread::run() { keyboard_thread_ = std::thread(&ControlThread::keyboard_listener, this); } - sysv_queue_thread_ = std::thread(&ControlThread::sysv_queue_listener, this); + message_queue_thread_ = std::thread(&ControlThread::message_queue_listener, this); // start the telecommand listener thread cmd_interface_.set_pvt(flowgraph_->get_pvt()); @@ -1214,40 +1211,55 @@ void ControlThread::gps_acq_assist_data_collector() const } -void ControlThread::sysv_queue_listener() +void ControlThread::message_queue_listener() { - typedef struct - { - long mtype; // NOLINT(google-runtime-int) required by SysV queue messaging - double stop_message; - } stop_msgbuf; - bool read_queue = true; - stop_msgbuf msg; double received_message = 0.0; - const int msgrcv_size = sizeof(msg.stop_message); - - const key_t key = 1102; - - if ((msqid_ = msgget(key, 0644 | IPC_CREAT)) == -1) + try { - std::cerr << "GNSS-SDR cannot create SysV message queues\n"; - read_queue = false; - } + // Remove any existing queue with the same name + boost::interprocess::message_queue::remove(control_message_queue_name_.c_str()); - while (read_queue && !stop_) - { - if (msgrcv(msqid_, &msg, msgrcv_size, 1, 0) != -1) + // Create a new message queue + auto mq = std::make_unique( + boost::interprocess::create_only, // Create a new queue + control_message_queue_name_.c_str(), // Queue name + 10, // Maximum number of messages + sizeof(double) // Maximum message size + ); + + while (read_queue && !stop_) { - received_message = msg.stop_message; - if ((std::abs(received_message - (-200.0)) < 10 * std::numeric_limits::epsilon())) + unsigned int priority; + std::size_t received_size; + // Receive a message (non-blocking) + if (mq->try_receive(&received_message, sizeof(received_message), received_size, priority)) { - std::cout << "Quit order received, stopping GNSS-SDR !!\n"; - control_queue_->push(pmt::make_any(command_event_make(200, 0))); - read_queue = false; + // Validate the size of the received message + if (received_size == sizeof(double)) + { + if (std::abs(received_message - (-200.0)) < 10 * std::numeric_limits::epsilon()) + { + std::cout << "Quit order received, stopping GNSS-SDR !!\n"; + + // Push the control command to the control queue + control_queue_->push(pmt::make_any(command_event_make(200, 0))); + read_queue = false; + } + } + } + else + { + // No message available, add a small delay to prevent busy-waiting + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } } + catch (const boost::interprocess::interprocess_exception &e) + { + std::cerr << "Error in message queue listener: " << e.what() << '\n'; + read_queue = false; + } } diff --git a/src/core/receiver/control_thread.h b/src/core/receiver/control_thread.h index b3340e94b..e1b29dcb6 100644 --- a/src/core/receiver/control_thread.h +++ b/src/core/receiver/control_thread.h @@ -160,7 +160,7 @@ private: void telecommand_listener(); void keyboard_listener(); - void sysv_queue_listener(); + void message_queue_listener(); void print_help_at_exit() const; // default filename for assistance data @@ -179,6 +179,8 @@ private: const std::string gal_almanac_default_xml_filename_ = "./gal_almanac.xml"; const std::string gps_almanac_default_xml_filename_ = "./gps_almanac.xml"; + const std::string control_message_queue_name_ = "receiver_control_queue"; + const size_t channel_event_type_hash_code_ = typeid(channel_event_sptr).hash_code(); const size_t command_event_type_hash_code_ = typeid(command_event_sptr).hash_code(); @@ -188,7 +190,7 @@ private: std::thread cmd_interface_thread_; std::thread keyboard_thread_; - std::thread sysv_queue_thread_; + std::thread message_queue_thread_; std::thread gps_acq_assist_data_collector_thread_; #ifdef ENABLE_FPGA @@ -210,7 +212,6 @@ private: unsigned int processed_control_messages_; unsigned int applied_actions_; - int msqid_; bool well_formatted_configuration_; bool conf_file_has_section_; diff --git a/tests/system-tests/ttff.cc b/tests/system-tests/ttff.cc index 47e74306f..3875781e1 100644 --- a/tests/system-tests/ttff.cc +++ b/tests/system-tests/ttff.cc @@ -21,11 +21,13 @@ #include "control_thread.h" #include "file_configuration.h" #include "gnss_flowgraph.h" +#include "gnss_sdr_make_unique.h" #include "gps_acq_assist.h" #include "in_memory_configuration.h" #include #include #include +#include #include #include #include @@ -35,9 +37,6 @@ #include #include #include -#include -#include -#include #include #if USE_GLOG_AND_GFLAGS @@ -79,13 +78,6 @@ Concurrent_Map global_gps_acq_assist_map; std::vector TTFF_v; -typedef struct -{ - long mtype; // required by SysV message - double ttff; -} ttff_msgbuf; - - class TtffTest : public ::testing::Test { public: @@ -301,45 +293,72 @@ void TtffTest::config_2() void receive_msg() { - ttff_msgbuf msg; - ttff_msgbuf msg_stop; - msg_stop.mtype = 1; - msg_stop.ttff = -200.0; - double ttff_msg = 0.0; - int msgrcv_size = sizeof(msg.ttff); - int msqid; - int msqid_stop = -1; - key_t key = 1101; - key_t key_stop = 1102; bool leave = false; while (!leave) { - // wait for the queue to be created - while ((msqid = msgget(key, 0644)) == -1) + // wait for the queues to be created + const std::string queue_name = "gnss_sdr_ttff_message_queue"; + std::unique_ptr d_mq; + bool queue_found = false; + while (!queue_found) { + try + { + // Attempt to open the message queue + d_mq = std::make_unique(boost::interprocess::open_only, queue_name.c_str()); + queue_found = true; // Queue found + } + catch (const boost::interprocess::interprocess_exception &) + { + // Queue not found, wait and retry + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } } - if (msgrcv(msqid, &msg, msgrcv_size, 1, 0) != -1) + const std::string queue_name_stop = "receiver_control_queue"; + std::unique_ptr d_mq_stop; + bool queue_found2 = false; + while (!queue_found2) { - ttff_msg = msg.ttff; - if ((ttff_msg != 0) && (ttff_msg != -1)) + try { - TTFF_v.push_back(ttff_msg); - LOG(INFO) << "Valid Time-To-First-Fix: " << ttff_msg << "[s]"; - // Stop the receiver - while (((msqid_stop = msgget(key_stop, 0644))) == -1) - { - } - double msgsend_size = sizeof(msg_stop.ttff); - msgsnd(msqid_stop, &msg_stop, msgsend_size, IPC_NOWAIT); + // Attempt to open the message queue + d_mq_stop = std::make_unique(boost::interprocess::open_only, queue_name_stop.c_str()); + queue_found2 = true; // Queue found } + catch (const boost::interprocess::interprocess_exception &) + { + // Queue not found, wait and retry + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } - if (std::abs(ttff_msg - (-1.0)) < 10 * std::numeric_limits::epsilon()) + double received_message; + unsigned int priority; + std::size_t received_size; + // Receive a message (non-blocking) + if (d_mq->try_receive(&received_message, sizeof(received_message), received_size, priority)) + { + // Validate the size of the received message + if (received_size == sizeof(double) && (received_message != 0) && (received_message != -1)) + { + TTFF_v.push_back(received_message); + LOG(INFO) << "Valid Time-To-First-Fix: " << received_message << " [s]"; + // Stop the receiver + double stop_message = -200.0; + d_mq_stop->send(&stop_message, sizeof(stop_message), 0); + } + if (received_size == sizeof(double) && std::abs(received_message - (-1.0)) < 10 * std::numeric_limits::epsilon()) { leave = true; } } + else + { + // No message available, add a small delay to prevent busy-waiting + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } } } @@ -715,23 +734,26 @@ int main(int argc, char **argv) } // Terminate the queue thread - key_t sysv_msg_key; - int sysv_msqid; - sysv_msg_key = 1101; - int msgflg = IPC_CREAT | 0666; - if ((sysv_msqid = msgget(sysv_msg_key, msgflg)) == -1) + const std::string queue_name = "gnss_sdr_ttff_message_queue"; + std::unique_ptr mq; + bool queue_found = false; + while (!queue_found) { - std::cout << "GNSS-SDR can not create message queues!\n"; - return 1; + try + { + // Attempt to open the message queue + mq = std::make_unique(boost::interprocess::open_only, queue_name.c_str()); + queue_found = true; // Queue found + } + catch (const boost::interprocess::interprocess_exception &) + { + // Queue not found, wait and retry + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } } - ttff_msgbuf msg; - msg.mtype = 1; - msg.ttff = -1; - int msgsend_size; - msgsend_size = sizeof(msg.ttff); - msgsnd(sysv_msqid, &msg, msgsend_size, IPC_NOWAIT); - receive_msg_thread.join(); - msgctl(sysv_msqid, IPC_RMID, nullptr); + double finish = -1.0; + mq->send(&finish, sizeof(finish), 0); + boost::interprocess::message_queue::remove(queue_name.c_str()); #if USE_GLOG_AND_GFLAGS gflags::ShutDownCommandLineFlags(); diff --git a/tests/unit-tests/control-plane/control_thread_test.cc b/tests/unit-tests/control-plane/control_thread_test.cc index 925ff9e57..8ad0100e5 100644 --- a/tests/unit-tests/control-plane/control_thread_test.cc +++ b/tests/unit-tests/control-plane/control_thread_test.cc @@ -26,15 +26,13 @@ #include "in_memory_configuration.h" #include #include +#include #include #include #include #include #include #include -#include -#include -#include #include #include @@ -49,35 +47,46 @@ class ControlThreadTest : public ::testing::Test { public: static int stop_receiver(); - typedef struct - { - long mtype; // required by SysV message - double message; - } message_buffer; }; int ControlThreadTest::stop_receiver() { - message_buffer msg_stop; - msg_stop.mtype = 1; - msg_stop.message = -200.0; - int msqid_stop = -1; - int msgsend_size = sizeof(msg_stop.message); - key_t key_stop = 1102; - - // wait for the receiver control queue to be created - while (((msqid_stop = msgget(key_stop, 0644))) == -1) + const std::string queue_name = "receiver_control_queue"; + std::unique_ptr d_mq; + try { + bool queue_found = false; + + while (!queue_found) + { + try + { + // Attempt to open the message queue + d_mq = std::make_unique(boost::interprocess::open_only, queue_name.c_str()); + queue_found = true; // Queue found + } + catch (const boost::interprocess::interprocess_exception&) + { + // Queue not found, wait and retry + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } + + + double stop_message = -200.0; + + // Wait for a couple of seconds before sending + std::this_thread::sleep_for(std::chrono::seconds(2)); + // Send the double value + d_mq->send(&stop_message, sizeof(stop_message), 0); // Priority 0 + return 0; + } + catch (const boost::interprocess::interprocess_exception& e) + { + std::cerr << "Failed to send stop message: " << e.what() << std::endl; + return -1; } - - // wait for a couple of seconds - std::this_thread::sleep_for(std::chrono::seconds(2)); - - // Stop the receiver - msgsnd(msqid_stop, &msg_stop, msgsend_size, IPC_NOWAIT); - - return 0; }