diff --git a/utils/skyplot/skyplot.py b/utils/skyplot/skyplot.py index 2d82bf140..d0721a1db 100755 --- a/utils/skyplot/skyplot.py +++ b/utils/skyplot/skyplot.py @@ -31,6 +31,7 @@ import sys from datetime import datetime, timedelta from math import atan2, cos, sin, sqrt from pathlib import Path +from typing import Tuple, Optional try: import matplotlib.pyplot as plt @@ -42,7 +43,7 @@ except ImportError: __version__ = "1.0.0" -def read_obs_time_bounds(obs_path: str) -> tuple[datetime | None, datetime | None]: +def read_obs_time_bounds(obs_path: str) -> Tuple[Optional[datetime], Optional[datetime]]: """ Return (start_time, end_time) from a RINEX observation file (v2/3/4) by scanning epoch lines. If parsing fails or the file is not OBS, return (None, None). @@ -106,7 +107,7 @@ def read_obs_time_bounds(obs_path: str) -> tuple[datetime | None, datetime | Non return (None, None) -def find_obs_for_nav(nav_file: str) -> str | None: +def find_obs_for_nav(nav_file: str) -> Optional[str]: """Find corresponding RINEX OBS file for a given NAV file (v2/v3/v4), covering all standard extensions.""" nav_path = Path(nav_file) tried = [] @@ -114,7 +115,7 @@ def find_obs_for_nav(nav_file: str) -> str | None: stem = nav_path.stem suffix = nav_path.suffix - # --- RINEX v2: replace last letter of extension with 'O' or 'o' + # --- RINEX v2 names: replace last letter of extension with 'O' or 'o' if suffix and suffix[-1].isalpha(): for o_type in ('O', 'o'): candidate = nav_path.with_suffix(suffix[:-1] + o_type) @@ -122,7 +123,7 @@ def find_obs_for_nav(nav_file: str) -> str | None: if candidate.exists(): return str(candidate) - # --- RINEX v3/v4: handle standard extensions and common modifiers + # --- RINEX v3/v4 names: handle standard extensions and common modifiers gnss_patterns = [ # Mixed constellations ("_MN", "_MO"), ("_mn", "_mo"), @@ -213,52 +214,232 @@ def parse_rinex_float(s: str) -> float: return 0.0 # Default if parsing fails +def read_rinex_header(filename: str) -> Tuple[str, str]: + """Return (version_str, file_type_char) from the 'RINEX VERSION / TYPE' header line.""" + with open(filename, 'r', encoding='utf-8', errors='ignore') as f: + for line in f: + if "RINEX VERSION / TYPE" in line: + version = line[0:9].strip() # F9.2 in v2/v3/v4 + ftype = line[20:21].upper() # 'N' (GPS nav v2), 'G' (GLO nav v2), 'H' (GEO/SBAS v2), 'N' in v3/4 too + return version, ftype + if "END OF HEADER" in line: + break + return "", "" + + def read_rinex_nav(filename): - """Read RINEX v3/4 navigation file into a dictionary of satellites.""" + """ + Read RINEX v2/v3/v4 navigation file into a dict { 'Gxx': [eph...], 'Rxx': [...], 'Sxxx': [...] }. + """ + version_str, ftype = read_rinex_header(filename) + is_v2 = version_str.startswith('2') + satellites = {} line_number = 0 - with open(filename, 'r', encoding='utf-8') as f: + with open(filename, 'r', encoding='utf-8', errors='ignore') as f: # Skip header while True: line = f.readline() line_number += 1 if not line: - return satellites # Empty file + return satellites if "END OF HEADER" in line: break - # Read ephemeris data current_line = f.readline() line_number += 1 + + # ---------------------------- + # RINEX 2.10 / 2.11 parsing + # ---------------------------- + if is_v2: + # File type: 'N' (GPS), 'G' (GLONASS), 'H' (GEO/SBAS) + v2_system = ftype # keep original char + while current_line: + # Skip empties + if not current_line.strip(): + current_line = f.readline() + line_number += 1 + continue + + try: + # --- First record line: PRN/EPOCH/CLOCK -------------------- + # Formats per RINEX 2.11 Table A4 (GPS) and Table A11 (GLONASS). + # PRN I2 in cols 1-2. Then yy, mm, dd, hh, mi (I2 with 1X between), ss F5.1, + # then 3D19.12 (clock bias, drift, drift rate). + prn_num = int(current_line[0:2]) + yy = int(current_line[3:5]) + mm = int(current_line[6:8]) + dd = int(current_line[9:11]) + hh = int(current_line[12:14]) + mi = int(current_line[15:17]) + ss = float(current_line[18:23]) + + # Year mapping: 80–99 => 1980–1999, 00–79 => 2000–2079 + yyyy = 1900 + yy if yy >= 80 else 2000 + yy + epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1) * 1e6)) + + # Map PRN to 'Gxx' / 'Rxx' / 'Sxxx' + if v2_system == 'N': # GPS nav + prn = f"G{prn_num:02d}" + elif v2_system == 'G': # GLONASS nav + prn = f"R{prn_num:02d}" + elif v2_system == 'H': # GEO/SBAS nav (PRN-100 in file) + prn = f"S{prn_num + 100:03d}" + else: + # Unknown v2 type; skip + current_line = f.readline() + line_number += 1 + continue + + # Collect the lines of this ephemeris block: + lines = [current_line] + + if v2_system == 'G' or v2_system == 'H': + # GLONASS & GEO blocks: 3 more lines (Tables A11/A16) -> total 4 records + needed = 3 + else: + # GPS v2 block: 7 more lines (Table A4) -> total 8 records + needed = 7 + + for _ in range(needed): + next_line = f.readline() + line_number += 1 + if not next_line: + break + lines.append(next_line) + + if len(lines) < needed + 1: + current_line = f.readline() + line_number += 1 + continue + + if v2_system == 'N': + # GPS + ephemeris = { + 'prn': prn, + 'epoch': epoch, + 'sv_clock_bias': parse_rinex_float(lines[0][23:41]), + 'sv_clock_drift': parse_rinex_float(lines[0][41:61]), + 'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]), + 'iode': parse_rinex_float(lines[1][4:22]), + 'crs': parse_rinex_float(lines[1][22:41]), + 'delta_n': parse_rinex_float(lines[1][41:60]), + 'm0': parse_rinex_float(lines[1][61:80]), + 'cuc': parse_rinex_float(lines[2][4:22]), + 'ecc': parse_rinex_float(lines[2][22:41]), + 'cus': parse_rinex_float(lines[2][41:60]), + 'sqrt_a': parse_rinex_float(lines[2][60:80]), + 'toe': parse_rinex_float(lines[3][4:22]), + 'cic': parse_rinex_float(lines[3][22:41]), + 'omega0': parse_rinex_float(lines[3][41:60]), + 'cis': parse_rinex_float(lines[3][60:80]), + 'i0': parse_rinex_float(lines[4][4:22]), + 'crc': parse_rinex_float(lines[4][22:41]), + 'omega': parse_rinex_float(lines[4][41:60]), + 'omega_dot': parse_rinex_float(lines[4][60:80]), + 'idot': parse_rinex_float(lines[5][4:22]), + 'codes_l2': parse_rinex_float(lines[5][22:41]), + 'gps_week': parse_rinex_float(lines[5][41:61]), + 'l2p_flag': parse_rinex_float(lines[5][61:80]), + 'sv_accuracy': parse_rinex_float(lines[6][4:22]), + 'sv_health': parse_rinex_float(lines[6][22:41]), + 'tgd': parse_rinex_float(lines[6][41:61]), + 'iodc': parse_rinex_float(lines[6][61:80]), + 'transmission_time': parse_rinex_float(lines[7][4:22]) if len(lines) > 7 else None, + 'fit_interval': parse_rinex_float(lines[7][22:41]) if len(lines) > 7 else None, + 'extra': lines[8:] + } + elif v2_system == 'H': + # GEO/SBAS (Table A16) + ephemeris = { + 'prn': prn, + 'epoch': epoch, + 'sv_clock_bias': parse_rinex_float(lines[0][23:41]), + 'sv_clock_drift': parse_rinex_float(lines[0][41:61]), + 'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]), + 'x': parse_rinex_float(lines[1][4:22]), + 'x_vel': parse_rinex_float(lines[1][22:41]), + 'x_acc': parse_rinex_float(lines[1][41:60]), + 'health': parse_rinex_float(lines[1][60:80]), + 'y': parse_rinex_float(lines[2][4:22]), + 'y_vel': parse_rinex_float(lines[2][22:41]), + 'y_acc': parse_rinex_float(lines[2][41:61]), + 'z': parse_rinex_float(lines[3][4:22]), + 'z_vel': parse_rinex_float(lines[3][21:41]), + 'z_acc': parse_rinex_float(lines[3][41:61]), + 'extra': lines[4:] + } + elif v2_system == 'G': + # GLONASS + ephemeris = { + 'prn': prn, + 'epoch': epoch, + 'sv_clock_bias': parse_rinex_float(lines[0][23:42]), + 'sv_relative_freq_bias': parse_rinex_float(lines[0][42:61]), + 'message_frame_time': parse_rinex_float(lines[0][61:80]), + 'x': parse_rinex_float(lines[1][4:22]), + 'x_vel': parse_rinex_float(lines[1][22:41]), + 'x_acc': parse_rinex_float(lines[1][41:60]), + 'health': parse_rinex_float(lines[1][60:80]), + 'y': parse_rinex_float(lines[2][4:22]), + 'y_vel': parse_rinex_float(lines[2][22:41]), + 'y_acc': parse_rinex_float(lines[2][41:60]), + 'freq_num': parse_rinex_float(lines[2][60:80]), + 'z': parse_rinex_float(lines[3][4:22]), + 'z_vel': parse_rinex_float(lines[3][22:41]), + 'z_acc': parse_rinex_float(lines[3][41:60]), + 'age': parse_rinex_float(lines[3][60:80]), + 'extra': lines[4:] + } + else: + ephemeris = None + + if ephemeris: + satellites.setdefault(prn, []).append(ephemeris) + + except (ValueError, IndexError) as e: + # Skip malformed block; advance + current_line = f.readline() + line_number += 1 + continue + + current_line = f.readline() + line_number += 1 + + return satellites # done with v2 + + # ---------------------------- + # RINEX 3 / 4 parsing + # ---------------------------- while current_line: + # Skip short/noise lines if len(current_line) < 23: current_line = f.readline() line_number += 1 continue - prn = current_line[:3].strip() - if not prn: + # Parse the epoch line + parts = current_line.split() + if len(parts) < 8: current_line = f.readline() line_number += 1 continue - system = prn[0] # G, R, E, etc. + prn = parts[0].strip() + system = prn[0] try: - # Parse epoch fields - year = int(current_line[4:8]) - month = int(current_line[9:11]) - day = int(current_line[12:14]) - hour = int(current_line[15:17]) - minute = int(current_line[18:20]) - second = int(float(current_line[21:23])) + year = int(parts[1]) + month = int(parts[2]) + day = int(parts[3]) + hour = int(parts[4]) + minute = int(parts[5]) + second = float(parts[6]) + epoch = datetime(year, month, day, hour, minute, int(second), int((second % 1) * 1e6)) - year += 2000 if year < 80 else 0 - epoch = datetime(year, month, day, hour, minute, second) - - # Read the next lines lines = [current_line] - line_count = 4 if system == 'R' else 7 + line_count = 4 if system == 'R' or system == 'S' else 7 for _ in range(line_count): next_line = f.readline() line_number += 1 @@ -271,36 +452,34 @@ def read_rinex_nav(filename): line_number += 1 continue - # Build ephemeris dictionary if system == 'R': # GLONASS ephemeris = { 'prn': prn, 'epoch': epoch, - 'sv_clock_bias': parse_rinex_float(lines[0][23:42]), + 'sv_clock_bias': parse_rinex_float(lines[0][23:41]), 'sv_relative_freq_bias': parse_rinex_float(lines[0][42:61]), 'message_frame_time': parse_rinex_float(lines[0][61:80]), - 'x': parse_rinex_float(lines[1][4:23]), # Position (km) - 'x_vel': parse_rinex_float(lines[1][23:42]), # Velocity (km/s) + 'x': parse_rinex_float(lines[1][4:23]), + 'x_vel': parse_rinex_float(lines[1][23:41]), 'x_acc': parse_rinex_float(lines[1][42:61]), 'health': parse_rinex_float(lines[1][61:80]), 'y': parse_rinex_float(lines[2][4:23]), - 'y_vel': parse_rinex_float(lines[2][23:42]), + 'y_vel': parse_rinex_float(lines[2][23:41]), 'y_acc': parse_rinex_float(lines[2][42:61]), 'freq_num': parse_rinex_float(lines[2][61:80]), 'z': parse_rinex_float(lines[3][4:23]), - 'z_vel': parse_rinex_float(lines[3][23:42]), + 'z_vel': parse_rinex_float(lines[3][23:41]), 'z_acc': parse_rinex_float(lines[3][42:61]), 'age': parse_rinex_float(lines[3][61:80]), - 'extra': lines[4:] # Keep any extra RINEX v4 lines + 'extra': lines[4:] } - elif system == 'S': # SBAS (RINEX v4) + elif system == 'S': # SBAS (RINEX v4 short form) ephemeris = { 'prn': prn, 'epoch': epoch, 'sv_clock_bias': parse_rinex_float(lines[0][23:42]), 'sv_clock_drift': parse_rinex_float(lines[0][42:61]), 'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]), - # SBAS messages are shorter: 4 lines instead of 8 'x': parse_rinex_float(lines[1][4:23]) if len(lines) > 1 else None, 'x_vel': parse_rinex_float(lines[1][23:42]) if len(lines) > 1 else None, 'x_acc': parse_rinex_float(lines[1][42:61]) if len(lines) > 1 else None, @@ -311,9 +490,9 @@ def read_rinex_nav(filename): 'z': parse_rinex_float(lines[3][4:23]) if len(lines) > 3 else None, 'z_vel': parse_rinex_float(lines[3][23:42]) if len(lines) > 3 else None, 'z_acc': parse_rinex_float(lines[3][42:61]) if len(lines) > 3 else None, - 'extra': lines[4:] # capture anything beyond + 'extra': lines[4:] } - else: # GPS, Galileo, BeiDou, QZSS, IRNSS, etc. + else: ephemeris = { 'prn': prn, 'epoch': epoch, @@ -346,25 +525,14 @@ def read_rinex_nav(filename): 'iodc': parse_rinex_float(lines[6][61:80]), 'transmission_time': parse_rinex_float(lines[7][4:23]) if len(lines) > 7 else None, 'fit_interval': parse_rinex_float(lines[7][23:42]) if len(lines) > 7 else None, - 'extra': lines[8:] # Keep any extra RINEX v4 lines + 'extra': lines[8:] } - if prn not in satellites: - satellites[prn] = [] - satellites[prn].append(ephemeris) + satellites.setdefault(prn, []).append(ephemeris) - except (ValueError, IndexError) as e: - print(f"\nError in file {filename} at line {line_number}:") - print(f" PRN: {prn}") - print(f" Line content: {current_line.strip()}") - print(f" Error type: {type(e).__name__}") - print(f" Error details: {str(e)}") - print("Skipping to next satellite block ...\n") - # Skip to next block by reading until next PRN - while current_line and not current_line.startswith(prn[0]): - current_line = f.readline() - line_number += 1 - continue + except (ValueError, IndexError): + # Skip to next line + pass current_line = f.readline() line_number += 1 @@ -758,7 +926,7 @@ def main(): try: satellites = read_rinex_nav(filename) except FileNotFoundError: - print(f"Error: File '{filename}' not found.") + print(f"Error: File {filename} not found.") return 1 if not satellites: