1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2025-01-16 12:12:57 +00:00

Improved handling of threads

Some threads were not terminating properly, triggering a failure of
control_thread_test in some configurations.
This commit is contained in:
Carles Fernandez 2015-12-30 14:43:32 +01:00
parent 1b16fb6dfb
commit c5407a5106
3 changed files with 45 additions and 46 deletions

View File

@ -18,10 +18,6 @@
set(CHANNEL_ADAPTER_SOURCES channel.cc)
if(Boost_VERSION LESS 105000)
add_definitions(-DOLD_BOOST=1)
endif(Boost_VERSION LESS 105000)
include_directories(
$(CMAKE_CURRENT_SOURCE_DIR)
${CMAKE_SOURCE_DIR}/src/core/system_parameters

View File

@ -226,8 +226,8 @@ void Channel::standby()
*/
void Channel::stop()
{
channel_internal_queue_.push(0); //message to stop channel
stop_ = true;
channel_internal_queue_.push(0); //message to stop channel
/* When the boost::thread object that represents a thread of execution
* is destroyed the thread becomes detached. Once a thread is detached,
* it will continue executing until the invocation of the function or
@ -237,15 +237,8 @@ void Channel::stop()
* the boost::thread object must be used. join() will block the calling
* thread until the thread represented by the boost::thread object
* has completed.
* NOTE: timed_join() is deprecated and only present up to Boost 1.56
* try_join_until() was introduced in Boost 1.50
*/
#ifdef OLD_BOOST
ch_thread_.timed_join(boost::posix_time::seconds(1));
#endif
#ifndef OLD_BOOST
ch_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
#endif
ch_thread_.join();
}

View File

@ -174,41 +174,39 @@ void ControlThread::run()
}
std::cout << "Stopping GNSS-SDR, please wait!" << std::endl;
flowgraph_->stop();
stop_ = true;
// sending empty data to terminate the threads
global_gps_ephemeris_queue.push(Gps_Ephemeris());
global_gps_iono_queue.push(Gps_Iono());
global_gps_utc_model_queue.push(Gps_Utc_Model());
global_gps_almanac_queue.push(Gps_Almanac());
global_gps_acq_assist_queue.push(Gps_Acq_Assist());
global_gps_ref_location_queue.push(Gps_Ref_Location());
global_gps_ref_time_queue.push(Gps_Ref_Time());
global_galileo_ephemeris_queue.push(Galileo_Ephemeris());
global_galileo_iono_queue.push(Galileo_Iono());
global_galileo_utc_model_queue.push(Galileo_Utc_Model());
global_galileo_almanac_queue.push(Galileo_Almanac());
#ifdef OLD_BOOST
// Join GPS threads
gps_ephemeris_data_collector_thread_.timed_join(boost::posix_time::seconds(1));
gps_iono_data_collector_thread_.timed_join(boost::posix_time::seconds(1));
gps_utc_model_data_collector_thread_.timed_join(boost::posix_time::seconds(1));
gps_acq_assist_data_collector_thread_.timed_join(boost::posix_time::seconds(1));
gps_ref_location_data_collector_thread_.timed_join(boost::posix_time::seconds(1));
gps_ref_time_data_collector_thread_.timed_join(boost::posix_time::seconds(1));
gps_ephemeris_data_collector_thread_.join();
gps_iono_data_collector_thread_.join();
gps_utc_model_data_collector_thread_.join();
gps_acq_assist_data_collector_thread_.join();
gps_ref_location_data_collector_thread_.join();
gps_ref_time_data_collector_thread_.join();
//Join Galileo threads
galileo_ephemeris_data_collector_thread_.timed_join(boost::posix_time::seconds(1));
galileo_iono_data_collector_thread_.timed_join(boost::posix_time::seconds(1));
galileo_almanac_data_collector_thread_.timed_join(boost::posix_time::seconds(1));
galileo_utc_model_data_collector_thread_.timed_join(boost::posix_time::seconds(1));
galileo_ephemeris_data_collector_thread_.join();
galileo_iono_data_collector_thread_.join();
galileo_almanac_data_collector_thread_.join();
galileo_utc_model_data_collector_thread_.join();
//Join keyboard threads
//Join keyboard thread
#ifdef OLD_BOOST
keyboard_thread_.timed_join(boost::posix_time::seconds(1));
#endif
#ifndef OLD_BOOST
// Join GPS threads
gps_ephemeris_data_collector_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
gps_iono_data_collector_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
gps_utc_model_data_collector_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
gps_acq_assist_data_collector_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
gps_ref_location_data_collector_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
gps_ref_time_data_collector_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
//Join Galileo threads
galileo_ephemeris_data_collector_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
galileo_iono_data_collector_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
galileo_almanac_data_collector_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
galileo_utc_model_data_collector_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
//Join keyboard threads
keyboard_thread_.try_join_until(boost::chrono::steady_clock::now() + boost::chrono::milliseconds(50));
#endif
@ -613,6 +611,7 @@ void ControlThread::gps_acq_assist_data_collector()
while(stop_ == false)
{
global_gps_acq_assist_queue.wait_and_pop(gps_acq);
if(gps_acq.i_satellite_PRN == 0) break;
// DEBUG MESSAGE
std::cout << "Acquisition assistance record has arrived from SAT ID "
@ -645,6 +644,7 @@ void ControlThread::gps_ephemeris_data_collector()
while(stop_ == false)
{
global_gps_ephemeris_queue.wait_and_pop(gps_eph);
if(gps_eph.i_satellite_PRN == 0) break;
// DEBUG MESSAGE
LOG(INFO) << "Ephemeris record has arrived from SAT ID "
@ -692,6 +692,7 @@ void ControlThread::galileo_ephemeris_data_collector()
while(stop_ == false)
{
global_galileo_ephemeris_queue.wait_and_pop(galileo_eph);
if(galileo_eph.SV_ID_PRN_4 == 0) break;
// DEBUG MESSAGE
std::cout << "Galileo Ephemeris record has arrived from SAT ID "
@ -735,6 +736,7 @@ void ControlThread::galileo_ephemeris_data_collector()
}
void ControlThread::gps_iono_data_collector()
{
// ############ 1.bis READ EPHEMERIS/UTC_MODE/IONO QUEUE ####################
@ -742,13 +744,16 @@ void ControlThread::gps_iono_data_collector()
while(stop_ == false)
{
global_gps_iono_queue.wait_and_pop(gps_iono);
if(gps_iono.valid == true)
{
LOG(INFO) << "New IONO record has arrived ";
}
// there is no timestamp for the iono data, new entries must always be added
global_gps_iono_map.write(0, gps_iono);
}
}
void ControlThread::galileo_almanac_data_collector()
{
// ############ 1.bis READ ALMANAC QUEUE ####################
@ -756,8 +761,10 @@ void ControlThread::galileo_almanac_data_collector()
while(stop_ == false)
{
global_galileo_almanac_queue.wait_and_pop(galileo_almanac);
if(galileo_almanac.WN_a_7 != 0.0)
{
LOG(INFO) << "New galileo_almanac record has arrived ";
}
// there is no timestamp for the galileo_almanac data, new entries must always be added
global_galileo_almanac_map.write(0, galileo_almanac);
}
@ -772,7 +779,10 @@ void ControlThread::galileo_iono_data_collector()
global_galileo_iono_queue.wait_and_pop(galileo_iono);
// DEBUG MESSAGE
if(galileo_iono.WN_5 != 0)
{
LOG(INFO) << "Iono record has arrived";
}
// insert new Iono record to the global Iono map
if (global_galileo_iono_map.read(0, galileo_iono_old))
@ -936,7 +946,7 @@ void ControlThread::keyboard_listener()
{
bool read_keys = true;
char c = '0';
while(read_keys)
while(read_keys && !stop_)
{
std::cin.get(c);
if (c == 'q')