diff --git a/src/algorithms/signal_source/adapters/zmq_signal_source.cc b/src/algorithms/signal_source/adapters/zmq_signal_source.cc index 66d931760..e87064ff2 100644 --- a/src/algorithms/signal_source/adapters/zmq_signal_source.cc +++ b/src/algorithms/signal_source/adapters/zmq_signal_source.cc @@ -17,6 +17,7 @@ #include "zmq_signal_source.h" #include "configuration_interface.h" #include "gnss_sdr_string_literals.h" +#include using namespace std::string_literals; @@ -25,55 +26,52 @@ ZmqSignalSource::ZmqSignalSource(const ConfigurationInterface* configuration, unsigned int /* in_stream [[maybe_unused]] */, unsigned int /* out_stream [[maybe_unused]] */, Concurrent_Queue* /* queue [[maybe_unused]] */) - : SignalSourceBase(configuration, role, "ZMQ_Signal_Source"s) // - , - d_item_size(decode_item_type(configuration->property(role + ".item_type"s, "gr_complex"s), nullptr, true)) + : SignalSourceBase(configuration, role, "ZMQ_Signal_Source"s), + d_item_size(decode_item_type(configuration->property(role + ".item_type"s, "gr_complex"s), nullptr, true)), + d_dump_filename(configuration->property(role + ".dump_filename"s, "data/zmq_dump.dat"s)), + d_dump(configuration->property(role + ".dump"s, false)) { auto vlen = configuration->property(role + ".vlen"s, 1); auto pass_tags = configuration->property(role + ".pass_tags"s, false); auto timeout_ms = configuration->property(role + ".timeout_ms"s, 100); auto hwm = configuration->property(role + ".hwm"s, -1); - // Each .endpointN must be specified - for (auto n = 0u; n < getRfChannels(); ++n) - { - auto property = role + ".endpoint"s + std::to_string(n); - auto endpoint = configuration->property(property, ""s); - if (not endpoint.empty()) - { - LOG(INFO) << "Connecting to ZMQ pub at " << endpoint; - d_source_blocks.push_back(gr::zeromq::sub_source::make(d_item_size, vlen, endpoint.data(), timeout_ms, pass_tags, hwm)); - } - else - { - std::cerr - << "For ZMQ_Signal_Source " << role << ", the .endpointN property must be defined\n" - << "for all values of N from 0 to " << getRfChannels() - 1 << std::endl; + auto property = role + ".endpoint"s; + auto endpoint = configuration->property(property, ""s); - throw std::invalid_argument(property + ": undefined"); - } + if (not endpoint.empty()) + { + LOG(INFO) << "Connecting to ZMQ pub at " << endpoint; + d_source_block = gr::zeromq::sub_source::make(d_item_size, vlen, endpoint.data(), timeout_ms, pass_tags, hwm); + } + else + { + std::cerr << "For ZMQ_Signal_Source " << property << " must be defined" << std::endl; + throw std::invalid_argument(property + ": undefined"); } } auto ZmqSignalSource::item_size() -> size_t { return d_item_size; } -auto ZmqSignalSource::connect(gr::top_block_sptr /* top_block [[maybe_unused]] */) -> void +auto ZmqSignalSource::connect(gr::top_block_sptr top_block) -> void { - // for now, nothing to connect + if (d_dump) + { + d_dump_sink = gr::blocks::file_sink::make(item_size(), d_dump_filename.data()); + top_block->connect(d_source_block, 0, d_dump_sink, 0); + } } -auto ZmqSignalSource::disconnect(gr::top_block_sptr /* top_block [[maybe_unused]] */) -> void +auto ZmqSignalSource::disconnect(gr::top_block_sptr top_block) -> void { - // for now, nothing to disconnect + if (d_dump) + { + top_block->disconnect(d_dump_sink); + } } auto ZmqSignalSource::get_right_block() -> gr::basic_block_sptr { - return d_source_blocks.front(); -} - -auto ZmqSignalSource::get_right_block(int RF_channel) -> gr::basic_block_sptr -{ - return d_source_blocks.at(RF_channel); // throws std::out_of_range + return d_source_block; } diff --git a/src/algorithms/signal_source/adapters/zmq_signal_source.h b/src/algorithms/signal_source/adapters/zmq_signal_source.h index 07b3ecace..dc33c1c0e 100644 --- a/src/algorithms/signal_source/adapters/zmq_signal_source.h +++ b/src/algorithms/signal_source/adapters/zmq_signal_source.h @@ -20,6 +20,7 @@ #include "signal_source_base.h" // #include "concurrent_queue.h" +#include // for dump #include #include @@ -30,18 +31,18 @@ //! This class supports the following properties: //! +//! .endpoint - the ZMQ endpoint to be connected to +//! .vlen - vector length of the input items (default 1, one item) +//! this must match the size of the publisher! //! .pass_tags - boolean flag if tags should be propagated (default false) //! .timeout_ms - receive timeout, in milliseconds (default 100) //! .hwm - ZMQ high water mark (default -1, ZMQ default) -//! .vlen - vector length of the input items (default 1, one item) -//! .endpointN - the ZMQ endpoint to be connected to (repeat for each channel) //! //! .item_type - data type of the samples (default "gr_complex") //! //! (probably should be abstracted to the base class) //! -//! .dump - whether to archive input data -//! +//! .dump - whether to archive input data //! .dump_filename - if dumping, path to file for output //! @@ -61,12 +62,14 @@ public: auto connect(gr::top_block_sptr top_block) -> void override; auto disconnect(gr::top_block_sptr top_block) -> void override; auto get_right_block() -> gr::basic_block_sptr override; - auto get_right_block(int RF_channel) -> gr::basic_block_sptr override; private: - std::vector d_source_blocks; + gr::zeromq::sub_source::sptr d_source_block; + gr::blocks::file_sink::sptr d_dump_sink; size_t d_item_size; + std::string d_dump_filename; + bool d_dump; }; /** \} */