From 7c4ece6ca91ed117f06fedb8188a5bb16e73301b Mon Sep 17 00:00:00 2001 From: Carles Fernandez Date: Sat, 20 Jun 2026 01:47:12 +0200 Subject: [PATCH] Fix valve stream signatures for multistream sources --- .../adapters/file_source_base.cc | 10 ++++- .../multichannel_file_signal_source.cc | 2 +- .../spir_gss6450_file_signal_source.cc | 6 +-- .../signal_source/libs/gnss_sdr_valve.cc | 40 ++++++++++++++----- .../signal_source/libs/gnss_sdr_valve.h | 18 ++++++++- .../sources/gnss_sdr_valve_test.cc | 25 ++++++++++++ 6 files changed, 85 insertions(+), 16 deletions(-) diff --git a/src/algorithms/signal_source/adapters/file_source_base.cc b/src/algorithms/signal_source/adapters/file_source_base.cc index d0c829709..1358a9d30 100644 --- a/src/algorithms/signal_source/adapters/file_source_base.cc +++ b/src/algorithms/signal_source/adapters/file_source_base.cc @@ -570,7 +570,15 @@ gnss_shared_ptr FileSourceBase::create_valve() { // if a number of samples is specified, honor it by creating a valve // in practice, this is always true - valve_ = gnss_sdr_make_valve(source_item_size(), samples(), queue_); + const auto source_block = source(); + const auto source_output_signature = source_block->output_signature(); + auto n_streams = 1; + if (source_output_signature->min_streams() == source_output_signature->max_streams() && + source_output_signature->min_streams() > 0) + { + n_streams = source_output_signature->min_streams(); + } + valve_ = gnss_sdr_make_valve(source_output_signature->sizeof_stream_item(0), samples(), queue_, true, n_streams); DLOG(INFO) << "valve(" << valve_->unique_id() << ")"; // enable subclass hooks diff --git a/src/algorithms/signal_source/adapters/multichannel_file_signal_source.cc b/src/algorithms/signal_source/adapters/multichannel_file_signal_source.cc index 84271beea..2b455a981 100644 --- a/src/algorithms/signal_source/adapters/multichannel_file_signal_source.cc +++ b/src/algorithms/signal_source/adapters/multichannel_file_signal_source.cc @@ -198,7 +198,7 @@ MultichannelFileSignalSource::MultichannelFileSignalSource(const ConfigurationIn DLOG(INFO) << "Total number samples to be processed= " << samples_ << " GNSS signal duration= " << signal_duration_s << " [s]"; std::cout << "GNSS signal recorded time to be processed: " << signal_duration_s << " [s]\n"; - valve_ = gnss_sdr_make_valve(item_size_, samples_, queue); + valve_ = gnss_sdr_make_valve(item_size_, samples_, queue, true, n_channels_); DLOG(INFO) << "valve(" << valve_->unique_id() << ")"; if (enable_throttle_control_) diff --git a/src/algorithms/signal_source/adapters/spir_gss6450_file_signal_source.cc b/src/algorithms/signal_source/adapters/spir_gss6450_file_signal_source.cc index 678b74dc1..f6017499a 100644 --- a/src/algorithms/signal_source/adapters/spir_gss6450_file_signal_source.cc +++ b/src/algorithms/signal_source/adapters/spir_gss6450_file_signal_source.cc @@ -233,7 +233,7 @@ SpirGSS6450FileSignalSource::SpirGSS6450FileSignalSource(const ConfigurationInte for (int32_t i = 0; i < n_channels_; i++) { - valve_vec_.emplace_back(gnss_sdr_make_valve(sizeof(gr_complex), samples_, queue)); + valve_vec_.emplace_back(gnss_sdr_make_valve(sizeof(gr_complex), samples_, queue, true, 1)); if (dump_) { std::string tmp_str = dump_filename_ + "_ch" + std::to_string(i); @@ -261,9 +261,9 @@ SpirGSS6450FileSignalSource::SpirGSS6450FileSignalSource(const ConfigurationInte { LOG(ERROR) << "A signal source does not have an input stream"; } - if (out_streams_ > 1) + if (out_streams_ == 0) { - LOG(ERROR) << "This implementation only supports one output stream"; + LOG(ERROR) << "A signal source must have at least one output stream"; } } diff --git a/src/algorithms/signal_source/libs/gnss_sdr_valve.cc b/src/algorithms/signal_source/libs/gnss_sdr_valve.cc index a62917745..82b263002 100644 --- a/src/algorithms/signal_source/libs/gnss_sdr_valve.cc +++ b/src/algorithms/signal_source/libs/gnss_sdr_valve.cc @@ -26,34 +26,44 @@ #if USE_GLOG_AND_GFLAGS #include #else +#include #include #endif Gnss_Sdr_Valve::Gnss_Sdr_Valve(size_t sizeof_stream_item, uint64_t nitems, Concurrent_Queue* queue, - bool stop_flowgraph) : gr::sync_block("valve", - gr::io_signature::make(1, 20, sizeof_stream_item), - gr::io_signature::make(1, 20, sizeof_stream_item)), - d_nitems(nitems), - d_ncopied_items(0), - d_queue(queue), - d_stop_flowgraph(stop_flowgraph), - d_open_valve(false) + bool stop_flowgraph, + int n_streams) : gr::sync_block("valve", + gr::io_signature::make(n_streams, n_streams, sizeof_stream_item), + gr::io_signature::make(n_streams, n_streams, sizeof_stream_item)), + d_nitems(nitems), + d_ncopied_items(0), + d_queue(queue), + d_stop_flowgraph(stop_flowgraph), + d_open_valve(false) { } gnss_shared_ptr gnss_sdr_make_valve(size_t sizeof_stream_item, uint64_t nitems, Concurrent_Queue* queue, bool stop_flowgraph) { - gnss_shared_ptr valve_(new Gnss_Sdr_Valve(sizeof_stream_item, nitems, queue, stop_flowgraph)); + gnss_shared_ptr valve_(new Gnss_Sdr_Valve(sizeof_stream_item, nitems, queue, stop_flowgraph, 1)); + return valve_; +} + + +gnss_shared_ptr gnss_sdr_make_valve(size_t sizeof_stream_item, uint64_t nitems, Concurrent_Queue* queue, bool stop_flowgraph, int n_streams) +{ + CHECK(n_streams > 0) << "A valve must have at least one stream"; + gnss_shared_ptr valve_(new Gnss_Sdr_Valve(sizeof_stream_item, nitems, queue, stop_flowgraph, n_streams)); return valve_; } gnss_shared_ptr gnss_sdr_make_valve(size_t sizeof_stream_item, uint64_t nitems, Concurrent_Queue* queue) { - gnss_shared_ptr valve_(new Gnss_Sdr_Valve(sizeof_stream_item, nitems, queue, true)); + gnss_shared_ptr valve_(new Gnss_Sdr_Valve(sizeof_stream_item, nitems, queue, true, 1)); return valve_; } @@ -70,6 +80,11 @@ int Gnss_Sdr_Valve::work(int noutput_items, { if (d_open_valve == false) { + if (input_items.size() != output_items.size()) + { + LOG(ERROR) << "Valve input and output stream counts must match"; + return -1; + } if (d_ncopied_items >= d_nitems) { LOG(INFO) << "Stopping receiver, " << d_ncopied_items << " samples processed"; @@ -93,6 +108,11 @@ int Gnss_Sdr_Valve::work(int noutput_items, d_ncopied_items += n; return n; } + if (input_items.size() != output_items.size()) + { + LOG(ERROR) << "Valve input and output stream counts must match"; + return -1; + } for (size_t ch = 0; ch < output_items.size(); ch++) { std::memcpy(output_items[ch], input_items[ch], noutput_items * input_signature()->sizeof_stream_item(ch)); diff --git a/src/algorithms/signal_source/libs/gnss_sdr_valve.h b/src/algorithms/signal_source/libs/gnss_sdr_valve.h index bc6bf12eb..f932907d9 100644 --- a/src/algorithms/signal_source/libs/gnss_sdr_valve.h +++ b/src/algorithms/signal_source/libs/gnss_sdr_valve.h @@ -47,6 +47,13 @@ gnss_shared_ptr gnss_sdr_make_valve( Concurrent_Queue* queue, bool stop_flowgraph); +gnss_shared_ptr gnss_sdr_make_valve( + size_t sizeof_stream_item, + uint64_t nitems, + Concurrent_Queue* queue, + bool stop_flowgraph, + int n_streams); + /*! * \brief Implementation of a GNU Radio block that sends a STOP message to the * control queue right after a specific number of samples have passed through it. @@ -72,9 +79,18 @@ private: Concurrent_Queue* queue, bool stop_flowgraph); + friend gnss_shared_ptr gnss_sdr_make_valve( + size_t sizeof_stream_item, + uint64_t nitems, + Concurrent_Queue* queue, + bool stop_flowgraph, + int n_streams); + Gnss_Sdr_Valve(size_t sizeof_stream_item, uint64_t nitems, - Concurrent_Queue* queue, bool stop_flowgraph); + Concurrent_Queue* queue, + bool stop_flowgraph, + int n_streams); uint64_t d_nitems; uint64_t d_ncopied_items; diff --git a/tests/unit-tests/signal-processing-blocks/sources/gnss_sdr_valve_test.cc b/tests/unit-tests/signal-processing-blocks/sources/gnss_sdr_valve_test.cc index 9d99b1b74..5be23463a 100644 --- a/tests/unit-tests/signal-processing-blocks/sources/gnss_sdr_valve_test.cc +++ b/tests/unit-tests/signal-processing-blocks/sources/gnss_sdr_valve_test.cc @@ -27,6 +27,7 @@ #include "concurrent_queue.h" #include "gnss_sdr_valve.h" #include +#include #include TEST(ValveTest, CheckEventSentAfter100Samples) @@ -52,3 +53,27 @@ TEST(ValveTest, CheckEventSentAfter100Samples) bool expected1 = true; EXPECT_EQ(expected1, queue->timed_wait_and_pop(msg, 100)); } + + +TEST(ValveTest, DefaultValveHasOneStream) +{ + auto queue = std::make_shared>(); + auto valve = gnss_sdr_make_valve(sizeof(float), 100, queue.get()); + + EXPECT_EQ(1, valve->input_signature()->min_streams()); + EXPECT_EQ(1, valve->input_signature()->max_streams()); + EXPECT_EQ(1, valve->output_signature()->min_streams()); + EXPECT_EQ(1, valve->output_signature()->max_streams()); +} + + +TEST(ValveTest, ExplicitValveStreamCountIsHonored) +{ + auto queue = std::make_shared>(); + auto valve = gnss_sdr_make_valve(sizeof(float), 100, queue.get(), true, 3); + + EXPECT_EQ(3, valve->input_signature()->min_streams()); + EXPECT_EQ(3, valve->input_signature()->max_streams()); + EXPECT_EQ(3, valve->output_signature()->min_streams()); + EXPECT_EQ(3, valve->output_signature()->max_streams()); +}