Update branch python

This commit is contained in:
Perrielornitorrinco 2023-10-16 11:38:35 +02:00 committed by Carles Fernandez
parent 4a8f137dd9
commit a1a683fc7b
17 changed files with 2622 additions and 0 deletions

View File

@ -0,0 +1,110 @@
"""
dll_pll_veml_plot_sample.py
Reads GNSS-SDR Tracking dump binary file using the provided function and
plots some internal variables
Irene Pérez Riega, 2023. iperrie@inta.es
Modifiable in the file:
sampling_freq - Sampling frequency [Hz]
plot_last_outputs - If 0 -> process everything / number of items processed
channels - Number of channels
first_channel - Number of the first channel
doppler_opt - = 1 -> Plot // = 0 -> No plot
path - Path to folder which contains raw files
fig_path - Path where doppler plots will be save
'trk_dump_ch' - Fixed part of the tracking dump files names
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import os
import numpy as np
import matplotlib.pyplot as plt
from lib.dll_pll_veml_read_tracking_dump import dll_pll_veml_read_tracking_dump
from lib.plotVEMLTracking import plotVEMLTracking
trackResults = []
settings = {}
GNSS_tracking = []
# ---------- CHANGE HERE:
sampling_freq = 3000000
plot_last_outputs = 0
channels = 5
first_channel = 0
doppler_opt = 1
settings['numberOfChannels'] = channels
path = '/home/labnav/Desktop/TEST_IRENE/tracking'
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/Doppler'
for N in range(1, channels+1):
tracking_log_path = os.path.join(path,
f'trk_dump_ch{N-1+first_channel}.dat')
GNSS_tracking.append(dll_pll_veml_read_tracking_dump(tracking_log_path))
# GNSS-SDR format conversion to Python GPS receiver
for N in range (1, channels+1):
if 0 < plot_last_outputs < len(GNSS_tracking[N - 1].get("code_freq_hz")):
start_sample = (len(GNSS_tracking[N-1].get("code_freq_hz")) -
plot_last_outputs)
else:
start_sample = 0
trackResult = {
'status': 'T', # fake track
'codeFreq': np.copy(GNSS_tracking[N-1]["code_freq_hz"][start_sample:]),
'carrFreq': np.copy(GNSS_tracking[N-1]["carrier_doppler_hz"][start_sample:]),
'dllDiscr': np.copy(GNSS_tracking[N-1]["code_error"][start_sample:]),
'dllDiscrFilt': np.copy(GNSS_tracking[N-1]["code_nco"][start_sample:]),
'pllDiscr': np.copy(GNSS_tracking[N-1]["carr_error"][start_sample:]),
'pllDiscrFilt': np.copy(GNSS_tracking[N-1]["carr_nco"][start_sample:]),
'I_P': np.copy(GNSS_tracking[N-1]["P"][start_sample:]),
'Q_P': np.zeros(len(GNSS_tracking[N-1]["P"][start_sample:])),
'I_VE': np.copy(GNSS_tracking[N-1]["VE"][start_sample:]),
'I_E': np.copy(GNSS_tracking[N-1]["E"][start_sample:]),
'I_L': np.copy(GNSS_tracking[N-1]["L"][start_sample:]),
'I_VL': np.copy(GNSS_tracking[N-1]["VL"][start_sample:]),
'Q_VE': np.zeros(len(GNSS_tracking[N-1]["VE"][start_sample:])),
'Q_E': np.zeros(len(GNSS_tracking[N-1]["E"][start_sample:])),
'Q_L': np.zeros(len(GNSS_tracking[N-1]["L"][start_sample:])),
'Q_VL': np.zeros(len(GNSS_tracking[N-1]["VL"][start_sample:])),
'data_I': np.copy(GNSS_tracking[N-1]["prompt_I"][start_sample:]),
'data_Q': np.copy(GNSS_tracking[N-1]["prompt_Q"][start_sample:]),
'PRN': np.copy(GNSS_tracking[N-1]["PRN"][start_sample:]),
'CNo': np.copy(GNSS_tracking[N-1]["CN0_SNV_dB_Hz"][start_sample:]),
'prn_start_time_s': np.copy(GNSS_tracking[N-1]["PRN_start_sample"]
[start_sample:]) / sampling_freq
}
trackResults.append(trackResult)
# Plot results:
plotVEMLTracking(N,trackResults,settings)
# Plot Doppler according to selected in doppler_opt variable:
if doppler_opt == 1:
if not os.path.exists(fig_path):
os.makedirs(fig_path)
plt.figure()
plt.plot(trackResults[N - 1]['prn_start_time_s'],
[x/1000 for x in GNSS_tracking[N - 1]['carrier_doppler_hz']
[start_sample:]])
plt.xlabel('Time(s)')
plt.ylabel('Doppler(KHz)')
plt.title('Doppler frequency channel ' + str(N))
plt.savefig(os.path.join(fig_path, f'Doppler_freq_ch_{N}.png'))
plt.show()

View File

@ -0,0 +1,98 @@
"""
gps_l1_ca_kf_plot_sample.py
Reads GNSS-SDR Tracking dump binary file using the provided
function and plots some internal variables
Irene Pérez Riega, 2023. iperrie@inta.es
Modifiable in the file:
sampling_freq - Sampling frequency [Hz]
channels - Number of channels to check if they exist
first_channel - Number of the first channel
path - Path to folder which contains raw file
'trk_dump_ch' - Fixed part in tracking dump files names
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import os
import numpy as np
from lib.gps_l1_ca_kf_read_tracking_dump import gps_l1_ca_kf_read_tracking_dump
from lib.plotTracking import plotTracking
from lib.plotKalman import plotKalman
GNSS_tracking = []
trackResults = []
kalmanResults = []
# ---------- CHANGE HERE:
# Signal parameters:
samplingFreq = 6625000
channels = 5
first_channel = 0
code_period = 0.001
path = '/home/labnav/Desktop/TEST_IRENE/tracking'
for N in range(1, channels + 1):
tracking_log_path = os.path.join(path,
f'trk_dump_ch{N-1+first_channel}.dat')
GNSS_tracking.append(gps_l1_ca_kf_read_tracking_dump(tracking_log_path))
# todo lee lo mismo que dll_pll_velm_plot_sample y guarda diferente v13 y v14
# GNSS-SDR format conversion to Python GPS receiver
for N in range(1, channels + 1):
trackResult= {
'status': 'T', # fake track
'codeFreq': np.copy(GNSS_tracking[N - 1]["code_freq_hz"]),
'carrFreq': np.copy(GNSS_tracking[N - 1]["carrier_doppler_hz"]),
'carrFreqRate':
np.copy(GNSS_tracking[N - 1]["carrier_doppler_rate_hz2"]),#todo no se usa en dll, carrier_doppler_rate_hz_s segun dll
'dllDiscr': np.copy(GNSS_tracking[N - 1]["code_error"]),
'dllDiscrFilt': np.copy(GNSS_tracking[N - 1]["code_nco"]),
'pllDiscr': np.copy(GNSS_tracking[N - 1]["carr_error"]),#todo code_freq_rate_hz_s segun dll
'pllDiscrFilt': np.copy(GNSS_tracking[N - 1]["carr_nco"]),
'I_P': np.copy(GNSS_tracking[N - 1]["prompt_I"]),#todo distinto de dll
'Q_P': np.copy(GNSS_tracking[N - 1]["prompt_Q"]),#todo distinto de dll
'I_E': np.copy(GNSS_tracking[N - 1]["E"]),
'I_L': np.copy(GNSS_tracking[N - 1]["L"]),
'Q_E': np.zeros(len(GNSS_tracking[N - 1]["E"])),
'Q_L': np.zeros(len(GNSS_tracking[N - 1]["L"])),
'PRN': np.copy(GNSS_tracking[N - 1]["PRN"]),
'CNo': np.copy(GNSS_tracking[N - 1]["CN0_SNV_dB_Hz"])
}
kalmanResult= {
'PRN': np.copy(GNSS_tracking[N - 1]["PRN"]),
'innovation': np.copy(GNSS_tracking[N - 1]["carr_error"]),#todo code_freq_rate_hz_s segun dll
'state1': np.copy(GNSS_tracking[N - 1]["carr_nco"]),
'state2': np.copy(GNSS_tracking[N - 1]["carrier_doppler_hz"]),
'state3': GNSS_tracking[N - 1]["carrier_doppler_rate_hz2"],#todo segun el dll es carrier_doppler_rate_hz_s
'r_noise_cov': np.copy(GNSS_tracking[N - 1]["carr_noise_sigma2"]),#todo carr_error segun dll
'CNo': np.copy(GNSS_tracking[N - 1]["CN0_SNV_dB_Hz"])
}
trackResults.append(trackResult)
kalmanResults.append(kalmanResult)
settings = {
'numberOfChannels': channels,
'msToProcess': len(GNSS_tracking[N-1]['E']),
'codePeriod': code_period,
'timeStartInSeconds': 20
}
# Create and save graphics as PNG
plotTracking(N, trackResults, settings)
plotKalman(N, kalmanResults, settings)

View File

