1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2025-01-08 08:20:33 +00:00

Added valves to properly handle end of samples

This commit is contained in:
Victor Castillo 2024-08-19 17:44:36 +02:00
parent 92dcec67e0
commit 89d73174a8
No known key found for this signature in database
GPG Key ID: 8EF1FC8B7182F608
5 changed files with 100 additions and 33 deletions

View File

@ -64,7 +64,7 @@ Marc Sales marcsales92@gmail.com Contributor
Piyush Gupta piyush04111999@gmail.com Contributor
Rodrigo Muñoz rodrigo.munoz@proteinlab.cl Contributor
Stefan van der Linden spvdlinden@gmail.com Contributor
Victor Castillo-Agüero victorcastilloaguero@gmail.com Contributor
Víctor Castillo-Agüero victorcastilloaguero@gmail.com Contributor
Will Silberman wsilberm@google.com Contributor
Carlos Paniego carpanie@hotmail.com Artwork

View File

@ -38,7 +38,7 @@ authors:
affiliation: "Instituto Nacional de Técnica Aeroespacial"
email: victorcastilloaguero@gmail.com
family-names: "Castillo-Agüero"
given-names: Victor
given-names: Víctor
- alias: acebrianjuan
email: acebrianjuan@gmail.com
family-names: "Cebrián-Juan"

View File

@ -17,6 +17,7 @@
#include "ion_gsms_signal_source.h"
#include "gnss_sdr_flags.h"
#include "gnss_sdr_string_literals.h"
#include "gnss_sdr_valve.h"
#include <gnuradio/blocks/copy.h>
#include <string>
#include <unordered_set>
@ -54,7 +55,7 @@ IONGSMSSignalSource::IONGSMSSignalSource(const ConfigurationInterface* configura
const std::string& role,
unsigned int in_streams,
unsigned int out_streams,
Concurrent_Queue<pmt::pmt_t>* queue __attribute__((unused)))
Concurrent_Queue<pmt::pmt_t>* queue)
: SignalSourceBase(configuration, role, "ION_GSMS_Signal_Source"s),
stream_ids_(parse_comma_list(configuration->property(role + ".streams"s, ""s))),
metadata_filepath_(configuration->property(role + ".metadata_filename"s, "../data/example_capture_metadata.sdrx"s)),
@ -82,6 +83,7 @@ IONGSMSSignalSource::IONGSMSSignalSource(const ConfigurationInterface* configura
for (std::size_t i = 0; i < source->output_stream_count(); ++i)
{
copy_blocks_.push_back(gr::blocks::copy::make(source->output_stream_item_size(i)));
valves_.push_back(gnss_sdr_make_valve(source->output_stream_item_size(i), source->output_stream_total_sample_count(i), queue));
}
}
}
@ -186,6 +188,7 @@ void IONGSMSSignalSource::connect(gr::top_block_sptr top_block)
for (std::size_t i = 0; i < source->output_stream_count(); ++i, ++cumulative_index)
{
top_block->connect(source, i, copy_blocks_[cumulative_index], 0);
top_block->connect(copy_blocks_[cumulative_index], 0, valves_[cumulative_index], 0);
}
}
}
@ -193,9 +196,14 @@ void IONGSMSSignalSource::connect(gr::top_block_sptr top_block)
void IONGSMSSignalSource::disconnect(gr::top_block_sptr top_block)
{
std::size_t cumulative_index = 0;
for (const auto& source : sources_)
{
top_block->disconnect(source);
for (std::size_t i = 0; i < source->output_stream_count(); ++i, ++cumulative_index)
{
top_block->disconnect(source, i, copy_blocks_[cumulative_index], 0);
top_block->disconnect(copy_blocks_[cumulative_index], 0, valves_[cumulative_index], 0);
}
}
}
@ -219,7 +227,7 @@ gr::basic_block_sptr IONGSMSSignalSource::get_right_block(int RF_channel)
if (RF_channel < 0 || RF_channel >= static_cast<int>(copy_blocks_.size()))
{
LOG(WARNING) << "'RF_channel' out of bounds while trying to get signal source right block.";
return copy_blocks_[0];
return valves_[0];
}
return copy_blocks_[RF_channel];
return valves_[RF_channel];
}

View File

