1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2025-01-23 07:27:05 +00:00

Replace the System V queues by boost::interprocess

This commit is contained in:
Carles Fernandez 2024-12-21 13:20:00 +01:00
parent 47e3cab802
commit cfb56d35d6
No known key found for this signature in database
GPG Key ID: 4C583C52B0C3877D
7 changed files with 222 additions and 175 deletions

View File

@ -106,6 +106,7 @@ All notable changes to GNSS-SDR will be documented in this file.
Accordingly, the GNSS-SDR building system now looks for OpenSSL in the first
place and, if not found, then it looks for GnuTLS as a fallback.
- Allow linking against Boost 1.87.0.
- Replace the System V queues by boost::interprocess, improving portability.
### Reliability

View File

@ -77,8 +77,6 @@
#include <locale> // for locale
#include <sstream> // for ostringstream
#include <stdexcept> // for length_error
#include <sys/ipc.h> // for IPC_CREAT
#include <sys/msg.h> // for msgctl
#include <typeinfo> // for std::type_info, typeid
#include <utility> // for pair
@ -130,6 +128,7 @@ rtklib_pvt_gs::rtklib_pvt_gs(uint32_t nchannels,
: gr::sync_block("rtklib_pvt_gs",
gr::io_signature::make(nchannels, nchannels, sizeof(Gnss_Synchro)),
gr::io_signature::make(0, 0, 0)),
d_queue_name("gnss_sdr_ttff_message_queue"),
d_dump_filename(conf_.dump_filename),
d_geohash(std::make_unique<Geohash>()),
d_gps_ephemeris_sptr_type_hash_code(typeid(std::shared_ptr<Gps_Ephemeris>).hash_code()),
@ -509,14 +508,26 @@ rtklib_pvt_gs::rtklib_pvt_gs(uint32_t nchannels,
d_eph_udp_sink_ptr = nullptr;
}
// Create Sys V message queue
// Create message queue
d_first_fix = true;
d_sysv_msg_key = 1101;
const int msgflg = IPC_CREAT | 0666;
if ((d_sysv_msqid = msgget(d_sysv_msg_key, msgflg)) == -1)
const std::size_t max_num_messages = 100;
try
{
std::cout << "GNSS-SDR cannot create System V message queues.\n";
LOG(WARNING) << "The System V message queue is not available. Error: " << errno << " - " << strerror(errno);
// Remove any existing queue with the same name
boost::interprocess::message_queue::remove(d_queue_name.c_str());
// Create a new message queue
d_mq = std::make_unique<boost::interprocess::message_queue>(
boost::interprocess::create_only, // Create a new queue
d_queue_name.c_str(), // Queue name
max_num_messages, // Maximum number of messages
sizeof(double) // Maximum message size
);
}
catch (const boost::interprocess::interprocess_exception& e)
{
std::cerr << "Error creating message queue: " << e.what() << std::endl;
}
// Display time in local time zone
@ -601,9 +612,9 @@ rtklib_pvt_gs::rtklib_pvt_gs(uint32_t nchannels,
rtklib_pvt_gs::~rtklib_pvt_gs()
{
DLOG(INFO) << "PVT block destructor called.";
if (d_sysv_msqid != -1)
if (d_mq)
{
msgctl(d_sysv_msqid, IPC_RMID, nullptr);
boost::interprocess::message_queue::remove(d_queue_name.c_str());
}
try
{
@ -1732,21 +1743,19 @@ void rtklib_pvt_gs::clear_ephemeris()
}
bool rtklib_pvt_gs::send_sys_v_ttff_msg(d_ttff_msgbuf ttff) const
bool rtklib_pvt_gs::send_ttff_msg(double ttff) const
{
if (d_sysv_msqid != -1)
if (d_mq)
{
// Fill Sys V message structures
int msgsend_size;
d_ttff_msgbuf msg;
msg.ttff = ttff.ttff;
msgsend_size = sizeof(msg.ttff);
msg.mtype = 1; // default message ID
// SEND SOLUTION OVER A MESSAGE QUEUE
// non-blocking Sys V message send
msgsnd(d_sysv_msqid, &msg, msgsend_size, IPC_NOWAIT);
return true;
try
{
d_mq->send(&ttff, sizeof(ttff), 0); // Priority 0
return true;
}
catch (const boost::interprocess::interprocess_exception& e)
{
std::cerr << "Error sending message: " << e.what() << std::endl;
}
}
return false;
}
@ -2389,12 +2398,10 @@ int rtklib_pvt_gs::work(int noutput_items, gr_vector_const_void_star& input_item
}
std::cout << " is Lat = " << d_user_pvt_solver->get_latitude() << " [deg], Long = " << d_user_pvt_solver->get_longitude()
<< " [deg], Height= " << d_user_pvt_solver->get_height() << " [m]\n";
d_ttff_msgbuf ttff;
ttff.mtype = 1;
d_end = std::chrono::system_clock::now();
std::chrono::duration<double> elapsed_seconds = d_end - d_start;
ttff.ttff = elapsed_seconds.count();
send_sys_v_ttff_msg(ttff);
double ttff = elapsed_seconds.count();
send_ttff_msg(ttff);
d_first_fix = false;
}
if (d_kml_output_enabled)

View File

@ -18,12 +18,14 @@
#define GNSS_SDR_RTKLIB_PVT_GS_H
#include "gnss_block_interface.h"
#include "gnss_sdr_make_unique.h"
#include "gnss_synchro.h"
#include "gnss_time.h"
#include "osnma_data.h"
#include "rtklib.h"
#include <boost/date_time/gregorian/gregorian.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <gnuradio/sync_block.h> // for sync_block
#include <gnuradio/types.h> // for gr_vector_const_void_star
#include <pmt/pmt.h> // for pmt_t
@ -37,7 +39,6 @@
#include <queue> // for std::queue
#include <set> // for std::set
#include <string> // for string
#include <sys/types.h> // for key_t
#include <vector> // for vector
/** \addtogroup PVT
@ -166,13 +167,7 @@ private:
std::vector<std::string> split_string(const std::string& s, char delim) const;
typedef struct
{
long mtype; // NOLINT(google-runtime-int)
double ttff;
} d_ttff_msgbuf;
bool send_sys_v_ttff_msg(d_ttff_msgbuf ttff) const;
bool send_ttff_msg(double ttff) const;
bool save_gnss_synchro_map_xml(const std::string& file_name); // debug helper function
bool load_gnss_synchro_map_xml(const std::string& file_name); // debug helper function
@ -181,6 +176,8 @@ private:
std::shared_ptr<Rtklib_Solver> d_internal_pvt_solver;
std::shared_ptr<Rtklib_Solver> d_user_pvt_solver;
std::unique_ptr<boost::interprocess::message_queue> d_mq;
std::unique_ptr<Rinex_Printer> d_rp;
std::unique_ptr<Kml_Printer> d_kml_dump;
std::unique_ptr<Gpx_Printer> d_gpx_dump;
@ -195,6 +192,7 @@ private:
std::chrono::time_point<std::chrono::system_clock> d_start;
std::chrono::time_point<std::chrono::system_clock> d_end;
std::string d_queue_name;
std::string d_dump_filename;
std::string d_xml_base_path;
std::string d_local_time_str;
@ -238,9 +236,6 @@ private:
uint64_t d_local_counter_ms;
uint64_t d_timestamp_rx_clock_offset_correction_msg_ms;
key_t d_sysv_msg_key;
int d_sysv_msqid;
int32_t d_rinexobs_rate_ms;
int32_t d_rtcm_MT1045_rate_ms; // Galileo Broadcast Ephemeris
int32_t d_rtcm_MT1019_rate_ms; // GPS Broadcast Ephemeris (orbits)

View File

@ -36,34 +36,34 @@
#include "gnss_flowgraph.h"
#include "gnss_satellite.h"
#include "gnss_sdr_flags.h"
#include "gps_acq_assist.h" // for Gps_Acq_Assist
#include "gps_almanac.h" // for Gps_Almanac
#include "gps_cnav_ephemeris.h" // for Gps_CNAV_Ephemeris
#include "gps_cnav_utc_model.h" // for Gps_CNAV_Utc_Model
#include "gps_ephemeris.h" // for Gps_Ephemeris
#include "gps_iono.h" // for Gps_Iono
#include "gps_utc_model.h" // for Gps_Utc_Model
#include "pvt_interface.h" // for PvtInterface
#include "rtklib.h" // for gtime_t, alm_t
#include "rtklib_conversions.h" // for alm_to_rtklib
#include "rtklib_ephemeris.h" // for alm2pos, eph2pos
#include "rtklib_rtkcmn.h" // for utc2gpst
#include <armadillo> // for interaction with geofunctions
#include <boost/lexical_cast.hpp> // for bad_lexical_cast
#include <pmt/pmt.h> // for make_any
#include <algorithm> // for find, min
#include <chrono> // for milliseconds
#include <cmath> // for floor, fmod, log
#include <csignal> // for signal, SIGINT
#include <ctime> // for time_t, gmtime, strftime
#include <exception> // for exception
#include <iostream> // for operator<<
#include <limits> // for numeric_limits
#include <map> // for map
#include <pthread.h> // for pthread_cancel
#include <stdexcept> // for invalid_argument
#include <sys/ipc.h> // for IPC_CREAT
#include <sys/msg.h> // for msgctl, msgget
#include "gnss_sdr_make_unique.h"
#include "gps_acq_assist.h" // for Gps_Acq_Assist
#include "gps_almanac.h" // for Gps_Almanac
#include "gps_cnav_ephemeris.h" // for Gps_CNAV_Ephemeris
#include "gps_cnav_utc_model.h" // for Gps_CNAV_Utc_Model
#include "gps_ephemeris.h" // for Gps_Ephemeris
#include "gps_iono.h" // for Gps_Iono
#include "gps_utc_model.h" // for Gps_Utc_Model
#include "pvt_interface.h" // for PvtInterface
#include "rtklib.h" // for gtime_t, alm_t
#include "rtklib_conversions.h" // for alm_to_rtklib
#include "rtklib_ephemeris.h" // for alm2pos, eph2pos
#include "rtklib_rtkcmn.h" // for utc2gpst
#include <armadillo> // for interaction with geofunctions
#include <boost/interprocess/ipc/message_queue.hpp> // for message_queue
#include <boost/lexical_cast.hpp> // for bad_lexical_cast
#include <pmt/pmt.h> // for make_any
#include <algorithm> // for find, min
#include <chrono> // for milliseconds
#include <cmath> // for floor, fmod, log
#include <csignal> // for signal, SIGINT
#include <ctime> // for time_t, gmtime, strftime
#include <exception> // for exception
#include <iostream> // for operator<<
#include <limits> // for numeric_limits
#include <map> // for map
#include <pthread.h> // for pthread_cancel
#include <stdexcept> // for invalid_argument
#if USE_GLOG_AND_GFLAGS
#include <glog/logging.h>
@ -197,7 +197,7 @@ void ControlThread::init()
supl_mns_ = 0;
supl_lac_ = 0;
supl_ci_ = 0;
msqid_ = -1;
agnss_ref_location_ = Agnss_Ref_Location();
agnss_ref_time_ = Agnss_Ref_Time();
@ -273,14 +273,11 @@ void ControlThread::init()
ControlThread::~ControlThread() // NOLINT(modernize-use-equals-default)
{
DLOG(INFO) << "Control Thread destructor called";
if (msqid_ != -1)
{
msgctl(msqid_, IPC_RMID, nullptr);
}
boost::interprocess::message_queue::remove(control_message_queue_name_.c_str());
if (sysv_queue_thread_.joinable())
if (message_queue_thread_.joinable())
{
sysv_queue_thread_.join();
message_queue_thread_.join();
}
if (cmd_interface_thread_.joinable())
@ -429,7 +426,7 @@ int ControlThread::run()
{
keyboard_thread_ = std::thread(&ControlThread::keyboard_listener, this);
}
sysv_queue_thread_ = std::thread(&ControlThread::sysv_queue_listener, this);
message_queue_thread_ = std::thread(&ControlThread::message_queue_listener, this);
// start the telecommand listener thread
cmd_interface_.set_pvt(flowgraph_->get_pvt());
@ -1214,40 +1211,55 @@ void ControlThread::gps_acq_assist_data_collector() const
}
void ControlThread::sysv_queue_listener()
void ControlThread::message_queue_listener()
{
typedef struct
{
long mtype; // NOLINT(google-runtime-int) required by SysV queue messaging
double stop_message;
} stop_msgbuf;
bool read_queue = true;
stop_msgbuf msg;
double received_message = 0.0;
const int msgrcv_size = sizeof(msg.stop_message);
const key_t key = 1102;
if ((msqid_ = msgget(key, 0644 | IPC_CREAT)) == -1)
try
{
std::cerr << "GNSS-SDR cannot create SysV message queues\n";
read_queue = false;
}
// Remove any existing queue with the same name
boost::interprocess::message_queue::remove(control_message_queue_name_.c_str());
while (read_queue && !stop_)
{
if (msgrcv(msqid_, &msg, msgrcv_size, 1, 0) != -1)
// Create a new message queue
auto mq = std::make_unique<boost::interprocess::message_queue>(
boost::interprocess::create_only, // Create a new queue
control_message_queue_name_.c_str(), // Queue name
10, // Maximum number of messages
sizeof(double) // Maximum message size
);
while (read_queue && !stop_)
{
received_message = msg.stop_message;
if ((std::abs(received_message - (-200.0)) < 10 * std::numeric_limits<double>::epsilon()))
unsigned int priority;
std::size_t received_size;
// Receive a message (non-blocking)
if (mq->try_receive(&received_message, sizeof(received_message), received_size, priority))
{
std::cout << "Quit order received, stopping GNSS-SDR !!\n";
control_queue_->push(pmt::make_any(command_event_make(200, 0)));
read_queue = false;
// Validate the size of the received message
if (received_size == sizeof(double))
{
if (std::abs(received_message - (-200.0)) < 10 * std::numeric_limits<double>::epsilon())
{
std::cout << "Quit order received, stopping GNSS-SDR !!\n";
// Push the control command to the control queue
control_queue_->push(pmt::make_any(command_event_make(200, 0)));
read_queue = false;
}
}
}
else
{
// No message available, add a small delay to prevent busy-waiting
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
}
catch (const boost::interprocess::interprocess_exception &e)
{
std::cerr << "Error in message queue listener: " << e.what() << '\n';
read_queue = false;
}
}

View File

@ -160,7 +160,7 @@ private:
void telecommand_listener();
void keyboard_listener();
void sysv_queue_listener();
void message_queue_listener();
void print_help_at_exit() const;
// default filename for assistance data
@ -179,6 +179,8 @@ private:
const std::string gal_almanac_default_xml_filename_ = "./gal_almanac.xml";
const std::string gps_almanac_default_xml_filename_ = "./gps_almanac.xml";
const std::string control_message_queue_name_ = "receiver_control_queue";
const size_t channel_event_type_hash_code_ = typeid(channel_event_sptr).hash_code();
const size_t command_event_type_hash_code_ = typeid(command_event_sptr).hash_code();
@ -188,7 +190,7 @@ private:
std::thread cmd_interface_thread_;
std::thread keyboard_thread_;
std::thread sysv_queue_thread_;
std::thread message_queue_thread_;
std::thread gps_acq_assist_data_collector_thread_;
#ifdef ENABLE_FPGA
@ -210,7 +212,6 @@ private:
unsigned int processed_control_messages_;
unsigned int applied_actions_;
int msqid_;
bool well_formatted_configuration_;
bool conf_file_has_section_;

View File

@ -21,11 +21,13 @@
#include "control_thread.h"
#include "file_configuration.h"
#include "gnss_flowgraph.h"
#include "gnss_sdr_make_unique.h"
#include "gps_acq_assist.h"
#include "in_memory_configuration.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/exception/diagnostic_information.hpp>
#include <boost/exception/exception.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <gtest/gtest.h>
#include <cerrno>
#include <chrono>
@ -35,9 +37,6 @@
#include <numeric>
#include <random>
#include <string>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <thread>
#if USE_GLOG_AND_GFLAGS
@ -79,13 +78,6 @@ Concurrent_Map<Gps_Acq_Assist> global_gps_acq_assist_map;
std::vector<double> TTFF_v;
typedef struct
{
long mtype; // required by SysV message
double ttff;
} ttff_msgbuf;
class TtffTest : public ::testing::Test
{
public:
@ -301,45 +293,72 @@ void TtffTest::config_2()
void receive_msg()
{
ttff_msgbuf msg;
ttff_msgbuf msg_stop;
msg_stop.mtype = 1;
msg_stop.ttff = -200.0;
double ttff_msg = 0.0;
int msgrcv_size = sizeof(msg.ttff);
int msqid;
int msqid_stop = -1;
key_t key = 1101;
key_t key_stop = 1102;
bool leave = false;
while (!leave)
{
// wait for the queue to be created
while ((msqid = msgget(key, 0644)) == -1)
// wait for the queues to be created
const std::string queue_name = "gnss_sdr_ttff_message_queue";
std::unique_ptr<boost::interprocess::message_queue> d_mq;
bool queue_found = false;
while (!queue_found)
{
try
{
// Attempt to open the message queue
d_mq = std::make_unique<boost::interprocess::message_queue>(boost::interprocess::open_only, queue_name.c_str());
queue_found = true; // Queue found
}
catch (const boost::interprocess::interprocess_exception &)
{
// Queue not found, wait and retry
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
if (msgrcv(msqid, &msg, msgrcv_size, 1, 0) != -1)
const std::string queue_name_stop = "receiver_control_queue";
std::unique_ptr<boost::interprocess::message_queue> d_mq_stop;
bool queue_found2 = false;
while (!queue_found2)
{
ttff_msg = msg.ttff;
if ((ttff_msg != 0) && (ttff_msg != -1))
try
{
TTFF_v.push_back(ttff_msg);
LOG(INFO) << "Valid Time-To-First-Fix: " << ttff_msg << "[s]";
// Stop the receiver
while (((msqid_stop = msgget(key_stop, 0644))) == -1)
{
}
double msgsend_size = sizeof(msg_stop.ttff);
msgsnd(msqid_stop, &msg_stop, msgsend_size, IPC_NOWAIT);
// Attempt to open the message queue
d_mq_stop = std::make_unique<boost::interprocess::message_queue>(boost::interprocess::open_only, queue_name_stop.c_str());
queue_found2 = true; // Queue found
}
catch (const boost::interprocess::interprocess_exception &)
{
// Queue not found, wait and retry
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
if (std::abs(ttff_msg - (-1.0)) < 10 * std::numeric_limits<double>::epsilon())
double received_message;
unsigned int priority;
std::size_t received_size;
// Receive a message (non-blocking)
if (d_mq->try_receive(&received_message, sizeof(received_message), received_size, priority))
{
// Validate the size of the received message
if (received_size == sizeof(double) && (received_message != 0) && (received_message != -1))
{
TTFF_v.push_back(received_message);
LOG(INFO) << "Valid Time-To-First-Fix: " << received_message << " [s]";
// Stop the receiver
double stop_message = -200.0;
d_mq_stop->send(&stop_message, sizeof(stop_message), 0);
}
if (received_size == sizeof(double) && std::abs(received_message - (-1.0)) < 10 * std::numeric_limits<double>::epsilon())
{
leave = true;
}
}
else
{
// No message available, add a small delay to prevent busy-waiting
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
}
@ -715,23 +734,26 @@ int main(int argc, char **argv)
}
// Terminate the queue thread
key_t sysv_msg_key;
int sysv_msqid;
sysv_msg_key = 1101;
int msgflg = IPC_CREAT | 0666;
if ((sysv_msqid = msgget(sysv_msg_key, msgflg)) == -1)
const std::string queue_name = "gnss_sdr_ttff_message_queue";
std::unique_ptr<boost::interprocess::message_queue> mq;
bool queue_found = false;
while (!queue_found)
{
std::cout << "GNSS-SDR can not create message queues!\n";
return 1;
try
{
// Attempt to open the message queue
mq = std::make_unique<boost::interprocess::message_queue>(boost::interprocess::open_only, queue_name.c_str());
queue_found = true; // Queue found
}
catch (const boost::interprocess::interprocess_exception &)
{
// Queue not found, wait and retry
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
ttff_msgbuf msg;
msg.mtype = 1;
msg.ttff = -1;
int msgsend_size;
msgsend_size = sizeof(msg.ttff);
msgsnd(sysv_msqid, &msg, msgsend_size, IPC_NOWAIT);
receive_msg_thread.join();
msgctl(sysv_msqid, IPC_RMID, nullptr);
double finish = -1.0;
mq->send(&finish, sizeof(finish), 0);
boost::interprocess::message_queue::remove(queue_name.c_str());
#if USE_GLOG_AND_GFLAGS
gflags::ShutDownCommandLineFlags();

View File

@ -26,15 +26,13 @@
#include "in_memory_configuration.h"
#include <boost/exception/diagnostic_information.hpp>
#include <boost/exception_ptr.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/lexical_cast.hpp>
#include <gtest/gtest.h>
#include <pmt/pmt.h>
#include <chrono>
#include <exception>
#include <memory>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <thread>
#include <unistd.h>
@ -49,35 +47,46 @@ class ControlThreadTest : public ::testing::Test
{
public:
static int stop_receiver();
typedef struct
{
long mtype; // required by SysV message
double message;
} message_buffer;
};
int ControlThreadTest::stop_receiver()
{
message_buffer msg_stop;
msg_stop.mtype = 1;
msg_stop.message = -200.0;
int msqid_stop = -1;
int msgsend_size = sizeof(msg_stop.message);
key_t key_stop = 1102;
// wait for the receiver control queue to be created
while (((msqid_stop = msgget(key_stop, 0644))) == -1)
const std::string queue_name = "receiver_control_queue";
std::unique_ptr<boost::interprocess::message_queue> d_mq;
try
{
bool queue_found = false;
while (!queue_found)
{
try
{
// Attempt to open the message queue
d_mq = std::make_unique<boost::interprocess::message_queue>(boost::interprocess::open_only, queue_name.c_str());
queue_found = true; // Queue found
}
catch (const boost::interprocess::interprocess_exception&)
{
// Queue not found, wait and retry
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
double stop_message = -200.0;
// Wait for a couple of seconds before sending
std::this_thread::sleep_for(std::chrono::seconds(2));
// Send the double value
d_mq->send(&stop_message, sizeof(stop_message), 0); // Priority 0
return 0;
}
catch (const boost::interprocess::interprocess_exception& e)
{
std::cerr << "Failed to send stop message: " << e.what() << std::endl;
return -1;
}
// wait for a couple of seconds
std::this_thread::sleep_for(std::chrono::seconds(2));
// Stop the receiver
msgsnd(msqid_stop, &msg_stop, msgsend_size, IPC_NOWAIT);
return 0;
}