gnss-sdr/src/algorithms/signal_source/adapters/zmq_signal_source.cc

108 lines
3.3 KiB
C++
Raw Normal View History

2022-08-10 23:37:57 +00:00
/*!
* \file zmq_signal_source.cc
* \brief Signal source which reads from ZeroMQ.
* \author Jim Melton, 2022. jim.melton(at)sncorp.com
*
* -----------------------------------------------------------------------------
*
* GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
* This file is part of GNSS-SDR.
*
* Copyright (C) 2010-2022 (see AUTHORS file for a list of contributors)
* SPDX-License-Identifier: GPL-3.0-or-later
*
* -----------------------------------------------------------------------------
*/
#include "zmq_signal_source.h"
#include "configuration_interface.h"
#include "gnss_sdr_string_literals.h"
2022-08-11 02:21:12 +00:00
#include <glog/logging.h>
2022-08-10 23:37:57 +00:00
using namespace std::string_literals;
ZmqSignalSource::ZmqSignalSource(const ConfigurationInterface* configuration,
const std::string& role,
unsigned int /* in_stream [[maybe_unused]] */,
unsigned int /* out_stream [[maybe_unused]] */,
Concurrent_Queue<pmt::pmt_t>* /* queue [[maybe_unused]] */)
2022-08-11 02:21:12 +00:00
: SignalSourceBase(configuration, role, "ZMQ_Signal_Source"s),
d_item_size(decode_item_type(configuration->property(role + ".item_type", "gr_complex"s), nullptr, true)),
d_dump_filename(configuration->property(role + ".dump_filename", "data/zmq_dump.dat"s)),
d_dump(configuration->property(role + ".dump", false))
2022-08-10 23:37:57 +00:00
{
auto vlen = configuration->property(role + ".vlen"s, 1);
auto pass_tags = configuration->property(role + ".pass_tags"s, false);
auto timeout_ms = configuration->property(role + ".timeout_ms"s, 100);
auto hwm = configuration->property(role + ".hwm"s, -1);
2022-08-11 02:21:12 +00:00
auto property = role + ".endpoint"s;
auto endpoint = configuration->property(property, ""s);
2022-08-10 23:37:57 +00:00
if (!endpoint.empty())
2022-08-11 02:21:12 +00:00
{
LOG(INFO) << "Connecting to ZMQ pub at " << endpoint;
2022-09-14 17:57:48 +00:00
// work around gnuradio interface deficiency
d_source_block = gr::zeromq::sub_source::make(d_item_size, vlen, const_cast<char*>(endpoint.data()), timeout_ms, pass_tags, hwm);
2023-01-27 20:30:32 +00:00
d_source_block->set_tag_propagation_policy(gr::block::TPP_DONT); // GNSS-SDR doesn't do well with tags/
2022-08-11 02:21:12 +00:00
}
else
{
std::cerr << "For ZMQ_Signal_Source " << property << " must be defined" << std::endl;
throw std::invalid_argument(property + ": undefined");
2022-08-10 23:37:57 +00:00
}
2022-09-14 17:57:48 +00:00
if (vlen > 1)
{
d_vec_block = gr::blocks::vector_to_stream::make(item_size(), vlen);
}
2022-08-10 23:37:57 +00:00
}
auto ZmqSignalSource::item_size() -> size_t { return d_item_size; }
2022-08-11 02:21:12 +00:00
auto ZmqSignalSource::connect(gr::top_block_sptr top_block) -> void
2022-08-10 23:37:57 +00:00
{
2022-09-14 17:57:48 +00:00
if (d_vec_block)
{
top_block->connect(d_source_block, 0, d_vec_block, 0);
}
2022-08-11 02:21:12 +00:00
if (d_dump)
{
d_dump_sink = gr::blocks::file_sink::make(item_size(), d_dump_filename.data());
2022-09-14 17:57:48 +00:00
top_block->connect(get_right_block(), 0, d_dump_sink, 0);
2022-08-11 02:21:12 +00:00
}
2022-08-10 23:37:57 +00:00
}
2022-08-11 02:21:12 +00:00
auto ZmqSignalSource::disconnect(gr::top_block_sptr top_block) -> void
2022-08-10 23:37:57 +00:00
{
2022-08-11 02:21:12 +00:00
if (d_dump)
{
top_block->disconnect(d_dump_sink);
}
2022-09-14 17:57:48 +00:00
if (d_vec_block)
{
top_block->disconnect(d_vec_block);
}
2022-08-10 23:37:57 +00:00
}
2022-08-10 23:37:57 +00:00
auto ZmqSignalSource::get_right_block() -> gr::basic_block_sptr
{
2022-09-14 17:57:48 +00:00
auto result = gr::basic_block_sptr();
if (d_vec_block)
{
result = d_vec_block;
}
2022-09-14 17:57:48 +00:00
else
{
result = d_source_block;
}
2022-09-14 17:57:48 +00:00
return result;
2022-08-10 23:37:57 +00:00
}