diff --git a/src/algorithms/signal_source/adapters/zmq_signal_source.cc b/src/algorithms/signal_source/adapters/zmq_signal_source.cc index a9cab6540..00a6f57e5 100644 --- a/src/algorithms/signal_source/adapters/zmq_signal_source.cc +++ b/src/algorithms/signal_source/adapters/zmq_signal_source.cc @@ -18,7 +18,6 @@ #include "configuration_interface.h" #include "gnss_sdr_string_literals.h" #include -#include using namespace std::string_literals; @@ -39,18 +38,23 @@ ZmqSignalSource::ZmqSignalSource(const ConfigurationInterface* configuration, auto property = role + ".endpoint"s; auto endpoint = configuration->property(property, ""s); - std::vector address(endpoint.c_str(), endpoint.c_str() + endpoint.size() + 1); if (!endpoint.empty()) { LOG(INFO) << "Connecting to ZMQ pub at " << endpoint; - d_source_block = gr::zeromq::sub_source::make(d_item_size, vlen, address.data(), timeout_ms, pass_tags, hwm); + // work around gnuradio interface deficiency + d_source_block = gr::zeromq::sub_source::make(d_item_size, vlen, const_cast(endpoint.data()), timeout_ms, pass_tags, hwm); } else { std::cerr << "For ZMQ_Signal_Source " << property << " must be defined" << std::endl; throw std::invalid_argument(property + ": undefined"); } + + if (vlen > 1) + { + d_vec_block = gr::blocks::vector_to_stream::make(item_size(), vlen); + } } @@ -59,10 +63,14 @@ auto ZmqSignalSource::item_size() -> size_t { return d_item_size; } auto ZmqSignalSource::connect(gr::top_block_sptr top_block) -> void { + if (d_vec_block) + { + top_block->connect(d_source_block, 0, d_vec_block, 0); + } if (d_dump) { d_dump_sink = gr::blocks::file_sink::make(item_size(), d_dump_filename.data()); - top_block->connect(d_source_block, 0, d_dump_sink, 0); + top_block->connect(get_right_block(), 0, d_dump_sink, 0); } } @@ -73,10 +81,22 @@ auto ZmqSignalSource::disconnect(gr::top_block_sptr top_block) -> void { top_block->disconnect(d_dump_sink); } + + if (d_vec_block) + { + top_block->disconnect(d_vec_block); + } } auto ZmqSignalSource::get_right_block() -> gr::basic_block_sptr { - return d_source_block; + auto result = gr::basic_block_sptr(); + + if (d_vec_block) + result = d_vec_block; // NOLINT + else + result = d_source_block; // NOLINT + + return result; } diff --git a/src/algorithms/signal_source/adapters/zmq_signal_source.h b/src/algorithms/signal_source/adapters/zmq_signal_source.h index 55b2d29b7..484b51eeb 100644 --- a/src/algorithms/signal_source/adapters/zmq_signal_source.h +++ b/src/algorithms/signal_source/adapters/zmq_signal_source.h @@ -21,6 +21,7 @@ // #include "concurrent_queue.h" #include // for dump +#include #include #include #include @@ -66,6 +67,7 @@ public: private: gr::zeromq::sub_source::sptr d_source_block; + gr::blocks::vector_to_stream::sptr d_vec_block; gr::blocks::file_sink::sptr d_dump_sink; size_t d_item_size;