1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2026-06-29 16:08:52 +00:00

Fix valve stream signatures for multistream sources

This commit is contained in:
Carles Fernandez
2026-06-20 01:47:12 +02:00
parent 00190135a6
commit 7c4ece6ca9
6 changed files with 85 additions and 16 deletions
@@ -570,7 +570,15 @@ gnss_shared_ptr<gr::block> FileSourceBase::create_valve()
{
// if a number of samples is specified, honor it by creating a valve
// in practice, this is always true
valve_ = gnss_sdr_make_valve(source_item_size(), samples(), queue_);
const auto source_block = source();
const auto source_output_signature = source_block->output_signature();
auto n_streams = 1;
if (source_output_signature->min_streams() == source_output_signature->max_streams() &&
source_output_signature->min_streams() > 0)
{
n_streams = source_output_signature->min_streams();
}
valve_ = gnss_sdr_make_valve(source_output_signature->sizeof_stream_item(0), samples(), queue_, true, n_streams);
DLOG(INFO) << "valve(" << valve_->unique_id() << ")";
// enable subclass hooks
@@ -198,7 +198,7 @@ MultichannelFileSignalSource::MultichannelFileSignalSource(const ConfigurationIn
DLOG(INFO) << "Total number samples to be processed= " << samples_ << " GNSS signal duration= " << signal_duration_s << " [s]";
std::cout << "GNSS signal recorded time to be processed: " << signal_duration_s << " [s]\n";
valve_ = gnss_sdr_make_valve(item_size_, samples_, queue);
valve_ = gnss_sdr_make_valve(item_size_, samples_, queue, true, n_channels_);
DLOG(INFO) << "valve(" << valve_->unique_id() << ")";
if (enable_throttle_control_)
@@ -233,7 +233,7 @@ SpirGSS6450FileSignalSource::SpirGSS6450FileSignalSource(const ConfigurationInte
for (int32_t i = 0; i < n_channels_; i++)
{
valve_vec_.emplace_back(gnss_sdr_make_valve(sizeof(gr_complex), samples_, queue));
valve_vec_.emplace_back(gnss_sdr_make_valve(sizeof(gr_complex), samples_, queue, true, 1));
if (dump_)
{
std::string tmp_str = dump_filename_ + "_ch" + std::to_string(i);
@@ -261,9 +261,9 @@ SpirGSS6450FileSignalSource::SpirGSS6450FileSignalSource(const ConfigurationInte
{
LOG(ERROR) << "A signal source does not have an input stream";
}
if (out_streams_ > 1)
if (out_streams_ == 0)
{
LOG(ERROR) << "This implementation only supports one output stream";
LOG(ERROR) << "A signal source must have at least one output stream";
}
}
@@ -26,34 +26,44 @@
#if USE_GLOG_AND_GFLAGS
#include <glog/logging.h>
#else
#include <absl/log/check.h>
#include <absl/log/log.h>
#endif
Gnss_Sdr_Valve::Gnss_Sdr_Valve(size_t sizeof_stream_item,
uint64_t nitems,
Concurrent_Queue<pmt::pmt_t>* queue,
bool stop_flowgraph) : gr::sync_block("valve",
gr::io_signature::make(1, 20, sizeof_stream_item),
gr::io_signature::make(1, 20, sizeof_stream_item)),
d_nitems(nitems),
d_ncopied_items(0),
d_queue(queue),
d_stop_flowgraph(stop_flowgraph),
d_open_valve(false)
bool stop_flowgraph,
int n_streams) : gr::sync_block("valve",
gr::io_signature::make(n_streams, n_streams, sizeof_stream_item),
gr::io_signature::make(n_streams, n_streams, sizeof_stream_item)),
d_nitems(nitems),
d_ncopied_items(0),
d_queue(queue),
d_stop_flowgraph(stop_flowgraph),
d_open_valve(false)
{
}
gnss_shared_ptr<Gnss_Sdr_Valve> gnss_sdr_make_valve(size_t sizeof_stream_item, uint64_t nitems, Concurrent_Queue<pmt::pmt_t>* queue, bool stop_flowgraph)
{
gnss_shared_ptr<Gnss_Sdr_Valve> valve_(new Gnss_Sdr_Valve(sizeof_stream_item, nitems, queue, stop_flowgraph));
gnss_shared_ptr<Gnss_Sdr_Valve> valve_(new Gnss_Sdr_Valve(sizeof_stream_item, nitems, queue, stop_flowgraph, 1));
return valve_;
}
gnss_shared_ptr<Gnss_Sdr_Valve> gnss_sdr_make_valve(size_t sizeof_stream_item, uint64_t nitems, Concurrent_Queue<pmt::pmt_t>* queue, bool stop_flowgraph, int n_streams)
{
CHECK(n_streams > 0) << "A valve must have at least one stream";
gnss_shared_ptr<Gnss_Sdr_Valve> valve_(new Gnss_Sdr_Valve(sizeof_stream_item, nitems, queue, stop_flowgraph, n_streams));
return valve_;
}
gnss_shared_ptr<Gnss_Sdr_Valve> gnss_sdr_make_valve(size_t sizeof_stream_item, uint64_t nitems, Concurrent_Queue<pmt::pmt_t>* queue)
{
gnss_shared_ptr<Gnss_Sdr_Valve> valve_(new Gnss_Sdr_Valve(sizeof_stream_item, nitems, queue, true));
gnss_shared_ptr<Gnss_Sdr_Valve> valve_(new Gnss_Sdr_Valve(sizeof_stream_item, nitems, queue, true, 1));
return valve_;
}
@@ -70,6 +80,11 @@ int Gnss_Sdr_Valve::work(int noutput_items,
{
if (d_open_valve == false)
{
if (input_items.size() != output_items.size())
{
LOG(ERROR) << "Valve input and output stream counts must match";
return -1;
}
if (d_ncopied_items >= d_nitems)
{
LOG(INFO) << "Stopping receiver, " << d_ncopied_items << " samples processed";
@@ -93,6 +108,11 @@ int Gnss_Sdr_Valve::work(int noutput_items,
d_ncopied_items += n;
return n;
}
if (input_items.size() != output_items.size())
{
LOG(ERROR) << "Valve input and output stream counts must match";
return -1;
}
for (size_t ch = 0; ch < output_items.size(); ch++)
{
std::memcpy(output_items[ch], input_items[ch], noutput_items * input_signature()->sizeof_stream_item(ch));
@@ -47,6 +47,13 @@ gnss_shared_ptr<Gnss_Sdr_Valve> gnss_sdr_make_valve(
Concurrent_Queue<pmt::pmt_t>* queue,
bool stop_flowgraph);
gnss_shared_ptr<Gnss_Sdr_Valve> gnss_sdr_make_valve(
size_t sizeof_stream_item,
uint64_t nitems,
Concurrent_Queue<pmt::pmt_t>* queue,
bool stop_flowgraph,
int n_streams);
/*!
* \brief Implementation of a GNU Radio block that sends a STOP message to the
* control queue right after a specific number of samples have passed through it.
@@ -72,9 +79,18 @@ private:
Concurrent_Queue<pmt::pmt_t>* queue,
bool stop_flowgraph);
friend gnss_shared_ptr<Gnss_Sdr_Valve> gnss_sdr_make_valve(
size_t sizeof_stream_item,
uint64_t nitems,
Concurrent_Queue<pmt::pmt_t>* queue,
bool stop_flowgraph,
int n_streams);
Gnss_Sdr_Valve(size_t sizeof_stream_item,
uint64_t nitems,
Concurrent_Queue<pmt::pmt_t>* queue, bool stop_flowgraph);
Concurrent_Queue<pmt::pmt_t>* queue,
bool stop_flowgraph,
int n_streams);
uint64_t d_nitems;
uint64_t d_ncopied_items;
@@ -27,6 +27,7 @@
#include "concurrent_queue.h"
#include "gnss_sdr_valve.h"
#include <gnuradio/blocks/null_sink.h>
#include <gtest/gtest.h>
#include <pmt/pmt.h>
TEST(ValveTest, CheckEventSentAfter100Samples)
@@ -52,3 +53,27 @@ TEST(ValveTest, CheckEventSentAfter100Samples)
bool expected1 = true;
EXPECT_EQ(expected1, queue->timed_wait_and_pop(msg, 100));
}
TEST(ValveTest, DefaultValveHasOneStream)
{
auto queue = std::make_shared<Concurrent_Queue<pmt::pmt_t>>();
auto valve = gnss_sdr_make_valve(sizeof(float), 100, queue.get());
EXPECT_EQ(1, valve->input_signature()->min_streams());
EXPECT_EQ(1, valve->input_signature()->max_streams());
EXPECT_EQ(1, valve->output_signature()->min_streams());
EXPECT_EQ(1, valve->output_signature()->max_streams());
}
TEST(ValveTest, ExplicitValveStreamCountIsHonored)
{
auto queue = std::make_shared<Concurrent_Queue<pmt::pmt_t>>();
auto valve = gnss_sdr_make_valve(sizeof(float), 100, queue.get(), true, 3);
EXPECT_EQ(3, valve->input_signature()->min_streams());
EXPECT_EQ(3, valve->input_signature()->max_streams());
EXPECT_EQ(3, valve->output_signature()->min_streams());
EXPECT_EQ(3, valve->output_signature()->max_streams());
}