@ -0,0 +1,101 @@
"""
gps_l1_ca_pvt_raw_plot_sample.py
Reads GNSS-SDR PVT raw dump binary file using the provided function and plots
some internal variables
Irene Pérez Riega, 2023. iperrie@inta.es
Modifiable in the file:
sampling_freq - Sampling frequency [Hz]
channels - Number of channels to check if they exist
path - Path to folder which contains raw file
pvt_raw_log_path - Completed path to PVT raw data file
nav_sol_period - Measurement period [ms]
plot_skyplot - = 1 -> Sky Plot (TO DO) // = 0 -> No Sky Plot
true_position - In settings, If not known enter all NaN's and mean
position will be used as a reference in UTM
coordinate system
plot_position - Optional function at the end
plot_oneVStime - Optional function at the end, select variable to plot
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import utm
import numpy as np
from lib.gps_l1_ca_read_pvt_dump import gps_l1_ca_read_pvt_dump
from lib.plotNavigation import plotNavigation
import pyproj
from lib.plotPosition import plot_position, plot_oneVStime
settings = {}
utm_e = []
utm_n = []
E_UTM = []
N_UTM = []
utm_zone = []
# ---------- CHANGE HERE:
samplingFreq = 64e6 / 16
channels = 5
path = '/home/labnav/Desktop/TEST_IRENE/'
pvt_raw_log_path = path + 'PVT.dat'
nav_sol_period = 10
plot_skyplot = 0
settings['true_position'] = {'E_UTM':np.nan,'N_UTM':np.nan,'U_UTM':np.nan}
settings['navSolPeriod'] = nav_sol_period
navSolutions = gps_l1_ca_read_pvt_dump(pvt_raw_log_path)
X, Y, Z = navSolutions['X'], navSolutions['Y'], navSolutions['Z']
utm_coords = []
for i in range(len(navSolutions['longitude'])):
utm_coords.append(utm.from_latlon(navSolutions['latitude'][i],
navSolutions['longitude'][i]))
for i in range(len(utm_coords)):
utm_e.append(utm_coords[i][0])
utm_n.append(utm_coords[i][1])
utm_zone.append(utm_coords[i][2])
# Transform from Lat Long degrees to UTM coordinate system
# It's supposed utm_zone and letter will not change during tracking
input_projection = pyproj.CRS.from_string("+proj=longlat "
"+datum=WGS84 +no_defs")
utm_e = []
utm_n = []
for i in range(len(navSolutions['longitude'])):
output_projection = pyproj.CRS (f"+proj=utm +zone={utm_zone[i]} "
f"+datum=WGS84 +units=m +no_defs")
transformer = pyproj.Transformer.from_crs(input_projection,
output_projection)
utm_e, utm_n = transformer.transform(navSolutions['longitude'][i],
navSolutions['latitude'][i])
E_UTM.append(utm_e)
N_UTM.append(utm_n)
navSolutions['E_UTM'] = E_UTM
navSolutions['N_UTM'] = N_UTM
navSolutions['U_UTM'] = navSolutions['Z']
plotNavigation(navSolutions,settings,plot_skyplot)
# OPTIONAL: Other plots ->
plot_position(navSolutions)
plot_oneVStime(navSolutions, 'X_vel')
plot_oneVStime(navSolutions, 'Tot_Vel')

View File

@ -0,0 +1,99 @@
"""
gps_l1_ca_telemetry_plot_sample.py
Reads GNSS-SDR Tracking dump binary file using the provided function and
plots some internal variables
Irene Pérez Riega, 2023. iperrie@inta.es
Modifiable in the file:
sampling_freq - Sampling frequency [Hz]
channels - Number of channels to check if they exist
doppler_opt - = 1 -> Plot // = 0 -> No plot
path - Path to folder which contains raw file
fig_path - Path where plots will be save
chn_num_a / b - Channel which will be plotted
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import os
import matplotlib.pyplot as plt
from lib.gps_l1_ca_read_telemetry_dump import gps_l1_ca_read_telemetry_dump
GNSS_telemetry = []
# ---------- CHANGE HERE:
sampling_freq = 2000000
channels = list(range(18))
path = '/home/labnav/Desktop/TEST_IRENE/'
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/Telemetry'
if not os.path.exists(fig_path):
os.makedirs(fig_path)
i = 0
for N in channels:
try:
telemetry_log_path = os.path.join(path, f'telemetry{N}.dat')
telemetry_data = gps_l1_ca_read_telemetry_dump(telemetry_log_path)
GNSS_telemetry.append(telemetry_data)
i += 1
except:
pass
# ---------- CHANGE HERE:
chn_num_a = 0
chn_num_b = 5
# Plotting values
if chn_num_a in range(i) and chn_num_b in range(i):
# First Plot:
plt.figure()
plt.gcf().canvas.manager.set_window_title(f'Telem_Current_Simbol_TOW_'
f'{chn_num_a}_{chn_num_b}.png')
plt.plot(GNSS_telemetry[chn_num_a]['tracking_sample_counter'],
[x / 1000 for x in GNSS_telemetry[chn_num_a]
['tow_current_symbol_ms']], 'b')
plt.plot(GNSS_telemetry[chn_num_b]['tracking_sample_counter'],
GNSS_telemetry[chn_num_b]['tow_current_symbol_ms'], 'r')
plt.grid(True)
plt.xlabel('TRK Sampling Counter')
plt.ylabel('Current Symbol TOW')
plt.legend([f'CHN-{chn_num_a-1}', f'CHN-{chn_num_b-1}'])
plt.tight_layout()
plt.savefig(os.path.join(fig_path, f'Telem_Current_Simbol_TOW_{chn_num_a}'
f'_{chn_num_b}.png'))
plt.show()
# Second Plot:
plt.figure()
plt.gcf().canvas.manager.set_window_title(f'Telem_TRK_Sampling_Counter_'
f'{chn_num_a}_{chn_num_b}.png')
plt.plot(GNSS_telemetry[chn_num_a]['tracking_sample_counter'],
GNSS_telemetry[chn_num_a]['tow'], 'b')
plt.plot(GNSS_telemetry[chn_num_b]['tracking_sample_counter'],
GNSS_telemetry[chn_num_b]['tow'], 'r')
plt.grid(True)
plt.xlabel('TRK Sampling Counter')
plt.ylabel('Decoded Nav TOW')
plt.legend([f'CHN-{chn_num_a-1}', f'CHN-{chn_num_b-1}'])
plt.tight_layout()
plt.savefig(os.path.join(fig_path, f'Telem_TRK_Sampling_Counter_'
f'{chn_num_a}_{chn_num_b}.png'))
plt.show()

View File

@ -0,0 +1,139 @@
"""
hybrid_observables_plot_sample.py
Reads GNSS-SDR observables raw dump binary file using the provided function
and plots some internal variables
Irene Pérez Riega, 2023. iperrie@inta.es
Modifiable in the file:
sampling_freq - Sampling frequency [Hz]
channels - Number of channels to check if they exist
path - Path to folder which contains raw file
fig_path - Path where plots will be save
observables_log_path - Completed path to observables raw data file
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import numpy as np
import matplotlib.pyplot as plt
from lib.read_hybrid_observables_dump import read_hybrid_observables_dump
import os
observables = {}
double_size_bytes = 8
bytes_shift = 0
# ---------- CHANGE HERE:
samplingFreq = 2000000
channels = 5
path = '/home/labnav/Desktop/TEST_IRENE/'
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/HybridObservables/'
observables_log_path = path + 'observables.dat'
if not os.path.exists(fig_path):
os.makedirs(fig_path)
GNSS_observables = read_hybrid_observables_dump(channels,observables_log_path)
# Plot data
# --- optional: plot since it detect the first satellite connected
min_tow_idx = 1
obs_idx = 1
for n in range(0, channels):
idx = np.where(np.array(GNSS_observables['valid'][n])>0)[0][0]
# Find the index from the first satellite connected
if n == 0:
min_tow_idx = idx
if min_tow_idx > idx:
min_tow_idx = idx
obs_idx = n
# Plot observables from that index
# First plot
plt.figure()
plt.title('Pseudorange')
for i in range(channels):
plt.scatter(GNSS_observables['RX_time'][i][min_tow_idx:],
GNSS_observables['Pseudorange_m'][i][min_tow_idx:],s=1,
label=f'Channel {i}')
plt.xlim(GNSS_observables['RX_time'][obs_idx][min_tow_idx]-100,
GNSS_observables['RX_time'][obs_idx][-1]+100)
plt.grid(True)
plt.xlabel('TOW [s]')
plt.ylabel('Pseudorange [m]')
plt.legend()
plt.gcf().canvas.manager.set_window_title('Pseudorange.png')
plt.tight_layout()
plt.savefig(os.path.join(fig_path, 'Pseudorange.png'))
plt.show()
# Second plot
plt.figure()
plt.title('Carrier Phase')
for i in range(channels):
plt.scatter(GNSS_observables['RX_time'][i][min_tow_idx:],
GNSS_observables['Carrier_phase_hz'][i][min_tow_idx:],s=1,
label=f'Channel {i}')
plt.xlim(GNSS_observables['RX_time'][obs_idx][min_tow_idx]-100,
GNSS_observables['RX_time'][obs_idx][-1]+100)
plt.xlabel('TOW [s]')
plt.ylabel('Accumulated Carrier Phase [cycles]')
plt.grid(True)
plt.legend()
plt.gcf().canvas.manager.set_window_title('AccumulatedCarrierPhase.png')
plt.tight_layout()
plt.savefig(os.path.join(fig_path, 'AccumulatedCarrierPhase.png'))
plt.show()
# Third plot
plt.figure()
plt.title('Doppler Effect')
for i in range(channels):
plt.scatter(GNSS_observables['RX_time'][i][min_tow_idx:],
GNSS_observables['Carrier_Doppler_hz'][i][min_tow_idx:],s=1,
label=f'Channel {i}')
plt.xlim(GNSS_observables['RX_time'][obs_idx][min_tow_idx]-100,
GNSS_observables['RX_time'][obs_idx][-1]+100)
plt.xlabel('TOW [s]')
plt.ylabel('Doppler Frequency [Hz]')
plt.grid(True)
plt.legend()
plt.gcf().canvas.manager.set_window_title('DopplerFrequency.png')
plt.tight_layout()
plt.savefig(os.path.join(fig_path, 'DopplerFrequency.png'))
plt.show()
# Fourth plot
plt.figure()
plt.title('GNSS Channels captured')
for i in range(channels):
lab = 0
a = 0
while lab == 0:
lab = int(GNSS_observables["PRN"][i][min_tow_idx+a])
a += 1
plt.scatter(GNSS_observables['RX_time'][i][min_tow_idx:],
GNSS_observables['PRN'][i][min_tow_idx:], s=1,
label=f'PRN {i} = {lab}')
plt.xlim(GNSS_observables['RX_time'][obs_idx][min_tow_idx]-100,
GNSS_observables['RX_time'][obs_idx][-1]+100)
plt.xlabel('TOW [s]')
plt.ylabel('PRN')
plt.grid(True)
plt.legend()
plt.gcf().canvas.manager.set_window_title('PRNs.png')
plt.tight_layout()
plt.savefig(os.path.join(fig_path, 'PRNs.png'))
plt.show()

View File

@ -0,0 +1,222 @@
"""
dll_pll_veml_read_tracking_dump.py
dll_pll_veml_read_tracking_dump (filename)
Read GNSS-SDR Tracking dump binary file into Python.
Opens GNSS-SDR tracking binary log file .dat and returns the contents
Irene Pérez Riega, 2023. iperrie@inta.es
Args:
filename: path to file .dat with the raw data
Return:
GNSS_tracking: A dictionary with the processed data in lists
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import struct
import sys
def dll_pll_veml_read_tracking_dump (filename):
v1 = []
v2 = []
v3 = []
v4 = []
v5 = []
v6 = []
v7 = []
v8 = []
v9 = []
v10= []
v11 = []
v12 = []
v13 = []
v14 = []
v15 = []
v16 = []
v17 = []
v18 = []
v19 = []
v20 = []
v21 = []
v22 = []
GNSS_tracking = {}
bytes_shift = 0
if sys.maxsize > 2 ** 36: # 64 bits computer
float_size_bytes = 4
unsigned_long_int_size_bytes = 8
double_size_bytes = 8
unsigned_int_size_bytes = 4
else: # 32 bits
float_size_bytes = 4
unsigned_long_int_size_bytes = 4
double_size_bytes = 8
unsigned_int_size_bytes = 4
f = open(filename, 'rb')
if f is None:
return None
else:
while True:
f.seek(bytes_shift, 0)
# VE -> Magnitude of the Very Early correlator.
v1.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# E -> Magnitude of the Early correlator.
v2.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# P -> Magnitude of the Prompt correlator.
v3.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# L -> Magnitude of the Late correlator.
v4.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# VL -> Magnitude of the Very Late correlator.
v5.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# promp_I -> Value of the Prompt correlator in the In-phase
# component.
v6.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# promp_Q -> Value of the Prompt correlator in the Quadrature
# component.
v7.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# PRN_start_sample -> Sample counter from tracking start.
if unsigned_long_int_size_bytes == 8:
v8.append(struct.unpack(
'Q', f.read(unsigned_long_int_size_bytes))[0])
bytes_shift += unsigned_long_int_size_bytes
else:
v8.append(struct.unpack('I',
f.read(unsigned_int_size_bytes))[0])
bytes_shift += unsigned_int_size_bytes
f.seek(bytes_shift, 0)
# acc_carrier_phase_rad - > Accumulated carrier phase, in rad.
v9.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier doppler hz -> Doppler shift, in Hz.
v10.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier doppler rate hz s -> Doppler rate, in Hz/s.
v11.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code freq hz -> Code frequency, in chips/s.
v12.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code_freq_rate_hz_s -> Code frequency rate, in chips/s².
v13.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carr_error -> Raw carrier error (unfiltered) at the PLL
# output, in Hz.
v14.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carr_nco -> Carrier error at the output of the PLL
# filter, in Hz.
v15.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code error -> Raw code error (unfiltered) at the DLL
# output, in chips.
v16.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code nco -> Code error at the output of the DLL
# filter, in chips.
v17.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# CN0_SNV_dB_Hz -> C/N0 estimation, in dB-Hz.
v18.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier lock test -> Output of the carrier lock test.
v19.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# var 1 -> not used ?
v20.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# var 2 -> not used ?
v21.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# PRN -> Satellite ID.
v22.append(struct.unpack('I',
f.read(unsigned_int_size_bytes))[0])
bytes_shift += unsigned_int_size_bytes
f.seek(bytes_shift, 0)
# Check file
linea = f.readline()
if not linea:
break
f.close()
GNSS_tracking['VE'] = v1
GNSS_tracking['E'] = v2
GNSS_tracking['P'] = v3
GNSS_tracking['L'] = v4
GNSS_tracking['VL'] = v5
GNSS_tracking['prompt_I'] = v6
GNSS_tracking['prompt_Q'] = v7
GNSS_tracking['PRN_start_sample'] = v8
GNSS_tracking['acc_carrier_phase_rad'] = v9
GNSS_tracking['carrier_doppler_hz'] = v10
GNSS_tracking['carrier_doppler_rate_hz_s'] = v11
GNSS_tracking['code_freq_hz'] = v12
GNSS_tracking['code_freq_rate_hz_s'] = v13
GNSS_tracking['carr_error'] = v14
GNSS_tracking['carr_nco'] = v15
GNSS_tracking['code_error'] = v16
GNSS_tracking['code_nco'] = v17
GNSS_tracking['CN0_SNV_dB_Hz'] = v18
GNSS_tracking['carrier_lock_test'] = v19
GNSS_tracking['var1'] = v20
GNSS_tracking['var2'] = v21
GNSS_tracking['PRN'] = v22
return GNSS_tracking

View File

@ -0,0 +1,225 @@
"""
gps_l1_ca_kf_read_tracking_dump.py
Read GNSS-SDR Tracking dump binary file into Python.
Opens GNSS-SDR tracking binary log file .dat and returns the contents in a dictionary
gps_l1_ca_kf_read_tracking_dump(filename)
Args:
filename - Path to file .dat with the raw data
Return:
GNSS_tracking - A dictionary with the processed data in lists
Irene Pérez Riega, 2023. iperrie@inta.es
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import struct
import sys
def gps_l1_ca_kf_read_tracking_dump(filename):
bytes_shift = 0
GNSS_tracking = {}
v1 = []
v2 = []
v3 = []
v4 = []
v5 = []
v6 = []
v7 = []
v8 = []
v9 = []
v10= []
v11 = []
v12 = []
v13 = []
v14 = []
v15 = []
v16 = []
v17 = []
v18 = []
v19 = []
v20 = []
v21 = []
v22 = []
if sys.maxsize > 2 ** 36: # 64 bits computer
float_size_bytes = 4
unsigned_long_int_size_bytes = 8
double_size_bytes = 8
unsigned_int_size_bytes = 4
else: # 32 bits
float_size_bytes = 4
unsigned_long_int_size_bytes = 4
double_size_bytes = 8
unsigned_int_size_bytes = 4
f = open(filename, 'rb')
if f is None:
help(gps_l1_ca_kf_read_tracking_dump)
return None
else:
while True:
f.seek(bytes_shift, 0)
# VE -> Magnitude of the Very Early correlator.
v1.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# E -> Magnitude of the Early correlator.
v2.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# P -> Magnitude of the Prompt correlator.
v3.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# L -> Magnitude of the Late correlator.
v4.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# VL -> Magnitude of the Very Late correlator.
v5.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# promp_I -> Value of the Prompt correlator in the
# In-phase component.
v6.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# promp_Q -> Value of the Prompt correlator in the
# Quadrature component.
v7.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# PRN_start_sample -> Sample counter from tracking start.
if unsigned_long_int_size_bytes == 8:
v8.append(struct.unpack(
'Q', f.read(unsigned_long_int_size_bytes))[0])
bytes_shift += unsigned_long_int_size_bytes
else:
v8.append(struct.unpack(
'I', f.read(unsigned_int_size_bytes))[0])
bytes_shift += unsigned_int_size_bytes
f.seek(bytes_shift, 0)
# acc_carrier_phase_rad - > Accumulated carrier phase, in rad.
v9.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier doppler hz -> Doppler shift, in Hz.
v10.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier doppler rate hz s -> Doppler rate, in Hz/s.
v11.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code freq hz -> Code frequency, in chips/s.
v12.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code_freq_rate_hz_s -> Code frequency rate, in chips/s².
#todo carr_error in gps_l1_ca_kf_read_tracking_dump.m
v13.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carr_error -> Raw carrier error (unfiltered) at the PLL
# output, in Hz.
#todo carr_noise_sigma2 in gps_l1_ca_kf_read_tracking_dump.m
v14.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carr_nco -> Carrier error at the output of the PLL
# filter, in Hz.
v15.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code error -> Raw code error (unfiltered) at the DLL
# output, in chips.
v16.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code nco -> Code error at the output of the DLL
# filter, in chips.
v17.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# CN0_SNV_dB_Hz -> C/N0 estimation, in dB-Hz.
v18.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier lock test -> Output of the carrier lock test.
v19.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# var 1 -> not used ?
v20.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# var 2 -> not used ?
v21.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# PRN -> Satellite ID.
v22.append(struct.unpack('I',
f.read(unsigned_int_size_bytes))[0])
bytes_shift += unsigned_int_size_bytes
f.seek(bytes_shift, 0)
linea = f.readline()
if not linea:
break
f.close()
GNSS_tracking['VE'] = v1
GNSS_tracking['E'] = v2
GNSS_tracking['P'] = v3
GNSS_tracking['L'] = v4
GNSS_tracking['VL'] = v5
GNSS_tracking['prompt_I'] = v6
GNSS_tracking['prompt_Q'] = v7
GNSS_tracking['PRN_start_sample'] = v8
GNSS_tracking['acc_carrier_phase_rad'] = v9
GNSS_tracking['carrier_doppler_hz'] = v10
GNSS_tracking['carrier_doppler_rate_hz2'] = v11 #todo segun el dll es carrier_doppler_rate_hz_s
GNSS_tracking['code_freq_hz'] = v12
GNSS_tracking['carr_error'] = v13 #todo code_freq_rate_hz_s segun dll
GNSS_tracking['carr_noise_sigma2'] = v14 #todo carr_error segun dll
GNSS_tracking['carr_nco'] = v15
GNSS_tracking['code_error'] = v16
GNSS_tracking['code_nco'] = v17
GNSS_tracking['CN0_SNV_dB_Hz'] = v18
GNSS_tracking['carrier_lock_test'] = v19
GNSS_tracking['var1'] = v20
GNSS_tracking['var2'] = v21
GNSS_tracking['PRN'] = v22
return GNSS_tracking

View File

@ -0,0 +1,249 @@
"""
gps_l1_ca_read_pvt_dump.py
gps_l1_ca_read_pvt_dump (filename)
Open and read GNSS-SDR PVT binary log file (.dat) into Python, and
return the contents.
Irene Pérez Riega, 2023. iperrie@inta.es
Args:
filename: path to file PVT.dat with the raw data
Return:
nav_solutions: A dictionary with the processed data in lists
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import math
import struct
import numpy as np
def gps_l1_ca_read_pvt_dump(filename):
uint8_size_bytes = 1
uint32_size_bytes = 4
double_size_bytes = 8
float_size_bytes = 4
bytes_shift = 0
TOW = []
WEEK =[]
PVT_GPS_time = []
Clock_Offset = []
ECEF_X_POS = []
ECEF_Y_POS = []
ECEF_Z_POS = []
ECEF_X_VEL = []
ECEF_Y_VEL = []
ECEF_Z_VEL = []
C_XX = []
C_YY = []
C_ZZ = []
C_XY = []
C_YZ = []
C_ZX = []
Lat = []
Long = []
Height = []
num_valid_sats = []
RTKLIB_status = []
RTKLIB_type = []
AR_factor = []
AR_threshold = []
GDOP = []
PDOP = []
HDOP = []
VDOP = []
f = open(filename, 'rb')
if f is None:
return None
else:
while True:
f.seek(bytes_shift, 0)
# TOW -> (Time Of Week) [usually sec] uint32
TOW.append(struct.unpack('I',
f.read(uint32_size_bytes))[0])
bytes_shift += uint32_size_bytes
f.seek(bytes_shift, 0)
# WEEK -> uint32
WEEK.append(struct.unpack('I',
f.read(uint32_size_bytes))[0])
bytes_shift += uint32_size_bytes
f.seek(bytes_shift, 0)
# PVT_GPS_time -> double
PVT_GPS_time.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# User_clock_offset -> [s] double
Clock_Offset.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# ##### ECEF POS X,Y,X [m] + ECEF VEL X,Y,X [m/s] (6 x double) ######
ECEF_X_POS.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
ECEF_Y_POS.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
ECEF_Z_POS.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
ECEF_X_VEL.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
ECEF_Y_VEL.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
ECEF_Z_VEL.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# #### Position variance/covariance [m²]
# {c_xx,c_yy,c_zz,c_xy,c_yz,c_zx} (6 x double) ######
C_XX.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
C_YY.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
C_ZZ.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
C_XY.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
C_YZ.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
C_ZX.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# GEO user position Latitude -> [deg] double
Lat.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# GEO user position Longitude -> [deg] double
Long.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# GEO user position Height -> [m] double
Height.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# NUMBER OF VALID SATS -> uint8
num_valid_sats.append(struct.unpack('B',
f.read(uint8_size_bytes))[0])
bytes_shift += uint8_size_bytes
f.seek(bytes_shift, 0)
# RTKLIB solution status (Real-Time Kinematic) -> uint8
RTKLIB_status.append(struct.unpack('B',
f.read(uint8_size_bytes))[0])
bytes_shift += uint8_size_bytes
f.seek(bytes_shift, 0)
# RTKLIB solution type (0:xyz-ecef,1:enu-baseline) -> uint8
RTKLIB_type.append(struct.unpack('B',
f.read(uint8_size_bytes))[0])
bytes_shift += uint8_size_bytes
f.seek(bytes_shift, 0)
# AR ratio factor for validation -> float
AR_factor.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# AR ratio threshold for validation -> float
AR_threshold.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# ##### GDOP / PDOP / HDOP / VDOP (4 * double) #####
GDOP.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
PDOP.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
HDOP.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
VDOP.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# Check file
linea = f.readline()
if not linea:
break
f.close()
# Creating a list with total velocities [m/s]
vel = []
for i in range(len(ECEF_X_VEL)):
vel.append(math.sqrt(ECEF_X_VEL[i]**2 + ECEF_Y_VEL[i]**2 +
ECEF_Z_VEL[i]**2))
navSolutions = {
'TOW': TOW,
'WEEK': WEEK,
'TransmitTime': PVT_GPS_time,
'dt': Clock_Offset,
'X': ECEF_X_POS,
'Y': ECEF_Y_POS,
'Z': ECEF_Z_POS,
'X_vel': ECEF_X_VEL,
'Y_vel': ECEF_Y_VEL,
'Z_vel': ECEF_Z_VEL,
'Tot_Vel': vel,
'C_XX': C_XX,
'C_YY': C_YY,
'C_ZZ': C_ZZ,
'C_XY': C_XY,
'C_YZ': C_YZ,
'C_ZX': C_ZX,
'latitude': Lat,
'longitude': Long,
'height': Height,
'SATS': num_valid_sats,
'RTK_status': RTKLIB_status,
'RTK_type': RTKLIB_type,
'AR_factor': AR_factor,
'AR_threshold': AR_threshold,
'GDOP': np.array(GDOP),
'PDOP': np.array(PDOP),
'HDOP': np.array(HDOP),
'VDOP': np.array(VDOP)
}
return navSolutions

View File

@ -0,0 +1,86 @@
"""
gps_l1_ca_read_telemetry_dump.py
gps_l1_ca_read_telemetry_dump (filename)
Open and read GNSS-SDR telemetry binary log files (.dat) into Python, and
return the contents.
Irene Pérez Riega, 2023. iperrie@inta.es
Args:
filename - Path to file telemetry[N].dat with the raw data
Return:
telemetry - A dictionary with the processed data
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import struct
def gps_l1_ca_read_telemetry_dump(filename):
double_size_bytes = 8
int_size_bytes = 4
bytes_shift = 0
tow_current_symbol_ms = []
tracking_sample_counter = []
tow = []
nav_simbols = []
prn = []
f = open(filename, 'rb')
if f is None:
return None
else:
while True:
f.seek(bytes_shift, 0)
tow_current_symbol_ms.append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
tracking_sample_counter.append(struct.unpack(
'Q', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
tow.append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
nav_simbols.append(struct.unpack(
'I', f.read(int_size_bytes))[0])
bytes_shift += int_size_bytes
f.seek(bytes_shift, 0)
prn.append(struct.unpack('I', f.read(int_size_bytes))[0])
bytes_shift += int_size_bytes
f.seek(bytes_shift, 0)
# Check file
linea = f.readline()
if not linea:
break
telemetry = {
'tow_current_symbol_ms': tow_current_symbol_ms,
'tracking_sample_counter': tracking_sample_counter,
'tow': tow,
'nav_simbols': nav_simbols,
'prn': prn
}
return telemetry

View File

@ -0,0 +1,140 @@
"""
plotKalman.py
plotKalman (channelNr, trackResults, settings)
This function plots the tracking results for the given channel list.
Irene Pérez Riega, 2023. iperrie@inta.es
Args:
channelList - list of channels to be plotted.
trackResults - tracking results from the tracking function.
settings - receiver settings.
Modifiable in the file:
fig_path - Path where doppler plots will be save
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import matplotlib.pyplot as plt
import numpy as np
import os
def plotKalman(channelNr, trackResults, settings):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotKalman'
if not os.path.exists(fig_path):
os.makedirs(fig_path)
# Protection - if the list contains incorrect channel numbers
channelNr = np.intersect1d(channelNr,
np.arange(1, settings['numberOfChannels'] + 1))
for channelNr in channelNr:
time_start = settings['timeStartInSeconds']
time_axis_in_seconds = np.arange(1, settings['msToProcess']+1)/1000
# Plot all figures
plt.figure(figsize=(1920 / 100, 1080 / 100))
plt.clf()
plt.gcf().canvas.set_window_title(
f'Channel {channelNr} (PRN '
f'{str(trackResults[channelNr-1]["PRN"][-2])}) results')
plt.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.1,
hspace=0.4, wspace=0.4)
plt.tight_layout()
# Row 1
# ----- CNo for signal -----------------------------------------------
# Measure of the ratio between carrier signal power and noise power
plt.subplot(4, 2, 1)
plt.plot(time_axis_in_seconds,
trackResults[channelNr-1]['CNo'][:settings['msToProcess']],
'b')
plt.grid()
plt.axis('tight')
plt.xlabel('Time (s)')
plt.ylabel('CNo (dB-Hz)')
plt.title('Carrier to Noise Ratio', fontweight='bold')
# ----- PLL discriminator filtered -----------------------------------
plt.subplot(4, 2, 2)
plt.plot(time_axis_in_seconds,
trackResults[channelNr-1]['state1']
[:settings['msToProcess']], 'b')
plt.grid()
plt.axis('tight')
plt.xlim([time_start, time_axis_in_seconds[-1]])
plt.xlabel('Time (s)')
plt.ylabel('Phase Amplitude')
plt.title('Filtered Carrier Phase', fontweight='bold')
# Row 2
# ----- Carrier Frequency --------------------------------------------
# Filtered carrier frequency of (transmitted by a satellite)
# for a specific channel
plt.subplot(4, 2, 3)
plt.plot(time_axis_in_seconds[1:],
trackResults[channelNr-1]['state2']
[1:settings['msToProcess']], color=[0.42, 0.25, 0.39])
plt.grid()
plt.axis('auto')
plt.xlim(time_start, time_axis_in_seconds[-1])
plt.xlabel('Time (s)')
plt.ylabel('Freq (Hz)')
plt.title('Filtered Carrier Frequency', fontweight='bold')
# ----- Carrier Frequency Rate ---------------------------------------
plt.subplot(4, 2, 4)
plt.plot(time_axis_in_seconds[1:],
trackResults[channelNr-1]['state3']
[1:settings['msToProcess']], color=[0.42, 0.25, 0.39])
plt.grid()
plt.axis('auto')
plt.xlim(time_start, time_axis_in_seconds[-1])
plt.xlabel('Time (s)')
plt.ylabel('Freq (Hz)')
plt.title('Filtered Carrier Frequency Rate', fontweight='bold')
# Row 3
# ----- PLL discriminator unfiltered----------------------------------
plt.subplot(4, 2, (5,6))
plt.plot(time_axis_in_seconds,
trackResults[channelNr-1]['innovation'], 'r')
plt.grid()
plt.axis('auto')
plt.xlim(time_start, time_axis_in_seconds[-1])
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.title('Raw PLL discriminator (Innovation)',fontweight='bold')
# Row 4
# ----- PLL discriminator covariance ---------------------------------
plt.subplot(4, 2, (7,8))
plt.plot(time_axis_in_seconds,
trackResults[channelNr-1]['r_noise_cov'], 'r')
plt.grid()
plt.axis('auto')
plt.xlim(time_start, time_axis_in_seconds[-1])
plt.xlabel('Time (s)')
plt.ylabel('Variance')
plt.title('Estimated Noise Variance', fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(fig_path,
f'kalman_ch{channelNr}_PRN_'
f'{trackResults[channelNr - 1]["PRN"][-1]}'
f'.png'))
plt.show()

