1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2025-11-12 05:13:04 +00:00

skyplot: add reading of RINEX v2 navigation files

Recover Python 3.6 compatibility (use old-style type hints for improved compatibility)
This commit is contained in:
Carles Fernandez
2025-08-24 13:42:36 +02:00
parent 9cf944b8f8
commit e256aaf451

View File

@@ -31,6 +31,7 @@ import sys
from datetime import datetime, timedelta
from math import atan2, cos, sin, sqrt
from pathlib import Path
from typing import Tuple, Optional
try:
import matplotlib.pyplot as plt
@@ -42,7 +43,7 @@ except ImportError:
__version__ = "1.0.0"
def read_obs_time_bounds(obs_path: str) -> tuple[datetime | None, datetime | None]:
def read_obs_time_bounds(obs_path: str) -> Tuple[Optional[datetime], Optional[datetime]]:
"""
Return (start_time, end_time) from a RINEX observation file (v2/3/4)
by scanning epoch lines. If parsing fails or the file is not OBS, return (None, None).
@@ -106,7 +107,7 @@ def read_obs_time_bounds(obs_path: str) -> tuple[datetime | None, datetime | Non
return (None, None)
def find_obs_for_nav(nav_file: str) -> str | None:
def find_obs_for_nav(nav_file: str) -> Optional[str]:
"""Find corresponding RINEX OBS file for a given NAV file (v2/v3/v4), covering all standard extensions."""
nav_path = Path(nav_file)
tried = []
@@ -114,7 +115,7 @@ def find_obs_for_nav(nav_file: str) -> str | None:
stem = nav_path.stem
suffix = nav_path.suffix
# --- RINEX v2: replace last letter of extension with 'O' or 'o'
# --- RINEX v2 names: replace last letter of extension with 'O' or 'o'
if suffix and suffix[-1].isalpha():
for o_type in ('O', 'o'):
candidate = nav_path.with_suffix(suffix[:-1] + o_type)
@@ -122,7 +123,7 @@ def find_obs_for_nav(nav_file: str) -> str | None:
if candidate.exists():
return str(candidate)
# --- RINEX v3/v4: handle standard extensions and common modifiers
# --- RINEX v3/v4 names: handle standard extensions and common modifiers
gnss_patterns = [
# Mixed constellations
("_MN", "_MO"), ("_mn", "_mo"),
@@ -213,52 +214,232 @@ def parse_rinex_float(s: str) -> float:
return 0.0 # Default if parsing fails
def read_rinex_header(filename: str) -> Tuple[str, str]:
"""Return (version_str, file_type_char) from the 'RINEX VERSION / TYPE' header line."""
with open(filename, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
if "RINEX VERSION / TYPE" in line:
version = line[0:9].strip() # F9.2 in v2/v3/v4
ftype = line[20:21].upper() # 'N' (GPS nav v2), 'G' (GLO nav v2), 'H' (GEO/SBAS v2), 'N' in v3/4 too
return version, ftype
if "END OF HEADER" in line:
break
return "", ""
def read_rinex_nav(filename):
"""Read RINEX v3/4 navigation file into a dictionary of satellites."""
"""
Read RINEX v2/v3/v4 navigation file into a dict { 'Gxx': [eph...], 'Rxx': [...], 'Sxxx': [...] }.
"""
version_str, ftype = read_rinex_header(filename)
is_v2 = version_str.startswith('2')
satellites = {}
line_number = 0
with open(filename, 'r', encoding='utf-8') as f:
with open(filename, 'r', encoding='utf-8', errors='ignore') as f:
# Skip header
while True:
line = f.readline()
line_number += 1
if not line:
return satellites # Empty file
return satellites
if "END OF HEADER" in line:
break
# Read ephemeris data
current_line = f.readline()
line_number += 1
# ----------------------------
# RINEX 2.10 / 2.11 parsing
# ----------------------------
if is_v2:
# File type: 'N' (GPS), 'G' (GLONASS), 'H' (GEO/SBAS)
v2_system = ftype # keep original char
while current_line:
# Skip empties
if not current_line.strip():
current_line = f.readline()
line_number += 1
continue
try:
# --- First record line: PRN/EPOCH/CLOCK --------------------
# Formats per RINEX 2.11 Table A4 (GPS) and Table A11 (GLONASS).
# PRN I2 in cols 1-2. Then yy, mm, dd, hh, mi (I2 with 1X between), ss F5.1,
# then 3D19.12 (clock bias, drift, drift rate).
prn_num = int(current_line[0:2])
yy = int(current_line[3:5])
mm = int(current_line[6:8])
dd = int(current_line[9:11])
hh = int(current_line[12:14])
mi = int(current_line[15:17])
ss = float(current_line[18:23])
# Year mapping: 8099 => 19801999, 0079 => 20002079
yyyy = 1900 + yy if yy >= 80 else 2000 + yy
epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1) * 1e6))
# Map PRN to 'Gxx' / 'Rxx' / 'Sxxx'
if v2_system == 'N': # GPS nav
prn = f"G{prn_num:02d}"
elif v2_system == 'G': # GLONASS nav
prn = f"R{prn_num:02d}"
elif v2_system == 'H': # GEO/SBAS nav (PRN-100 in file)
prn = f"S{prn_num + 100:03d}"
else:
# Unknown v2 type; skip
current_line = f.readline()
line_number += 1
continue
# Collect the lines of this ephemeris block:
lines = [current_line]
if v2_system == 'G' or v2_system == 'H':
# GLONASS & GEO blocks: 3 more lines (Tables A11/A16) -> total 4 records
needed = 3
else:
# GPS v2 block: 7 more lines (Table A4) -> total 8 records
needed = 7
for _ in range(needed):
next_line = f.readline()
line_number += 1
if not next_line:
break
lines.append(next_line)
if len(lines) < needed + 1:
current_line = f.readline()
line_number += 1
continue
if v2_system == 'N':
# GPS
ephemeris = {
'prn': prn,
'epoch': epoch,
'sv_clock_bias': parse_rinex_float(lines[0][23:41]),
'sv_clock_drift': parse_rinex_float(lines[0][41:61]),
'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]),
'iode': parse_rinex_float(lines[1][4:22]),
'crs': parse_rinex_float(lines[1][22:41]),
'delta_n': parse_rinex_float(lines[1][41:60]),
'm0': parse_rinex_float(lines[1][61:80]),
'cuc': parse_rinex_float(lines[2][4:22]),
'ecc': parse_rinex_float(lines[2][22:41]),
'cus': parse_rinex_float(lines[2][41:60]),
'sqrt_a': parse_rinex_float(lines[2][60:80]),
'toe': parse_rinex_float(lines[3][4:22]),
'cic': parse_rinex_float(lines[3][22:41]),
'omega0': parse_rinex_float(lines[3][41:60]),
'cis': parse_rinex_float(lines[3][60:80]),
'i0': parse_rinex_float(lines[4][4:22]),
'crc': parse_rinex_float(lines[4][22:41]),
'omega': parse_rinex_float(lines[4][41:60]),
'omega_dot': parse_rinex_float(lines[4][60:80]),
'idot': parse_rinex_float(lines[5][4:22]),
'codes_l2': parse_rinex_float(lines[5][22:41]),
'gps_week': parse_rinex_float(lines[5][41:61]),
'l2p_flag': parse_rinex_float(lines[5][61:80]),
'sv_accuracy': parse_rinex_float(lines[6][4:22]),
'sv_health': parse_rinex_float(lines[6][22:41]),
'tgd': parse_rinex_float(lines[6][41:61]),
'iodc': parse_rinex_float(lines[6][61:80]),
'transmission_time': parse_rinex_float(lines[7][4:22]) if len(lines) > 7 else None,
'fit_interval': parse_rinex_float(lines[7][22:41]) if len(lines) > 7 else None,
'extra': lines[8:]
}
elif v2_system == 'H':
# GEO/SBAS (Table A16)
ephemeris = {
'prn': prn,
'epoch': epoch,
'sv_clock_bias': parse_rinex_float(lines[0][23:41]),
'sv_clock_drift': parse_rinex_float(lines[0][41:61]),
'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]),
'x': parse_rinex_float(lines[1][4:22]),
'x_vel': parse_rinex_float(lines[1][22:41]),
'x_acc': parse_rinex_float(lines[1][41:60]),
'health': parse_rinex_float(lines[1][60:80]),
'y': parse_rinex_float(lines[2][4:22]),
'y_vel': parse_rinex_float(lines[2][22:41]),
'y_acc': parse_rinex_float(lines[2][41:61]),
'z': parse_rinex_float(lines[3][4:22]),
'z_vel': parse_rinex_float(lines[3][21:41]),
'z_acc': parse_rinex_float(lines[3][41:61]),
'extra': lines[4:]
}
elif v2_system == 'G':
# GLONASS
ephemeris = {
'prn': prn,
'epoch': epoch,
'sv_clock_bias': parse_rinex_float(lines[0][23:42]),
'sv_relative_freq_bias': parse_rinex_float(lines[0][42:61]),
'message_frame_time': parse_rinex_float(lines[0][61:80]),
'x': parse_rinex_float(lines[1][4:22]),
'x_vel': parse_rinex_float(lines[1][22:41]),
'x_acc': parse_rinex_float(lines[1][41:60]),
'health': parse_rinex_float(lines[1][60:80]),
'y': parse_rinex_float(lines[2][4:22]),
'y_vel': parse_rinex_float(lines[2][22:41]),
'y_acc': parse_rinex_float(lines[2][41:60]),
'freq_num': parse_rinex_float(lines[2][60:80]),
'z': parse_rinex_float(lines[3][4:22]),
'z_vel': parse_rinex_float(lines[3][22:41]),
'z_acc': parse_rinex_float(lines[3][41:60]),
'age': parse_rinex_float(lines[3][60:80]),
'extra': lines[4:]
}
else:
ephemeris = None
if ephemeris:
satellites.setdefault(prn, []).append(ephemeris)
except (ValueError, IndexError) as e:
# Skip malformed block; advance
current_line = f.readline()
line_number += 1
continue
current_line = f.readline()
line_number += 1
return satellites # done with v2
# ----------------------------
# RINEX 3 / 4 parsing
# ----------------------------
while current_line:
# Skip short/noise lines
if len(current_line) < 23:
current_line = f.readline()
line_number += 1
continue
prn = current_line[:3].strip()
if not prn:
# Parse the epoch line
parts = current_line.split()
if len(parts) < 8:
current_line = f.readline()
line_number += 1
continue
system = prn[0] # G, R, E, etc.
prn = parts[0].strip()
system = prn[0]
try:
# Parse epoch fields
year = int(current_line[4:8])
month = int(current_line[9:11])
day = int(current_line[12:14])
hour = int(current_line[15:17])
minute = int(current_line[18:20])
second = int(float(current_line[21:23]))
year = int(parts[1])
month = int(parts[2])
day = int(parts[3])
hour = int(parts[4])
minute = int(parts[5])
second = float(parts[6])
epoch = datetime(year, month, day, hour, minute, int(second), int((second % 1) * 1e6))
year += 2000 if year < 80 else 0
epoch = datetime(year, month, day, hour, minute, second)
# Read the next lines
lines = [current_line]
line_count = 4 if system == 'R' else 7
line_count = 4 if system == 'R' or system == 'S' else 7
for _ in range(line_count):
next_line = f.readline()
line_number += 1
@@ -271,36 +452,34 @@ def read_rinex_nav(filename):
line_number += 1
continue
# Build ephemeris dictionary
if system == 'R': # GLONASS
ephemeris = {
'prn': prn,
'epoch': epoch,
'sv_clock_bias': parse_rinex_float(lines[0][23:42]),
'sv_clock_bias': parse_rinex_float(lines[0][23:41]),
'sv_relative_freq_bias': parse_rinex_float(lines[0][42:61]),
'message_frame_time': parse_rinex_float(lines[0][61:80]),
'x': parse_rinex_float(lines[1][4:23]), # Position (km)
'x_vel': parse_rinex_float(lines[1][23:42]), # Velocity (km/s)
'x': parse_rinex_float(lines[1][4:23]),
'x_vel': parse_rinex_float(lines[1][23:41]),
'x_acc': parse_rinex_float(lines[1][42:61]),
'health': parse_rinex_float(lines[1][61:80]),
'y': parse_rinex_float(lines[2][4:23]),
'y_vel': parse_rinex_float(lines[2][23:42]),
'y_vel': parse_rinex_float(lines[2][23:41]),
'y_acc': parse_rinex_float(lines[2][42:61]),
'freq_num': parse_rinex_float(lines[2][61:80]),
'z': parse_rinex_float(lines[3][4:23]),
'z_vel': parse_rinex_float(lines[3][23:42]),
'z_vel': parse_rinex_float(lines[3][23:41]),
'z_acc': parse_rinex_float(lines[3][42:61]),
'age': parse_rinex_float(lines[3][61:80]),
'extra': lines[4:] # Keep any extra RINEX v4 lines
'extra': lines[4:]
}
elif system == 'S': # SBAS (RINEX v4)
elif system == 'S': # SBAS (RINEX v4 short form)
ephemeris = {
'prn': prn,
'epoch': epoch,
'sv_clock_bias': parse_rinex_float(lines[0][23:42]),
'sv_clock_drift': parse_rinex_float(lines[0][42:61]),
'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]),
# SBAS messages are shorter: 4 lines instead of 8
'x': parse_rinex_float(lines[1][4:23]) if len(lines) > 1 else None,
'x_vel': parse_rinex_float(lines[1][23:42]) if len(lines) > 1 else None,
'x_acc': parse_rinex_float(lines[1][42:61]) if len(lines) > 1 else None,
@@ -311,9 +490,9 @@ def read_rinex_nav(filename):
'z': parse_rinex_float(lines[3][4:23]) if len(lines) > 3 else None,
'z_vel': parse_rinex_float(lines[3][23:42]) if len(lines) > 3 else None,
'z_acc': parse_rinex_float(lines[3][42:61]) if len(lines) > 3 else None,
'extra': lines[4:] # capture anything beyond
'extra': lines[4:]
}
else: # GPS, Galileo, BeiDou, QZSS, IRNSS, etc.
else:
ephemeris = {
'prn': prn,
'epoch': epoch,
@@ -346,25 +525,14 @@ def read_rinex_nav(filename):
'iodc': parse_rinex_float(lines[6][61:80]),
'transmission_time': parse_rinex_float(lines[7][4:23]) if len(lines) > 7 else None,
'fit_interval': parse_rinex_float(lines[7][23:42]) if len(lines) > 7 else None,
'extra': lines[8:] # Keep any extra RINEX v4 lines
'extra': lines[8:]
}
if prn not in satellites:
satellites[prn] = []
satellites[prn].append(ephemeris)
satellites.setdefault(prn, []).append(ephemeris)
except (ValueError, IndexError) as e:
print(f"\nError in file {filename} at line {line_number}:")
print(f" PRN: {prn}")
print(f" Line content: {current_line.strip()}")
print(f" Error type: {type(e).__name__}")
print(f" Error details: {str(e)}")
print("Skipping to next satellite block ...\n")
# Skip to next block by reading until next PRN
while current_line and not current_line.startswith(prn[0]):
current_line = f.readline()
line_number += 1
continue
except (ValueError, IndexError):
# Skip to next line
pass
current_line = f.readline()
line_number += 1
@@ -758,7 +926,7 @@ def main():
try:
satellites = read_rinex_nav(filename)
except FileNotFoundError:
print(f"Error: File '{filename}' not found.")
print(f"Error: File {filename} not found.")
return 1
if not satellites: