Add mutex to protect list of available signals

Disconnect the flowgraph when leaving
This commit is contained in:
Carles Fernandez 2018-04-22 19:49:13 +02:00
parent 2cd1bed90c
commit a584e8e51d
No known key found for this signature in database
GPG Key ID: 4C583C52B0C3877D
2 changed files with 206 additions and 16 deletions

View File

@ -58,7 +58,13 @@ GNSSFlowgraph::GNSSFlowgraph(std::shared_ptr<ConfigurationInterface> configurati
} }
GNSSFlowgraph::~GNSSFlowgraph() {} GNSSFlowgraph::~GNSSFlowgraph()
{
if (connected_)
{
GNSSFlowgraph::disconnect();
}
}
void GNSSFlowgraph::start() void GNSSFlowgraph::start()
@ -376,6 +382,183 @@ void GNSSFlowgraph::connect()
} }
void GNSSFlowgraph::disconnect()
{
LOG(INFO) << "Disconnecting flowgraph";
if (!connected_)
{
LOG(INFO) << "flowgraph was not connected";
return;
}
// Signal Source (i) > Signal conditioner (i) >
int RF_Channels = 0;
int signal_conditioner_ID = 0;
for (int i = 0; i < sources_count_; i++)
{
try
{
// TODO: Remove this array implementation and create generic multistream connector
// (if a signal source has more than 1 stream, then connect it to the multistream signal conditioner)
if (sig_source_.at(i)->implementation().compare("Raw_Array_Signal_Source") == 0)
{
//Multichannel Array
for (int j = 0; j < GNSS_SDR_ARRAY_SIGNAL_CONDITIONER_CHANNELS; j++)
{
top_block_->disconnect(sig_source_.at(i)->get_right_block(), j, sig_conditioner_.at(i)->get_left_block(), j);
}
}
else
{
// TODO: Create a class interface for SignalSources, derived from GNSSBlockInterface.
// Include GetRFChannels in the interface to avoid read config parameters here
// read the number of RF channels for each front-end
RF_Channels = configuration_->property(sig_source_.at(i)->role() + ".RF_channels", 1);
for (int j = 0; j < RF_Channels; j++)
{
if (sig_source_.at(i)->get_right_block()->output_signature()->max_streams() > 1)
{
top_block_->disconnect(sig_source_.at(i)->get_right_block(), j, sig_conditioner_.at(signal_conditioner_ID)->get_left_block(), 0);
}
else
{
if (j == 0)
{
// RF_channel 0 backward compatibility with single channel sources
top_block_->disconnect(sig_source_.at(i)->get_right_block(), 0, sig_conditioner_.at(signal_conditioner_ID)->get_left_block(), 0);
}
else
{
// Multiple channel sources using multiple output blocks of single channel (requires RF_channel selector in call)
top_block_->disconnect(sig_source_.at(i)->get_right_block(j), 0, sig_conditioner_.at(signal_conditioner_ID)->get_left_block(), 0);
}
}
signal_conditioner_ID++;
}
}
}
catch (const std::exception& e)
{
LOG(INFO) << "Can't disconnect signal source " << i << " to signal conditioner " << i << ": " << e.what();
}
}
try
{
top_block_->disconnect(sig_conditioner_.at(0)->get_right_block(), 0, ch_out_sample_counter, 0);
top_block_->disconnect(ch_out_sample_counter, 0, observables_->get_left_block(), channels_count_); //extra port for the sample counter pulse
}
catch (const std::exception& e)
{
LOG(INFO) << "Can't disconnect sample counter: " << e.what();
}
// Signal conditioner (selected_signal_source) >> channels (i) (dependent of their associated SignalSource_ID)
int selected_signal_conditioner_ID;
for (unsigned int i = 0; i < channels_count_; i++)
{
selected_signal_conditioner_ID = configuration_->property("Channel" + boost::lexical_cast<std::string>(i) + ".RF_channel_ID", 0);
try
{
top_block_->disconnect(sig_conditioner_.at(selected_signal_conditioner_ID)->get_right_block(), 0,
channels_.at(i)->get_left_block(), 0);
}
catch (const std::exception& e)
{
LOG(INFO) << "Can't disconnect signal conditioner " << selected_signal_conditioner_ID << " to channel " << i << ": " << e.what();
}
// Signal Source > Signal conditioner >> Channels >> Observables
try
{
top_block_->disconnect(channels_.at(i)->get_right_block(), 0,
observables_->get_left_block(), i);
}
catch (const std::exception& e)
{
LOG(INFO) << "Can't disconnect channel " << i << " to observables: " << e.what();
}
}
try
{
for (unsigned int i = 0; i < channels_count_; i++)
{
top_block_->disconnect(observables_->get_right_block(), i, pvt_->get_left_block(), i);
top_block_->msg_disconnect(channels_.at(i)->get_right_block(), pmt::mp("telemetry"), pvt_->get_left_block(), pmt::mp("telemetry"));
}
}
catch (const std::exception& e)
{
LOG(INFO) << "Can't disconnect observables to PVT: " << e.what();
}
for (int i = 0; i < sources_count_; i++)
{
try
{
sig_source_.at(i)->disconnect(top_block_);
}
catch (const std::exception& e)
{
LOG(INFO) << "Can't disconnect signal source block " << i << " internally: " << e.what();
}
}
// Signal Source > Signal conditioner >
for (unsigned int i = 0; i < sig_conditioner_.size(); i++)
{
try
{
sig_conditioner_.at(i)->disconnect(top_block_);
}
catch (const std::exception& e)
{
LOG(INFO) << "Can't disconnect signal conditioner block " << i << " internally: " << e.what();
}
}
for (unsigned int i = 0; i < channels_count_; i++)
{
try
{
channels_.at(i)->disconnect(top_block_);
}
catch (const std::exception& e)
{
LOG(INFO) << "Can't disconnect channel " << i << " internally: " << e.what();
}
}
try
{
observables_->disconnect(top_block_);
}
catch (const std::exception& e)
{
LOG(INFO) << "Can't disconnect observables block internally: " << e.what();
}
// Signal Source > Signal conditioner >> Channels >> Observables > PVT
try
{
pvt_->disconnect(top_block_);
}
catch (const std::exception& e)
{
LOG(INFO) << "Can't disconnect PVT block internally: " << e.what();
}
DLOG(INFO) << "blocks disconnected internally";
connected_ = false;
LOG(INFO) << "Flowgraph disconnected";
}
void GNSSFlowgraph::wait() void GNSSFlowgraph::wait()
{ {
if (!running_) if (!running_)
@ -411,18 +594,19 @@ void GNSSFlowgraph::apply_action(unsigned int who, unsigned int what)
switch (what) switch (what)
{ {
case 0: case 0:
DLOG(INFO) << "Channel " << who << " ACQ FAILED satellite " << channels_.at(who)->get_signal().get_satellite() << ", Signal " << channels_.at(who)->get_signal().get_signal_str(); DLOG(INFO) << "Channel " << who << " ACQ FAILED satellite " << channels_[who]->get_signal().get_satellite() << ", Signal " << channels_[who]->get_signal().get_signal_str();
if (sat == 0) if (sat == 0)
{ {
available_GNSS_signals_.push_back(channels_.at(who)->get_signal()); std::lock_guard<std::mutex> lock(signal_list_mutex);
channels_.at(who)->set_signal(search_next_signal(channels_.at(who)->get_signal().get_signal_str(), true)); available_GNSS_signals_.push_back(channels_[who]->get_signal());
channels_[who]->set_signal(search_next_signal(channels_[who]->get_signal().get_signal_str(), true));
} }
DLOG(INFO) << "Channel " << who << " Starting acquisition " << channels_.at(who)->get_signal().get_satellite() << ", Signal " << channels_.at(who)->get_signal().get_signal_str(); DLOG(INFO) << "Channel " << who << " Starting acquisition " << channels_[who]->get_signal().get_satellite() << ", Signal " << channels_[who]->get_signal().get_signal_str();
channels_.at(who)->start_acquisition(); channels_[who]->start_acquisition();
break; break;
case 1: case 1:
LOG(INFO) << "Channel " << who << " ACQ SUCCESS satellite " << channels_.at(who)->get_signal().get_satellite(); LOG(INFO) << "Channel " << who << " ACQ SUCCESS satellite " << channels_[who]->get_signal().get_satellite();
channels_state_[who] = 2; channels_state_[who] = 2;
acq_channels_count_--; acq_channels_count_--;
for (unsigned int i = 0; i < channels_count_; i++) for (unsigned int i = 0; i < channels_count_; i++)
@ -433,26 +617,27 @@ void GNSSFlowgraph::apply_action(unsigned int who, unsigned int what)
channels_state_[i] = 1; channels_state_[i] = 1;
if (sat_ == 0) if (sat_ == 0)
{ {
channels_.at(i)->set_signal(search_next_signal(channels_.at(i)->get_signal().get_signal_str(), true)); std::lock_guard<std::mutex> lock(signal_list_mutex);
channels_[i]->set_signal(search_next_signal(channels_[i]->get_signal().get_signal_str(), true));
} }
acq_channels_count_++; acq_channels_count_++;
DLOG(INFO) << "Channel " << i << " Starting acquisition " << channels_.at(i)->get_signal().get_satellite() << ", Signal " << channels_.at(i)->get_signal().get_signal_str(); DLOG(INFO) << "Channel " << i << " Starting acquisition " << channels_[i]->get_signal().get_satellite() << ", Signal " << channels_[i]->get_signal().get_signal_str();
channels_.at(i)->start_acquisition(); channels_[i]->start_acquisition();
} }
DLOG(INFO) << "Channel " << i << " in state " << channels_state_[i]; DLOG(INFO) << "Channel " << i << " in state " << channels_state_[i];
} }
break; break;
case 2: case 2:
LOG(INFO) << "Channel " << who << " TRK FAILED satellite " << channels_.at(who)->get_signal().get_satellite(); LOG(INFO) << "Channel " << who << " TRK FAILED satellite " << channels_[who]->get_signal().get_satellite();
DLOG(INFO) << "Number of channels in acquisition = " << acq_channels_count_; DLOG(INFO) << "Number of channels in acquisition = " << acq_channels_count_;
if (acq_channels_count_ < max_acq_channels_) if (acq_channels_count_ < max_acq_channels_)
{ {
channels_state_[who] = 1; channels_state_[who] = 1;
acq_channels_count_++; acq_channels_count_++;
LOG(INFO) << "Channel " << who << " Starting acquisition " << channels_.at(who)->get_signal().get_satellite() << ", Signal " << channels_.at(who)->get_signal().get_signal_str(); LOG(INFO) << "Channel " << who << " Starting acquisition " << channels_[who]->get_signal().get_satellite() << ", Signal " << channels_[who]->get_signal().get_signal_str();
channels_.at(who)->start_acquisition(); channels_[who]->start_acquisition();
} }
else else
{ {
@ -460,7 +645,8 @@ void GNSSFlowgraph::apply_action(unsigned int who, unsigned int what)
LOG(INFO) << "Channel " << who << " Idle state"; LOG(INFO) << "Channel " << who << " Idle state";
if (sat == 0) if (sat == 0)
{ {
available_GNSS_signals_.push_back(channels_.at(who)->get_signal()); std::lock_guard<std::mutex> lock(signal_list_mutex);
available_GNSS_signals_.push_back(channels_[who]->get_signal());
} }
} }
break; break;

View File

@ -43,6 +43,7 @@
#include <gnuradio/msg_queue.h> #include <gnuradio/msg_queue.h>
#include <list> #include <list>
#include <memory> #include <memory>
#include <mutex>
#include <queue> #include <queue>
#include <string> #include <string>
#include <vector> #include <vector>
@ -67,9 +68,9 @@ public:
GNSSFlowgraph(std::shared_ptr<ConfigurationInterface> configuration, gr::msg_queue::sptr queue); GNSSFlowgraph(std::shared_ptr<ConfigurationInterface> configuration, gr::msg_queue::sptr queue);
/*! /*!
* \brief Virtual destructor * \brief Destructor
*/ */
virtual ~GNSSFlowgraph(); ~GNSSFlowgraph();
//! \brief Start the flow graph //! \brief Start the flow graph
void start(); void start();
@ -84,6 +85,8 @@ public:
*/ */
void connect(); void connect();
void disconnect();
void wait(); void wait();
/*! /*!
@ -147,6 +150,7 @@ private:
gr::msg_queue::sptr queue_; gr::msg_queue::sptr queue_;
std::list<Gnss_Signal> available_GNSS_signals_; std::list<Gnss_Signal> available_GNSS_signals_;
std::vector<unsigned int> channels_state_; std::vector<unsigned int> channels_state_;
std::mutex signal_list_mutex;
}; };
#endif /*GNSS_SDR_GNSS_FLOWGRAPH_H_*/ #endif /*GNSS_SDR_GNSS_FLOWGRAPH_H_*/