diff --git a/src/algorithms/observables/gnuradio_blocks/hybrid_observables_cc.cc b/src/algorithms/observables/gnuradio_blocks/hybrid_observables_cc.cc index 274dc0732..593e01b6b 100644 --- a/src/algorithms/observables/gnuradio_blocks/hybrid_observables_cc.cc +++ b/src/algorithms/observables/gnuradio_blocks/hybrid_observables_cc.cc @@ -372,14 +372,13 @@ void hybrid_observables_cc::find_interp_elements(const unsigned int &ch, const d } -void hybrid_observables_cc::forecast(int noutput_items __attribute__((unused)), - gr_vector_int &ninput_items_required) +void hybrid_observables_cc::forecast(int noutput_items, gr_vector_int &ninput_items_required) { for (unsigned int i = 0; i < d_nchannels; i++) { ninput_items_required[i] = 0; } - ninput_items_required[d_nchannels] = 1; + ninput_items_required[d_nchannels] = noutput_items; } @@ -459,152 +458,161 @@ int hybrid_observables_cc::general_work(int noutput_items __attribute__((unused) Gnss_Synchro **out = reinterpret_cast(&output_items[0]); unsigned int i; + unsigned int returned_elements = 0; int total_input_items = 0; for (i = 0; i < d_nchannels; i++) { total_input_items += ninput_items[i]; } - consume(d_nchannels, 1); - T_rx_s += T_rx_step_s; - - ////////////////////////////////////////////////////////////////////////// - if ((total_input_items == 0) and (d_num_valid_channels == 0)) + for (int epoch = 0; epoch < ninput_items[d_nchannels]; epoch++) { - return 0; - } - ////////////////////////////////////////////////////////////////////////// + T_rx_s += T_rx_step_s; - if (total_input_items > 0) - { - for (i = 0; i < d_nchannels; i++) + ////////////////////////////////////////////////////////////////////////// + if ((total_input_items == 0) and (d_num_valid_channels == 0)) { - if (ninput_items[i] > 0) + consume(d_nchannels, epoch + 1); + return returned_elements; + } + ////////////////////////////////////////////////////////////////////////// + + if (total_input_items > 0 and epoch == 0) + { + for (i = 0; i < d_nchannels; i++) { - // Add the new Gnss_Synchros to their corresponding deque - for (int aux = 0; aux < ninput_items[i]; aux++) + if (ninput_items[i] > 0) { - if (in[i][aux].Flag_valid_word) + // Add the new Gnss_Synchros to their corresponding deque + for (int aux = 0; aux < ninput_items[i]; aux++) { - d_gnss_synchro_history->push_back(i, in[i][aux]); - d_gnss_synchro_history->back(i).RX_time = compute_T_rx_s(in[i][aux]); - // Check if the last Gnss_Synchro comes from the same satellite as the previous ones - if (d_gnss_synchro_history->size(i) > 1) + if (in[i][aux].Flag_valid_word) { - if (d_gnss_synchro_history->front(i).PRN != d_gnss_synchro_history->back(i).PRN) + d_gnss_synchro_history->push_back(i, in[i][aux]); + d_gnss_synchro_history->back(i).RX_time = compute_T_rx_s(in[i][aux]); + // Check if the last Gnss_Synchro comes from the same satellite as the previous ones + if (d_gnss_synchro_history->size(i) > 1) { - d_gnss_synchro_history->clear(i); + if (d_gnss_synchro_history->front(i).PRN != d_gnss_synchro_history->back(i).PRN) + { + d_gnss_synchro_history->clear(i); + } } } } + consume(i, ninput_items[i]); } - consume(i, ninput_items[i]); } } - } - for (i = 0; i < d_nchannels; i++) - { - if (d_gnss_synchro_history->size(i) > 2) + for (i = 0; i < d_nchannels; i++) { - valid_channels[i] = true; - } - else - { - valid_channels[i] = false; - } - } - d_num_valid_channels = valid_channels.count(); - // Check if there is any valid channel after reading the new incoming Gnss_Synchro data - if (d_num_valid_channels == 0) - { - return 0; - } - - for (i = 0; i < d_nchannels; i++) //Discard observables with T_rx higher than the threshold - { - if (valid_channels[i]) - { - clean_history(i); - if (d_gnss_synchro_history->size(i) < 2) + if (d_gnss_synchro_history->size(i) > 2) { - valid_channels[i] = false; - } - } - } - - // Check if there is any valid channel after computing the time distance between the Gnss_Synchro data and the receiver time - d_num_valid_channels = valid_channels.count(); - double T_rx_s_out = T_rx_s - d_latency; - if ((d_num_valid_channels == 0) or (T_rx_s_out < 0.0)) - { - return 0; - } - - std::vector epoch_data; - for (i = 0; i < d_nchannels; i++) - { - if (valid_channels[i]) - { - Gnss_Synchro interpolated_gnss_synchro = d_gnss_synchro_history->back(i); - if (interpolate_data(interpolated_gnss_synchro, i, T_rx_s_out)) - { - epoch_data.push_back(interpolated_gnss_synchro); + valid_channels[i] = true; } else { valid_channels[i] = false; } } - } - d_num_valid_channels = valid_channels.count(); - if (d_num_valid_channels == 0) - { - return 0; - } - correct_TOW_and_compute_prange(epoch_data); - std::vector::iterator it = epoch_data.begin(); - for (i = 0; i < d_nchannels; i++) - { - if (valid_channels[i]) + d_num_valid_channels = valid_channels.count(); + // Check if there is any valid channel after reading the new incoming Gnss_Synchro data + if (d_num_valid_channels == 0) { - out[i][0] = (*it); - out[i][0].Flag_valid_pseudorange = true; - it++; + consume(d_nchannels, epoch + 1); + return returned_elements; } - else + + for (i = 0; i < d_nchannels; i++) //Discard observables with T_rx higher than the threshold { - out[i][0] = Gnss_Synchro(); - out[i][0].Flag_valid_pseudorange = false; - } - } - if (d_dump) - { - // MULTIPLEXED FILE RECORDING - Record results to file - try - { - double tmp_double; - for (i = 0; i < d_nchannels; i++) + if (valid_channels[i]) { - tmp_double = out[i][0].RX_time; - d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); - tmp_double = out[i][0].TOW_at_current_symbol_s; - d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); - tmp_double = out[i][0].Carrier_Doppler_hz; - d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); - tmp_double = out[i][0].Carrier_phase_rads / GPS_TWO_PI; - d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); - tmp_double = out[i][0].Pseudorange_m; - d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); - tmp_double = static_cast(out[i][0].PRN); - d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); - tmp_double = static_cast(out[i][0].Flag_valid_pseudorange); - d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); + clean_history(i); + if (d_gnss_synchro_history->size(i) < 2) + { + valid_channels[i] = false; + } } } - catch (const std::ifstream::failure &e) + + // Check if there is any valid channel after computing the time distance between the Gnss_Synchro data and the receiver time + d_num_valid_channels = valid_channels.count(); + double T_rx_s_out = T_rx_s - d_latency; + if ((d_num_valid_channels == 0) or (T_rx_s_out < 0.0)) { - LOG(WARNING) << "Exception writing observables dump file " << e.what(); - d_dump = false; + consume(d_nchannels, epoch + 1); + return returned_elements; } + + std::vector epoch_data; + for (i = 0; i < d_nchannels; i++) + { + if (valid_channels[i]) + { + Gnss_Synchro interpolated_gnss_synchro = d_gnss_synchro_history->back(i); + if (interpolate_data(interpolated_gnss_synchro, i, T_rx_s_out)) + { + epoch_data.push_back(interpolated_gnss_synchro); + } + else + { + valid_channels[i] = false; + } + } + } + d_num_valid_channels = valid_channels.count(); + if (d_num_valid_channels == 0) + { + consume(d_nchannels, epoch + 1); + return returned_elements; + } + correct_TOW_and_compute_prange(epoch_data); + std::vector::iterator it = epoch_data.begin(); + for (i = 0; i < d_nchannels; i++) + { + if (valid_channels[i]) + { + out[i][0] = (*it); + out[i][0].Flag_valid_pseudorange = true; + it++; + } + else + { + out[i][0] = Gnss_Synchro(); + out[i][0].Flag_valid_pseudorange = false; + } + } + if (d_dump) + { + // MULTIPLEXED FILE RECORDING - Record results to file + try + { + double tmp_double; + for (i = 0; i < d_nchannels; i++) + { + tmp_double = out[i][0].RX_time; + d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); + tmp_double = out[i][0].TOW_at_current_symbol_s; + d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); + tmp_double = out[i][0].Carrier_Doppler_hz; + d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); + tmp_double = out[i][0].Carrier_phase_rads / GPS_TWO_PI; + d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); + tmp_double = out[i][0].Pseudorange_m; + d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); + tmp_double = static_cast(out[i][0].PRN); + d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); + tmp_double = static_cast(out[i][0].Flag_valid_pseudorange); + d_dump_file.write(reinterpret_cast(&tmp_double), sizeof(double)); + } + } + catch (const std::ifstream::failure &e) + { + LOG(WARNING) << "Exception writing observables dump file " << e.what(); + d_dump = false; + } + } + returned_elements++; } - return 1; + consume(d_nchannels, ninput_items[d_nchannels]); + return returned_elements; }