@ -19,6 +19,12 @@
#include <algorithm>
#include <cmath>
#if USE_GLOG_AND_GFLAGS
#include <glog/logging.h>
#else
#include <absl/log/log.h>
#endif
using namespace std::string_literals;
IONGSMSFileSource::IONGSMSFileSource(
@ -30,13 +36,20 @@ IONGSMSFileSource::IONGSMSFileSource(
"ion_gsms_file_source",
gr::io_signature::make(0, 0, 0),
make_output_signature(block, stream_ids)),
file_stream_(metadata_filepath.parent_path() / file.Url().Value()),
file_stream_(metadata_filepath.parent_path() / file.Url().Value(), std::ios::in | std::ios::binary),
io_buffer_offset_(0),
maximum_item_rate_(0),
chunk_cycle_length_(0)
{
fs::path data_filepath = metadata_filepath.parent_path() / file.Url().Value();
std::size_t block_offset = file.Offset();
if (!file_stream_.is_open())
{
LOG(ERROR) << "ion_gsms_file_source: Unable to open the samples file: " << (data_filepath).c_str();
}
// Skip offset and block header
file_stream_.seekg(file.Offset() + block_offset + block.SizeHeader());
std::size_t output_stream_offset = 0;
@ -49,42 +62,26 @@ IONGSMSFileSource::IONGSMSFileSource(
for (std::size_t i = 0; i < out_count; ++i)
{
output_stream_item_sizes_.push_back(chunk_data_.back()->output_stream_item_size(i));
output_stream_item_rates_.push_back(chunk_data_.back()->output_stream_item_rate(i));
maximum_item_rate_ = std::max(chunk_data_.back()->output_stream_item_rate(i), maximum_item_rate_);
}
}
output_stream_count_ = output_stream_offset;
}
output_stream_total_sample_counts_.resize(output_stream_count_);
int IONGSMSFileSource::work(
int noutput_items,
gr_vector_const_void_star& input_items __attribute__((unused)),
gr_vector_void_star& output_items)
{
const std::size_t max_sample_output = std::floor((noutput_items - 1.0) / maximum_item_rate_);
io_buffer_.resize(max_sample_output * chunk_cycle_length_);
io_buffer_offset_ = 0;
file_stream_.read(io_buffer_.data(), sizeof(decltype(io_buffer_)::value_type) * io_buffer_.size());
items_produced_.clear();
items_produced_.resize(output_items.size());
while (io_buffer_offset_ < io_buffer_.size())
std::size_t cycle_count = block.Cycles();
if (cycle_count == 0)
{
for (auto& c : chunk_data_)
{
auto* chunk = c.get();
io_buffer_offset_ += chunk->read_from_buffer(reinterpret_cast<uint8_t*>(io_buffer_.data()), io_buffer_offset_);
chunk->write_to_output(output_items, items_produced_);
}
// Read the whole file
const std::size_t file_size = fs::file_size(data_filepath);
cycle_count = std::floor((file_size - block_offset - block.SizeHeader()) / chunk_cycle_length_);
}
for (std::size_t i = 0; i < items_produced_.size(); ++i)
for (std::size_t i = 0; i < output_stream_count_; ++i)
{
produce(i, items_produced_[i]);
output_stream_total_sample_counts_[i] = cycle_count * output_stream_item_rates_[i];
}
return WORK_CALLED_PRODUCE;
}
@ -99,6 +96,10 @@ std::size_t IONGSMSFileSource::output_stream_item_size(std::size_t stream_index)
return output_stream_item_sizes_[stream_index];
}
std::size_t IONGSMSFileSource::output_stream_total_sample_count(std::size_t stream_index) const
{
return output_stream_total_sample_counts_[stream_index];
}
gr::io_signature::sptr IONGSMSFileSource::make_output_signature(const GnssMetadata::Block& block, const std::vector<std::string>& stream_ids)
{
@ -140,3 +141,58 @@ gr::io_signature::sptr IONGSMSFileSource::make_output_signature(const GnssMetada
nstreams,
item_sizes);
}
int IONGSMSFileSource::work(
int noutput_items,
gr_vector_const_void_star& input_items __attribute__((unused)),
gr_vector_void_star& output_items)
{
// Compute the maximum number of samples that will be copied across all output buffer.
// If there are more than one output buffer (multichannel set up), the one with the most samples will be used as the maximum.
//
// Complex samples produce 2 items each (I and Q). In order to account for them, we subtract 1 from `noutput_items` and
// then floor the division. During testing, not doing this caused `max_sample_output` to oscillate between two values, thus
// resizing the `io_buffer_` on each call to `work()`.
const std::size_t max_sample_output = std::floor((noutput_items - 1.0) / maximum_item_rate_);
// Resize the IO buffer to fit exactly the maximum amount of samples that will be outputted.
io_buffer_.resize(max_sample_output * chunk_cycle_length_);
// We will be walking the IO buffer with this variable.
io_buffer_offset_ = 0;
// Read samples from file into IO buffer
const std::size_t bytes_to_read = io_buffer_.size();
file_stream_.read(io_buffer_.data(), bytes_to_read);
// Reset `items_produced_` vector. This vector will accumulate the amount of items produced for each output stream.
items_produced_.clear();
items_produced_.resize(output_items.size());
// Walk the IO buffer one chunk cycle at a time. See ION documentation for a definition of chunk and chunk cycle.
while (io_buffer_offset_ < bytes_to_read)
{
// Iterate chunks within a chunk cycle
for (auto& c : chunk_data_)
{
auto* chunk = c.get();
// Copy chunk into a separate buffer where the samples will be shifted from.
const std::size_t bytes_copied = chunk->read_from_buffer(reinterpret_cast<uint8_t*>(io_buffer_.data()), io_buffer_offset_);
// Advance IO buffer offset
io_buffer_offset_ += bytes_copied;
// Shift samples into output buffers following the appropriate unpacking strategy for this chunk.
chunk->write_to_output(output_items, items_produced_);
}
}
// Call `produce(int, int)` with the appropriate item count for each output stream.
for (std::size_t i = 0; i < items_produced_.size(); ++i)
{
produce(i, items_produced_[i]);
}
return WORK_CALLED_PRODUCE;
}

View File

@ -24,10 +24,10 @@
#include <gnuradio/sync_block.h>
#include <cstddef>
#include <cstdio>
#include <fstream>
#include <memory>
#include <string>
#include <vector>
#include <fstream>
/** \addtogroup Signal_Source
* \{ */
@ -52,6 +52,7 @@ public:
std::size_t output_stream_count() const;
std::size_t output_stream_item_size(std::size_t stream_index) const;
std::size_t output_stream_total_sample_count(std::size_t stream_index) const;
private:
static gr::io_signature::sptr make_output_signature(const GnssMetadata::Block& block, const std::vector<std::string>& stream_ids);
@ -62,6 +63,8 @@ private:
std::vector<int> items_produced_;
std::size_t output_stream_count_;
std::vector<std::size_t> output_stream_item_sizes_;
std::vector<std::size_t> output_stream_item_rates_;
std::vector<std::size_t> output_stream_total_sample_counts_;
std::size_t maximum_item_rate_;
std::vector<std::shared_ptr<IONGSMSChunkData>> chunk_data_;
std::size_t chunk_cycle_length_;