1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2025-09-02 02:48:00 +00:00

skyplot: more improvements:

Add reading of RINEX 4 navigation files
More robust finding of RINEX obs files when --use-obs is set
Add type hints to some functions for improved documentation and readability
Improve reading of RINEX obs files
Make labels more transparent and colorful
Update documentation
This commit is contained in:
Carles Fernandez
2025-08-23 15:53:28 +02:00
parent 1f4f6b333a
commit 9cf944b8f8
2 changed files with 198 additions and 78 deletions

View File

@@ -20,15 +20,16 @@ showing satellite visibility over time.
- Plots satellite tracks in azimuth-elevation coordinates.
- Customizable observer location.
- Color-codes satellites by constellation (GPS, Galileo, GLONASS, BeiDou).
- Elevation mask set to 5°, configurable via the `--elev-mask` option.
- Elevation mask set to 5°, configurable via the `--elev-mask` optional
argument.
- Outputs high-quality image in PDF format. EPS, PNG, and SVG formats are also
available via the `--format` option.
- Non-interactive mode for CI jobs with the `--no-show` option.
- Constellations to plot can be configured via the `--system` option.
- Optionally uses an OBS file to limit plot to the receiver observation time
(`--use-obs`).
- When enabled, the tool looks for a matching file by replacing the last
character of the NAV filename with `O`/`o` and uses it if found.
available via the `--format` optional argument.
- Non-interactive mode for CI jobs with the `--no-show` optional argument.
- Constellations to plot can be configured via the `--system` optional argument.
- Optionally, it uses the corresponding RINEX observation file to limit the plot
to the receiver observation time via the`--use-obs` optional argument.
- If this argument is set, the tool looks for a matching file following
standard RINEX naming conventions, and uses it if found.
## Requirements
@@ -103,7 +104,8 @@ The script generates a PDF file named `skyplot_<RINEX_FILE>.pdf` (with dots in
- Satellite trajectories over all epochs in the file.
- NAV file - ephemeris time range (default).
- Receiver observation if `--use-obs` is specified and OBS file is found.
- Receiver observation time if `--use-obs` is specified and the RINEX OBS file
is found.
- Color-coded by constellation.
- Observer location in title.
- Time range in footer.

View File

@@ -42,55 +42,152 @@ except ImportError:
__version__ = "1.0.0"
def _read_obs_time_bounds(obs_path):
"""Return (start_time, end_time) from a RINEX 2/3 OBS file by scanning epoch lines.
If parsing fails or the file is not OBS, return (None, None).
def read_obs_time_bounds(obs_path: str) -> tuple[datetime | None, datetime | None]:
"""
Return (start_time, end_time) from a RINEX observation file (v2/3/4)
by scanning epoch lines. If parsing fails or the file is not OBS, return (None, None).
"""
start_time = None
end_time = None
try:
with open(obs_path, 'r', encoding='utf-8', errors='ignore') as f:
# Detect OBS in header and skip to END OF HEADER
obs_file = Path(obs_path)
if not obs_file.exists():
return (None, None)
with obs_file.open('r', encoding='utf-8', errors='ignore') as f:
# --- Detect OBS file in header ---
is_obs = False
for line in f:
if "RINEX VERSION / TYPE" in line:
# Robust OBS detection:
# In RINEX 2/3 the file-type letter at col 21 (0-based idx 20)
tchar = line[20:21]
if tchar == 'O' or 'OBSERVATION DATA' in line:
file_type = line[20:21].upper()
if file_type == 'O' or 'OBSERVATION DATA' in line.upper():
is_obs = True
if "END OF HEADER" in line:
break
if not is_obs:
return (None, None)
start_time = None
end_time = None
# --- Scan for epoch lines ---
for line in f:
if not line.strip():
line = line.strip()
if not line:
continue
if line.startswith('>'): # RINEX 3 epoch line
yyyy = int(line[2:6]); mm = int(line[7:9]); dd = int(line[10:12])
hh = int(line[13:15]); mi = int(line[16:18]); ss = float(line[19:29])
epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1)*1e6))
else:
# RINEX 2 epoch line
try:
yy = int(line[1:3]); mm = int(line[4:6]); dd = int(line[7:9])
hh = int(line[10:12]); mi = int(line[13:15]); ss = float(line[15:26])
try:
if line.startswith('>'): # RINEX 3/4 epoch line
yyyy = int(line[2:6])
mm = int(line[7:9])
dd = int(line[10:12])
hh = int(line[13:15])
mi = int(line[16:18])
ss = float(line[19:29])
else: # RINEX 2 epoch line
yy = int(line[1:3])
mm = int(line[4:6])
dd = int(line[7:9])
hh = int(line[10:12])
mi = int(line[13:15])
ss = float(line[15:26])
yyyy = 1900 + yy if yy >= 80 else 2000 + yy
epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1)*1e6))
except Exception:
continue
if start_time is None or epoch < start_time:
start_time = epoch
if end_time is None or epoch > end_time:
end_time = epoch
return (start_time, end_time)
epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1) * 1e6))
if start_time is None or epoch < start_time:
start_time = epoch
if end_time is None or epoch > end_time:
end_time = epoch
except Exception:
# Skip malformed lines
continue
return (start_time, end_time)
except Exception:
return (None, None)
def parse_rinex_float(s):
def find_obs_for_nav(nav_file: str) -> str | None:
"""Find corresponding RINEX OBS file for a given NAV file (v2/v3/v4), covering all standard extensions."""
nav_path = Path(nav_file)
tried = []
stem = nav_path.stem
suffix = nav_path.suffix
# --- RINEX v2: replace last letter of extension with 'O' or 'o'
if suffix and suffix[-1].isalpha():
for o_type in ('O', 'o'):
candidate = nav_path.with_suffix(suffix[:-1] + o_type)
tried.append(str(candidate))
if candidate.exists():
return str(candidate)
# --- RINEX v3/v4: handle standard extensions and common modifiers
gnss_patterns = [
# Mixed constellations
("_MN", "_MO"), ("_mn", "_mo"),
("_MM", "_MO"), ("_mm", "_mo"),
("_MR", "_MO"), ("_mr", "_mo"),
# Individual constellations
("_GN", "_GO"), ("_gn", "_go"), # GPS
("_RN", "_RO"), ("_rn", "_ro"), # GLONASS
("_EN", "_EO"), ("_en", "_eo"), # Galileo
("_CN", "_CO"), ("_cn", "_co"), # BeiDou
("_JN", "_JO"), ("_jn", "_jo"), # QZSS
("_IN", "_IO"), ("_in", "_io"), # IRNSS
("_SN", "_SO"), ("_sn", "_so"), # SBAS
]
for nav_pattern, obs_pattern in gnss_patterns:
if nav_pattern in stem:
# Direct replacement (e.g., _MN -> _MO)
candidate = nav_path.with_name(stem.replace(nav_pattern, obs_pattern) + suffix)
tried.append(str(candidate))
if candidate.exists():
return str(candidate)
# Handle sampling rate patterns (e.g., _MN -> _30S_MO)
sampling_rates = ['_30S', '_15S', '_01S', '_05S', '_30s', '_15s', '_01s', '_05s']
for rate in sampling_rates:
candidate = nav_path.with_name(stem.replace(nav_pattern, rate + obs_pattern) + suffix)
tried.append(str(candidate))
if candidate.exists():
return str(candidate)
# Also try with common observation extensions
for obs_ext in ['.rnx', '.obs', '.OBS', '.22O', '.23O', '.24O', '.25O']:
if suffix != obs_ext:
candidate = nav_path.with_name(stem.replace(nav_pattern, obs_pattern) + obs_ext)
tried.append(str(candidate))
if candidate.exists():
return str(candidate)
# --- Additional patterns for files with sampling rate modifiers before constellation code
sampling_rates = ['_30S', '_15S', '_01S', '_05S', '_30s', '_15s', '_01s', '_05s', '_01H', '_1H', '_01h', '_1h']
for rate in sampling_rates:
if rate in stem:
# Check if this is a navigation file with sampling rate + _MN/_GN/etc.
for nav_suffix in ['_MN', '_GN', '_RN', '_EN', '_CN', '_JN', '_IN', '_SN',
'_mn', '_gn', '_rn', '_en', '_cn', '_jn', '_in', '_sn']:
if rate + nav_suffix in stem:
# Replace navigation with observation (e.g., _30S_MN -> _30S_MO)
candidate = nav_path.with_name(stem.replace(rate + nav_suffix, rate + nav_suffix.replace('N', 'O').replace('n', 'o')) + suffix)
tried.append(str(candidate))
if candidate.exists():
return str(candidate)
# Also try without sampling rate (e.g., _30S_MN -> _MO)
candidate = nav_path.with_name(stem.replace(rate + nav_suffix, nav_suffix.replace('N', 'O').replace('n', 'o')) + suffix)
tried.append(str(candidate))
if candidate.exists():
return str(candidate)
print(f"OBS file not found. Tried: {', '.join(tried)}.")
return None
def parse_rinex_float(s: str) -> float:
"""Parse RINEX formatted float string which may contain D or E exponent and compact spacing"""
# Handle empty string
if not s.strip():
@@ -117,7 +214,7 @@ def parse_rinex_float(s):
def read_rinex_nav(filename):
"""Read RINEX v3.0 navigation file"""
"""Read RINEX v3/4 navigation file into a dictionary of satellites."""
satellites = {}
line_number = 0
with open(filename, 'r', encoding='utf-8') as f:
@@ -140,6 +237,11 @@ def read_rinex_nav(filename):
continue
prn = current_line[:3].strip()
if not prn:
current_line = f.readline()
line_number += 1
continue
system = prn[0] # G, R, E, etc.
try:
@@ -169,7 +271,8 @@ def read_rinex_nav(filename):
line_number += 1
continue
if system == 'R': # GLONASS specific parsing
# Build ephemeris dictionary
if system == 'R': # GLONASS
ephemeris = {
'prn': prn,
'epoch': epoch,
@@ -187,10 +290,30 @@ def read_rinex_nav(filename):
'z': parse_rinex_float(lines[3][4:23]),
'z_vel': parse_rinex_float(lines[3][23:42]),
'z_acc': parse_rinex_float(lines[3][42:61]),
'age': parse_rinex_float(lines[3][61:80])
'age': parse_rinex_float(lines[3][61:80]),
'extra': lines[4:] # Keep any extra RINEX v4 lines
}
else:
# Parse all ephemeris parameters
elif system == 'S': # SBAS (RINEX v4)
ephemeris = {
'prn': prn,
'epoch': epoch,
'sv_clock_bias': parse_rinex_float(lines[0][23:42]),
'sv_clock_drift': parse_rinex_float(lines[0][42:61]),
'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]),
# SBAS messages are shorter: 4 lines instead of 8
'x': parse_rinex_float(lines[1][4:23]) if len(lines) > 1 else None,
'x_vel': parse_rinex_float(lines[1][23:42]) if len(lines) > 1 else None,
'x_acc': parse_rinex_float(lines[1][42:61]) if len(lines) > 1 else None,
'health': parse_rinex_float(lines[1][61:80]) if len(lines) > 1 else None,
'y': parse_rinex_float(lines[2][4:23]) if len(lines) > 2 else None,
'y_vel': parse_rinex_float(lines[2][23:42]) if len(lines) > 2 else None,
'y_acc': parse_rinex_float(lines[2][42:61]) if len(lines) > 2 else None,
'z': parse_rinex_float(lines[3][4:23]) if len(lines) > 3 else None,
'z_vel': parse_rinex_float(lines[3][23:42]) if len(lines) > 3 else None,
'z_acc': parse_rinex_float(lines[3][42:61]) if len(lines) > 3 else None,
'extra': lines[4:] # capture anything beyond
}
else: # GPS, Galileo, BeiDou, QZSS, IRNSS, etc.
ephemeris = {
'prn': prn,
'epoch': epoch,
@@ -221,9 +344,9 @@ def read_rinex_nav(filename):
'sv_health': parse_rinex_float(lines[6][23:42]),
'tgd': parse_rinex_float(lines[6][42:61]),
'iodc': parse_rinex_float(lines[6][61:80]),
'transmission_time': parse_rinex_float(lines[7][4:23]),
'fit_interval': (
parse_rinex_float(lines[7][23:42])) if len(lines[7]) > 23 else 0.0
'transmission_time': parse_rinex_float(lines[7][4:23]) if len(lines) > 7 else None,
'fit_interval': parse_rinex_float(lines[7][23:42]) if len(lines) > 7 else None,
'extra': lines[8:] # Keep any extra RINEX v4 lines
}
if prn not in satellites:
@@ -236,7 +359,7 @@ def read_rinex_nav(filename):
print(f" Line content: {current_line.strip()}")
print(f" Error type: {type(e).__name__}")
print(f" Error details: {str(e)}")
print("Skipping to next satellite block...\n")
print("Skipping to next satellite block ...\n")
# Skip to next block by reading until next PRN
while current_line and not current_line.startswith(prn[0]):
current_line = f.readline()
@@ -253,7 +376,7 @@ def calculate_satellite_position(ephemeris, transmit_time):
"""Calculate satellite position in ECEF coordinates at given transmission time"""
system = ephemeris['prn'][0]
if system == 'R': # GLONASS - use position + velocity * time
if system in ('R', 'S'): # GLONASS/SBAS
dt = transmit_time
# Convert km to meters
xk = (ephemeris['x'] + ephemeris['x_vel'] * dt + 0.5 * ephemeris['x_acc'] * dt**2) * 1000
@@ -397,13 +520,14 @@ def plot_satellite_tracks(satellites, obs_lat, obs_lon, obs_alt,
# Color scheme by constellation
system_colors = {
'G': 'blue', # GPS
'E': 'green', # Galileo
'R': 'red', # GLONASS
'C': 'orange', # BeiDou
'J': 'brown', # QZSS
'I': 'pink', # IRNSS
'S': 'gray' # SBAS
'G': 'blue', # GPS
'E': 'green', # Galileo
'R': 'red', # GLONASS
'C': 'orange', # BeiDou
'J': 'brown', # QZSS
'I': 'pink', # IRNSS
'S': 'lightgray', # SBAS
'L': 'cyan' # LEO (new in RINEX v4)
}
# System names mapping
@@ -414,7 +538,8 @@ def plot_satellite_tracks(satellites, obs_lat, obs_lon, obs_alt,
'C': 'BeiDou',
'J': 'QZSS',
'I': 'IRNSS',
'S': 'SBAS'
'S': 'SBAS',
'L': 'LEO'
}
# Find which systems are actually present
@@ -487,7 +612,7 @@ def plot_satellite_tracks(satellites, obs_lat, obs_lon, obs_alt,
mid_idx = len(theta)//2
ax.text(theta[mid_idx], r[mid_idx], prn,
fontsize=12, ha='center', va='center',
bbox={"facecolor": "white", "alpha": 0.8, "pad": 2},
bbox={"facecolor": system_colors.get(prn[0], "white"), "alpha": 0.2, "pad": 2},
zorder=3)
# Legend for present systems
@@ -547,7 +672,8 @@ def main():
'BEIDOU': 'C',
'QZSS': 'J',
'IRNSS': 'I',
'SBAS': 'S'
'SBAS': 'S',
'LEO': 'L'
}
# Set up argument parser
@@ -628,7 +754,7 @@ def main():
filename = args.filename
# Read RINEX file
print(f"Reading {filename}...")
print(f"Reading {filename} ...")
try:
satellites = read_rinex_nav(filename)
except FileNotFoundError:
@@ -675,7 +801,11 @@ def main():
'G': 'GPS',
'R': 'GLONASS',
'E': 'Galileo',
'C': 'BeiDou'
'C': 'BeiDou',
'J': 'QZSS',
'I': 'IRNSS',
'S': 'SBAS',
'L': 'LEO'
}.get(system, 'Unknown')
print(f"- {system_name} ({system}): {count} satellites")
@@ -683,33 +813,21 @@ def main():
# Time window: OBS bounds if provided; else NAV span
use_start, use_end = all_epochs[0], all_epochs[-1]
if args.use_obs:
tried = []
obs_path = None
stem = filename[:-1]
for s in ('O', 'o'): # Try uppercase then lowercase
candidate = stem + s
tried.append(candidate)
if Path(candidate).exists():
obs_path = candidate
break
obs_path = find_obs_for_nav(filename)
if obs_path:
obs_start, obs_end = _read_obs_time_bounds(obs_path)
obs_start, obs_end = read_obs_time_bounds(obs_path)
if obs_start and obs_end:
use_start, use_end = obs_start, obs_end
print(f"\nObservation window detected from {obs_path}: {use_start} {use_end}")
print(f"\nObservation window detected in {obs_path}: from {use_start} to {use_end}")
else:
print(f"\nWarning: Could not read valid times from {obs_path}. Using NAV span instead.")
else:
print(f"\nOBS file not found. Tried: {', '.join(tried)}. Using NAV span instead.")
# Ensure at least two samples with the default 5-minute step
if (use_end - use_start) < timedelta(minutes=5):
use_end = use_start + timedelta(minutes=5)
# Generate the combined skyplot
print("\nGenerating skyplot...")
print("\nGenerating skyplot ...")
footer = f"From {use_start} to {use_end} UTC"
plot_satellite_tracks(