View File

@ -0,0 +1,134 @@
"""
plotNavigation.py
Function plots variations of coordinates over time and a 3D position
plot. It plots receiver coordinates in UTM system or coordinate offsets if
the true UTM receiver coordinates are provided.
Irene Pérez Riega, 2023. iperrie@inta.es
plotNavigation(navSolutions, settings, plot_skyplot)
Args:
navSolutions - Results from navigation solution function. It
contains measured pseudoranges and receiver
coordinates.
settings - Receiver settings. The true receiver coordinates
are contained in this structure.
plot_skyplot - If == 1 then use satellite coordinates to plot the
satellite positions (not implemented yet TO DO)
Modifiable in the file:
fig_path - Path where plots will be save
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import numpy as np
import matplotlib.pyplot as plt
import os
def plotNavigation(navSolutions, settings, plot_skyplot=0):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotNavigation'
if not os.path.exists(fig_path):
os.makedirs(fig_path)
if navSolutions:
if (np.isnan(settings['true_position']['E_UTM']) or
np.isnan(settings['true_position']['N_UTM']) or
np.isnan(settings['true_position']['U_UTM'])):
# Compute mean values
ref_coord = {
'E_UTM': np.nanmean(navSolutions['E_UTM']),
'N_UTM': np.nanmean(navSolutions['N_UTM']),
'U_UTM': np.nanmean(navSolutions['U_UTM'])
}
mean_latitude = np.nanmean(navSolutions['latitude'])
mean_longitude = np.nanmean(navSolutions['longitude'])
mean_height = np.nanmean(navSolutions['height'])
ref_point_lg_text = (f"Mean Position\nLat: {mean_latitude}º\n"
f"Long: {mean_longitude}º\n"
f"Hgt: {mean_height:+6.1f}")
else:
# Compute the mean error for static receiver
ref_coord = {
'E_UTM': settings.truePosition['E_UTM'],
'N_UTM': settings.truePosition['N_UTM'],
'U_UTM': settings.truePosition['U_UTM']
}
mean_position = {
'E_UTM': np.nanmean(navSolutions['E_UTM']),
'N_UTM': np.nanmean(navSolutions['N_UTM']),
'U_UTM': np.nanmean(navSolutions['U_UTM'])
}
error_meters = np.sqrt(
(mean_position['E_UTM'] - ref_coord['E_UTM']) ** 2 +
(mean_position['N_UTM'] - ref_coord['N_UTM']) ** 2 +
(mean_position['U_UTM'] - ref_coord['U_UTM']) ** 2)
ref_point_lg_text = (f"Reference Position, Mean 3D error = "
f"{error_meters} [m]")
#Create plot and subplots
plt.figure(figsize=(1920 / 120, 1080 / 120))
plt.clf()
plt.title('Navigation solutions',fontweight='bold')
ax1 = plt.subplot(4, 2, (1, 4))
ax2 = plt.subplot(4, 2, (5, 7), projection='3d')
ax3 = plt.subplot(4, 2, (6, 8), projection='3d')
# (ax1) Coordinate differences in UTM system from reference point
ax1.plot(np.vstack([navSolutions['E_UTM'] - ref_coord['E_UTM'],
navSolutions['N_UTM'] - ref_coord['N_UTM'],
navSolutions['U_UTM'] - ref_coord['U_UTM']]).T)
ax1.set_title('Coordinates variations in UTM system', fontweight='bold')
ax1.legend(['E_UTM', 'N_UTM', 'U_UTM'])
ax1.set_xlabel(f"Measurement period: {settings['navSolPeriod']} ms")
ax1.set_ylabel('Variations (m)')
ax1.grid(True)
ax1.axis('tight')
# (ax2) Satellite sky plot
if plot_skyplot: #todo posicion de los satelites
skyPlot(ax2, navSolutions['channel']['az'],
navSolutions['channel']['el'],
navSolutions['channel']['PRN'][:, 0])
ax2.set_title(f'Sky plot (mean PDOP: '
f'{np.nanmean(navSolutions["DOP"][1, :]):.1f})',
fontweight='bold')
# (ax3) Position plot in UTM system
ax3.scatter(navSolutions['E_UTM'] - ref_coord['E_UTM'],
navSolutions['N_UTM'] - ref_coord['N_UTM'],
navSolutions['U_UTM'] - ref_coord['U_UTM'], marker='+')
ax3.scatter([0], [0], [0], color='r', marker='+', linewidth=1.5)
ax3.view_init(0, 90)
ax3.set_box_aspect([1, 1, 1])
ax3.grid(True, which='minor')
ax3.legend(['Measurements', ref_point_lg_text])
ax3.set_title('Positions in UTM system (3D plot)',fontweight='bold')
ax3.set_xlabel('East (m)')
ax3.set_ylabel('North (m)')
ax3.set_zlabel('Upping (m)')
plt.tight_layout()
plt.savefig(os.path.join(fig_path, 'measures_UTM.png'))
plt.show()

View File

@ -0,0 +1,208 @@
"""
plotPosition.py
plot_position(navSolutions)
Graph Latitude-Longitude and X-Y-X as a function of Transmit Time
Args:
navSolutions - A dictionary with the processed information in lists
plot_oneVStime(navSolutions, name)
Graph of a variable as a function of transmission time
Args:
navSolutions - A dictionary with the processed information in lists
name - navSolutions variable name that we want to plot
calcularCEFP(percentil, navSolutions, m_lat, m_long)
Calculate CEFP radio [m] for n percentil.
Args:
percentil - Number of measures that will be inside the circumference
navSolutions - A dictionary with the processed information in lists
m_lat - Mean latitude measures [º]
m_long - Mean longitude measures [º]
Modifiable in the file:
fig_path - Path where plots will be save
fig_path_maps - Path where the maps will be save
filename_map - Path where map will be save
filename_map_t - Path where terrain map will be save
Irene Pérez Riega, 2023. iperrie@inta.es
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import math
import os.path
import webbrowser
import numpy as np
import matplotlib.pyplot as plt
import folium
def plot_position(navSolutions):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotPosition/'
fig_path_maps = fig_path + 'maps/'
filename_map = 'mapPlotPosition.html'
filename_map_t = 'mapTerrainPotPosition.html'
if not os.path.exists(fig_path_maps):
os.mkdir(fig_path_maps)
# Statics Positions:
m_lat = sum(navSolutions['latitude']) / len(navSolutions['latitude'])
m_long = sum(navSolutions['longitude']) / len(navSolutions['longitude'])
# CEFP_n -> Include the n% of the dots in the circle
r_CEFP_95 = calcularCEFP(95, navSolutions, m_lat, m_long)
r_CEFP_50 = calcularCEFP(50, navSolutions, m_lat, m_long)
# Generate and save html with the positions
m = folium.Map(location=[navSolutions['latitude'][0],
navSolutions['longitude'][0]], zoom_start=100)
c_CEFP95 = folium.Circle(location=[m_lat, m_long],
radius=r_CEFP_95, color='green', fill=True,
fill_color='green', fill_opacity=0.5)
c_CEFP50 = folium.Circle(location=[m_lat, m_long], radius=r_CEFP_50,
color='red', fill=True, fill_color='red',
fill_opacity=0.5)
# POP-UPs
popup95 = folium.Popup("(Green)CEFP95 diameter: {} "
"metres".format(2 * r_CEFP_95))
popup95.add_to(c_CEFP95)
popup50 = folium.Popup("(Red)CEFP50 diameter: {} "
"metres".format(2 * r_CEFP_50))
popup50.add_to(c_CEFP50)
c_CEFP95.add_to(m)
c_CEFP50.add_to(m)
# Optional: Plot each point ->
"""
for i in range(len(navSolutions['latitude'])):
folium.Marker(location=[navSolutions['latitude'][i],
navSolutions['longitude'][i]],
icon=folium.Icon(color='red')).add_to(m)
"""
m.save(fig_path_maps + filename_map)
webbrowser.open(fig_path_maps + filename_map)
# Optional: with terrain ->
"""
n = folium.Map(location=[navSolutions['latitude'][0],
navSolutions['longitude'][0]], zoom_start=100,
tiles='Stamen Terrain')
c_CEFP95.add_to(n)
c_CEFP50.add_to(n)
n.save(fig_path_maps + filename_map_t)
webbrowser.open(fig_path_maps + filename_map_t)
"""
# Plot ->
time = []
for i in range(len(navSolutions['TransmitTime'])):
time.append(round(navSolutions['TransmitTime'][i] -
min(navSolutions['TransmitTime']), 3))
plt.figure(figsize=(1920 / 120, 1080 / 120))
plt.clf()
plt.suptitle(f'Plot file PVT process data results')
# Latitude and Longitude
plt.subplot(1, 2, 1)
scatter = plt.scatter(navSolutions['latitude'], navSolutions['longitude'],
c=time, marker='.')
plt.grid()
plt.ticklabel_format(style='plain', axis='both', useOffset=False)
plt.title('Positions latitud-longitud')
plt.xlabel('Latitude º')
plt.ylabel('Longitude º')
plt.axis('tight')
# Colors
cmap = plt.get_cmap('viridis')
norm = plt.Normalize(vmin=min(time), vmax=max(time))
scatter.set_cmap(cmap)
scatter.set_norm(norm)
colors = plt.colorbar(scatter)
colors.set_label('TransmitTime [s]')
# X, Y, Z
ax = plt.subplot(1, 2, 2, projection='3d')
plt.ticklabel_format(style='plain', axis='both', useOffset=False)
ax.scatter(navSolutions['X'], navSolutions['Y'], navSolutions['Z'],
c=time, marker='.')
ax.set_xlabel('Eje X [m]')
ax.set_ylabel('Eje Y [m]')
ax.set_zlabel('Eje Z [m]')
ax.set_title('Positions x-y-z')
plt.tight_layout()
plt.savefig(os.path.join(fig_path, f'PVT_ProcessDataResults.png'))
plt.show()
def plot_oneVStime(navSolutions, name):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotPosition/'
if not os.path.exists(fig_path):
os.mkdir(fig_path)
time = []
for i in range(len(navSolutions['TransmitTime'])):
time.append(round(navSolutions['TransmitTime'][i] -
min(navSolutions['TransmitTime']), 3))
plt.clf()
plt.scatter(time, navSolutions[name], marker='.')
plt.grid()
plt.title(f'{name} vs Time')
plt.xlabel('Time [s]')
plt.ylabel(name)
plt.axis('tight')
plt.ticklabel_format(style='plain', axis='both', useOffset=False)
plt.tight_layout()
plt.savefig(os.path.join(fig_path, f'{name}VSTime.png'))
plt.show()
def calcularCEFP(percentil, navSolutions, m_lat, m_long):
r_earth = 6371000
lat = []
long = []
dlat = []
dlong = []
dist = []
m_lat = math.radians(m_lat)
m_long = math.radians(m_long)
for i in range(len(navSolutions['latitude'])):
lat.append(math.radians(navSolutions['latitude'][i]))
long.append(math.radians(navSolutions['longitude'][i]))
for i in range(len(lat)):
dlat.append(m_lat - lat[i])
dlong.append(m_long - long[i])
# Haversine:
a = (math.sin(dlat[i] / 2) ** 2 +
math.cos(lat[i]) * math.cos(m_lat) * math.sin(dlong[i] / 2) ** 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
dist.append(r_earth * c)
# Radio CEFP
radio_CEFP_p = np.percentile(dist, percentil)
return radio_CEFP_p

View File

@ -0,0 +1,190 @@
"""
plotTracking.py
This function plots the tracking results for the given channel list.
Irene Pérez Riega, 2023. iperrie@inta.es
plotTracking(channelList, trackResults, settings)
Args:
channelList - list of channels to be plotted.
trackResults - tracking results from the tracking function.
settings - receiver settings.
Modifiable in the file:
fig_path - Path where plots will be save
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import numpy as np
import os
import matplotlib.pyplot as plt
def plotTracking(channelNr, trackResults, settings):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotTracking'
if not os.path.exists(fig_path):
os.makedirs(fig_path)
# Protection - if the list contains incorrect channel numbers
if channelNr in list(range(1,settings["numberOfChannels"]+1)):
plt.figure(figsize=(1920 / 120, 1080 / 120))
plt.clf()
plt.gcf().canvas.set_window_title(
f'Channel {channelNr} (PRN '
f'{trackResults[channelNr-1]["PRN"][0]}) results')
plt.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.1,
hspace=0.4, wspace=0.4)
plt.tight_layout()
# Extract timeAxis and time_label
if 'prn_start_time_s' in trackResults[channelNr-1]:
timeAxis = trackResults[channelNr-1]['prn_start_time_s']
time_label = 'RX Time (s)'
else:
timeAxis = np.arange(1, len(trackResults[channelNr-1]['PRN']) + 1)
time_label = 'Epoch'
# Row 1 ==============================================================
# Discrete-Time Scatter Plot
plt.subplot(4, 3, 1)
plt.plot(trackResults[channelNr-1]['I_P'],
trackResults[channelNr-1]['Q_P'], marker='.', markersize=1,
linestyle=' ')
plt.grid()
plt.axis('equal')
plt.title('Discrete-Time Scatter Plot', fontweight='bold')
plt.xlabel('I prompt')
plt.ylabel('Q prompt')
# Nav bits
plt.subplot(4, 3, (2, 3))
plt.plot(timeAxis, trackResults[channelNr-1]['I_P'], linewidth=1)
plt.grid()
plt.title('Bits of the navigation message', fontweight='bold')
plt.xlabel(time_label)
plt.axis('tight')
# Row 2 ==============================================================
# Raw PLL discriminator unfiltered
plt.subplot(4, 3, 4)
plt.plot(timeAxis, trackResults[channelNr-1]['pllDiscr'],
color='r', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Raw PLL discriminator', fontweight='bold')
# Correlation results
plt.subplot(4, 3, (5, 6))
corr_data = [
np.sqrt(trackResults[channelNr-1]['I_E'] ** 2 +
trackResults[channelNr-1]['Q_E'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_P'] ** 2 +
trackResults[channelNr-1]['Q_P'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_L'] ** 2 +
trackResults[channelNr - 1]['Q_L'] ** 2)
]
line = []
colors = ['b', '#FF6600', '#FFD700', 'purple', 'g']
for i, data in enumerate(corr_data):
line.append(plt.plot(timeAxis, data,
label=f'Data {i+1}', color=colors[i],
marker='*', linestyle=' ', linewidth=1))
plt.grid()
plt.title('Correlation results', fontweight='bold')
plt.xlabel(time_label)
plt.axis('tight')
plt.legend([r'$\sqrt{I_{VE}^2 + Q_{VE}^2}$',
r'$\sqrt{I_{E}^2 + Q_{E}^2}$',
r'$\sqrt{I_{P}^2 + Q_{P}^2}$',
r'$\sqrt{I_{L}^2 + Q_{L}^2}$',
r'$\sqrt{I_{VL}^2 + Q_{VL}^2}$'], loc='best')
# Row 3 ==============================================================
# Filtered PLL discriminator
plt.subplot(4, 3, 7)
plt.plot(timeAxis, trackResults[channelNr-1]['pllDiscrFilt'],
'b', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Filtered PLL discriminator', fontweight='bold')
# Raw DLL discriminator unfiltered
plt.subplot(4, 3, 8)
plt.plot(timeAxis, trackResults[channelNr-1]['dllDiscr'],
'r', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Raw DLL discriminator', fontweight='bold')
# Filtered DLL discriminator
plt.subplot(4, 3, 9)
plt.plot(timeAxis, trackResults[channelNr-1]['dllDiscrFilt'],
'b', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Filtered DLL discriminator', fontweight='bold')
# Row 4 ==============================================================
# CNo for signal
plt.subplot(4, 3, 10)
plt.plot(timeAxis, trackResults[channelNr-1]['CNo'], 'b',
linewidth=1)
plt.grid()
plt.axis('equal')
plt.xlabel('Time (s)')
plt.ylabel('CNo (dB-Hz)')
plt.title('Carrier to Noise Ratio', fontweight='bold')
# Carrier Frequency
plt.subplot(4, 3, 11)
plt.plot(timeAxis, trackResults[channelNr-1]['carrFreq'],
marker='.', markersize=1, linestyle=' ')
plt.grid()
plt.axis('equal')
plt.xlabel('Time (s)')
plt.ylabel('Freq (hz)')
plt.title('Carrier Frequency', fontweight='bold')
# Code Frequency
# Skip sample 0 to help with results display
plt.subplot(4, 3, 12)
plt.plot(timeAxis, trackResults[channelNr-1]['codeFreq'],
marker='.', markersize=1, linestyle=' ')
plt.grid()
plt.axis('equal')
plt.xlabel('Time (s)')
plt.ylabel('Freq (Hz)')
plt.title('Code Frequency',fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(fig_path,
f'trk_dump_ch{channelNr}_PRN_'
f'{trackResults[channelNr - 1]["PRN"][-1]}'
f'.png'))
plt.show()

View File

@ -0,0 +1,171 @@
"""
plotVEMLTracking.py
This function plots the tracking results for the given channel list.
Irene Pérez Riega, 2023. iperrie@inta.es
plotVEMLTracking(channelNr, trackResults, settings)
Args:
channelList - list of channels to be plotted.
trackResults - tracking results from the tracking function.
settings - receiver settings.
Modifiable in the file:
fig_path - Path where plots will be save
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import matplotlib.pyplot as plt
import numpy as np
import os
def plotVEMLTracking(channelNr, trackResults, settings):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/VEMLTracking'
if not os.path.exists(fig_path):
os.makedirs(fig_path)
# Protection - if the list contains incorrect channel numbers
if channelNr in list(range(1,settings["numberOfChannels"]+1)):
plt.figure(figsize=(1920 / 120, 1080 / 120))
plt.clf()
plt.gcf().canvas.set_window_title(
f'Channel {channelNr} (PRN '
f'{trackResults[channelNr-1]["PRN"][0]}) results')
plt.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.1,
hspace=0.4, wspace=0.4)
# Extract timeAxis and time_label
if 'prn_start_time_s' in trackResults[channelNr-1]:
timeAxis = trackResults[channelNr-1]['prn_start_time_s']
time_label = 'RX Time (s)'
else:
timeAxis = np.arange(1, len(trackResults[channelNr-1]['PRN']) + 1)
time_label = 'Epoch'
len_dataI = len (trackResults[channelNr-1]["data_I"])
len_dataQ = len (trackResults[channelNr-1]["data_Q"])
if len_dataI < len_dataQ:
dif = len_dataQ - len_dataI
trackResults[channelNr-1]["data_I"] = np.pad(
trackResults[channelNr-1]["data_I"], pad_width=(0,dif),
mode="constant", constant_values=0)
elif len_dataQ < len_dataI:
dif = len_dataI - len_dataQ
trackResults[channelNr-1]["data_Q"] = np.pad(
trackResults[channelNr-1]["data_Q"], pad_width=(0,dif),
mode="constant", constant_values=0 )
# Discrete-Time Scatter Plot
plt.subplot(3, 3, 1)
plt.plot(trackResults[channelNr-1]['data_I'],
trackResults[channelNr-1]['data_Q'], marker='.',
markersize=1, linestyle=' ')
plt.grid()
plt.axis('equal')
plt.title('Discrete-Time Scatter Plot', fontweight='bold')
plt.xlabel('I prompt')
plt.ylabel('Q prompt')
# Nav bits
plt.subplot(3, 3, (2, 3))
plt.plot(timeAxis, trackResults[channelNr-1]['data_I'],
linewidth=1)
plt.grid()
plt.title('Bits of the navigation message', fontweight='bold')
plt.xlabel(time_label)
plt.axis('tight')
# Raw PLL discriminator unfiltered
plt.subplot(3, 3, 4)
plt.plot(timeAxis, trackResults[channelNr-1]['pllDiscr'],
color='r', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Raw PLL discriminator', fontweight='bold')
# Correlation results
plt.subplot(3, 3, (5, 6))
corr_data = [
np.sqrt(trackResults[channelNr-1]['I_VE'] ** 2 +
trackResults[channelNr-1]['Q_VE'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_E'] ** 2 +
trackResults[channelNr-1]['Q_E'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_P'] ** 2 +
trackResults[channelNr-1]['Q_P'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_L'] ** 2 +
trackResults[channelNr-1]['Q_L'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_VL'] ** 2 +
trackResults[channelNr-1]['Q_VL'] ** 2)
]
line = []
colors = ['b','#FF6600','#FFD700','purple','g']
for i, data in enumerate(corr_data):
line.append(plt.plot(timeAxis, data, label=f'Data {i+1}',
color=colors[i], marker='*', linestyle=' ',
linewidth=1))
plt.grid()
plt.title('Correlation results',fontweight='bold')
plt.xlabel(time_label)
plt.axis('tight')
plt.legend([r'$\sqrt{I_{VE}^2 + Q_{VE}^2}$',
r'$\sqrt{I_{E}^2 + Q_{E}^2}$',
r'$\sqrt{I_{P}^2 + Q_{P}^2}$',
r'$\sqrt{I_{L}^2 + Q_{L}^2}$',
r'$\sqrt{I_{VL}^2 + Q_{VL}^2}$'], loc='best')
# Filtered PLL discriminator
plt.subplot(3, 3, 7)
plt.plot(timeAxis, trackResults[channelNr-1]['pllDiscrFilt'],
'b', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Filtered PLL discriminator', fontweight='bold')
# Raw DLL discriminator unfiltered
plt.subplot(3, 3, 8)
plt.plot(timeAxis, trackResults[channelNr-1]['dllDiscr'], 'r',
linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Raw DLL discriminator',fontweight='bold')
# Filtered DLL discriminator
plt.subplot(3, 3, 9)
plt.plot(timeAxis, trackResults[channelNr-1]['dllDiscrFilt'],
'b', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Filtered DLL discriminator',fontweight='bold')
plt.savefig(os.path.join(fig_path,
f'Ch{channelNr}_PRN'
f'{trackResults[channelNr-1]["PRN"][0]}'
f'_results'))
plt.show()

View File

@ -0,0 +1,112 @@
"""
read_hybrid_observables_dump.py
This function plots the tracking results for the given channel list.
Irene Pérez Riega, 2023. iperrie@inta.es
read_hybrid_observables_dump(channels, filename)
Args:
channels - list of channels to be processed
filename - path to the observables file
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import struct
def read_hybrid_observables_dump(channels, filename):
double_size_bytes = 8
bytes_shift = 0
RX_time = [[] for _ in range(channels+1)]
d_TOW_at_current_symbol = [[] for _ in range(channels+1)]
Carrier_Doppler_hz = [[] for _ in range(channels+1)]
Carrier_phase_hz = [[] for _ in range(channels+1)]
Pseudorange_m = [[] for _ in range(channels+1)]
PRN = [[] for _ in range(channels+1)]
valid = [[] for _ in range(channels+1)]
f = open(filename, 'rb')
if f is None:
return None
else:
while True:
try:
# There is an empty channel at the end (Channel-6)
for N in range(0, channels+1):
f.seek(bytes_shift, 0)
RX_time[N].append(struct.unpack(
'd',f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
d_TOW_at_current_symbol[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
Carrier_Doppler_hz[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
Carrier_phase_hz[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
Pseudorange_m[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
PRN[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
valid[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
except:
break
# Delete last Channel:
RX_time = [row for i, row in enumerate(RX_time) if i != 5]
d_TOW_at_current_symbol = [row for i, row in enumerate(
d_TOW_at_current_symbol)if i != 5]
Carrier_Doppler_hz = [row for i, row in enumerate(
Carrier_Doppler_hz) if i != 5]
Carrier_phase_hz = [row for i, row in enumerate(
Carrier_phase_hz) if i != 5]
Pseudorange_m = [row for i, row in enumerate(Pseudorange_m) if i != 5]
PRN = [row for i, row in enumerate(PRN) if i != 5]
valid = [row for i, row in enumerate(valid) if i != 5]
observables = {
'RX_time': RX_time,
'd_TOW_at_current_symbol': d_TOW_at_current_symbol,
'Carrier_Doppler_hz': Carrier_Doppler_hz,
'Carrier_phase_hz': Carrier_phase_hz,
'Pseudorange_m': Pseudorange_m,
'PRN': PRN,
'valid':valid
}
f.close()
return observables

View File

@ -0,0 +1,262 @@
"""
plot_acq_grid.py
Reads GNSS-SDR Acquisition dump .mat file using the provided function and
plots acquisition grid of acquisition statistic of PRN sat
Irene Pérez Riega, 2023. iperrie@inta.es
Modifiable in the file:
sampling_freq - Sampling frequency [Hz]
channels - Number of channels to check if they exist
path - Path to folder which contains raw file
fig_path - Path where plots will be save
plot_all_files - Plot all the files in a folder (True/False)
----
file - Fixed part in files names. In our case: acq_dump
sat - Satellite. In our case: 1
channel - Channel. In our case: 1
execution - In our case: 0
signal_type - In our case: 1
----
lite_view - True for light grid representation
File format:
{path}/{file}_ch_{system}_{signal}_ch_{channel}_{execution}_sat_{sat}.mat
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import os
import sys
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import CubicSpline
import h5py
# ---------- CHANGE HERE:
path = '/home/labnav/Desktop/TEST_IRENE/acquisition/'
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/Acquisition/'
plot_all_files = False
if not os.path.exists(fig_path):
os.makedirs(fig_path)
if not plot_all_files:
# ---------- CHANGE HERE:
file = 'acq_dump'
sat = 1
channel = 0
execution = 1
signal_type = 1
lite_view = True
# If lite_view -> sets the number of samples per chip in the graphical
# representation
n_samples_per_chip = 3
d_samples_per_code = 25000
signal_types = {
1: ('G', '1C', 1023), # GPS L1
2: ('G', '2S', 10230), # GPS L2M
3: ('G', 'L5', 10230), # GPS L5
4: ('E', '1B', 4092), # Galileo E1B
5: ('E', '5X', 10230), # Galileo E5
6: ('R', '1G', 511), # Glonass 1G
7: ('R', '2G', 511), # Glonass 2G
8: ('C', 'B1', 2048), # Beidou B1
9: ('C', 'B3', 10230), # Beidou B3
10: ('C', '5C', 10230) # Beidou B2a
}
system, signal, n_chips = signal_types.get(signal_type)
# Load data
filename = (f'{path}{file}_ch_{system}_{signal}_ch_{channel}_{execution}'
f'_sat_{sat}.mat')
img_name_root = (f'{fig_path}{file}_ch_{system}_{signal}_ch_{channel}_'
f'{execution}_sat_{sat}')
with h5py.File(filename, 'r') as data:
acq_grid = data['acq_grid'][:]
n_fft, n_dop_bins = acq_grid.shape
d_max, f_max = np.unravel_index(np.argmax(acq_grid), acq_grid.shape)
doppler_step = data['doppler_step'][0]
doppler_max = data['doppler_max'][0]
freq = np.arange(n_dop_bins) * doppler_step - doppler_max
delay = np.arange(n_fft) / n_fft * n_chips
# Plot data
# --- Acquisition grid (3D)
fig = plt.figure()
plt.gcf().canvas.manager.set_window_title(filename)
if not lite_view:
ax = fig.add_subplot(111, projection='3d')
X, Y = np.meshgrid(freq, delay)
ax.plot_surface(X, Y, acq_grid, cmap='viridis')
ax.set_ylim([min(delay), max(delay)])
else:
delay_interp = (np.arange(n_samples_per_chip * n_chips)
/ n_samples_per_chip)
spline = CubicSpline(delay, acq_grid)
grid_interp = spline(delay_interp)
ax = fig.add_subplot(111, projection='3d')
X, Y = np.meshgrid(freq, delay_interp)
ax.plot_surface(X, Y, grid_interp, cmap='inferno')
ax.set_ylim([min(delay_interp), max(delay_interp)])
ax.set_xlabel('Doppler shift (Hz)')
ax.set_xlim([min(freq), max(freq)])
ax.set_ylabel('Code delay (chips)')
ax.set_zlabel('Test Statistics')
plt.tight_layout()
plt.savefig(img_name_root + '_sample_3D.png')
plt.show()
# --- Acquisition grid (2D)
input_power = 100 # Change Test statistics in Doppler wipe-off plot
fig2, axes = plt.subplots(2, 1, figsize=(8, 6))
plt.gcf().canvas.manager.set_window_title(filename)
axes[0].plot(freq, acq_grid[d_max, :])
axes[0].set_xlim([min(freq), max(freq)])
axes[0].set_xlabel('Doppler shift (Hz)')
axes[0].set_ylabel('Test statistics')
axes[0].set_title(f'Fixed code delay to {(d_max - 1) / n_fft * n_chips} '
f'chips')
normalization = (d_samples_per_code**4) * input_power
axes[1].plot(delay, acq_grid[:, f_max] / normalization)
axes[1].set_xlim([min(delay), max(delay)])
axes[1].set_xlabel('Code delay (chips)')
axes[1].set_ylabel('Test statistics')
axes[1].set_title(f'Doppler wipe-off = '
f'{str((f_max-1) * doppler_step - doppler_max)} Hz')
plt.tight_layout()
plt.savefig(img_name_root + '_sample_2D.png')
plt.show()
else:
# ---------- CHANGE HERE:
lite_view = True
# If lite_view -> sets the number of samples per chip in the graphical
# representation
n_samples_per_chip = 3
d_samples_per_code = 25000
filenames = os.listdir(path)
for filename in filenames:
sat = 1
channel = 0
execution = 1
system = filename[12]
signal = filename[14:16]
if system == "G":
if signal == "1C":
n_chips = 1023
elif signal == "2S" or "L5":
n_chips = 10230
else:
print("Incorrect files format. Change the code or the "
"filenames.")
sys.exit()
elif system == "E":
if signal == "1B":
n_chips = 4092
elif signal == "5X":
n_chips = 10230
else:
print("Incorrect files format. Change the code or the "
"filenames.")
sys.exit()
elif system == "R":
if signal == "1G" or "2G":
n_chips = 511
else:
print("Incorrect files format. Change the code or the "
"filenames.")
sys.exit()
elif system == "C":
if signal == "B1":
n_chips = 2048
elif signal == "B3" or "5C":
n_chips = 10230
else:
print("Incorrect files format. Change the code or the "
"filenames.")
sys.exit()
complete_path = path + filename
with h5py.File(complete_path, 'r') as data:
acq_grid = data['acq_grid'][:]
n_fft, n_dop_bins = acq_grid.shape
d_max, f_max = np.unravel_index(np.argmax(acq_grid),
acq_grid.shape)
doppler_step = data['doppler_step'][0]
doppler_max = data['doppler_max'][0]
freq = np.arange(n_dop_bins) * doppler_step - doppler_max
delay = np.arange(n_fft) / n_fft * n_chips
# Plot data
# --- Acquisition grid (3D)
fig = plt.figure()
plt.gcf().canvas.manager.set_window_title(filename)
if not lite_view:
ax = fig.add_subplot(111, projection='3d')
X, Y = np.meshgrid(freq, delay)
ax.plot_surface(X, Y, acq_grid, cmap='viridis')
ax.set_ylim([min(delay), max(delay)])
else:
delay_interp = (np.arange(n_samples_per_chip * n_chips)
/ n_samples_per_chip)
spline = CubicSpline(delay, acq_grid)
grid_interp = spline(delay_interp)
ax = fig.add_subplot(111, projection='3d')
X, Y = np.meshgrid(freq, delay_interp)
ax.plot_surface(X, Y, grid_interp, cmap='inferno')
ax.set_ylim([min(delay_interp), max(delay_interp)])
ax.set_xlabel('Doppler shift (Hz)')
ax.set_xlim([min(freq), max(freq)])
ax.set_ylabel('Code delay (chips)')
ax.set_zlabel('Test Statistics')
plt.savefig(os.path.join(fig_path, filename[:-4]) + '_3D.png')
plt.close()
# --- Acquisition grid (2D)
input_power = 100 # Change Test statistics in Doppler wipe-off plot
fig2, axes = plt.subplots(2, 1, figsize=(8, 6))
plt.gcf().canvas.manager.set_window_title(filename)
axes[0].plot(freq, acq_grid[d_max, :])
axes[0].set_xlim([min(freq), max(freq)])
axes[0].set_xlabel('Doppler shift (Hz)')
axes[0].set_ylabel('Test statistics')
axes[0].set_title(f'Fixed code delay to '
f'{(d_max - 1) / n_fft * n_chips} chips')
normalization = (d_samples_per_code ** 4) * input_power
axes[1].plot(delay, acq_grid[:, f_max] / normalization)
axes[1].set_xlim([min(delay), max(delay)])
axes[1].set_xlabel('Code delay (chips)')
axes[1].set_ylabel('Test statistics')
axes[1].set_title(f'Doppler wipe-off = '
f'{str((f_max - 1) * doppler_step - doppler_max)} '
f'Hz')
plt.tight_layout()
plt.savefig(os.path.join(fig_path, filename[:-4]) + '_2D.png')
# plt.show()
plt.close()

View File

@ -0,0 +1,76 @@
"""
plot_tracking_quality_indicators.py
Irene Pérez Riega, 2023. iperrie@inta.es
Modifiable in the file:
channels - Number of channels
firs_channel - Number of the first channel
path - Path to folder which contains raw files
fig_path - Path where doppler plots will be save
'trk_dump_ch' - Fixed part of the tracking dump files names
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import matplotlib.pyplot as plt
import numpy as np
import os
from lib.dll_pll_veml_read_tracking_dump import dll_pll_veml_read_tracking_dump
GNSS_tracking = []
plot_names = []
# ---------- CHANGE HERE:
channels = 5
first_channel = 0
path = '/home/labnav/Desktop/TEST_IRENE/tracking'
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/TrackingQualityIndicator'
for N in range(1, channels + 1):
tracking_log_path = os.path.join(path,
f'trk_dump_ch{N-1+first_channel}.dat')
GNSS_tracking.append(dll_pll_veml_read_tracking_dump(tracking_log_path))
if not os.path.exists(fig_path):
os.makedirs(fig_path)
# Plot tracking quality indicators
# First plot
plt.figure()
plt.gcf().canvas.manager.set_window_title('Carrier lock test output for all '
'the channels')
plt.title('Carrier lock test output for all the channels')
for n in range(len(GNSS_tracking)):
plt.plot(GNSS_tracking[n]['carrier_lock_test'])
plot_names.append(f'SV {str(round(np.mean(GNSS_tracking[n]["PRN"])))}')
plt.legend(plot_names)
plt.savefig(os.path.join(fig_path,
f'carrier_lock_test '
f'{str(round(np.mean(GNSS_tracking[n]["PRN"])))}'))
plt.show()
# Second plot
plt.figure()
plt.gcf().canvas.manager.set_window_title('Carrier CN0 output for all the '
'channels')
plt.title('Carrier CN0 output for all the channels')
for n in range(len(GNSS_tracking)):
plt.plot(GNSS_tracking[n]['CN0_SNV_dB_Hz'])
plot_names.append(f'CN0_SNV_dB_Hz '
f'{str(round(np.mean(GNSS_tracking[n]["PRN"])))}')
plt.legend(plot_names)
plt.savefig(os.path.join(
fig_path, f'SV {str(round(np.mean(GNSS_tracking[n]["PRN"])))}'))
plt.show()