1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2025-09-11 23:36:03 +00:00

feat: add plot scripts for the VTL dump data

This commit is contained in:
pedromiguelcp
2025-08-27 18:56:19 +01:00
committed by Carles Fernandez
parent 0ce37a4cb9
commit 3c85f7445b
4 changed files with 775 additions and 10 deletions

View File

@@ -32,7 +32,7 @@ Vtl_Core::Vtl_Core(const Pvt_Conf &conf)
dump_filename(conf.vtl_dump_filename), // vtl dump file name
N_ch(conf.vtl_gps_channels + conf.vtl_gal_channels), // total allocated channels
N_st((conf.vtl_gal_channels > 0) ? 9 : 8), // x, vx, y, vy, z, vz, b (gps), b (gal), d
N_meas(N_ch * 2), // 3 measurements (pr, prr, pracc) per channel
N_meas(N_ch * 2), // 2 measurements (pr, prr) per channel
N_st_idx(arma::linspace<arma::uvec>(0, N_st - 1, N_st)),
i_clkb_E(i_clkb_G + 1),
i_clkd((conf.vtl_gal_channels > 0) ? (i_clkb_G + 2) : (i_clkb_G + 1))
@@ -582,27 +582,27 @@ void Vtl_Core::saveVTLdata(const Vtl_Data &rtk_data)
// ekf rx d
tmp_double = ekf_X(i_clkd);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
// rtk rx p
// rtklib rx p
tmp_double = rtk_data.rx_p(0);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
tmp_double = rtk_data.rx_p(1);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
tmp_double = rtk_data.rx_p(2);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
// rtk rx v
// rtklib rx v
tmp_double = rtk_data.rx_v(0);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
tmp_double = rtk_data.rx_v(1);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
tmp_double = rtk_data.rx_v(2);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
// rtk rx b gps
// rtklib rx b gps
tmp_double = rtk_data.rx_clk(0);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
// rtk rx b gal
// rtklib rx b gal
tmp_double = rtk_data.rx_clk(1);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
// rtk rx d
// rtklib rx d
tmp_double = rtk_data.rx_clk(2);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
@@ -640,25 +640,25 @@ void Vtl_Core::saveVTLdata(const Vtl_Data &rtk_data)
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
}
// rtk code pseudorange
// observed pseudorange
for (int i = 0; i < N_ch; i++)
{
tmp_double = rtk_data.obs_pr(i);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
}
// rtk code pseudorange rate
// observed pseudorange rate
for (int i = 0; i < N_ch; i++)
{
tmp_double = rtk_data.obs_prr(i);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
}
// vtl code pseudorange
// computed pseudorange
for (int i = 0; i < N_ch; i++)
{
tmp_double = ekf_comp_Z(i);
dump_file.write(reinterpret_cast<char *>(&tmp_double), sizeof(double));
}
// vtl code pseudorange rate
// computed pseudorange rate
for (int i = 0; i < N_ch; i++)
{
tmp_double = ekf_comp_Z(i + N_ch);

331
utils/python/lib/plotVTL.py Normal file
View File

@@ -0,0 +1,331 @@
"""
plotVTL.py
Plotting module for GNSS-SDR Vector Tracking Loop (VTL) dumps.
Pedro Pereira, 2025. pereirapedro@gmail.com
Expected GNSS_vtl keys (from vtl_read_dump.py):
nepoch, receiver_time_s, vtl_dt_s,
VTL_RX_P_*_ECEF_m, VTL_RX_V_*_ECEF_ms, VTL_RX_CLK_B_GPS_m, VTL_RX_CLK_B_GAL_m, VTL_RX_CLK_D_ms,
RTKL_RX_P_*_ECEF_m, RTKL_RX_V_*_ECEF_ms, RTKL_RX_CLK_B_GPS_m, RTKL_RX_CLK_B_GAL_m, RTKL_RX_CLK_D_ms,
VTL_active_channels,
EKF_prefit_PR_m, EKF_prefit_PRR_ms, EKF_postfit_PR_m, EKF_postfit_PRR_ms,
EKF_meascov_PR, EKF_meascov_PRR, EKF_process_noise,
RTKL_observed_PR_m, RTKL_observed_PRR_ms, VTL_computed_PR_m, VTL_computed_PRR_ms,
VTL_code_freq_hz,
RTKL_SV_P_*_ECEF_m, RTKL_SV_CLK_B_m, RTKL_SV_CLK_D_ms, RTKL_topo_delay_m, RTKL_iono_delay_m, RTKL_code_bias_m
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
from __future__ import annotations
import os
from typing import Iterable, Dict, Any
import numpy as np
import matplotlib.pyplot as plt
SKIP_EPOCHS_DEFAULT = 10 # number of epochs to skip in plots
def _ensure_dir(path: str) -> None:
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
def _epochs(G: Dict[str, Any]) -> np.ndarray:
if "receiver_time_s" in G and len(G["receiver_time_s"]):
t = np.asarray(G["receiver_time_s"], dtype=float)
return t - t[0]
if "vtl_dt_s" in G and len(G["vtl_dt_s"]):
dt = float(np.asarray(G["vtl_dt_s"]).ravel()[0])
n = _infer_nepoch(G)
return np.arange(n) * dt
n = _infer_nepoch(G)
return np.arange(n)
def _infer_nepoch(G: Dict[str, Any]) -> int:
for k in ["receiver_time_s", "VTL_RX_P_X_ECEF_m", "RTKL_RX_P_X_ECEF_m"]:
if k in G and hasattr(G[k], "__len__"):
return len(G[k])
if "nepoch" in G:
try:
return int(G["nepoch"])
except Exception:
pass
return 0
def _reshape_ch(x, n_epoch: int, n_ch: int) -> np.ndarray:
a = np.asarray(x, dtype=float).ravel()
if a.size == n_epoch * n_ch:
return a.reshape(n_epoch, n_ch)
if n_ch > 0 and a.size % n_ch == 0:
return a.reshape(a.size // n_ch, n_ch)
if n_ch > 0 and a.size > 0:
pad = (-a.size) % n_ch
a = np.concatenate([a, np.full(pad, np.nan)])
return a.reshape(-1, n_ch)
return a.reshape(-1, 1)
def _active_mask(G: Dict[str, Any], n_epoch: int, n_ch: int) -> np.ndarray:
if "VTL_active_channels" not in G:
return np.ones((n_epoch, n_ch), dtype=bool)
act = _reshape_ch(G["VTL_active_channels"], n_epoch, n_ch)
return act.astype(bool)
def _sanitize(y: np.ndarray) -> np.ndarray:
y = np.asarray(y, dtype=np.float64, order='C')
y[~np.isfinite(y)] = np.nan
return y
def plotVTL(G: Dict[str, Any],
settings: Dict[str, Any],
groups: Iterable[str] = ("pos","vel","clock","residuals","meas","sv","activity"),
save: bool = True,
show: bool = False,
skip_epochs: int = SKIP_EPOCHS_DEFAULT) -> Dict[str, Any]:
groups = set(groups)
if "all" in groups:
groups = {"pos","vel","clock","residuals","meas","sv","activity"}
n_ch = int(settings.get("numberOfChannels", 0))
n_epoch = _infer_nepoch(G)
t_full = _epochs(G)
# Skip initial epochs for plotting
start = min(max(skip_epochs, 0), len(t_full)) if len(t_full) else 0
t = t_full[start:] if len(t_full) else t_full
outdir = settings.get("fig_path", "./figs")
if save:
_ensure_dir(outdir)
figs: Dict[str, Any] = {}
# Determine channels that were active at least once
active_mask = _active_mask(G, n_epoch, n_ch) if (n_ch > 0 and n_epoch > 0) else np.zeros((0,0), dtype=bool)
active_any = active_mask.any(axis=0) if active_mask.size else np.array([], dtype=bool)
def _slice_1d(y):
y = _sanitize(y)
return y[start:start+len(t)] if len(y) >= start else y*0
def _slice_2d(M):
M = _reshape_ch(M, n_epoch, n_ch)
M = _sanitize(M)
return M[start:start+len(t), :] if M.shape[0] >= start else np.empty((0, M.shape[1]))
def _maybe_close(fig, is_aggregate: bool):
if not is_aggregate or not show:
plt.close(fig)
# ---------- Position (ECEF) & Errors ----------
if "pos" in groups:
has_gt = all(k in settings for k in ["gt_rx_p_x","gt_rx_p_y","gt_rx_p_z"])
f, axs = plt.subplots(2 if has_gt else 1, 3, figsize=(13, 6 if has_gt else 4), constrained_layout=True)
axs = np.atleast_2d(axs)
pos_keys = ["VTL_RX_P_X_ECEF_m","VTL_RX_P_Y_ECEF_m","VTL_RX_P_Z_ECEF_m"]
labels = ["X","Y","Z"]
# Top row: positions for BOTH RTKL and VTL
for i, k in enumerate(pos_keys):
if f"RTKL_RX_P_{labels[i]}_ECEF_m" in G:
axs[0, i].plot(t, _slice_1d(G.get(f"RTKL_RX_P_{labels[i]}_ECEF_m", [])), linestyle="--", label=f"RTKL {labels[i]}")
axs[0, i].plot(t, _slice_1d(G.get(k, [])), label=f"VTL {labels[i]}")
axs[0, i].set_title(f"ECEF {labels[i]}"); axs[0, i].set_xlabel("Time [s]"); axs[0, i].set_ylabel("m"); axs[0, i].grid(True, alpha=0.3)
axs[0, i].legend(loc="best")
# Bottom row: errors for BOTH RTKL and VTL
if has_gt:
gt = np.array([settings["gt_rx_p_x"], settings["gt_rx_p_y"], settings["gt_rx_p_z"]], dtype=float)
vtl_xyz = np.vstack([_slice_1d(G.get(k, [])) for k in pos_keys]).T
rtk_xyz = np.vstack([_slice_1d(G.get(f"RTKL_RX_P_{lab}_ECEF_m", [])) for lab in labels]).T \
if f"RTKL_RX_P_{labels[0]}_ECEF_m" in G else None
if vtl_xyz.shape[0] == len(t):
vtl_err = vtl_xyz - gt
for i, lab in enumerate(labels):
if rtk_xyz is not None and rtk_xyz.shape[0] == len(t):
rtk_err = rtk_xyz - gt
axs[1, i].plot(t, rtk_err[:, i], linestyle="--", label=f"RTKL-gt {lab}", zorder=2)
axs[1, i].plot(t, vtl_err[:, i], label=f"VTL-gt {lab}", zorder=3)
axs[1, i].set_title(f"Error {lab}"); axs[1, i].set_xlabel("Time [s]"); axs[1, i].set_ylabel("m"); axs[1, i].grid(True, alpha=0.3)
axs[1, i].legend(loc="best")
figs["pos"] = f
if save: f.savefig(os.path.join(outdir, "pos_ecef_and_errors.png"), dpi=150)
_maybe_close(f, is_aggregate=True)
# ---------- Velocity (ECEF) ----------
if "vel" in groups:
f, axs = plt.subplots(1, 3, figsize=(13, 4), constrained_layout=True)
vel_keys = ["VTL_RX_V_X_ECEF_ms","VTL_RX_V_Y_ECEF_ms","VTL_RX_V_Z_ECEF_ms"]
labels = ["X","Y","Z"]
for i, k in enumerate(vel_keys):
if f"RTKL_RX_V_{labels[i]}_ECEF_ms" in G:
axs[i].plot(t, _slice_1d(G.get(f"RTKL_RX_V_{labels[i]}_ECEF_ms", [])), linestyle="--", label=f"RTKL {labels[i]}")
axs[i].plot(t, _slice_1d(G.get(k, [])), label=f"VTL {labels[i]}")
axs[i].set_title(f"ECEF Velocity {labels[i]}"); axs[i].set_xlabel("Time [s]"); axs[i].set_ylabel("m/s")
axs[i].grid(True, alpha=0.3); axs[i].legend(loc="best")
figs["vel"] = f
if save: f.savefig(os.path.join(outdir, "vel_ecef.png"), dpi=150)
_maybe_close(f, is_aggregate=True)
# ---------- Clock (bias & drift) ----------
if "clock" in groups:
f, axs = plt.subplots(1, 3, figsize=(13, 4), constrained_layout=True)
if "RTKL_RX_CLK_B_GPS_m" in G:
axs[0].plot(t, _slice_1d(G.get("RTKL_RX_CLK_B_GPS_m", [])), linestyle="--", label="RTKL GPS bias")
axs[0].plot(t, _slice_1d(G.get("VTL_RX_CLK_B_GPS_m", [])), label="VTL GPS bias")
axs[0].set_title("Clock Bias (GPS) [m]"); axs[0].grid(True, alpha=0.3); axs[0].set_xlabel("Time [s]"); axs[0].legend(loc="best")
if "VTL_RX_CLK_B_GAL_m" in G and len(G["VTL_RX_CLK_B_GAL_m"]) == len(t_full):
if "RTKL_RX_CLK_B_GAL_m" in G:
axs[1].plot(t, _slice_1d(G.get("RTKL_RX_CLK_B_GAL_m", [])), linestyle="--", label="RTKL GAL bias")
axs[1].plot(t, _slice_1d(G.get("VTL_RX_CLK_B_GAL_m", [])), label="VTL GAL bias")
axs[1].set_title("Clock Bias (GAL) [m]"); axs[1].grid(True, alpha=0.3); axs[1].set_xlabel("Time [s]"); axs[1].legend(loc="best")
else:
axs[1].axis("off")
if "RTKL_RX_CLK_D_ms" in G:
axs[2].plot(t, _slice_1d(G.get("RTKL_RX_CLK_D_ms", [])), linestyle="--", label="RTKL drift")
axs[2].plot(t, _slice_1d(G.get("VTL_RX_CLK_D_ms", [])), label="VTL drift")
axs[2].set_title("Clock Drift [m/s]"); axs[2].grid(True, alpha=0.3); axs[2].set_xlabel("Time [s]"); axs[2].legend(loc="best")
figs["clock"] = f
if save: f.savefig(os.path.join(outdir, "clock.png"), dpi=150)
_maybe_close(f, is_aggregate=True)
# ---------- Satellite-related per-channel (active only) ----------
if "sv" in groups and n_ch > 0 and len(t) > 0:
svx = _slice_2d(G.get("RTKL_SV_P_X_ECEF_m", []))
svy = _slice_2d(G.get("RTKL_SV_P_Y_ECEF_m", []))
svz = _slice_2d(G.get("RTKL_SV_P_Z_ECEF_m", []))
sv_cb = _slice_2d(G.get("RTKL_SV_CLK_B_m", []))
cd_all = _slice_2d(G.get("RTKL_SV_CLK_D_ms", []))
topo = _slice_2d(G.get("RTKL_topo_delay_m", []))
iono = _slice_2d(G.get("RTKL_iono_delay_m", []))
cbias = _slice_2d(G.get("RTKL_code_bias_m", []))
sat_root = os.path.join(outdir, "satellites"); _ensure_dir(sat_root) if save else None
for ch in range(n_ch):
if ch >= len(active_any) or not active_any[ch]:
continue
# Coords
fx, axx = plt.subplots(1, 3, figsize=(13,4), constrained_layout=True)
for i, (arr, lab) in enumerate([(svx, "X"), (svy, "Y"), (svz, "Z")]):
y = arr[:, ch] if arr.shape[1] > ch else np.array([])
axx[i].plot(t, y); axx[i].set_title(f"SV {lab} (CH{ch:02d})")
axx[i].set_xlabel("Time [s]"); axx[i].set_ylabel("m"); axx[i].grid(True, alpha=0.3)
if save: fx.savefig(os.path.join(sat_root, f"sv_coords_CH{ch:02d}.png"), dpi=150)
plt.close(fx)
# Clock
fclk, axc = plt.subplots(1, 2, figsize=(10,4), constrained_layout=True)
cb = sv_cb[:, ch] if sv_cb.shape[1] > ch else np.array([])
cd = cd_all[:, ch] if cd_all.shape[1] > ch else np.array([])
axc[0].plot(t, cb, label="Bias"); axc[0].set_title(f"Clock bias (CH{ch:02d})"); axc[0].set_xlabel("Time [s]"); axc[0].set_ylabel("m"); axc[0].grid(True, alpha=0.3)
axc[1].plot(t, cd, label="Drift"); axc[1].set_title(f"Clock drift (CH{ch:02d})"); axc[1].set_xlabel("Time [s]"); axc[1].set_ylabel("m/s"); axc[1].grid(True, alpha=0.3)
if save: fclk.savefig(os.path.join(sat_root, f"sv_clock_CH{ch:02d}.png"), dpi=150)
plt.close(fclk)
# bias
fe, axe = plt.subplots(1, 1, figsize=(10,4), constrained_layout=True)
y_topo = topo[:, ch] if topo.shape[1] > ch else np.array([])
y_iono = iono[:, ch] if iono.shape[1] > ch else np.array([])
y_cb = cbias[:, ch] if cbias.shape[1] > ch else np.array([])
axe.plot(t, y_topo, label="Topo")
axe.plot(t, y_iono, label="Iono")
axe.plot(t, y_cb, label="Code bias", linestyle="--")
axe.set_title(f"Propagation bias (CH{ch:02d})"); axe.set_xlabel("Time [s]"); axe.set_ylabel("m"); axe.grid(True, alpha=0.3); axe.legend(loc="best")
if save: fe.savefig(os.path.join(sat_root, f"sv_bias_CH{ch:02d}.png"), dpi=150)
plt.close(fe)
# ---------- Channel activity ----------
if "activity" in groups and n_ch > 0 and len(t) > 0:
act_full = _reshape_ch(G.get("VTL_active_channels", []), n_epoch, n_ch).astype(int)
act = act_full[start:start+len(t), :]
f, ax = plt.subplots(figsize=(12, 6), constrained_layout=True)
extent = (t[0], t[-1] if len(t) else 1, -0.5, n_ch - 0.5)
ax.imshow(act.T, aspect="auto", interpolation="nearest", origin="lower", extent=extent)
ax.set_xlabel("Time [s]"); ax.set_ylabel("Channel")
ax.set_title("Channel Activity (1=active, 0=inactive)")
ax.set_yticks(range(n_ch)); ax.grid(False)
figs["activity"] = f
if save: f.savefig(os.path.join(outdir, "channel_activity.png"), dpi=150)
if not show: plt.close(f)
# ---------- Per-channel measurements (active channels only) ----------
if ("meas" in groups or "residuals" in groups) and n_ch > 0 and len(t) > 0:
meas_root = os.path.join(outdir, "measurements")
res_root = os.path.join(meas_root, "residuals")
if save:
_ensure_dir(meas_root); _ensure_dir(res_root)
PR_obs = _slice_2d(G.get("RTKL_observed_PR_m", []))
PR_comp = _slice_2d(G.get("VTL_computed_PR_m", []))
PRR_obs = _slice_2d(G.get("RTKL_observed_PRR_ms", []))
PRR_comp = _slice_2d(G.get("VTL_computed_PRR_ms", []))
PREF_PR = _slice_2d(G.get("EKF_prefit_PR_m", []))
POSTF_PR = _slice_2d(G.get("EKF_postfit_PR_m", []))
PREF_PRR = _slice_2d(G.get("EKF_prefit_PRR_ms", []))
POSTF_PRR = _slice_2d(G.get("EKF_postfit_PRR_ms", []))
for ch in range(n_ch):
if ch >= len(active_any) or not active_any[ch]:
continue
# observed vs computed
fmc, axmc = plt.subplots(1, 2, figsize=(12,4), constrained_layout=True)
y1 = PR_obs[:, ch] if PR_obs.shape[1] > ch else np.array([])
y2 = PR_comp[:, ch] if PR_comp.shape[1] > ch else np.array([])
axmc[0].plot(t, y1, label="Observed")
axmc[0].plot(t, y2, linestyle="--", label="Computed")
axmc[0].set_title(f"PR Observed vs Computed (CH{ch:02d})"); axmc[0].set_xlabel("Time [s]"); axmc[0].set_ylabel("m"); axmc[0].grid(True, alpha=0.3)
y1r = PRR_obs[:, ch] if PRR_obs.shape[1] > ch else np.array([])
y2r = PRR_comp[:, ch] if PRR_comp.shape[1] > ch else np.array([])
axmc[1].plot(t, y1r, label="Observed")
axmc[1].plot(t, y2r, linestyle="--", label="Computed")
axmc[1].set_title(f"PRR Observed vs Computed (CH{ch:02d})"); axmc[1].set_xlabel("Time [s]"); axmc[1].set_ylabel("m/s"); axmc[1].grid(True, alpha=0.3)
axmc[0].legend(loc="best")
if save: fmc.savefig(os.path.join(meas_root, f"observed_vs_computed_CH{ch:02d}.png"), dpi=150)
plt.close(fmc)
# residuals
fr, axr = plt.subplots(1, 2, figsize=(12,4), constrained_layout=True)
yp1 = PREF_PR[:, ch] if PREF_PR.shape[1] > ch else np.array([])
yp2 = POSTF_PR[:, ch] if POSTF_PR.shape[1] > ch else np.array([])
axr[0].plot(t, yp1, label="Prefit")
axr[0].plot(t, yp2, label="Postfit")
axr[0].set_title(f"PR Residuals (CH{ch:02d})"); axr[0].set_xlabel("Time [s]"); axr[0].set_ylabel("m"); axr[0].grid(True, alpha=0.3)
yrr1 = PREF_PRR[:, ch] if PREF_PRR.shape[1] > ch else np.array([])
yrr2 = POSTF_PRR[:, ch] if POSTF_PRR.shape[1] > ch else np.array([])
axr[1].plot(t, yrr1, label="Prefit")
axr[1].plot(t, yrr2, label="Postfit")
axr[1].set_title(f"PRR Residuals (CH{ch:02d})"); axr[1].set_xlabel("Time [s]"); axr[1].set_ylabel("m/s"); axr[1].grid(True, alpha=0.3)
axr[0].legend(loc="best")
if save: fr.savefig(os.path.join(res_root, f"residuals_CH{ch:02d}.png"), dpi=150)
plt.close(fr)
if show:
plt.show()
return figs

View File

@@ -0,0 +1,360 @@
"""
vtl_read_dump.py
vtl_read_dump (filename)
Read GNSS-SDR Vector Tracking dump binary file into Python.
Opens GNSS-SDR vector tracking binary log file .dat and returns the contents
Pedro Pereira, 2025. pereirapedro@gmail.com
Args:
settings: receiver settings.
Return:
GNSS_vtl: A dictionary with the processed data in lists
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import struct
def vtl_read_dump(settings):
v1 = []
v2 = []
v3 = []
v4 = []
v5 = []
v6 = []
v7 = []
v8 = []
v9 = []
v10= []
v11 = []
v12 = []
v13 = []
v14 = []
v15 = []
v16 = []
v17 = []
v18 = []
v19 = []
v20 = []
v21 = []
v22 = []
v23 = []
v24 = []
v25 = []
v26 = []
v27 = []
v28 = []
v29 = []
v30 = []
v31 = []
v32 = []
v33 = []
v34 = []
v35 = []
v36 = []
v37 = []
v38 = []
v39 = []
v40 = []
v41 = []
v42 = []
GNSS_vtl = {}
bytes_shift = 0
uint32_size_bytes = 4
double_size_bytes = 8
f = open(settings["dump_path"], 'rb')
if f is None:
return None
else:
while True:
f.seek(bytes_shift, 0)
# epoch number
v1.append(struct.unpack('I', f.read(uint32_size_bytes))[0])
bytes_shift += uint32_size_bytes
f.seek(bytes_shift, 0)
# receiver time
v2.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# time difference between vtl epochs
v3.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# VTL receiver position x-axis
v4.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# VTL receiver position y-axis
v5.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# VTL receiver position z-axis
v6.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# VTL receiver velocity x-axis
v7.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# VTL receiver velocity y-axis
v8.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# VTL receiver velocity z-axis
v9.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# VTL receiver clock bias (GPS)
v10.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# VTL receiver clock bias (GAL)
if settings["GAL_en"] == 1:
v11.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# VTL receiver clock drift
v12.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# RTKL receiver position x-axis
v13.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# RTKL receiver position y-axis
v14.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# RTKL receiver position z-axis
v15.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# RTKL receiver velocity x-axis
v16.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# RTKL receiver velocity y-axis
v17.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# RTKL receiver velocity z-axis
v18.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# RTKL receiver clock bias (GPS)
v19.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# RTKL receiver clock bias (GAL)
v20.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# RTKL receiver clock drift
v21.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# active channels
for _ in range(settings["numberOfChannels"]):
v22.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# EKF pre-fit (pseudorange)
for _ in range(settings["numberOfChannels"]):
v23.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# EKF pre-fit (pseudorange rate)
for _ in range(settings["numberOfChannels"]):
v24.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# EKF post-fit (pseudorange)
for _ in range(settings["numberOfChannels"]):
v25.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# EKF post-fit (pseudorange rate)
for _ in range(settings["numberOfChannels"]):
v26.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# EKF measurement covariance (pseudorange)
for _ in range(settings["numberOfChannels"]):
v27.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# EKF measurement covariance (pseudorange rate)
for _ in range(settings["numberOfChannels"]):
v28.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# EKF process noise
for i in range(settings["numberOfStates"]):
v29.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# observed pseudorange
for _ in range(settings["numberOfChannels"]):
v30.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# observed pseudorange rate
for _ in range(settings["numberOfChannels"]):
v31.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# computed pseudorange
for _ in range(settings["numberOfChannels"]):
v32.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# computed pseudorange rate
for _ in range(settings["numberOfChannels"]):
v33.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# vector tracking code frequency feedback
for _ in range(settings["numberOfChannels"]):
v34.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# satellite position in x-axis
for _ in range(settings["numberOfChannels"]):
v35.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# satellite position in y-axis
for _ in range(settings["numberOfChannels"]):
v36.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# satellite position in z-axis
for _ in range(settings["numberOfChannels"]):
v37.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# satellite clock bias
for _ in range(settings["numberOfChannels"]):
v38.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# satellite clock drift
for _ in range(settings["numberOfChannels"]):
v39.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# troposphere bias
for _ in range(settings["numberOfChannels"]):
v40.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# ionosphere bias
for _ in range(settings["numberOfChannels"]):
v41.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# satellite code bias
for _ in range(settings["numberOfChannels"]):
v42.append(struct.unpack('d', f.read(double_size_bytes))[0])
bytes_shift += double_size_bytes
f.seek(bytes_shift, 0)
# Check file
linea = f.readline()
if not linea:
break
f.close()
GNSS_vtl['nepoch'] = v1
GNSS_vtl['receiver_time_s'] = v2
GNSS_vtl['vtl_dt_s'] = v3
GNSS_vtl['VTL_RX_P_X_ECEF_m'] = v4
GNSS_vtl['VTL_RX_P_Y_ECEF_m'] = v5
GNSS_vtl['VTL_RX_P_Z_ECEF_m'] = v6
GNSS_vtl['VTL_RX_V_X_ECEF_ms'] = v7
GNSS_vtl['VTL_RX_V_Y_ECEF_ms'] = v8
GNSS_vtl['VTL_RX_V_Z_ECEF_ms'] = v9
GNSS_vtl['VTL_RX_CLK_B_GPS_m'] = v10
GNSS_vtl['VTL_RX_CLK_B_GAL_m'] = v11
GNSS_vtl['VTL_RX_CLK_D_ms'] = v12
GNSS_vtl['RTKL_RX_P_X_ECEF_m'] = v13
GNSS_vtl['RTKL_RX_P_Y_ECEF_m'] = v14
GNSS_vtl['RTKL_RX_P_Z_ECEF_m'] = v15
GNSS_vtl['RTKL_RX_V_X_ECEF_ms'] = v16
GNSS_vtl['RTKL_RX_V_Y_ECEF_ms'] = v17
GNSS_vtl['RTKL_RX_V_Z_ECEF_ms'] = v18
GNSS_vtl['RTKL_RX_CLK_B_GPS_m'] = v19
GNSS_vtl['RTKL_RX_CLK_B_GAL_m'] = v20
GNSS_vtl['RTKL_RX_CLK_D_ms'] = v21
GNSS_vtl['VTL_active_channels'] = v22
GNSS_vtl['EKF_prefit_PR_m'] = v23
GNSS_vtl['EKF_prefit_PRR_ms'] = v24
GNSS_vtl['EKF_postfit_PR_m'] = v25
GNSS_vtl['EKF_postfit_PRR_ms'] = v26
GNSS_vtl['EKF_meascov_PR'] = v27
GNSS_vtl['EKF_meascov_PRR'] = v28
GNSS_vtl['EKF_process_noise'] = v29
GNSS_vtl['RTKL_observed_PR_m'] = v30
GNSS_vtl['RTKL_observed_PRR_ms'] = v31
GNSS_vtl['VTL_computed_PR_m'] = v32
GNSS_vtl['VTL_computed_PRR_ms'] = v33
GNSS_vtl['VTL_code_freq_hz'] = v34
GNSS_vtl['RTKL_SV_P_X_ECEF_m'] = v35
GNSS_vtl['RTKL_SV_P_Y_ECEF_m'] = v36
GNSS_vtl['RTKL_SV_P_Z_ECEF_m'] = v37
GNSS_vtl['RTKL_SV_CLK_B_m'] = v38
GNSS_vtl['RTKL_SV_CLK_D_ms'] = v39
GNSS_vtl['RTKL_topo_delay_m'] = v40
GNSS_vtl['RTKL_iono_delay_m'] = v41
GNSS_vtl['RTKL_code_bias_m'] = v42
return GNSS_vtl

View File

@@ -0,0 +1,74 @@
"""
vtl_plot_data.py
CLI entry-point to read GNSS-SDR Tracking dump binary file using the provided function and
plot internal variables
Pedro Pereira, 2025. pereirapedro@gmail.com
Example
--------
python vtl_plot_data.py --dump /path/to/vtl_dump.dat --channels 20 \
--galileo 1 --gt 4729369.07 -705574.36 4207012.59 \
--fig-dir ./PLOTS --groups pos vel clock residuals meas sv activity \
--interactive --no-show
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
Copyright (C) 2022 (see AUTHORS file for a list of contributors)
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import argparse
from typing import Dict, Any
from lib.vtl_read_dump import vtl_read_dump
from lib.plotVTL import plotVTL
def parse_args():
p = argparse.ArgumentParser(description="Plot GNSS-SDR VTL dump.")
p.add_argument("--dump", required=True, help="Path to vtl_dump.dat")
p.add_argument("--channels", type=int, required=True, help="Number of channels in the dump")
p.add_argument("--galileo", type=int, default=1, choices=[0,1], help="GAL enabled flag recorded in dump")
p.add_argument("--fig-dir", default="./PLOTS", help="Directory to save figures")
p.add_argument("--gt", type=float, nargs=3, metavar=("X","Y","Z"),
help="Ground-truth ECEF (m): X Y Z")
p.add_argument("--groups", nargs="+",
default=["pos","vel","clock","residuals","meas","sv","activity"],
help="Which groups to plot. Use any of: pos vel clock residuals meas sv activity all")
p.add_argument("--no-save", action="store_true", help="Do not save figures")
p.add_argument("--show", action="store_true", help="Show aggregate matplotlib figures")
return p.parse_args()
def main():
args = parse_args()
settings: Dict[str, Any] = {
"numberOfChannels": args.channels,
"numberOfStates": 8 + (1 if args.galileo == 1 else 0),
"GAL_en": args.galileo,
"dump_path": args.dump,
"fig_path": args.fig_dir,
}
if args.gt is not None:
settings["gt_rx_p_x"], settings["gt_rx_p_y"], settings["gt_rx_p_z"] = args.gt
# Read dump
G = vtl_read_dump(settings)
# Plot
plotVTL(G, settings,
groups=args.groups,
save=not args.no_save,
show=args.show)
if __name__ == "__main__":
main()