1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2025-11-23 02:24:53 +00:00

Remove build and data folders, move tests and utils to the base of the source tree

This commit is contained in:
Carles Fernandez
2024-10-04 11:55:09 +02:00
parent 5be2971c9b
commit 825037592a
251 changed files with 154 additions and 179 deletions

View File

@@ -0,0 +1,222 @@
"""
dll_pll_veml_read_tracking_dump.py
dll_pll_veml_read_tracking_dump (filename)
Read GNSS-SDR Tracking dump binary file into Python.
Opens GNSS-SDR tracking binary log file .dat and returns the contents
Irene Pérez Riega, 2023. iperrie@inta.es
Args:
filename: path to file .dat with the raw data
Return:
GNSS_tracking: A dictionary with the processed data in lists
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import struct
import sys
def dll_pll_veml_read_tracking_dump (filename):
v1 = []
v2 = []
v3 = []
v4 = []
v5 = []
v6 = []
v7 = []
v8 = []
v9 = []
v10= []
v11 = []
v12 = []
v13 = []
v14 = []
v15 = []
v16 = []
v17 = []
v18 = []
v19 = []
v20 = []
v21 = []
v22 = []
GNSS_tracking = {}
bytes_shift = 0
if sys.maxsize > 2 ** 36: # 64 bits computer
float_size_bytes = 4
unsigned_long_int_size_bytes = 8
double_size_bytes = 8
unsigned_int_size_bytes = 4
else: # 32 bits
float_size_bytes = 4
unsigned_long_int_size_bytes = 4
double_size_bytes = 8
unsigned_int_size_bytes = 4
f = open(filename, 'rb')
if f is None:
return None
else:
while True:
f.seek(bytes_shift, 0)
# VE -> Magnitude of the Very Early correlator.
v1.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# E -> Magnitude of the Early correlator.
v2.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# P -> Magnitude of the Prompt correlator.
v3.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# L -> Magnitude of the Late correlator.
v4.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# VL -> Magnitude of the Very Late correlator.
v5.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# promp_I -> Value of the Prompt correlator in the In-phase
# component.
v6.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# promp_Q -> Value of the Prompt correlator in the Quadrature
# component.
v7.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# PRN_start_sample -> Sample counter from tracking start.
if unsigned_long_int_size_bytes == 8:
v8.append(struct.unpack(
'Q', f.read(unsigned_long_int_size_bytes))[0])
bytes_shift += unsigned_long_int_size_bytes
else:
v8.append(struct.unpack('I',
f.read(unsigned_int_size_bytes))[0])
bytes_shift += unsigned_int_size_bytes
f.seek(bytes_shift, 0)
# acc_carrier_phase_rad - > Accumulated carrier phase, in rad.
v9.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier doppler hz -> Doppler shift, in Hz.
v10.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier doppler rate hz s -> Doppler rate, in Hz/s.
v11.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code freq hz -> Code frequency, in chips/s.
v12.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code_freq_rate_hz_s -> Code frequency rate, in chips/s².
v13.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carr_error -> Raw carrier error (unfiltered) at the PLL
# output, in Hz.
v14.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carr_nco -> Carrier error at the output of the PLL
# filter, in Hz.
v15.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code error -> Raw code error (unfiltered) at the DLL
# output, in chips.
v16.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code nco -> Code error at the output of the DLL
# filter, in chips.
v17.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# CN0_SNV_dB_Hz -> C/N0 estimation, in dB-Hz.
v18.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier lock test -> Output of the carrier lock test.
v19.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# var 1 -> not used ?
v20.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# var 2 -> not used ?
v21.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# PRN -> Satellite ID.
v22.append(struct.unpack('I',
f.read(unsigned_int_size_bytes))[0])
bytes_shift += unsigned_int_size_bytes
f.seek(bytes_shift, 0)
# Check file
linea = f.readline()
if not linea:
break
f.close()
GNSS_tracking['VE'] = v1
GNSS_tracking['E'] = v2
GNSS_tracking['P'] = v3
GNSS_tracking['L'] = v4
GNSS_tracking['VL'] = v5
GNSS_tracking['prompt_I'] = v6
GNSS_tracking['prompt_Q'] = v7
GNSS_tracking['PRN_start_sample'] = v8
GNSS_tracking['acc_carrier_phase_rad'] = v9
GNSS_tracking['carrier_doppler_hz'] = v10
GNSS_tracking['carrier_doppler_rate_hz_s'] = v11
GNSS_tracking['code_freq_hz'] = v12
GNSS_tracking['code_freq_rate_hz_s'] = v13
GNSS_tracking['carr_error'] = v14
GNSS_tracking['carr_nco'] = v15
GNSS_tracking['code_error'] = v16
GNSS_tracking['code_nco'] = v17
GNSS_tracking['CN0_SNV_dB_Hz'] = v18
GNSS_tracking['carrier_lock_test'] = v19
GNSS_tracking['var1'] = v20
GNSS_tracking['var2'] = v21
GNSS_tracking['PRN'] = v22
return GNSS_tracking

View File

@@ -0,0 +1,225 @@
"""
gps_l1_ca_kf_read_tracking_dump.py
Read GNSS-SDR Tracking dump binary file into Python.
Opens GNSS-SDR tracking binary log file .dat and returns the contents in a dictionary
gps_l1_ca_kf_read_tracking_dump(filename)
Args:
filename - Path to file .dat with the raw data
Return:
GNSS_tracking - A dictionary with the processed data in lists
Irene Pérez Riega, 2023. iperrie@inta.es
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import struct
import sys
def gps_l1_ca_kf_read_tracking_dump(filename):
bytes_shift = 0
GNSS_tracking = {}
v1 = []
v2 = []
v3 = []
v4 = []
v5 = []
v6 = []
v7 = []
v8 = []
v9 = []
v10= []
v11 = []
v12 = []
v13 = []
v14 = []
v15 = []
v16 = []
v17 = []
v18 = []
v19 = []
v20 = []
v21 = []
v22 = []
if sys.maxsize > 2 ** 36: # 64 bits computer
float_size_bytes = 4
unsigned_long_int_size_bytes = 8
double_size_bytes = 8
unsigned_int_size_bytes = 4
else: # 32 bits
float_size_bytes = 4
unsigned_long_int_size_bytes = 4
double_size_bytes = 8
unsigned_int_size_bytes = 4
f = open(filename, 'rb')
if f is None:
help(gps_l1_ca_kf_read_tracking_dump)
return None
else:
while True:
f.seek(bytes_shift, 0)
# VE -> Magnitude of the Very Early correlator.
v1.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# E -> Magnitude of the Early correlator.
v2.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# P -> Magnitude of the Prompt correlator.
v3.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# L -> Magnitude of the Late correlator.
v4.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# VL -> Magnitude of the Very Late correlator.
v5.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# promp_I -> Value of the Prompt correlator in the
# In-phase component.
v6.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# promp_Q -> Value of the Prompt correlator in the
# Quadrature component.
v7.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# PRN_start_sample -> Sample counter from tracking start.
if unsigned_long_int_size_bytes == 8:
v8.append(struct.unpack(
'Q', f.read(unsigned_long_int_size_bytes))[0])
bytes_shift += unsigned_long_int_size_bytes
else:
v8.append(struct.unpack(
'I', f.read(unsigned_int_size_bytes))[0])
bytes_shift += unsigned_int_size_bytes
f.seek(bytes_shift, 0)
# acc_carrier_phase_rad - > Accumulated carrier phase, in rad.
v9.append(struct.unpack('f', f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier doppler hz -> Doppler shift, in Hz.
v10.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier doppler rate hz s -> Doppler rate, in Hz/s.
v11.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code freq hz -> Code frequency, in chips/s.
v12.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code_freq_rate_hz_s -> Code frequency rate, in chips/s².
#todo carr_error in gps_l1_ca_kf_read_tracking_dump.m
v13.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carr_error -> Raw carrier error (unfiltered) at the PLL
# output, in Hz.
#todo carr_noise_sigma2 in gps_l1_ca_kf_read_tracking_dump.m
v14.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carr_nco -> Carrier error at the output of the PLL
# filter, in Hz.
v15.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code error -> Raw code error (unfiltered) at the DLL
# output, in chips.
v16.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# code nco -> Code error at the output of the DLL
# filter, in chips.
v17.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# CN0_SNV_dB_Hz -> C/N0 estimation, in dB-Hz.
v18.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# carrier lock test -> Output of the carrier lock test.
v19.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# var 1 -> not used ?
v20.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# var 2 -> not used ?
v21.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# PRN -> Satellite ID.
v22.append(struct.unpack('I',
f.read(unsigned_int_size_bytes))[0])
bytes_shift += unsigned_int_size_bytes
f.seek(bytes_shift, 0)
linea = f.readline()
if not linea:
break
f.close()
GNSS_tracking['VE'] = v1
GNSS_tracking['E'] = v2
GNSS_tracking['P'] = v3
GNSS_tracking['L'] = v4
GNSS_tracking['VL'] = v5
GNSS_tracking['prompt_I'] = v6
GNSS_tracking['prompt_Q'] = v7
GNSS_tracking['PRN_start_sample'] = v8
GNSS_tracking['acc_carrier_phase_rad'] = v9
GNSS_tracking['carrier_doppler_hz'] = v10
GNSS_tracking['carrier_doppler_rate_hz2'] = v11 #todo segun el dll es carrier_doppler_rate_hz_s
GNSS_tracking['code_freq_hz'] = v12
GNSS_tracking['carr_error'] = v13 #todo code_freq_rate_hz_s segun dll
GNSS_tracking['carr_noise_sigma2'] = v14 #todo carr_error segun dll
GNSS_tracking['carr_nco'] = v15
GNSS_tracking['code_error'] = v16
GNSS_tracking['code_nco'] = v17
GNSS_tracking['CN0_SNV_dB_Hz'] = v18
GNSS_tracking['carrier_lock_test'] = v19
GNSS_tracking['var1'] = v20
GNSS_tracking['var2'] = v21
GNSS_tracking['PRN'] = v22
return GNSS_tracking

View File

@@ -0,0 +1,249 @@
"""
gps_l1_ca_read_pvt_dump.py
gps_l1_ca_read_pvt_dump (filename)
Open and read GNSS-SDR PVT binary log file (.dat) into Python, and
return the contents.
Irene Pérez Riega, 2023. iperrie@inta.es
Args:
filename: path to file PVT.dat with the raw data
Return:
nav_solutions: A dictionary with the processed data in lists
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import math
import struct
import numpy as np
def gps_l1_ca_read_pvt_dump(filename):
uint8_size_bytes = 1
uint32_size_bytes = 4
double_size_bytes = 8
float_size_bytes = 4
bytes_shift = 0
TOW = []
WEEK =[]
PVT_GPS_time = []
Clock_Offset = []
ECEF_X_POS = []
ECEF_Y_POS = []
ECEF_Z_POS = []
ECEF_X_VEL = []
ECEF_Y_VEL = []
ECEF_Z_VEL = []
C_XX = []
C_YY = []
C_ZZ = []
C_XY = []
C_YZ = []
C_ZX = []
Lat = []
Long = []
Height = []
num_valid_sats = []
RTKLIB_status = []
RTKLIB_type = []
AR_factor = []
AR_threshold = []
GDOP = []
PDOP = []
HDOP = []
VDOP = []
f = open(filename, 'rb')
if f is None:
return None
else:
while True:
f.seek(bytes_shift, 0)
# TOW -> (Time Of Week) [usually sec] uint32
TOW.append(struct.unpack('I',
f.read(uint32_size_bytes))[0])
bytes_shift += uint32_size_bytes
f.seek(bytes_shift, 0)
# WEEK -> uint32
WEEK.append(struct.unpack('I',
f.read(uint32_size_bytes))[0])
bytes_shift += uint32_size_bytes
f.seek(bytes_shift, 0)
# PVT_GPS_time -> double
PVT_GPS_time.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# User_clock_offset -> [s] double
Clock_Offset.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# ##### ECEF POS X,Y,X [m] + ECEF VEL X,Y,X [m/s] (6 x double) ######
ECEF_X_POS.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
ECEF_Y_POS.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
ECEF_Z_POS.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
ECEF_X_VEL.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
ECEF_Y_VEL.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
ECEF_Z_VEL.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# #### Position variance/covariance [m²]
# {c_xx,c_yy,c_zz,c_xy,c_yz,c_zx} (6 x double) ######
C_XX.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
C_YY.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
C_ZZ.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
C_XY.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
C_YZ.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
C_ZX.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# GEO user position Latitude -> [deg] double
Lat.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# GEO user position Longitude -> [deg] double
Long.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# GEO user position Height -> [m] double
Height.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# NUMBER OF VALID SATS -> uint8
num_valid_sats.append(struct.unpack('B',
f.read(uint8_size_bytes))[0])
bytes_shift += uint8_size_bytes
f.seek(bytes_shift, 0)
# RTKLIB solution status (Real-Time Kinematic) -> uint8
RTKLIB_status.append(struct.unpack('B',
f.read(uint8_size_bytes))[0])
bytes_shift += uint8_size_bytes
f.seek(bytes_shift, 0)
# RTKLIB solution type (0:xyz-ecef,1:enu-baseline) -> uint8
RTKLIB_type.append(struct.unpack('B',
f.read(uint8_size_bytes))[0])
bytes_shift += uint8_size_bytes
f.seek(bytes_shift, 0)
# AR ratio factor for validation -> float
AR_factor.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# AR ratio threshold for validation -> float
AR_threshold.append(struct.unpack('f',
f.read(float_size_bytes))[0])
bytes_shift += float_size_bytes
f.seek(bytes_shift, 0)
# ##### GDOP / PDOP / HDOP / VDOP (4 * double) #####
GDOP.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
PDOP.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
HDOP.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
VDOP.append(struct.unpack('d',
f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# Check file
linea = f.readline()
if not linea:
break
f.close()
# Creating a list with total velocities [m/s]
vel = []
for i in range(len(ECEF_X_VEL)):
vel.append(math.sqrt(ECEF_X_VEL[i]**2 + ECEF_Y_VEL[i]**2 +
ECEF_Z_VEL[i]**2))
navSolutions = {
'TOW': TOW,
'WEEK': WEEK,
'TransmitTime': PVT_GPS_time,
'dt': Clock_Offset,
'X': ECEF_X_POS,
'Y': ECEF_Y_POS,
'Z': ECEF_Z_POS,
'X_vel': ECEF_X_VEL,
'Y_vel': ECEF_Y_VEL,
'Z_vel': ECEF_Z_VEL,
'Tot_Vel': vel,
'C_XX': C_XX,
'C_YY': C_YY,
'C_ZZ': C_ZZ,
'C_XY': C_XY,
'C_YZ': C_YZ,
'C_ZX': C_ZX,
'latitude': Lat,
'longitude': Long,
'height': Height,
'SATS': num_valid_sats,
'RTK_status': RTKLIB_status,
'RTK_type': RTKLIB_type,
'AR_factor': AR_factor,
'AR_threshold': AR_threshold,
'GDOP': np.array(GDOP),
'PDOP': np.array(PDOP),
'HDOP': np.array(HDOP),
'VDOP': np.array(VDOP)
}
return navSolutions

View File

@@ -0,0 +1,86 @@
"""
gps_l1_ca_read_telemetry_dump.py
gps_l1_ca_read_telemetry_dump (filename)
Open and read GNSS-SDR telemetry binary log files (.dat) into Python, and
return the contents.
Irene Pérez Riega, 2023. iperrie@inta.es
Args:
filename - Path to file telemetry[N].dat with the raw data
Return:
telemetry - A dictionary with the processed data
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import struct
def gps_l1_ca_read_telemetry_dump(filename):
double_size_bytes = 8
int_size_bytes = 4
bytes_shift = 0
tow_current_symbol_ms = []
tracking_sample_counter = []
tow = []
nav_simbols = []
prn = []
f = open(filename, 'rb')
if f is None:
return None
else:
while True:
f.seek(bytes_shift, 0)
tow_current_symbol_ms.append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
tracking_sample_counter.append(struct.unpack(
'Q', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
tow.append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
nav_simbols.append(struct.unpack(
'I', f.read(int_size_bytes))[0])
bytes_shift += int_size_bytes
f.seek(bytes_shift, 0)
prn.append(struct.unpack('I', f.read(int_size_bytes))[0])
bytes_shift += int_size_bytes
f.seek(bytes_shift, 0)
# Check file
linea = f.readline()
if not linea:
break
telemetry = {
'tow_current_symbol_ms': tow_current_symbol_ms,
'tracking_sample_counter': tracking_sample_counter,
'tow': tow,
'nav_simbols': nav_simbols,
'prn': prn
}
return telemetry

View File

@@ -0,0 +1,140 @@
"""
plotKalman.py
plotKalman (channelNr, trackResults, settings)
This function plots the tracking results for the given channel list.
Irene Pérez Riega, 2023. iperrie@inta.es
Args:
channelList - list of channels to be plotted.
trackResults - tracking results from the tracking function.
settings - receiver settings.
Modifiable in the file:
fig_path - Path where doppler plots will be save
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import matplotlib.pyplot as plt
import numpy as np
import os
def plotKalman(channelNr, trackResults, settings):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotKalman'
if not os.path.exists(fig_path):
os.makedirs(fig_path)
# Protection - if the list contains incorrect channel numbers
channelNr = np.intersect1d(channelNr,
np.arange(1, settings['numberOfChannels'] + 1))
for channelNr in channelNr:
time_start = settings['timeStartInSeconds']
time_axis_in_seconds = np.arange(1, settings['msToProcess']+1)/1000
# Plot all figures
plt.figure(figsize=(1920 / 100, 1080 / 100))
plt.clf()
plt.gcf().canvas.set_window_title(
f'Channel {channelNr} (PRN '
f'{str(trackResults[channelNr-1]["PRN"][-2])}) results')
plt.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.1,
hspace=0.4, wspace=0.4)
plt.tight_layout()
# Row 1
# ----- CNo for signal -----------------------------------------------
# Measure of the ratio between carrier signal power and noise power
plt.subplot(4, 2, 1)
plt.plot(time_axis_in_seconds,
trackResults[channelNr-1]['CNo'][:settings['msToProcess']],
'b')
plt.grid()
plt.axis('tight')
plt.xlabel('Time (s)')
plt.ylabel('CNo (dB-Hz)')
plt.title('Carrier to Noise Ratio', fontweight='bold')
# ----- PLL discriminator filtered -----------------------------------
plt.subplot(4, 2, 2)
plt.plot(time_axis_in_seconds,
trackResults[channelNr-1]['state1']
[:settings['msToProcess']], 'b')
plt.grid()
plt.axis('tight')
plt.xlim([time_start, time_axis_in_seconds[-1]])
plt.xlabel('Time (s)')
plt.ylabel('Phase Amplitude')
plt.title('Filtered Carrier Phase', fontweight='bold')
# Row 2
# ----- Carrier Frequency --------------------------------------------
# Filtered carrier frequency of (transmitted by a satellite)
# for a specific channel
plt.subplot(4, 2, 3)
plt.plot(time_axis_in_seconds[1:],
trackResults[channelNr-1]['state2']
[1:settings['msToProcess']], color=[0.42, 0.25, 0.39])
plt.grid()
plt.axis('auto')
plt.xlim(time_start, time_axis_in_seconds[-1])
plt.xlabel('Time (s)')
plt.ylabel('Freq (Hz)')
plt.title('Filtered Carrier Frequency', fontweight='bold')
# ----- Carrier Frequency Rate ---------------------------------------
plt.subplot(4, 2, 4)
plt.plot(time_axis_in_seconds[1:],
trackResults[channelNr-1]['state3']
[1:settings['msToProcess']], color=[0.42, 0.25, 0.39])
plt.grid()
plt.axis('auto')
plt.xlim(time_start, time_axis_in_seconds[-1])
plt.xlabel('Time (s)')
plt.ylabel('Freq (Hz)')
plt.title('Filtered Carrier Frequency Rate', fontweight='bold')
# Row 3
# ----- PLL discriminator unfiltered----------------------------------
plt.subplot(4, 2, (5,6))
plt.plot(time_axis_in_seconds,
trackResults[channelNr-1]['innovation'], 'r')
plt.grid()
plt.axis('auto')
plt.xlim(time_start, time_axis_in_seconds[-1])
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.title('Raw PLL discriminator (Innovation)',fontweight='bold')
# Row 4
# ----- PLL discriminator covariance ---------------------------------
plt.subplot(4, 2, (7,8))
plt.plot(time_axis_in_seconds,
trackResults[channelNr-1]['r_noise_cov'], 'r')
plt.grid()
plt.axis('auto')
plt.xlim(time_start, time_axis_in_seconds[-1])
plt.xlabel('Time (s)')
plt.ylabel('Variance')
plt.title('Estimated Noise Variance', fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(fig_path,
f'kalman_ch{channelNr}_PRN_'
f'{trackResults[channelNr - 1]["PRN"][-1]}'
f'.png'))
plt.show()

View File

@@ -0,0 +1,134 @@
"""
plotNavigation.py
Function plots variations of coordinates over time and a 3D position
plot. It plots receiver coordinates in UTM system or coordinate offsets if
the true UTM receiver coordinates are provided.
Irene Pérez Riega, 2023. iperrie@inta.es
plotNavigation(navSolutions, settings, plot_skyplot)
Args:
navSolutions - Results from navigation solution function. It
contains measured pseudoranges and receiver
coordinates.
settings - Receiver settings. The true receiver coordinates
are contained in this structure.
plot_skyplot - If == 1 then use satellite coordinates to plot the
satellite positions (not implemented yet TO DO)
Modifiable in the file:
fig_path - Path where plots will be save
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import numpy as np
import matplotlib.pyplot as plt
import os
def plotNavigation(navSolutions, settings, plot_skyplot=0):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotNavigation'
if not os.path.exists(fig_path):
os.makedirs(fig_path)
if navSolutions:
if (np.isnan(settings['true_position']['E_UTM']) or
np.isnan(settings['true_position']['N_UTM']) or
np.isnan(settings['true_position']['U_UTM'])):
# Compute mean values
ref_coord = {
'E_UTM': np.nanmean(navSolutions['E_UTM']),
'N_UTM': np.nanmean(navSolutions['N_UTM']),
'U_UTM': np.nanmean(navSolutions['U_UTM'])
}
mean_latitude = np.nanmean(navSolutions['latitude'])
mean_longitude = np.nanmean(navSolutions['longitude'])
mean_height = np.nanmean(navSolutions['height'])
ref_point_lg_text = (f"Mean Position\nLat: {mean_latitude}º\n"
f"Long: {mean_longitude}º\n"
f"Hgt: {mean_height:+6.1f}")
else:
# Compute the mean error for static receiver
ref_coord = {
'E_UTM': settings.truePosition['E_UTM'],
'N_UTM': settings.truePosition['N_UTM'],
'U_UTM': settings.truePosition['U_UTM']
}
mean_position = {
'E_UTM': np.nanmean(navSolutions['E_UTM']),
'N_UTM': np.nanmean(navSolutions['N_UTM']),
'U_UTM': np.nanmean(navSolutions['U_UTM'])
}
error_meters = np.sqrt(
(mean_position['E_UTM'] - ref_coord['E_UTM']) ** 2 +
(mean_position['N_UTM'] - ref_coord['N_UTM']) ** 2 +
(mean_position['U_UTM'] - ref_coord['U_UTM']) ** 2)
ref_point_lg_text = (f"Reference Position, Mean 3D error = "
f"{error_meters} [m]")
#Create plot and subplots
plt.figure(figsize=(1920 / 120, 1080 / 120))
plt.clf()
plt.title('Navigation solutions',fontweight='bold')
ax1 = plt.subplot(4, 2, (1, 4))
ax2 = plt.subplot(4, 2, (5, 7), projection='3d')
ax3 = plt.subplot(4, 2, (6, 8), projection='3d')
# (ax1) Coordinate differences in UTM system from reference point
ax1.plot(np.vstack([navSolutions['E_UTM'] - ref_coord['E_UTM'],
navSolutions['N_UTM'] - ref_coord['N_UTM'],
navSolutions['U_UTM'] - ref_coord['U_UTM']]).T)
ax1.set_title('Coordinates variations in UTM system', fontweight='bold')
ax1.legend(['E_UTM', 'N_UTM', 'U_UTM'])
ax1.set_xlabel(f"Measurement period: {settings['navSolPeriod']} ms")
ax1.set_ylabel('Variations (m)')
ax1.grid(True)
ax1.axis('tight')
# (ax2) Satellite sky plot
if plot_skyplot: #todo posicion de los satelites
skyPlot(ax2, navSolutions['channel']['az'],
navSolutions['channel']['el'],
navSolutions['channel']['PRN'][:, 0])
ax2.set_title(f'Sky plot (mean PDOP: '
f'{np.nanmean(navSolutions["DOP"][1, :]):.1f})',
fontweight='bold')
# (ax3) Position plot in UTM system
ax3.scatter(navSolutions['E_UTM'] - ref_coord['E_UTM'],
navSolutions['N_UTM'] - ref_coord['N_UTM'],
navSolutions['U_UTM'] - ref_coord['U_UTM'], marker='+')
ax3.scatter([0], [0], [0], color='r', marker='+', linewidth=1.5)
ax3.view_init(0, 90)
ax3.set_box_aspect([1, 1, 1])
ax3.grid(True, which='minor')
ax3.legend(['Measurements', ref_point_lg_text])
ax3.set_title('Positions in UTM system (3D plot)',fontweight='bold')
ax3.set_xlabel('East (m)')
ax3.set_ylabel('North (m)')
ax3.set_zlabel('Upping (m)')
plt.tight_layout()
plt.savefig(os.path.join(fig_path, 'measures_UTM.png'))
plt.show()

View File

@@ -0,0 +1,208 @@
"""
plotPosition.py
plot_position(navSolutions)
Graph Latitude-Longitude and X-Y-X as a function of Transmit Time
Args:
navSolutions - A dictionary with the processed information in lists
plot_oneVStime(navSolutions, name)
Graph of a variable as a function of transmission time
Args:
navSolutions - A dictionary with the processed information in lists
name - navSolutions variable name that we want to plot
calcularCEFP(percentil, navSolutions, m_lat, m_long)
Calculate CEFP radio [m] for n percentil.
Args:
percentil - Number of measures that will be inside the circumference
navSolutions - A dictionary with the processed information in lists
m_lat - Mean latitude measures [º]
m_long - Mean longitude measures [º]
Modifiable in the file:
fig_path - Path where plots will be save
fig_path_maps - Path where the maps will be save
filename_map - Path where map will be save
filename_map_t - Path where terrain map will be save
Irene Pérez Riega, 2023. iperrie@inta.es
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import math
import os.path
import webbrowser
import numpy as np
import matplotlib.pyplot as plt
import folium
def plot_position(navSolutions):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotPosition/'
fig_path_maps = fig_path + 'maps/'
filename_map = 'mapPlotPosition.html'
filename_map_t = 'mapTerrainPotPosition.html'
if not os.path.exists(fig_path_maps):
os.mkdir(fig_path_maps)
# Statics Positions:
m_lat = sum(navSolutions['latitude']) / len(navSolutions['latitude'])
m_long = sum(navSolutions['longitude']) / len(navSolutions['longitude'])
# CEFP_n -> Include the n% of the dots in the circle
r_CEFP_95 = calcularCEFP(95, navSolutions, m_lat, m_long)
r_CEFP_50 = calcularCEFP(50, navSolutions, m_lat, m_long)
# Generate and save html with the positions
m = folium.Map(location=[navSolutions['latitude'][0],
navSolutions['longitude'][0]], zoom_start=100)
c_CEFP95 = folium.Circle(location=[m_lat, m_long],
radius=r_CEFP_95, color='green', fill=True,
fill_color='green', fill_opacity=0.5)
c_CEFP50 = folium.Circle(location=[m_lat, m_long], radius=r_CEFP_50,
color='red', fill=True, fill_color='red',
fill_opacity=0.5)
# POP-UPs
popup95 = folium.Popup("(Green)CEFP95 diameter: {} "
"metres".format(2 * r_CEFP_95))
popup95.add_to(c_CEFP95)
popup50 = folium.Popup("(Red)CEFP50 diameter: {} "
"metres".format(2 * r_CEFP_50))
popup50.add_to(c_CEFP50)
c_CEFP95.add_to(m)
c_CEFP50.add_to(m)
# Optional: Plot each point ->
"""
for i in range(len(navSolutions['latitude'])):
folium.Marker(location=[navSolutions['latitude'][i],
navSolutions['longitude'][i]],
icon=folium.Icon(color='red')).add_to(m)
"""
m.save(fig_path_maps + filename_map)
webbrowser.open(fig_path_maps + filename_map)
# Optional: with terrain ->
"""
n = folium.Map(location=[navSolutions['latitude'][0],
navSolutions['longitude'][0]], zoom_start=100,
tiles='Stamen Terrain')
c_CEFP95.add_to(n)
c_CEFP50.add_to(n)
n.save(fig_path_maps + filename_map_t)
webbrowser.open(fig_path_maps + filename_map_t)
"""
# Plot ->
time = []
for i in range(len(navSolutions['TransmitTime'])):
time.append(round(navSolutions['TransmitTime'][i] -
min(navSolutions['TransmitTime']), 3))
plt.figure(figsize=(1920 / 120, 1080 / 120))
plt.clf()
plt.suptitle(f'Plot file PVT process data results')
# Latitude and Longitude
plt.subplot(1, 2, 1)
scatter = plt.scatter(navSolutions['latitude'], navSolutions['longitude'],
c=time, marker='.')
plt.grid()
plt.ticklabel_format(style='plain', axis='both', useOffset=False)
plt.title('Positions latitud-longitud')
plt.xlabel('Latitude º')
plt.ylabel('Longitude º')
plt.axis('tight')
# Colors
cmap = plt.get_cmap('viridis')
norm = plt.Normalize(vmin=min(time), vmax=max(time))
scatter.set_cmap(cmap)
scatter.set_norm(norm)
colors = plt.colorbar(scatter)
colors.set_label('TransmitTime [s]')
# X, Y, Z
ax = plt.subplot(1, 2, 2, projection='3d')
plt.ticklabel_format(style='plain', axis='both', useOffset=False)
ax.scatter(navSolutions['X'], navSolutions['Y'], navSolutions['Z'],
c=time, marker='.')
ax.set_xlabel('Eje X [m]')
ax.set_ylabel('Eje Y [m]')
ax.set_zlabel('Eje Z [m]')
ax.set_title('Positions x-y-z')
plt.tight_layout()
plt.savefig(os.path.join(fig_path, f'PVT_ProcessDataResults.png'))
plt.show()
def plot_oneVStime(navSolutions, name):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotPosition/'
if not os.path.exists(fig_path):
os.mkdir(fig_path)
time = []
for i in range(len(navSolutions['TransmitTime'])):
time.append(round(navSolutions['TransmitTime'][i] -
min(navSolutions['TransmitTime']), 3))
plt.clf()
plt.scatter(time, navSolutions[name], marker='.')
plt.grid()
plt.title(f'{name} vs Time')
plt.xlabel('Time [s]')
plt.ylabel(name)
plt.axis('tight')
plt.ticklabel_format(style='plain', axis='both', useOffset=False)
plt.tight_layout()
plt.savefig(os.path.join(fig_path, f'{name}VSTime.png'))
plt.show()
def calcularCEFP(percentil, navSolutions, m_lat, m_long):
r_earth = 6371000
lat = []
long = []
dlat = []
dlong = []
dist = []
m_lat = math.radians(m_lat)
m_long = math.radians(m_long)
for i in range(len(navSolutions['latitude'])):
lat.append(math.radians(navSolutions['latitude'][i]))
long.append(math.radians(navSolutions['longitude'][i]))
for i in range(len(lat)):
dlat.append(m_lat - lat[i])
dlong.append(m_long - long[i])
# Haversine:
a = (math.sin(dlat[i] / 2) ** 2 +
math.cos(lat[i]) * math.cos(m_lat) * math.sin(dlong[i] / 2) ** 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
dist.append(r_earth * c)
# Radio CEFP
radio_CEFP_p = np.percentile(dist, percentil)
return radio_CEFP_p

View File

@@ -0,0 +1,190 @@
"""
plotTracking.py
This function plots the tracking results for the given channel list.
Irene Pérez Riega, 2023. iperrie@inta.es
plotTracking(channelList, trackResults, settings)
Args:
channelList - list of channels to be plotted.
trackResults - tracking results from the tracking function.
settings - receiver settings.
Modifiable in the file:
fig_path - Path where plots will be save
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import numpy as np
import os
import matplotlib.pyplot as plt
def plotTracking(channelNr, trackResults, settings):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotTracking'
if not os.path.exists(fig_path):
os.makedirs(fig_path)
# Protection - if the list contains incorrect channel numbers
if channelNr in list(range(1,settings["numberOfChannels"]+1)):
plt.figure(figsize=(1920 / 120, 1080 / 120))
plt.clf()
plt.gcf().canvas.set_window_title(
f'Channel {channelNr} (PRN '
f'{trackResults[channelNr-1]["PRN"][0]}) results')
plt.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.1,
hspace=0.4, wspace=0.4)
plt.tight_layout()
# Extract timeAxis and time_label
if 'prn_start_time_s' in trackResults[channelNr-1]:
timeAxis = trackResults[channelNr-1]['prn_start_time_s']
time_label = 'RX Time (s)'
else:
timeAxis = np.arange(1, len(trackResults[channelNr-1]['PRN']) + 1)
time_label = 'Epoch'
# Row 1 ==============================================================
# Discrete-Time Scatter Plot
plt.subplot(4, 3, 1)
plt.plot(trackResults[channelNr-1]['I_P'],
trackResults[channelNr-1]['Q_P'], marker='.', markersize=1,
linestyle=' ')
plt.grid()
plt.axis('equal')
plt.title('Discrete-Time Scatter Plot', fontweight='bold')
plt.xlabel('I prompt')
plt.ylabel('Q prompt')
# Nav bits
plt.subplot(4, 3, (2, 3))
plt.plot(timeAxis, trackResults[channelNr-1]['I_P'], linewidth=1)
plt.grid()
plt.title('Bits of the navigation message', fontweight='bold')
plt.xlabel(time_label)
plt.axis('tight')
# Row 2 ==============================================================
# Raw PLL discriminator unfiltered
plt.subplot(4, 3, 4)
plt.plot(timeAxis, trackResults[channelNr-1]['pllDiscr'],
color='r', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Raw PLL discriminator', fontweight='bold')
# Correlation results
plt.subplot(4, 3, (5, 6))
corr_data = [
np.sqrt(trackResults[channelNr-1]['I_E'] ** 2 +
trackResults[channelNr-1]['Q_E'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_P'] ** 2 +
trackResults[channelNr-1]['Q_P'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_L'] ** 2 +
trackResults[channelNr - 1]['Q_L'] ** 2)
]
line = []
colors = ['b', '#FF6600', '#FFD700', 'purple', 'g']
for i, data in enumerate(corr_data):
line.append(plt.plot(timeAxis, data,
label=f'Data {i+1}', color=colors[i],
marker='*', linestyle=' ', linewidth=1))
plt.grid()
plt.title('Correlation results', fontweight='bold')
plt.xlabel(time_label)
plt.axis('tight')
plt.legend([r'$\sqrt{I_{VE}^2 + Q_{VE}^2}$',
r'$\sqrt{I_{E}^2 + Q_{E}^2}$',
r'$\sqrt{I_{P}^2 + Q_{P}^2}$',
r'$\sqrt{I_{L}^2 + Q_{L}^2}$',
r'$\sqrt{I_{VL}^2 + Q_{VL}^2}$'], loc='best')
# Row 3 ==============================================================
# Filtered PLL discriminator
plt.subplot(4, 3, 7)
plt.plot(timeAxis, trackResults[channelNr-1]['pllDiscrFilt'],
'b', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Filtered PLL discriminator', fontweight='bold')
# Raw DLL discriminator unfiltered
plt.subplot(4, 3, 8)
plt.plot(timeAxis, trackResults[channelNr-1]['dllDiscr'],
'r', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Raw DLL discriminator', fontweight='bold')
# Filtered DLL discriminator
plt.subplot(4, 3, 9)
plt.plot(timeAxis, trackResults[channelNr-1]['dllDiscrFilt'],
'b', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Filtered DLL discriminator', fontweight='bold')
# Row 4 ==============================================================
# CNo for signal
plt.subplot(4, 3, 10)
plt.plot(timeAxis, trackResults[channelNr-1]['CNo'], 'b',
linewidth=1)
plt.grid()
plt.axis('equal')
plt.xlabel('Time (s)')
plt.ylabel('CNo (dB-Hz)')
plt.title('Carrier to Noise Ratio', fontweight='bold')
# Carrier Frequency
plt.subplot(4, 3, 11)
plt.plot(timeAxis, trackResults[channelNr-1]['carrFreq'],
marker='.', markersize=1, linestyle=' ')
plt.grid()
plt.axis('equal')
plt.xlabel('Time (s)')
plt.ylabel('Freq (hz)')
plt.title('Carrier Frequency', fontweight='bold')
# Code Frequency
# Skip sample 0 to help with results display
plt.subplot(4, 3, 12)
plt.plot(timeAxis, trackResults[channelNr-1]['codeFreq'],
marker='.', markersize=1, linestyle=' ')
plt.grid()
plt.axis('equal')
plt.xlabel('Time (s)')
plt.ylabel('Freq (Hz)')
plt.title('Code Frequency',fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(fig_path,
f'trk_dump_ch{channelNr}_PRN_'
f'{trackResults[channelNr - 1]["PRN"][-1]}'
f'.png'))
plt.show()

View File

@@ -0,0 +1,171 @@
"""
plotVEMLTracking.py
This function plots the tracking results for the given channel list.
Irene Pérez Riega, 2023. iperrie@inta.es
plotVEMLTracking(channelNr, trackResults, settings)
Args:
channelList - list of channels to be plotted.
trackResults - tracking results from the tracking function.
settings - receiver settings.
Modifiable in the file:
fig_path - Path where plots will be save
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import matplotlib.pyplot as plt
import numpy as np
import os
def plotVEMLTracking(channelNr, trackResults, settings):
# ---------- CHANGE HERE:
fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/VEMLTracking'
if not os.path.exists(fig_path):
os.makedirs(fig_path)
# Protection - if the list contains incorrect channel numbers
if channelNr in list(range(1,settings["numberOfChannels"]+1)):
plt.figure(figsize=(1920 / 120, 1080 / 120))
plt.clf()
plt.gcf().canvas.set_window_title(
f'Channel {channelNr} (PRN '
f'{trackResults[channelNr-1]["PRN"][0]}) results')
plt.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.1,
hspace=0.4, wspace=0.4)
# Extract timeAxis and time_label
if 'prn_start_time_s' in trackResults[channelNr-1]:
timeAxis = trackResults[channelNr-1]['prn_start_time_s']
time_label = 'RX Time (s)'
else:
timeAxis = np.arange(1, len(trackResults[channelNr-1]['PRN']) + 1)
time_label = 'Epoch'
len_dataI = len (trackResults[channelNr-1]["data_I"])
len_dataQ = len (trackResults[channelNr-1]["data_Q"])
if len_dataI < len_dataQ:
dif = len_dataQ - len_dataI
trackResults[channelNr-1]["data_I"] = np.pad(
trackResults[channelNr-1]["data_I"], pad_width=(0,dif),
mode="constant", constant_values=0)
elif len_dataQ < len_dataI:
dif = len_dataI - len_dataQ
trackResults[channelNr-1]["data_Q"] = np.pad(
trackResults[channelNr-1]["data_Q"], pad_width=(0,dif),
mode="constant", constant_values=0 )
# Discrete-Time Scatter Plot
plt.subplot(3, 3, 1)
plt.plot(trackResults[channelNr-1]['data_I'],
trackResults[channelNr-1]['data_Q'], marker='.',
markersize=1, linestyle=' ')
plt.grid()
plt.axis('equal')
plt.title('Discrete-Time Scatter Plot', fontweight='bold')
plt.xlabel('I prompt')
plt.ylabel('Q prompt')
# Nav bits
plt.subplot(3, 3, (2, 3))
plt.plot(timeAxis, trackResults[channelNr-1]['data_I'],
linewidth=1)
plt.grid()
plt.title('Bits of the navigation message', fontweight='bold')
plt.xlabel(time_label)
plt.axis('tight')
# Raw PLL discriminator unfiltered
plt.subplot(3, 3, 4)
plt.plot(timeAxis, trackResults[channelNr-1]['pllDiscr'],
color='r', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Raw PLL discriminator', fontweight='bold')
# Correlation results
plt.subplot(3, 3, (5, 6))
corr_data = [
np.sqrt(trackResults[channelNr-1]['I_VE'] ** 2 +
trackResults[channelNr-1]['Q_VE'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_E'] ** 2 +
trackResults[channelNr-1]['Q_E'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_P'] ** 2 +
trackResults[channelNr-1]['Q_P'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_L'] ** 2 +
trackResults[channelNr-1]['Q_L'] ** 2),
np.sqrt(trackResults[channelNr-1]['I_VL'] ** 2 +
trackResults[channelNr-1]['Q_VL'] ** 2)
]
line = []
colors = ['b','#FF6600','#FFD700','purple','g']
for i, data in enumerate(corr_data):
line.append(plt.plot(timeAxis, data, label=f'Data {i+1}',
color=colors[i], marker='*', linestyle=' ',
linewidth=1))
plt.grid()
plt.title('Correlation results',fontweight='bold')
plt.xlabel(time_label)
plt.axis('tight')
plt.legend([r'$\sqrt{I_{VE}^2 + Q_{VE}^2}$',
r'$\sqrt{I_{E}^2 + Q_{E}^2}$',
r'$\sqrt{I_{P}^2 + Q_{P}^2}$',
r'$\sqrt{I_{L}^2 + Q_{L}^2}$',
r'$\sqrt{I_{VL}^2 + Q_{VL}^2}$'], loc='best')
# Filtered PLL discriminator
plt.subplot(3, 3, 7)
plt.plot(timeAxis, trackResults[channelNr-1]['pllDiscrFilt'],
'b', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Filtered PLL discriminator', fontweight='bold')
# Raw DLL discriminator unfiltered
plt.subplot(3, 3, 8)
plt.plot(timeAxis, trackResults[channelNr-1]['dllDiscr'], 'r',
linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Raw DLL discriminator',fontweight='bold')
# Filtered DLL discriminator
plt.subplot(3, 3, 9)
plt.plot(timeAxis, trackResults[channelNr-1]['dllDiscrFilt'],
'b', linewidth=1)
plt.grid()
plt.axis('tight')
plt.xlabel(time_label)
plt.ylabel('Amplitude')
plt.title('Filtered DLL discriminator',fontweight='bold')
plt.savefig(os.path.join(fig_path,
f'Ch{channelNr}_PRN'
f'{trackResults[channelNr-1]["PRN"][0]}'
f'_results'))
plt.show()

View File

@@ -0,0 +1,112 @@
"""
read_hybrid_observables_dump.py
This function plots the tracking results for the given channel list.
Irene Pérez Riega, 2023. iperrie@inta.es
read_hybrid_observables_dump(channels, filename)
Args:
channels - list of channels to be processed
filename - path to the observables file
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import struct
def read_hybrid_observables_dump(channels, filename):
double_size_bytes = 8
bytes_shift = 0
RX_time = [[] for _ in range(channels+1)]
d_TOW_at_current_symbol = [[] for _ in range(channels+1)]
Carrier_Doppler_hz = [[] for _ in range(channels+1)]
Carrier_phase_hz = [[] for _ in range(channels+1)]
Pseudorange_m = [[] for _ in range(channels+1)]
PRN = [[] for _ in range(channels+1)]
valid = [[] for _ in range(channels+1)]
f = open(filename, 'rb')
if f is None:
return None
else:
while True:
try:
# There is an empty channel at the end (Channel-6)
for N in range(0, channels+1):
f.seek(bytes_shift, 0)
RX_time[N].append(struct.unpack(
'd',f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
d_TOW_at_current_symbol[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
Carrier_Doppler_hz[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
Carrier_phase_hz[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
Pseudorange_m[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
PRN[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
valid[N].append(struct.unpack(
'd', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
except:
break
# Delete last Channel:
RX_time = [row for i, row in enumerate(RX_time) if i != 5]
d_TOW_at_current_symbol = [row for i, row in enumerate(
d_TOW_at_current_symbol)if i != 5]
Carrier_Doppler_hz = [row for i, row in enumerate(
Carrier_Doppler_hz) if i != 5]
Carrier_phase_hz = [row for i, row in enumerate(
Carrier_phase_hz) if i != 5]
Pseudorange_m = [row for i, row in enumerate(Pseudorange_m) if i != 5]
PRN = [row for i, row in enumerate(PRN) if i != 5]
valid = [row for i, row in enumerate(valid) if i != 5]
observables = {
'RX_time': RX_time,
'd_TOW_at_current_symbol': d_TOW_at_current_symbol,
'Carrier_Doppler_hz': Carrier_Doppler_hz,
'Carrier_phase_hz': Carrier_phase_hz,
'Pseudorange_m': Pseudorange_m,
'PRN': PRN,
'valid':valid
}
f.close()
return observables