From 43bceb52fffacceac365837dd2b72e45a1a80e1b Mon Sep 17 00:00:00 2001 From: minhaj Date: Fri, 19 Jun 2026 23:01:57 -0500 Subject: [PATCH] Fix and refactor GPS L1 C/A KF tracking dump reader and plotter. The KF tracking dump reader now follows the parameters dumped by kf_tracking.cc lib/gps_l1_ca_kf_read_tracking_dump.py: - Rewrite using the same _RECORD_FORMAT / _FIELD_NAMES architecture as dll_pll_veml_read_tracking_dump.py: one struct.unpack per record and EOF handling, dropping the v1..v22 / bytes_shift / seek. gps_l1_ca_kf_plot_sample.py: - map innovation -> carr_error; drop r_noise_cov lib/plotKalman.py, lib/plotTracking.py: - Skip the "Estimated Noise Variance" panel when r_noise_cov is absent. - Replace fig.canvas.set_window_title() with fig.canvas.manager.set_window_title(). The canvas method was deprecated in matplotlib 3.4 and is gone in current matplotlib (3.6.3 here), where it raised AttributeError. Ref: https://github.com/raysect/source/issues/383 Signed-off-by: minhaj --- utils/python/gps_l1_ca_kf_plot_sample.py | 85 +++--- .../lib/gps_l1_ca_kf_read_tracking_dump.py | 258 ++++-------------- utils/python/lib/plotKalman.py | 31 ++- utils/python/lib/plotTracking.py | 10 +- 4 files changed, 130 insertions(+), 254 deletions(-) diff --git a/utils/python/gps_l1_ca_kf_plot_sample.py b/utils/python/gps_l1_ca_kf_plot_sample.py index 26e0f3287..9f310f1da 100644 --- a/utils/python/gps_l1_ca_kf_plot_sample.py +++ b/utils/python/gps_l1_ca_kf_plot_sample.py @@ -1,17 +1,21 @@ """ gps_l1_ca_kf_plot_sample.py - Reads GNSS-SDR Tracking dump binary file using the provided - function and plots some internal variables + Reads a GPS L1 C/A Kalman-filter tracking dump binary file + (GPS_L1_CA_KF_Tracking) and plots some internal tracking and Kalman-filter + variables. Irene Pérez Riega, 2023. iperrie@inta.es + Minhaj Uddin Ahmad, 2026. mahmad12@crimson.ua.edu Modifiable in the file: - sampling_freq - Sampling frequency [Hz] - channels - Number of channels to check if they exist + samplingFreq - Sampling frequency [Hz] + channels - Number of channels first_channel - Number of the first channel - path - Path to folder which contains raw file - 'trk_dump_ch' - Fixed part in tracking dump files names + code_period - Code period [s] + path - Path to folder which contains the tracking dump files + fig_path - Path where the plots will be saved + file_prefix - Fixed part of the tracking dump file names ----------------------------------------------------------------------------- @@ -35,52 +39,55 @@ trackResults = [] kalmanResults = [] # ---------- CHANGE HERE: -# Signal parameters: -samplingFreq = 6625000 +samplingFreq = 4000000 # must match SignalSource.sampling_frequency in the .conf channels = 5 first_channel = 0 code_period = 0.001 - -path = '/home/labnav/Desktop/TEST_IRENE/tracking' +path = '../../../out/' +fig_path = '../../../PLOTS/KF_Tracking' +file_prefix = 'track_ch' for N in range(1, channels + 1): tracking_log_path = os.path.join(path, - f'trk_dump_ch{N-1+first_channel}.dat') + f'{file_prefix}{N-1+first_channel}.dat') GNSS_tracking.append(gps_l1_ca_kf_read_tracking_dump(tracking_log_path)) -# todo lee lo mismo que dll_pll_velm_plot_sample y guarda diferente v13 y v14 -# GNSS-SDR format conversion to Python GPS receiver for N in range(1, channels + 1): - trackResult= { + trackResult = { 'status': 'T', # fake track - 'codeFreq': np.copy(GNSS_tracking[N - 1]["code_freq_hz"]), - 'carrFreq': np.copy(GNSS_tracking[N - 1]["carrier_doppler_hz"]), + 'codeFreq': np.copy(GNSS_tracking[N-1]["code_freq_hz"]), + 'carrFreq': np.copy(GNSS_tracking[N-1]["carrier_doppler_hz"]), 'carrFreqRate': - np.copy(GNSS_tracking[N - 1]["carrier_doppler_rate_hz2"]),#todo no se usa en dll, carrier_doppler_rate_hz_s segun dll - 'dllDiscr': np.copy(GNSS_tracking[N - 1]["code_error"]), - 'dllDiscrFilt': np.copy(GNSS_tracking[N - 1]["code_nco"]), - 'pllDiscr': np.copy(GNSS_tracking[N - 1]["carr_error"]),#todo code_freq_rate_hz_s segun dll - 'pllDiscrFilt': np.copy(GNSS_tracking[N - 1]["carr_nco"]), + np.copy(GNSS_tracking[N-1]["carrier_doppler_rate_hz2"]), + 'dllDiscr': np.copy(GNSS_tracking[N-1]["code_error"]), + 'dllDiscrFilt': np.copy(GNSS_tracking[N-1]["code_nco"]), + 'pllDiscr': np.copy(GNSS_tracking[N-1]["carr_error"]), + 'pllDiscrFilt': np.copy(GNSS_tracking[N-1]["carr_nco"]), - 'I_P': np.copy(GNSS_tracking[N - 1]["prompt_I"]),#todo distinto de dll - 'Q_P': np.copy(GNSS_tracking[N - 1]["prompt_Q"]),#todo distinto de dll + 'I_P': np.copy(GNSS_tracking[N-1]["prompt_I"]), + 'Q_P': np.copy(GNSS_tracking[N-1]["prompt_Q"]), - 'I_E': np.copy(GNSS_tracking[N - 1]["E"]), - 'I_L': np.copy(GNSS_tracking[N - 1]["L"]), - 'Q_E': np.zeros(len(GNSS_tracking[N - 1]["E"])), - 'Q_L': np.zeros(len(GNSS_tracking[N - 1]["L"])), - 'PRN': np.copy(GNSS_tracking[N - 1]["PRN"]), - 'CNo': np.copy(GNSS_tracking[N - 1]["CN0_SNV_dB_Hz"]) + 'I_E': np.copy(GNSS_tracking[N-1]["E"]), + 'I_L': np.copy(GNSS_tracking[N-1]["L"]), + 'Q_E': np.zeros(len(GNSS_tracking[N-1]["E"])), + 'Q_L': np.zeros(len(GNSS_tracking[N-1]["L"])), + 'PRN': np.copy(GNSS_tracking[N-1]["PRN"]), + 'CNo': np.copy(GNSS_tracking[N-1]["CN0_SNV_dB_Hz"]), + 'prn_start_time_s': + np.copy(GNSS_tracking[N-1]["PRN_start_sample"]) / samplingFreq } - kalmanResult= { - 'PRN': np.copy(GNSS_tracking[N - 1]["PRN"]), - 'innovation': np.copy(GNSS_tracking[N - 1]["carr_error"]),#todo code_freq_rate_hz_s segun dll - 'state1': np.copy(GNSS_tracking[N - 1]["carr_nco"]), - 'state2': np.copy(GNSS_tracking[N - 1]["carrier_doppler_hz"]), - 'state3': GNSS_tracking[N - 1]["carrier_doppler_rate_hz2"],#todo segun el dll es carrier_doppler_rate_hz_s - 'r_noise_cov': np.copy(GNSS_tracking[N - 1]["carr_noise_sigma2"]),#todo carr_error segun dll - 'CNo': np.copy(GNSS_tracking[N - 1]["CN0_SNV_dB_Hz"]) + # Kalman-filter internals. The KF dump stores the carrier discriminator + # (carr_error -> innovation) and the filter states, but no measurement + # noise covariance, so 'r_noise_cov' is intentionally left out (plotKalman + # skips that panel when it is absent). + kalmanResult = { + 'PRN': np.copy(GNSS_tracking[N-1]["PRN"]), + 'innovation': np.copy(GNSS_tracking[N-1]["carr_error"]), + 'state1': np.copy(GNSS_tracking[N-1]["carr_nco"]), + 'state2': np.copy(GNSS_tracking[N-1]["carrier_doppler_hz"]), + 'state3': np.copy(GNSS_tracking[N-1]["carrier_doppler_rate_hz2"]), + 'CNo': np.copy(GNSS_tracking[N-1]["CN0_SNV_dB_Hz"]) } trackResults.append(trackResult) @@ -90,7 +97,9 @@ for N in range(1, channels + 1): 'numberOfChannels': channels, 'msToProcess': len(GNSS_tracking[N-1]['E']), 'codePeriod': code_period, - 'timeStartInSeconds': 20 + 'timeStartInSeconds': 0, + 'fig_path': fig_path, + 'show': False # set True to display the figures interactively } # Create and save graphics as PNG diff --git a/utils/python/lib/gps_l1_ca_kf_read_tracking_dump.py b/utils/python/lib/gps_l1_ca_kf_read_tracking_dump.py index 3d809d889..534a4b074 100644 --- a/utils/python/lib/gps_l1_ca_kf_read_tracking_dump.py +++ b/utils/python/lib/gps_l1_ca_kf_read_tracking_dump.py @@ -1,18 +1,18 @@ """ gps_l1_ca_kf_read_tracking_dump.py + gps_l1_ca_kf_read_tracking_dump (filename) Read GNSS-SDR Tracking dump binary file into Python. - Opens GNSS-SDR tracking binary log file .dat and returns the contents in a dictionary - - gps_l1_ca_kf_read_tracking_dump(filename) - - Args: - filename - Path to file .dat with the raw data - - Return: - GNSS_tracking - A dictionary with the processed data in lists + Opens GNSS-SDR tracking binary log file .dat and returns the contents Irene Pérez Riega, 2023. iperrie@inta.es + Minhaj Uddin Ahmad, 2026. mahmad12@crimson.ua.edu + + Args: + filename: path to file .dat with the raw data + + Return: + GNSS_tracking: A dictionary with the processed data in lists ----------------------------------------------------------------------------- @@ -26,200 +26,60 @@ """ import struct -import sys + +# Binary layout of one GPS_L1_CA_KF_Tracking dump record, matching the writer +# in src/algorithms/tracking/gnuradio_blocks/kf_tracking.cc +# +# Fields are written back-to-back. Unlike the DLL/PLL VEML dump, the KF dump +# ends at PRN (no TOW_ms / WN fields), so a record is 22 fields / 96 bytes. +# '<' selects little-endian / standard sizes / no alignment padding. +_RECORD_FORMAT = '<' + ( + 'f' # VE -> Magnitude of the Very Early correlator. + 'f' # E -> Magnitude of the Early correlator. + 'f' # P -> Magnitude of the Prompt correlator. + 'f' # L -> Magnitude of the Late correlator. + 'f' # VL -> Magnitude of the Very Late correlator. + 'f' # prompt_I -> Prompt correlator, In-phase component. + 'f' # prompt_Q -> Prompt correlator, Quadrature component. + 'Q' # PRN_start_sample -> Sample counter from tracking start. + 'f' # acc_carrier_phase_rad -> Accumulated carrier phase, in rad. + 'f' # carrier_doppler_hz -> KF carrier Doppler estimate, in Hz. + 'f' # carrier_doppler_rate_hz2 -> KF carrier Doppler rate, in Hz/s. + 'f' # code_freq_hz -> KF code frequency, in chips/s. + 'f' # code_freq_rate_hz_s -> Code phase rate, in chips/s^2. + 'f' # carr_error -> Carrier phase discriminator output, in Hz. + 'f' # carr_nco -> KF carrier state estimate. + 'f' # code_error -> Code discriminator output, in chips. + 'f' # code_nco -> KF code error estimate, in chips. + 'f' # CN0_SNV_dB_Hz -> C/N0 estimation, in dB-Hz. + 'f' # carrier_lock_test -> Output of the carrier lock test. + 'f' # var1 -> aux variable. + 'd' # var2 -> aux variable. + 'I' # PRN -> Satellite ID. +) + +_FIELD_NAMES = [ + 'VE', 'E', 'P', 'L', 'VL', 'prompt_I', 'prompt_Q', 'PRN_start_sample', + 'acc_carrier_phase_rad', 'carrier_doppler_hz', 'carrier_doppler_rate_hz2', + 'code_freq_hz', 'code_freq_rate_hz_s', 'carr_error', 'carr_nco', + 'code_error', 'code_nco', 'CN0_SNV_dB_Hz', 'carrier_lock_test', + 'var1', 'var2', 'PRN', +] -def gps_l1_ca_kf_read_tracking_dump(filename): +def gps_l1_ca_kf_read_tracking_dump (filename): - bytes_shift = 0 + record_size = struct.calcsize(_RECORD_FORMAT) + columns = [[] for _ in _FIELD_NAMES] - GNSS_tracking = {} - - v1 = [] - v2 = [] - v3 = [] - v4 = [] - v5 = [] - v6 = [] - v7 = [] - v8 = [] - v9 = [] - v10= [] - v11 = [] - v12 = [] - v13 = [] - v14 = [] - v15 = [] - v16 = [] - v17 = [] - v18 = [] - v19 = [] - v20 = [] - v21 = [] - v22 = [] - - if sys.maxsize > 2 ** 36: # 64 bits computer - float_size_bytes = 4 - unsigned_long_int_size_bytes = 8 - double_size_bytes = 8 - unsigned_int_size_bytes = 4 - - else: # 32 bits - float_size_bytes = 4 - unsigned_long_int_size_bytes = 4 - double_size_bytes = 8 - unsigned_int_size_bytes = 4 - - f = open(filename, 'rb') - if f is None: - help(gps_l1_ca_kf_read_tracking_dump) - return None - - else: + with open(filename, 'rb') as f: while True: - f.seek(bytes_shift, 0) - # VE -> Magnitude of the Very Early correlator. - v1.append(struct.unpack('f', f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # E -> Magnitude of the Early correlator. - v2.append(struct.unpack('f', f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # P -> Magnitude of the Prompt correlator. - v3.append(struct.unpack('f', f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # L -> Magnitude of the Late correlator. - v4.append(struct.unpack('f', f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # VL -> Magnitude of the Very Late correlator. - v5.append(struct.unpack('f', f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # promp_I -> Value of the Prompt correlator in the - # In-phase component. - v6.append(struct.unpack('f', f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # promp_Q -> Value of the Prompt correlator in the - # Quadrature component. - v7.append(struct.unpack('f', f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # PRN_start_sample -> Sample counter from tracking start. - if unsigned_long_int_size_bytes == 8: - v8.append(struct.unpack( - 'Q', f.read(unsigned_long_int_size_bytes))[0]) - bytes_shift += unsigned_long_int_size_bytes - else: - v8.append(struct.unpack( - 'I', f.read(unsigned_int_size_bytes))[0]) - bytes_shift += unsigned_int_size_bytes - f.seek(bytes_shift, 0) - # acc_carrier_phase_rad - > Accumulated carrier phase, in rad. - v9.append(struct.unpack('f', f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # carrier doppler hz -> Doppler shift, in Hz. - v10.append(struct.unpack('f', - f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # carrier doppler rate hz s -> Doppler rate, in Hz/s. - v11.append(struct.unpack('f', - f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # code freq hz -> Code frequency, in chips/s. - v12.append(struct.unpack('f', - f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # code_freq_rate_hz_s -> Code frequency rate, in chips/s². - #todo carr_error in gps_l1_ca_kf_read_tracking_dump.m - v13.append(struct.unpack('f', - f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # carr_error -> Raw carrier error (unfiltered) at the PLL - # output, in Hz. - #todo carr_noise_sigma2 in gps_l1_ca_kf_read_tracking_dump.m - v14.append(struct.unpack('f', - f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # carr_nco -> Carrier error at the output of the PLL - # filter, in Hz. - v15.append(struct.unpack('f', - f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # code error -> Raw code error (unfiltered) at the DLL - # output, in chips. - v16.append(struct.unpack('f', - f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # code nco -> Code error at the output of the DLL - # filter, in chips. - v17.append(struct.unpack('f', - f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # CN0_SNV_dB_Hz -> C/N0 estimation, in dB-Hz. - v18.append(struct.unpack('f', - f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # carrier lock test -> Output of the carrier lock test. - v19.append(struct.unpack('f', - f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # var 1 -> not used ? - v20.append(struct.unpack('f', - f.read(float_size_bytes))[0]) - bytes_shift += float_size_bytes - f.seek(bytes_shift, 0) - # var 2 -> not used ? - v21.append(struct.unpack('d', - f.read(double_size_bytes))[0]) - bytes_shift += double_size_bytes - f.seek(bytes_shift, 0) - # PRN -> Satellite ID. - v22.append(struct.unpack('I', - f.read(unsigned_int_size_bytes))[0]) - bytes_shift += unsigned_int_size_bytes - f.seek(bytes_shift, 0) - - linea = f.readline() - if not linea: + record = f.read(record_size) + if len(record) < record_size: + # Clean end of file (or a trailing partial record). break - f.close() + for column, value in zip(columns, struct.unpack(_RECORD_FORMAT, + record)): + column.append(value) - GNSS_tracking['VE'] = v1 - GNSS_tracking['E'] = v2 - GNSS_tracking['P'] = v3 - GNSS_tracking['L'] = v4 - GNSS_tracking['VL'] = v5 - GNSS_tracking['prompt_I'] = v6 - GNSS_tracking['prompt_Q'] = v7 - GNSS_tracking['PRN_start_sample'] = v8 - GNSS_tracking['acc_carrier_phase_rad'] = v9 - GNSS_tracking['carrier_doppler_hz'] = v10 - GNSS_tracking['carrier_doppler_rate_hz2'] = v11 #todo segun el dll es carrier_doppler_rate_hz_s - GNSS_tracking['code_freq_hz'] = v12 - GNSS_tracking['carr_error'] = v13 #todo code_freq_rate_hz_s segun dll - GNSS_tracking['carr_noise_sigma2'] = v14 #todo carr_error segun dll - GNSS_tracking['carr_nco'] = v15 - GNSS_tracking['code_error'] = v16 - GNSS_tracking['code_nco'] = v17 - GNSS_tracking['CN0_SNV_dB_Hz'] = v18 - GNSS_tracking['carrier_lock_test'] = v19 - GNSS_tracking['var1'] = v20 - GNSS_tracking['var2'] = v21 - GNSS_tracking['PRN'] = v22 - - return GNSS_tracking + return dict(zip(_FIELD_NAMES, columns)) diff --git a/utils/python/lib/plotKalman.py b/utils/python/lib/plotKalman.py index 38001af45..f2dffe245 100644 --- a/utils/python/lib/plotKalman.py +++ b/utils/python/lib/plotKalman.py @@ -32,8 +32,9 @@ import os def plotKalman(channelNr, trackResults, settings): - # ---------- CHANGE HERE: - fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotKalman' + # ---------- CHANGE HERE (or pass 'fig_path' in settings): + fig_path = settings.get('fig_path', + '../../../PLOTS/PlotKalman') if not os.path.exists(fig_path): os.makedirs(fig_path) @@ -49,7 +50,7 @@ def plotKalman(channelNr, trackResults, settings): # Plot all figures plt.figure(figsize=(1920 / 100, 1080 / 100)) plt.clf() - plt.gcf().canvas.set_window_title( + plt.gcf().canvas.manager.set_window_title( f'Channel {channelNr} (PRN ' f'{str(trackResults[channelNr-1]["PRN"][-2])}) results') plt.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.1, @@ -122,19 +123,23 @@ def plotKalman(channelNr, trackResults, settings): # Row 4 # ----- PLL discriminator covariance --------------------------------- - plt.subplot(4, 2, (7,8)) - plt.plot(time_axis_in_seconds, - trackResults[channelNr-1]['r_noise_cov'], 'r') - plt.grid() - plt.axis('auto') - plt.xlim(time_start, time_axis_in_seconds[-1]) - plt.xlabel('Time (s)') - plt.ylabel('Variance') - plt.title('Estimated Noise Variance', fontweight='bold') + # Only plotted if the dump provides a noise covariance. The GPS L1 C/A + # KF tracking dump does not store one, so this panel is skipped there. + if 'r_noise_cov' in trackResults[channelNr-1]: + plt.subplot(4, 2, (7,8)) + plt.plot(time_axis_in_seconds, + trackResults[channelNr-1]['r_noise_cov'], 'r') + plt.grid() + plt.axis('auto') + plt.xlim(time_start, time_axis_in_seconds[-1]) + plt.xlabel('Time (s)') + plt.ylabel('Variance') + plt.title('Estimated Noise Variance', fontweight='bold') plt.tight_layout() plt.savefig(os.path.join(fig_path, f'kalman_ch{channelNr}_PRN_' f'{trackResults[channelNr - 1]["PRN"][-1]}' f'.png')) - plt.show() + if settings.get('show', True): + plt.show() diff --git a/utils/python/lib/plotTracking.py b/utils/python/lib/plotTracking.py index 248091d12..01f949b28 100644 --- a/utils/python/lib/plotTracking.py +++ b/utils/python/lib/plotTracking.py @@ -32,8 +32,9 @@ import matplotlib.pyplot as plt def plotTracking(channelNr, trackResults, settings): - # ---------- CHANGE HERE: - fig_path = '/home/labnav/Desktop/TEST_IRENE/PLOTS/PlotTracking' + # ---------- CHANGE HERE (or pass 'fig_path' in settings): + fig_path = settings.get('fig_path', + '../../../PLOTS/PlotTracking') if not os.path.exists(fig_path): os.makedirs(fig_path) @@ -43,7 +44,7 @@ def plotTracking(channelNr, trackResults, settings): plt.figure(figsize=(1920 / 120, 1080 / 120)) plt.clf() - plt.gcf().canvas.set_window_title( + plt.gcf().canvas.manager.set_window_title( f'Channel {channelNr} (PRN ' f'{trackResults[channelNr-1]["PRN"][0]}) results') plt.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.1, @@ -187,4 +188,5 @@ def plotTracking(channelNr, trackResults, settings): f'trk_dump_ch{channelNr}_PRN_' f'{trackResults[channelNr - 1]["PRN"][-1]}' f'.png')) - plt.show() + if settings.get('show', True): + plt.show()