From 9cf944b8f8bf530f402e7950581f06ec8e6a614f Mon Sep 17 00:00:00 2001 From: Carles Fernandez Date: Sat, 23 Aug 2025 15:53:28 +0200 Subject: [PATCH] skyplot: more improvements: Add reading of RINEX 4 navigation files More robust finding of RINEX obs files when --use-obs is set Add type hints to some functions for improved documentation and readability Improve reading of RINEX obs files Make labels more transparent and colorful Update documentation --- utils/skyplot/README.md | 20 +-- utils/skyplot/skyplot.py | 256 ++++++++++++++++++++++++++++----------- 2 files changed, 198 insertions(+), 78 deletions(-) diff --git a/utils/skyplot/README.md b/utils/skyplot/README.md index a62fce949..6adf7a284 100644 --- a/utils/skyplot/README.md +++ b/utils/skyplot/README.md @@ -20,15 +20,16 @@ showing satellite visibility over time. - Plots satellite tracks in azimuth-elevation coordinates. - Customizable observer location. - Color-codes satellites by constellation (GPS, Galileo, GLONASS, BeiDou). -- Elevation mask set to 5°, configurable via the `--elev-mask` option. +- Elevation mask set to 5°, configurable via the `--elev-mask` optional + argument. - Outputs high-quality image in PDF format. EPS, PNG, and SVG formats are also - available via the `--format` option. -- Non-interactive mode for CI jobs with the `--no-show` option. -- Constellations to plot can be configured via the `--system` option. -- Optionally uses an OBS file to limit plot to the receiver observation time - (`--use-obs`). - - When enabled, the tool looks for a matching file by replacing the last - character of the NAV filename with `O`/`o` and uses it if found. + available via the `--format` optional argument. +- Non-interactive mode for CI jobs with the `--no-show` optional argument. +- Constellations to plot can be configured via the `--system` optional argument. +- Optionally, it uses the corresponding RINEX observation file to limit the plot + to the receiver observation time via the`--use-obs` optional argument. + - If this argument is set, the tool looks for a matching file following + standard RINEX naming conventions, and uses it if found. ## Requirements @@ -103,7 +104,8 @@ The script generates a PDF file named `skyplot_.pdf` (with dots in - Satellite trajectories over all epochs in the file. - NAV file - ephemeris time range (default). - - Receiver observation if `--use-obs` is specified and OBS file is found. + - Receiver observation time if `--use-obs` is specified and the RINEX OBS file + is found. - Color-coded by constellation. - Observer location in title. - Time range in footer. diff --git a/utils/skyplot/skyplot.py b/utils/skyplot/skyplot.py index 966cb2b43..2d82bf140 100755 --- a/utils/skyplot/skyplot.py +++ b/utils/skyplot/skyplot.py @@ -42,55 +42,152 @@ except ImportError: __version__ = "1.0.0" -def _read_obs_time_bounds(obs_path): - """Return (start_time, end_time) from a RINEX 2/3 OBS file by scanning epoch lines. - If parsing fails or the file is not OBS, return (None, None). +def read_obs_time_bounds(obs_path: str) -> tuple[datetime | None, datetime | None]: """ + Return (start_time, end_time) from a RINEX observation file (v2/3/4) + by scanning epoch lines. If parsing fails or the file is not OBS, return (None, None). + """ + start_time = None + end_time = None + try: - with open(obs_path, 'r', encoding='utf-8', errors='ignore') as f: - # Detect OBS in header and skip to END OF HEADER + obs_file = Path(obs_path) + if not obs_file.exists(): + return (None, None) + + with obs_file.open('r', encoding='utf-8', errors='ignore') as f: + # --- Detect OBS file in header --- is_obs = False for line in f: if "RINEX VERSION / TYPE" in line: - # Robust OBS detection: - # In RINEX 2/3 the file-type letter at col 21 (0-based idx 20) - tchar = line[20:21] - if tchar == 'O' or 'OBSERVATION DATA' in line: + file_type = line[20:21].upper() + if file_type == 'O' or 'OBSERVATION DATA' in line.upper(): is_obs = True if "END OF HEADER" in line: break if not is_obs: return (None, None) - start_time = None - end_time = None - + # --- Scan for epoch lines --- for line in f: - if not line.strip(): + line = line.strip() + if not line: continue - if line.startswith('>'): # RINEX 3 epoch line - yyyy = int(line[2:6]); mm = int(line[7:9]); dd = int(line[10:12]) - hh = int(line[13:15]); mi = int(line[16:18]); ss = float(line[19:29]) - epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1)*1e6)) - else: - # RINEX 2 epoch line - try: - yy = int(line[1:3]); mm = int(line[4:6]); dd = int(line[7:9]) - hh = int(line[10:12]); mi = int(line[13:15]); ss = float(line[15:26]) + + try: + if line.startswith('>'): # RINEX 3/4 epoch line + yyyy = int(line[2:6]) + mm = int(line[7:9]) + dd = int(line[10:12]) + hh = int(line[13:15]) + mi = int(line[16:18]) + ss = float(line[19:29]) + else: # RINEX 2 epoch line + yy = int(line[1:3]) + mm = int(line[4:6]) + dd = int(line[7:9]) + hh = int(line[10:12]) + mi = int(line[13:15]) + ss = float(line[15:26]) yyyy = 1900 + yy if yy >= 80 else 2000 + yy - epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1)*1e6)) - except Exception: - continue - if start_time is None or epoch < start_time: - start_time = epoch - if end_time is None or epoch > end_time: - end_time = epoch - return (start_time, end_time) + + epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1) * 1e6)) + + if start_time is None or epoch < start_time: + start_time = epoch + if end_time is None or epoch > end_time: + end_time = epoch + except Exception: + # Skip malformed lines + continue + + return (start_time, end_time) except Exception: return (None, None) -def parse_rinex_float(s): +def find_obs_for_nav(nav_file: str) -> str | None: + """Find corresponding RINEX OBS file for a given NAV file (v2/v3/v4), covering all standard extensions.""" + nav_path = Path(nav_file) + tried = [] + + stem = nav_path.stem + suffix = nav_path.suffix + + # --- RINEX v2: replace last letter of extension with 'O' or 'o' + if suffix and suffix[-1].isalpha(): + for o_type in ('O', 'o'): + candidate = nav_path.with_suffix(suffix[:-1] + o_type) + tried.append(str(candidate)) + if candidate.exists(): + return str(candidate) + + # --- RINEX v3/v4: handle standard extensions and common modifiers + gnss_patterns = [ + # Mixed constellations + ("_MN", "_MO"), ("_mn", "_mo"), + ("_MM", "_MO"), ("_mm", "_mo"), + ("_MR", "_MO"), ("_mr", "_mo"), + + # Individual constellations + ("_GN", "_GO"), ("_gn", "_go"), # GPS + ("_RN", "_RO"), ("_rn", "_ro"), # GLONASS + ("_EN", "_EO"), ("_en", "_eo"), # Galileo + ("_CN", "_CO"), ("_cn", "_co"), # BeiDou + ("_JN", "_JO"), ("_jn", "_jo"), # QZSS + ("_IN", "_IO"), ("_in", "_io"), # IRNSS + ("_SN", "_SO"), ("_sn", "_so"), # SBAS + ] + + for nav_pattern, obs_pattern in gnss_patterns: + if nav_pattern in stem: + # Direct replacement (e.g., _MN -> _MO) + candidate = nav_path.with_name(stem.replace(nav_pattern, obs_pattern) + suffix) + tried.append(str(candidate)) + if candidate.exists(): + return str(candidate) + + # Handle sampling rate patterns (e.g., _MN -> _30S_MO) + sampling_rates = ['_30S', '_15S', '_01S', '_05S', '_30s', '_15s', '_01s', '_05s'] + for rate in sampling_rates: + candidate = nav_path.with_name(stem.replace(nav_pattern, rate + obs_pattern) + suffix) + tried.append(str(candidate)) + if candidate.exists(): + return str(candidate) + + # Also try with common observation extensions + for obs_ext in ['.rnx', '.obs', '.OBS', '.22O', '.23O', '.24O', '.25O']: + if suffix != obs_ext: + candidate = nav_path.with_name(stem.replace(nav_pattern, obs_pattern) + obs_ext) + tried.append(str(candidate)) + if candidate.exists(): + return str(candidate) + + # --- Additional patterns for files with sampling rate modifiers before constellation code + sampling_rates = ['_30S', '_15S', '_01S', '_05S', '_30s', '_15s', '_01s', '_05s', '_01H', '_1H', '_01h', '_1h'] + for rate in sampling_rates: + if rate in stem: + # Check if this is a navigation file with sampling rate + _MN/_GN/etc. + for nav_suffix in ['_MN', '_GN', '_RN', '_EN', '_CN', '_JN', '_IN', '_SN', + '_mn', '_gn', '_rn', '_en', '_cn', '_jn', '_in', '_sn']: + if rate + nav_suffix in stem: + # Replace navigation with observation (e.g., _30S_MN -> _30S_MO) + candidate = nav_path.with_name(stem.replace(rate + nav_suffix, rate + nav_suffix.replace('N', 'O').replace('n', 'o')) + suffix) + tried.append(str(candidate)) + if candidate.exists(): + return str(candidate) + + # Also try without sampling rate (e.g., _30S_MN -> _MO) + candidate = nav_path.with_name(stem.replace(rate + nav_suffix, nav_suffix.replace('N', 'O').replace('n', 'o')) + suffix) + tried.append(str(candidate)) + if candidate.exists(): + return str(candidate) + + print(f"OBS file not found. Tried: {', '.join(tried)}.") + return None + + +def parse_rinex_float(s: str) -> float: """Parse RINEX formatted float string which may contain D or E exponent and compact spacing""" # Handle empty string if not s.strip(): @@ -117,7 +214,7 @@ def parse_rinex_float(s): def read_rinex_nav(filename): - """Read RINEX v3.0 navigation file""" + """Read RINEX v3/4 navigation file into a dictionary of satellites.""" satellites = {} line_number = 0 with open(filename, 'r', encoding='utf-8') as f: @@ -140,6 +237,11 @@ def read_rinex_nav(filename): continue prn = current_line[:3].strip() + if not prn: + current_line = f.readline() + line_number += 1 + continue + system = prn[0] # G, R, E, etc. try: @@ -169,7 +271,8 @@ def read_rinex_nav(filename): line_number += 1 continue - if system == 'R': # GLONASS specific parsing + # Build ephemeris dictionary + if system == 'R': # GLONASS ephemeris = { 'prn': prn, 'epoch': epoch, @@ -187,10 +290,30 @@ def read_rinex_nav(filename): 'z': parse_rinex_float(lines[3][4:23]), 'z_vel': parse_rinex_float(lines[3][23:42]), 'z_acc': parse_rinex_float(lines[3][42:61]), - 'age': parse_rinex_float(lines[3][61:80]) + 'age': parse_rinex_float(lines[3][61:80]), + 'extra': lines[4:] # Keep any extra RINEX v4 lines } - else: - # Parse all ephemeris parameters + elif system == 'S': # SBAS (RINEX v4) + ephemeris = { + 'prn': prn, + 'epoch': epoch, + 'sv_clock_bias': parse_rinex_float(lines[0][23:42]), + 'sv_clock_drift': parse_rinex_float(lines[0][42:61]), + 'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]), + # SBAS messages are shorter: 4 lines instead of 8 + 'x': parse_rinex_float(lines[1][4:23]) if len(lines) > 1 else None, + 'x_vel': parse_rinex_float(lines[1][23:42]) if len(lines) > 1 else None, + 'x_acc': parse_rinex_float(lines[1][42:61]) if len(lines) > 1 else None, + 'health': parse_rinex_float(lines[1][61:80]) if len(lines) > 1 else None, + 'y': parse_rinex_float(lines[2][4:23]) if len(lines) > 2 else None, + 'y_vel': parse_rinex_float(lines[2][23:42]) if len(lines) > 2 else None, + 'y_acc': parse_rinex_float(lines[2][42:61]) if len(lines) > 2 else None, + 'z': parse_rinex_float(lines[3][4:23]) if len(lines) > 3 else None, + 'z_vel': parse_rinex_float(lines[3][23:42]) if len(lines) > 3 else None, + 'z_acc': parse_rinex_float(lines[3][42:61]) if len(lines) > 3 else None, + 'extra': lines[4:] # capture anything beyond + } + else: # GPS, Galileo, BeiDou, QZSS, IRNSS, etc. ephemeris = { 'prn': prn, 'epoch': epoch, @@ -221,9 +344,9 @@ def read_rinex_nav(filename): 'sv_health': parse_rinex_float(lines[6][23:42]), 'tgd': parse_rinex_float(lines[6][42:61]), 'iodc': parse_rinex_float(lines[6][61:80]), - 'transmission_time': parse_rinex_float(lines[7][4:23]), - 'fit_interval': ( - parse_rinex_float(lines[7][23:42])) if len(lines[7]) > 23 else 0.0 + 'transmission_time': parse_rinex_float(lines[7][4:23]) if len(lines) > 7 else None, + 'fit_interval': parse_rinex_float(lines[7][23:42]) if len(lines) > 7 else None, + 'extra': lines[8:] # Keep any extra RINEX v4 lines } if prn not in satellites: @@ -236,7 +359,7 @@ def read_rinex_nav(filename): print(f" Line content: {current_line.strip()}") print(f" Error type: {type(e).__name__}") print(f" Error details: {str(e)}") - print("Skipping to next satellite block...\n") + print("Skipping to next satellite block ...\n") # Skip to next block by reading until next PRN while current_line and not current_line.startswith(prn[0]): current_line = f.readline() @@ -253,7 +376,7 @@ def calculate_satellite_position(ephemeris, transmit_time): """Calculate satellite position in ECEF coordinates at given transmission time""" system = ephemeris['prn'][0] - if system == 'R': # GLONASS - use position + velocity * time + if system in ('R', 'S'): # GLONASS/SBAS dt = transmit_time # Convert km to meters xk = (ephemeris['x'] + ephemeris['x_vel'] * dt + 0.5 * ephemeris['x_acc'] * dt**2) * 1000 @@ -397,13 +520,14 @@ def plot_satellite_tracks(satellites, obs_lat, obs_lon, obs_alt, # Color scheme by constellation system_colors = { - 'G': 'blue', # GPS - 'E': 'green', # Galileo - 'R': 'red', # GLONASS - 'C': 'orange', # BeiDou - 'J': 'brown', # QZSS - 'I': 'pink', # IRNSS - 'S': 'gray' # SBAS + 'G': 'blue', # GPS + 'E': 'green', # Galileo + 'R': 'red', # GLONASS + 'C': 'orange', # BeiDou + 'J': 'brown', # QZSS + 'I': 'pink', # IRNSS + 'S': 'lightgray', # SBAS + 'L': 'cyan' # LEO (new in RINEX v4) } # System names mapping @@ -414,7 +538,8 @@ def plot_satellite_tracks(satellites, obs_lat, obs_lon, obs_alt, 'C': 'BeiDou', 'J': 'QZSS', 'I': 'IRNSS', - 'S': 'SBAS' + 'S': 'SBAS', + 'L': 'LEO' } # Find which systems are actually present @@ -487,7 +612,7 @@ def plot_satellite_tracks(satellites, obs_lat, obs_lon, obs_alt, mid_idx = len(theta)//2 ax.text(theta[mid_idx], r[mid_idx], prn, fontsize=12, ha='center', va='center', - bbox={"facecolor": "white", "alpha": 0.8, "pad": 2}, + bbox={"facecolor": system_colors.get(prn[0], "white"), "alpha": 0.2, "pad": 2}, zorder=3) # Legend for present systems @@ -547,7 +672,8 @@ def main(): 'BEIDOU': 'C', 'QZSS': 'J', 'IRNSS': 'I', - 'SBAS': 'S' + 'SBAS': 'S', + 'LEO': 'L' } # Set up argument parser @@ -628,7 +754,7 @@ def main(): filename = args.filename # Read RINEX file - print(f"Reading {filename}...") + print(f"Reading {filename} ...") try: satellites = read_rinex_nav(filename) except FileNotFoundError: @@ -675,7 +801,11 @@ def main(): 'G': 'GPS', 'R': 'GLONASS', 'E': 'Galileo', - 'C': 'BeiDou' + 'C': 'BeiDou', + 'J': 'QZSS', + 'I': 'IRNSS', + 'S': 'SBAS', + 'L': 'LEO' }.get(system, 'Unknown') print(f"- {system_name} ({system}): {count} satellites") @@ -683,33 +813,21 @@ def main(): # Time window: OBS bounds if provided; else NAV span use_start, use_end = all_epochs[0], all_epochs[-1] if args.use_obs: - tried = [] - obs_path = None - stem = filename[:-1] - - for s in ('O', 'o'): # Try uppercase then lowercase - candidate = stem + s - tried.append(candidate) - if Path(candidate).exists(): - obs_path = candidate - break - + obs_path = find_obs_for_nav(filename) if obs_path: - obs_start, obs_end = _read_obs_time_bounds(obs_path) + obs_start, obs_end = read_obs_time_bounds(obs_path) if obs_start and obs_end: use_start, use_end = obs_start, obs_end - print(f"\nObservation window detected from {obs_path}: {use_start} → {use_end}") + print(f"\nObservation window detected in {obs_path}: from {use_start} to {use_end}") else: print(f"\nWarning: Could not read valid times from {obs_path}. Using NAV span instead.") - else: - print(f"\nOBS file not found. Tried: {', '.join(tried)}. Using NAV span instead.") # Ensure at least two samples with the default 5-minute step if (use_end - use_start) < timedelta(minutes=5): use_end = use_start + timedelta(minutes=5) # Generate the combined skyplot - print("\nGenerating skyplot...") + print("\nGenerating skyplot ...") footer = f"From {use_start} to {use_end} UTC" plot_satellite_tracks(