From fc91f414d59e9614fe6eaf3f8360f1fdb355eafd Mon Sep 17 00:00:00 2001 From: Carles Fernandez Date: Wed, 27 Aug 2025 20:35:29 +0200 Subject: [PATCH] skyplot: add jpg output, improve documentation, apply pep8 formatting --- utils/skyplot/README.md | 32 +- utils/skyplot/skyplot.py | 1112 +++++++++++++++++++++++++------------- 2 files changed, 750 insertions(+), 394 deletions(-) diff --git a/utils/skyplot/README.md b/utils/skyplot/README.md index d6578ccbf..bd870f1d8 100644 --- a/utils/skyplot/README.md +++ b/utils/skyplot/README.md @@ -22,8 +22,8 @@ showing satellite visibility over time. - Color-codes satellites by constellation (GPS, Galileo, GLONASS, BeiDou). - Elevation mask set to 5°, configurable via the `--elev-mask` optional argument. -- Outputs high-quality image in PDF format. EPS, PNG, and SVG formats are also - available via the `--format` optional argument. +- Outputs high-quality image in PDF format. EPS, JPG, PNG, and SVG formats are + also available via the `--format` optional argument. - Non-interactive mode for CI jobs with the `--no-show` optional argument. - Constellations to plot can be configured via the `--system` optional argument. - Optionally, it uses the corresponding RINEX observation file to limit the plot @@ -52,7 +52,7 @@ showing satellite visibility over time. ``` ./skyplot.py [LATITUDE] [LONGITUDE] [ALTITUDE] [--elev-mask ELEV_MASK] - [--format {pdf,eps,png,svg}] + [--format {pdf,eps,jpg,png,svg}] [--no-show] [--system SYSTEM [SYSTEM ...]] [--use-obs] @@ -60,17 +60,17 @@ showing satellite visibility over time. ### Arguments -| Argument | Type | Units | Description | Default | -| ---------------- | -------- | ----------- | ------------------------ | -------- | -| `RINEX_NAV_FILE` | Required | - | RINEX nav file path | - | -| `LATITUDE` | Optional | degrees (°) | North/South position | 41.275°N | -| `LONGITUDE` | Optional | degrees (°) | East/West position | 1.9876°E | -| `ALTITUDE` | Optional | meters (m) | Height above sea level | 80.0 m | -| `--elev-mask` | Optional | degrees (°) | Elevation mask | 5° | -| `--format` | Optional | - | Output {pdf,eps,png,svg} | pdf | -| `--no-show` | Optional | - | Do not show plot | - | -| `--system` | Optional | - | Systems to plot | All | -| `--use-obs` | Optional | - | Use RINEX obs data | - | +| Argument | Type | Units | Description | Default | +| ---------------- | -------- | ----------- | ---------------------- | -------- | +| `RINEX_NAV_FILE` | Required | - | RINEX nav file path | - | +| `LATITUDE` | Optional | degrees (°) | North/South position | 41.275°N | +| `LONGITUDE` | Optional | degrees (°) | East/West position | 1.9876°E | +| `ALTITUDE` | Optional | meters (m) | Height above sea level | 80.0 m | +| `--elev-mask` | Optional | degrees (°) | Elevation mask | 5° | +| `--format` | Optional | - | Output file format | pdf | +| `--no-show` | Optional | - | Do not show plot | - | +| `--system` | Optional | - | Systems to plot | All | +| `--use-obs` | Optional | - | Use RINEX obs file | - | ### Examples @@ -118,5 +118,5 @@ The script generates a PDF file named `skyplot_.pdf` (with dots in - Time range in footer. - Embedded fonts that display consistently across all systems and generate publication-ready figures. -- EPS, PNG, and SVG output formats available via `--format eps`, `--format png`, - and `--format svg`. +- EPS, JPG, PNG, and SVG output formats available via `--format eps`, + `--format jpg`, `--format png`, and `--format svg`. diff --git a/utils/skyplot/skyplot.py b/utils/skyplot/skyplot.py index 705847b2c..d4e753c00 100755 --- a/utils/skyplot/skyplot.py +++ b/utils/skyplot/skyplot.py @@ -1,28 +1,28 @@ #!/usr/bin/env python """ - skyplot.py +skyplot.py - Reads a RINEX navigation file and generates a skyplot. Optionally, a RINEX - observation file can also be read to match the skyplot to the receiver - processing time. +Reads a RINEX navigation file and generates a skyplot. Optionally, a RINEX +observation file can also be read to match the skyplot to the receiver +processing time. - Usage: - skyplot.py [observer_lat] [observer_lon] [observer_alt] - [--elev-mask ELEV_MASK] - [--format {pdf,eps,png,svg}] - [--no-show] - [--system SYSTEM [SYSTEM ...]] - [--use-obs] +Usage: + skyplot.py [observer_lat] [observer_lon] [observer_alt] + [--elev-mask ELEV_MASK] + [--format {pdf,eps,jpg,png,svg}] + [--no-show] + [--system SYSTEM [SYSTEM ...]] + [--use-obs] - ----------------------------------------------------------------------------- +----------------------------------------------------------------------------- - GNSS-SDR is a Global Navigation Satellite System software-defined receiver. - This file is part of GNSS-SDR. +GNSS-SDR is a Global Navigation Satellite System software-defined receiver. +This file is part of GNSS-SDR. - SPDX-FileCopyrightText: 2025 Carles Fernandez-Prades cfernandez(at)cttc.es - SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: 2025 Carles Fernandez-Prades cfernandez(at)cttc.es +SPDX-License-Identifier: GPL-3.0-or-later - ----------------------------------------------------------------------------- +----------------------------------------------------------------------------- """ import argparse @@ -31,7 +31,7 @@ import sys from datetime import datetime, timedelta from math import atan2, cos, sin, sqrt from pathlib import Path -from typing import Tuple, Optional +from typing import Any, Dict, List, Tuple, Optional try: import matplotlib.pyplot as plt @@ -48,7 +48,10 @@ DEFAULT_LAT = 41.275 DEFAULT_LON = 1.9876 DEFAULT_ALT = 80.0 -def read_obs_time_bounds(obs_path: str) -> Tuple[Optional[datetime], Optional[datetime]]: + +def read_obs_time_bounds( + obs_path: str, +) -> Tuple[Optional[datetime], Optional[datetime]]: """ Return (start_time, end_time) from a RINEX observation file (v2/3/4) by scanning epoch lines. If parsing fails or the file is not OBS, return (None, None). @@ -61,13 +64,13 @@ def read_obs_time_bounds(obs_path: str) -> Tuple[Optional[datetime], Optional[da if not obs_file.exists(): return (None, None) - with obs_file.open('r', encoding='utf-8', errors='ignore') as f: + with obs_file.open("r", encoding="utf-8", errors="ignore") as f: # --- Detect OBS file in header --- is_obs = False for line in f: if "RINEX VERSION / TYPE" in line: file_type = line[20:21].upper() - if file_type == 'O' or 'OBSERVATION DATA' in line.upper(): + if file_type == "O" or "OBSERVATION DATA" in line.upper(): is_obs = True if "END OF HEADER" in line: break @@ -81,7 +84,7 @@ def read_obs_time_bounds(obs_path: str) -> Tuple[Optional[datetime], Optional[da continue try: - if line.startswith('>'): # RINEX 3/4 epoch line + if line.startswith(">"): # RINEX 3/4 epoch line yyyy = int(line[2:6]) mm = int(line[7:9]) dd = int(line[10:12]) @@ -97,7 +100,9 @@ def read_obs_time_bounds(obs_path: str) -> Tuple[Optional[datetime], Optional[da ss = float(line[15:26]) yyyy = 1900 + yy if yy >= 80 else 2000 + yy - epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1) * 1e6)) + epoch = datetime( + yyyy, mm, dd, hh, mi, int(ss), int((ss % 1) * 1e6) + ) if start_time is None or epoch < start_time: start_time = epoch @@ -113,7 +118,25 @@ def read_obs_time_bounds(obs_path: str) -> Tuple[Optional[datetime], Optional[da def find_obs_for_nav(nav_file: str) -> Optional[str]: - """Find corresponding RINEX OBS file for a given NAV file (v2/v3/v4), covering all standard extensions.""" + """ + Find the corresponding RINEX observation file for a given navigation file. + + This function attempts to locate the appropriate observation (OBS) file that + corresponds to a given navigation (NAV) file by applying standard RINEX naming + conventions across versions 2, 3, and 4. + + Parameters + ---------- + nav_file : str + Path to the RINEX navigation file. The function will search for a + corresponding observation file using various naming patterns. + + Returns + ------- + Optional[str] + Path to the found observation file if successful, None otherwise. + Also prints the list of attempted file paths if no match is found. + """ nav_path = Path(nav_file) tried = [] @@ -122,7 +145,7 @@ def find_obs_for_nav(nav_file: str) -> Optional[str]: # --- RINEX v2 names: replace last letter of extension with 'O' or 'o' if suffix and suffix[-1].isalpha(): - for o_type in ('O', 'o'): + for o_type in ("O", "o"): candidate = nav_path.with_suffix(suffix[:-1] + o_type) tried.append(str(candidate)) if candidate.exists(): @@ -131,60 +154,134 @@ def find_obs_for_nav(nav_file: str) -> Optional[str]: # --- RINEX v3/v4 names: handle standard extensions and common modifiers gnss_patterns = [ # Mixed constellations - ("_MN", "_MO"), ("_mn", "_mo"), - ("_MM", "_MO"), ("_mm", "_mo"), - ("_MR", "_MO"), ("_mr", "_mo"), - + ("_MN", "_MO"), + ("_mn", "_mo"), + ("_MM", "_MO"), + ("_mm", "_mo"), + ("_MR", "_MO"), + ("_mr", "_mo"), # Individual constellations - ("_GN", "_GO"), ("_gn", "_go"), # GPS - ("_RN", "_RO"), ("_rn", "_ro"), # GLONASS - ("_EN", "_EO"), ("_en", "_eo"), # Galileo - ("_CN", "_CO"), ("_cn", "_co"), # BeiDou - ("_JN", "_JO"), ("_jn", "_jo"), # QZSS - ("_IN", "_IO"), ("_in", "_io"), # IRNSS - ("_SN", "_SO"), ("_sn", "_so"), # SBAS + ("_GN", "_GO"), # GPS + ("_gn", "_go"), + ("_RN", "_RO"), # GLONASS + ("_rn", "_ro"), + ("_EN", "_EO"), # Galileo + ("_en", "_eo"), + ("_CN", "_CO"), # BeiDou + ("_cn", "_co"), + ("_JN", "_JO"), # QZSS + ("_jn", "_jo"), + ("_IN", "_IO"), # IRNSS + ("_in", "_io"), + ("_SN", "_SO"), # SBAS + ("_sn", "_so"), ] for nav_pattern, obs_pattern in gnss_patterns: if nav_pattern in stem: # Direct replacement (e.g., _MN -> _MO) - candidate = nav_path.with_name(stem.replace(nav_pattern, obs_pattern) + suffix) + candidate = nav_path.with_name( + stem.replace(nav_pattern, obs_pattern) + suffix + ) tried.append(str(candidate)) if candidate.exists(): return str(candidate) # Handle sampling rate patterns (e.g., _MN -> _30S_MO) - sampling_rates = ['_30S', '_15S', '_01S', '_05S', '_30s', '_15s', '_01s', '_05s'] + sampling_rates = [ + "_30S", + "_15S", + "_01S", + "_05S", + "_30s", + "_15s", + "_01s", + "_05s", + ] for rate in sampling_rates: - candidate = nav_path.with_name(stem.replace(nav_pattern, rate + obs_pattern) + suffix) + candidate = nav_path.with_name( + stem.replace(nav_pattern, rate + obs_pattern) + suffix + ) tried.append(str(candidate)) if candidate.exists(): return str(candidate) # Also try with common observation extensions - for obs_ext in ['.rnx', '.obs', '.OBS', '.22O', '.23O', '.24O', '.25O']: + for obs_ext in [ + ".rnx", + ".obs", + ".OBS", + ".22O", + ".23O", + ".24O", + ".25O", + ]: if suffix != obs_ext: - candidate = nav_path.with_name(stem.replace(nav_pattern, obs_pattern) + obs_ext) + candidate = nav_path.with_name( + stem.replace(nav_pattern, obs_pattern) + obs_ext + ) tried.append(str(candidate)) if candidate.exists(): return str(candidate) # --- Additional patterns for files with sampling rate modifiers before constellation code - sampling_rates = ['_30S', '_15S', '_01S', '_05S', '_30s', '_15s', '_01s', '_05s', '_01H', '_1H', '_01h', '_1h'] + sampling_rates = [ + "_30S", + "_15S", + "_01S", + "_05S", + "_30s", + "_15s", + "_01s", + "_05s", + "_01H", + "_1H", + "_01h", + "_1h", + ] for rate in sampling_rates: if rate in stem: # Check if this is a navigation file with sampling rate + _MN/_GN/etc. - for nav_suffix in ['_MN', '_GN', '_RN', '_EN', '_CN', '_JN', '_IN', '_SN', - '_mn', '_gn', '_rn', '_en', '_cn', '_jn', '_in', '_sn']: + for nav_suffix in [ + "_MN", + "_GN", + "_RN", + "_EN", + "_CN", + "_JN", + "_IN", + "_SN", + "_mn", + "_gn", + "_rn", + "_en", + "_cn", + "_jn", + "_in", + "_sn", + ]: if rate + nav_suffix in stem: # Replace navigation with observation (e.g., _30S_MN -> _30S_MO) - candidate = nav_path.with_name(stem.replace(rate + nav_suffix, rate + nav_suffix.replace('N', 'O').replace('n', 'o')) + suffix) + candidate = nav_path.with_name( + stem.replace( + rate + nav_suffix, + rate + + nav_suffix.replace("N", "O").replace("n", "o"), + ) + + suffix + ) tried.append(str(candidate)) if candidate.exists(): return str(candidate) # Also try without sampling rate (e.g., _30S_MN -> _MO) - candidate = nav_path.with_name(stem.replace(rate + nav_suffix, nav_suffix.replace('N', 'O').replace('n', 'o')) + suffix) + candidate = nav_path.with_name( + stem.replace( + rate + nav_suffix, + nav_suffix.replace("N", "O").replace("n", "o"), + ) + + suffix + ) tried.append(str(candidate)) if candidate.exists(): return str(candidate) @@ -193,32 +290,36 @@ def find_obs_for_nav(nav_file: str) -> Optional[str]: return None -def ecef_to_geodetic(x: float, y: float, z: float) -> Tuple[float, float, float]: - """Convert ECEF (X, Y, Z) to (lat, lon, h).""" +def ecef_to_geodetic( + x: float, y: float, z: float +) -> Tuple[float, float, float]: + """Convert ECEF (X, Y, Z) [m] to (lat [°], lon [°], h [m]).""" # WGS84 constants - a = 6378137.0 # semi-major axis + a = 6378137.0 # semi-major axis f = 1 / 298.257223563 e2 = f * (2 - f) lon = atan2(y, x) - r = sqrt(x*x + y*y) + r = sqrt(x * x + y * y) lat = atan2(z, r * (1 - e2)) # initial guess # Iterative improvement for _ in range(5): - N = a / sqrt(1 - e2 * sin(lat)**2) + N = a / sqrt(1 - e2 * sin(lat) ** 2) h = r / cos(lat) - N lat = atan2(z, r * (1 - e2 * (N / (N + h)))) - N = a / sqrt(1 - e2 * sin(lat)**2) + N = a / sqrt(1 - e2 * sin(lat) ** 2) h = r / cos(lat) - N return np.degrees(lat), np.degrees(lon), h -def get_approx_position_from_obs(obs_file: str) -> Optional[Tuple[float, float, float]]: +def get_approx_position_from_obs( + obs_file: str, +) -> Optional[Tuple[float, float, float]]: """Read APPROX POSITION XYZ from an OBS RINEX file header. - Returns (lat, lon, h) in degrees/meters or None if not valid.""" + Returns (lat, lon, h) in degrees/meters or None if not valid.""" try: with open(obs_file, "r") as f: for line in f: @@ -246,48 +347,49 @@ def parse_rinex_float(s: str) -> float: return 0.0 # Replace D exponent with E (some RINEX files use D instead of E) - s = s.replace('D', 'E').replace('d', 'e') + s = s.replace("D", "E").replace("d", "e") # Handle cases where exponent lacks E (e.g., "12345-3") - if re.match(r'[+-]?\d+[+-]\d+', s.strip()): - s = s.replace('+', 'E+').replace('-', 'E-') + if re.match(r"[+-]?\d+[+-]\d+", s.strip()): + s = s.replace("+", "E+").replace("-", "E-") try: return float(s) except ValueError: # Handle cases where the number runs into the next field # Try to split at the exponent if present - if 'E' in s: - base, exp = s.split('E')[:2] + if "E" in s: + base, exp = s.split("E")[:2] # Take first character of exponent if needed - if exp and exp[0] in '+-' and len(exp) > 1: - return float(base + 'E' + exp[0] + exp[1:].split()[0]) + if exp and exp[0] in "+-" and len(exp) > 1: + return float(base + "E" + exp[0] + exp[1:].split()[0]) return 0.0 # Default if parsing fails def read_rinex_header(filename: str) -> Tuple[str, str]: """Return (version_str, file_type_char) from the 'RINEX VERSION / TYPE' header line.""" - with open(filename, 'r', encoding='utf-8', errors='ignore') as f: + with open(filename, "r", encoding="utf-8", errors="ignore") as f: for line in f: if "RINEX VERSION / TYPE" in line: version = line[0:9].strip() # F9.2 in v2/v3/v4 - ftype = line[20:21].upper() # 'N' (GPS nav v2), 'G' (GLO nav v2), 'H' (GEO/SBAS v2), 'N' in v3/4 too + # 'N' (GPS nav v2), 'G' (GLO nav v2), 'H' (GEO/SBAS v2), 'N' in v3/4 too + ftype = line[20:21].upper() return version, ftype if "END OF HEADER" in line: break return "", "" -def read_rinex_nav(filename): +def read_rinex_nav(filename: str) -> Dict[str, List[Dict[str, Any]]]: """ Read RINEX v2/v3/v4 navigation file into a dict { 'Gxx': [eph...], 'Rxx': [...], 'Sxxx': [...] }. """ version_str, ftype = read_rinex_header(filename) - is_v2 = version_str.startswith('2') + is_v2 = version_str.startswith("2") satellites = {} line_number = 0 - with open(filename, 'r', encoding='utf-8', errors='ignore') as f: + with open(filename, "r", encoding="utf-8", errors="ignore") as f: # Skip header while True: line = f.readline() @@ -315,9 +417,7 @@ def read_rinex_nav(filename): try: # --- First record line: PRN/EPOCH/CLOCK -------------------- - # Formats per RINEX 2.11 Table A4 (GPS) and Table A11 (GLONASS). - # PRN I2 in cols 1-2. Then yy, mm, dd, hh, mi (I2 with 1X between), ss F5.1, - # then 3D19.12 (clock bias, drift, drift rate). + # Formats per RINEX 2.11 Table A4 (GPS) and Table A11 (GLONASS) prn_num = int(current_line[0:2]) yy = int(current_line[3:5]) mm = int(current_line[6:8]) @@ -328,14 +428,16 @@ def read_rinex_nav(filename): # Year mapping: 80–99 => 1980–1999, 00–79 => 2000–2079 yyyy = 1900 + yy if yy >= 80 else 2000 + yy - epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1) * 1e6)) + epoch = datetime( + yyyy, mm, dd, hh, mi, int(ss), int((ss % 1) * 1e6) + ) # Map PRN to 'Gxx' / 'Rxx' / 'Sxxx' - if v2_system == 'N': # GPS nav + if v2_system == "N": # GPS nav prn = f"G{prn_num:02d}" - elif v2_system == 'G': # GLONASS nav + elif v2_system == "G": # GLONASS nav prn = f"R{prn_num:02d}" - elif v2_system == 'H': # GEO/SBAS nav (PRN-100 in file) + elif v2_system == "H": # GEO/SBAS nav (PRN-100 in file) prn = f"S{prn_num + 100:03d}" else: # Unknown v2 type; skip @@ -346,7 +448,7 @@ def read_rinex_nav(filename): # Collect the lines of this ephemeris block: lines = [current_line] - if v2_system == 'G' or v2_system == 'H': + if v2_system == "G" or v2_system == "H": # GLONASS & GEO blocks: 3 more lines (Tables A11/A16) -> total 4 records needed = 3 else: @@ -365,83 +467,107 @@ def read_rinex_nav(filename): line_number += 1 continue - if v2_system == 'N': + if v2_system == "N": # GPS ephemeris = { - 'prn': prn, - 'epoch': epoch, - 'sv_clock_bias': parse_rinex_float(lines[0][23:41]), - 'sv_clock_drift': parse_rinex_float(lines[0][41:61]), - 'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]), - 'iode': parse_rinex_float(lines[1][4:22]), - 'crs': parse_rinex_float(lines[1][22:41]), - 'delta_n': parse_rinex_float(lines[1][41:60]), - 'm0': parse_rinex_float(lines[1][61:80]), - 'cuc': parse_rinex_float(lines[2][4:22]), - 'ecc': parse_rinex_float(lines[2][22:41]), - 'cus': parse_rinex_float(lines[2][41:60]), - 'sqrt_a': parse_rinex_float(lines[2][60:80]), - 'toe': parse_rinex_float(lines[3][4:22]), - 'cic': parse_rinex_float(lines[3][22:41]), - 'omega0': parse_rinex_float(lines[3][41:60]), - 'cis': parse_rinex_float(lines[3][60:80]), - 'i0': parse_rinex_float(lines[4][4:22]), - 'crc': parse_rinex_float(lines[4][22:41]), - 'omega': parse_rinex_float(lines[4][41:60]), - 'omega_dot': parse_rinex_float(lines[4][60:80]), - 'idot': parse_rinex_float(lines[5][4:22]), - 'codes_l2': parse_rinex_float(lines[5][22:41]), - 'gps_week': parse_rinex_float(lines[5][41:61]), - 'l2p_flag': parse_rinex_float(lines[5][61:80]), - 'sv_accuracy': parse_rinex_float(lines[6][4:22]), - 'sv_health': parse_rinex_float(lines[6][22:41]), - 'tgd': parse_rinex_float(lines[6][41:61]), - 'iodc': parse_rinex_float(lines[6][61:80]), - 'transmission_time': parse_rinex_float(lines[7][4:22]) if len(lines) > 7 else None, - 'fit_interval': parse_rinex_float(lines[7][22:41]) if len(lines) > 7 else None, - 'extra': lines[8:] + "prn": prn, + "epoch": epoch, + "sv_clock_bias": parse_rinex_float( + lines[0][23:41] + ), + "sv_clock_drift": parse_rinex_float( + lines[0][41:61] + ), + "sv_clock_drift_rate": parse_rinex_float( + lines[0][61:80] + ), + "iode": parse_rinex_float(lines[1][4:22]), + "crs": parse_rinex_float(lines[1][22:41]), + "delta_n": parse_rinex_float(lines[1][41:60]), + "m0": parse_rinex_float(lines[1][61:80]), + "cuc": parse_rinex_float(lines[2][4:22]), + "ecc": parse_rinex_float(lines[2][22:41]), + "cus": parse_rinex_float(lines[2][41:60]), + "sqrt_a": parse_rinex_float(lines[2][60:80]), + "toe": parse_rinex_float(lines[3][4:22]), + "cic": parse_rinex_float(lines[3][22:41]), + "omega0": parse_rinex_float(lines[3][41:60]), + "cis": parse_rinex_float(lines[3][60:80]), + "i0": parse_rinex_float(lines[4][4:22]), + "crc": parse_rinex_float(lines[4][22:41]), + "omega": parse_rinex_float(lines[4][41:60]), + "omega_dot": parse_rinex_float(lines[4][60:80]), + "idot": parse_rinex_float(lines[5][4:22]), + "codes_l2": parse_rinex_float(lines[5][22:41]), + "gps_week": parse_rinex_float(lines[5][41:61]), + "l2p_flag": parse_rinex_float(lines[5][61:80]), + "sv_accuracy": parse_rinex_float(lines[6][4:22]), + "sv_health": parse_rinex_float(lines[6][22:41]), + "tgd": parse_rinex_float(lines[6][41:61]), + "iodc": parse_rinex_float(lines[6][61:80]), + "transmission_time": parse_rinex_float( + lines[7][4:22] + ) + if len(lines) > 7 + else None, + "fit_interval": parse_rinex_float(lines[7][22:41]) + if len(lines) > 7 + else None, + "extra": lines[8:], } - elif v2_system == 'H': + elif v2_system == "H": # GEO/SBAS (Table A16) ephemeris = { - 'prn': prn, - 'epoch': epoch, - 'sv_clock_bias': parse_rinex_float(lines[0][23:41]), - 'sv_clock_drift': parse_rinex_float(lines[0][41:61]), - 'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]), - 'x': parse_rinex_float(lines[1][4:22]), - 'x_vel': parse_rinex_float(lines[1][22:41]), - 'x_acc': parse_rinex_float(lines[1][41:60]), - 'health': parse_rinex_float(lines[1][60:80]), - 'y': parse_rinex_float(lines[2][4:22]), - 'y_vel': parse_rinex_float(lines[2][22:41]), - 'y_acc': parse_rinex_float(lines[2][41:61]), - 'z': parse_rinex_float(lines[3][4:22]), - 'z_vel': parse_rinex_float(lines[3][21:41]), - 'z_acc': parse_rinex_float(lines[3][41:61]), - 'extra': lines[4:] + "prn": prn, + "epoch": epoch, + "sv_clock_bias": parse_rinex_float( + lines[0][23:41] + ), + "sv_clock_drift": parse_rinex_float( + lines[0][41:61] + ), + "sv_clock_drift_rate": parse_rinex_float( + lines[0][61:80] + ), + "x": parse_rinex_float(lines[1][4:22]), + "x_vel": parse_rinex_float(lines[1][22:41]), + "x_acc": parse_rinex_float(lines[1][41:60]), + "health": parse_rinex_float(lines[1][60:80]), + "y": parse_rinex_float(lines[2][4:22]), + "y_vel": parse_rinex_float(lines[2][22:41]), + "y_acc": parse_rinex_float(lines[2][41:61]), + "z": parse_rinex_float(lines[3][4:22]), + "z_vel": parse_rinex_float(lines[3][21:41]), + "z_acc": parse_rinex_float(lines[3][41:61]), + "extra": lines[4:], } - elif v2_system == 'G': - # GLONASS + elif v2_system == "G": + # GLONASS ephemeris = { - 'prn': prn, - 'epoch': epoch, - 'sv_clock_bias': parse_rinex_float(lines[0][23:42]), - 'sv_relative_freq_bias': parse_rinex_float(lines[0][42:61]), - 'message_frame_time': parse_rinex_float(lines[0][61:80]), - 'x': parse_rinex_float(lines[1][4:22]), - 'x_vel': parse_rinex_float(lines[1][22:41]), - 'x_acc': parse_rinex_float(lines[1][41:60]), - 'health': parse_rinex_float(lines[1][60:80]), - 'y': parse_rinex_float(lines[2][4:22]), - 'y_vel': parse_rinex_float(lines[2][22:41]), - 'y_acc': parse_rinex_float(lines[2][41:60]), - 'freq_num': parse_rinex_float(lines[2][60:80]), - 'z': parse_rinex_float(lines[3][4:22]), - 'z_vel': parse_rinex_float(lines[3][22:41]), - 'z_acc': parse_rinex_float(lines[3][41:60]), - 'age': parse_rinex_float(lines[3][60:80]), - 'extra': lines[4:] + "prn": prn, + "epoch": epoch, + "sv_clock_bias": parse_rinex_float( + lines[0][23:42] + ), + "sv_relative_freq_bias": parse_rinex_float( + lines[0][42:61] + ), + "message_frame_time": parse_rinex_float( + lines[0][61:80] + ), + "x": parse_rinex_float(lines[1][4:22]), + "x_vel": parse_rinex_float(lines[1][22:41]), + "x_acc": parse_rinex_float(lines[1][41:60]), + "health": parse_rinex_float(lines[1][60:80]), + "y": parse_rinex_float(lines[2][4:22]), + "y_vel": parse_rinex_float(lines[2][22:41]), + "y_acc": parse_rinex_float(lines[2][41:60]), + "freq_num": parse_rinex_float(lines[2][60:80]), + "z": parse_rinex_float(lines[3][4:22]), + "z_vel": parse_rinex_float(lines[3][22:41]), + "z_acc": parse_rinex_float(lines[3][41:60]), + "age": parse_rinex_float(lines[3][60:80]), + "extra": lines[4:], } else: ephemeris = None @@ -487,9 +613,17 @@ def read_rinex_nav(filename): hour = int(parts[4]) minute = int(parts[5]) second = float(parts[6][:2]) - epoch = datetime(year, month, day, hour, minute, int(second), int((second % 1) * 1e6)) + epoch = datetime( + year, + month, + day, + hour, + minute, + int(second), + int((second % 1) * 1e6), + ) lines = [current_line] - line_count = 4 if system == 'R' or system == 'S' else 7 + line_count = 4 if system == "R" or system == "S" else 7 for _ in range(line_count): next_line = f.readline() line_number += 1 @@ -502,80 +636,112 @@ def read_rinex_nav(filename): line_number += 1 continue - if system == 'R': # GLONASS + if system == "R": # GLONASS ephemeris = { - 'prn': prn, - 'epoch': epoch, - 'sv_clock_bias': parse_rinex_float(lines[0][23:41]), - 'sv_relative_freq_bias': parse_rinex_float(lines[0][42:61]), - 'message_frame_time': parse_rinex_float(lines[0][61:80]), - 'x': parse_rinex_float(lines[1][4:23]), - 'x_vel': parse_rinex_float(lines[1][23:41]), - 'x_acc': parse_rinex_float(lines[1][42:61]), - 'health': parse_rinex_float(lines[1][61:80]), - 'y': parse_rinex_float(lines[2][4:23]), - 'y_vel': parse_rinex_float(lines[2][23:41]), - 'y_acc': parse_rinex_float(lines[2][42:61]), - 'freq_num': parse_rinex_float(lines[2][61:80]), - 'z': parse_rinex_float(lines[3][4:23]), - 'z_vel': parse_rinex_float(lines[3][23:41]), - 'z_acc': parse_rinex_float(lines[3][42:61]), - 'age': parse_rinex_float(lines[3][61:80]), - 'extra': lines[4:] + "prn": prn, + "epoch": epoch, + "sv_clock_bias": parse_rinex_float(lines[0][23:41]), + "sv_relative_freq_bias": parse_rinex_float( + lines[0][42:61] + ), + "message_frame_time": parse_rinex_float( + lines[0][61:80] + ), + "x": parse_rinex_float(lines[1][4:23]), + "x_vel": parse_rinex_float(lines[1][23:41]), + "x_acc": parse_rinex_float(lines[1][42:61]), + "health": parse_rinex_float(lines[1][61:80]), + "y": parse_rinex_float(lines[2][4:23]), + "y_vel": parse_rinex_float(lines[2][23:41]), + "y_acc": parse_rinex_float(lines[2][42:61]), + "freq_num": parse_rinex_float(lines[2][61:80]), + "z": parse_rinex_float(lines[3][4:23]), + "z_vel": parse_rinex_float(lines[3][23:41]), + "z_acc": parse_rinex_float(lines[3][42:61]), + "age": parse_rinex_float(lines[3][61:80]), + "extra": lines[4:], } - elif system == 'S': # SBAS (RINEX v4 short form) + elif system == "S": # SBAS (RINEX v4 short form) ephemeris = { - 'prn': prn, - 'epoch': epoch, - 'sv_clock_bias': parse_rinex_float(lines[0][23:42]), - 'sv_clock_drift': parse_rinex_float(lines[0][42:61]), - 'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]), - 'x': parse_rinex_float(lines[1][4:23]) if len(lines) > 1 else None, - 'x_vel': parse_rinex_float(lines[1][23:42]) if len(lines) > 1 else None, - 'x_acc': parse_rinex_float(lines[1][42:61]) if len(lines) > 1 else None, - 'health': parse_rinex_float(lines[1][61:80]) if len(lines) > 1 else None, - 'y': parse_rinex_float(lines[2][4:23]) if len(lines) > 2 else None, - 'y_vel': parse_rinex_float(lines[2][23:42]) if len(lines) > 2 else None, - 'y_acc': parse_rinex_float(lines[2][42:61]) if len(lines) > 2 else None, - 'z': parse_rinex_float(lines[3][4:23]) if len(lines) > 3 else None, - 'z_vel': parse_rinex_float(lines[3][23:42]) if len(lines) > 3 else None, - 'z_acc': parse_rinex_float(lines[3][42:61]) if len(lines) > 3 else None, - 'extra': lines[4:] + "prn": prn, + "epoch": epoch, + "sv_clock_bias": parse_rinex_float(lines[0][23:42]), + "sv_clock_drift": parse_rinex_float(lines[0][42:61]), + "sv_clock_drift_rate": parse_rinex_float( + lines[0][61:80] + ), + "x": parse_rinex_float(lines[1][4:23]) + if len(lines) > 1 + else None, + "x_vel": parse_rinex_float(lines[1][23:42]) + if len(lines) > 1 + else None, + "x_acc": parse_rinex_float(lines[1][42:61]) + if len(lines) > 1 + else None, + "health": parse_rinex_float(lines[1][61:80]) + if len(lines) > 1 + else None, + "y": parse_rinex_float(lines[2][4:23]) + if len(lines) > 2 + else None, + "y_vel": parse_rinex_float(lines[2][23:42]) + if len(lines) > 2 + else None, + "y_acc": parse_rinex_float(lines[2][42:61]) + if len(lines) > 2 + else None, + "z": parse_rinex_float(lines[3][4:23]) + if len(lines) > 3 + else None, + "z_vel": parse_rinex_float(lines[3][23:42]) + if len(lines) > 3 + else None, + "z_acc": parse_rinex_float(lines[3][42:61]) + if len(lines) > 3 + else None, + "extra": lines[4:], } - elif system in ('G', 'E', 'C', 'I', 'J'): + elif system in ("G", "E", "C", "I", "J"): ephemeris = { - 'prn': prn, - 'epoch': epoch, - 'sv_clock_bias': parse_rinex_float(lines[0][23:42]), - 'sv_clock_drift': parse_rinex_float(lines[0][42:61]), - 'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]), - 'iode': parse_rinex_float(lines[1][4:23]), - 'crs': parse_rinex_float(lines[1][23:42]), - 'delta_n': parse_rinex_float(lines[1][42:61]), - 'm0': parse_rinex_float(lines[1][61:80]), - 'cuc': parse_rinex_float(lines[2][4:23]), - 'ecc': parse_rinex_float(lines[2][23:42]), - 'cus': parse_rinex_float(lines[2][42:61]), - 'sqrt_a': parse_rinex_float(lines[2][61:80]), - 'toe': parse_rinex_float(lines[3][4:23]), - 'cic': parse_rinex_float(lines[3][23:42]), - 'omega0': parse_rinex_float(lines[3][42:61]), - 'cis': parse_rinex_float(lines[3][61:80]), - 'i0': parse_rinex_float(lines[4][4:23]), - 'crc': parse_rinex_float(lines[4][23:42]), - 'omega': parse_rinex_float(lines[4][42:61]), - 'omega_dot': parse_rinex_float(lines[4][61:80]), - 'idot': parse_rinex_float(lines[5][4:23]), - 'codes_l2': parse_rinex_float(lines[5][23:42]), - 'gps_week': parse_rinex_float(lines[5][42:61]), - 'l2p_flag': parse_rinex_float(lines[5][61:80]), - 'sv_accuracy': parse_rinex_float(lines[6][4:23]), - 'sv_health': parse_rinex_float(lines[6][23:42]), - 'tgd': parse_rinex_float(lines[6][42:61]), - 'iodc': parse_rinex_float(lines[6][61:80]), - 'transmission_time': parse_rinex_float(lines[7][4:23]) if len(lines) > 7 else None, - 'fit_interval': parse_rinex_float(lines[7][23:42]) if len(lines) > 7 else None, - 'extra': lines[8:] + "prn": prn, + "epoch": epoch, + "sv_clock_bias": parse_rinex_float(lines[0][23:42]), + "sv_clock_drift": parse_rinex_float(lines[0][42:61]), + "sv_clock_drift_rate": parse_rinex_float( + lines[0][61:80] + ), + "iode": parse_rinex_float(lines[1][4:23]), + "crs": parse_rinex_float(lines[1][23:42]), + "delta_n": parse_rinex_float(lines[1][42:61]), + "m0": parse_rinex_float(lines[1][61:80]), + "cuc": parse_rinex_float(lines[2][4:23]), + "ecc": parse_rinex_float(lines[2][23:42]), + "cus": parse_rinex_float(lines[2][42:61]), + "sqrt_a": parse_rinex_float(lines[2][61:80]), + "toe": parse_rinex_float(lines[3][4:23]), + "cic": parse_rinex_float(lines[3][23:42]), + "omega0": parse_rinex_float(lines[3][42:61]), + "cis": parse_rinex_float(lines[3][61:80]), + "i0": parse_rinex_float(lines[4][4:23]), + "crc": parse_rinex_float(lines[4][23:42]), + "omega": parse_rinex_float(lines[4][42:61]), + "omega_dot": parse_rinex_float(lines[4][61:80]), + "idot": parse_rinex_float(lines[5][4:23]), + "codes_l2": parse_rinex_float(lines[5][23:42]), + "gps_week": parse_rinex_float(lines[5][42:61]), + "l2p_flag": parse_rinex_float(lines[5][61:80]), + "sv_accuracy": parse_rinex_float(lines[6][4:23]), + "sv_health": parse_rinex_float(lines[6][23:42]), + "tgd": parse_rinex_float(lines[6][42:61]), + "iodc": parse_rinex_float(lines[6][61:80]), + "transmission_time": parse_rinex_float(lines[7][4:23]) + if len(lines) > 7 + else None, + "fit_interval": parse_rinex_float(lines[7][23:42]) + if len(lines) > 7 + else None, + "extra": lines[8:], } else: ephemeris = [] @@ -593,54 +759,77 @@ def read_rinex_nav(filename): return satellites -def calculate_satellite_position(ephemeris, transmit_time): - """Calculate satellite position in ECEF coordinates at given transmission time""" - system = ephemeris['prn'][0] +def calculate_satellite_position( + ephemeris: Dict[str, Any], transmit_time: float +) -> Tuple[float, float, float]: + """Calculate satellite position in ECEF coordinates [m], at a given transmission time""" + system = ephemeris["prn"][0] - if system in ('R', 'S'): # GLONASS/SBAS + if system in ("R", "S"): # GLONASS/SBAS dt = transmit_time # Convert km to meters - xk = (ephemeris['x'] + ephemeris['x_vel'] * dt + 0.5 * ephemeris['x_acc'] * dt**2) * 1000 - yk = (ephemeris['y'] + ephemeris['y_vel'] * dt + 0.5 * ephemeris['y_acc'] * dt**2) * 1000 - zk = (ephemeris['z'] + ephemeris['z_vel'] * dt + 0.5 * ephemeris['z_acc'] * dt**2) * 1000 + xk = ( + ephemeris["x"] + + ephemeris["x_vel"] * dt + + 0.5 * ephemeris["x_acc"] * dt**2 + ) * 1000 + yk = ( + ephemeris["y"] + + ephemeris["y_vel"] * dt + + 0.5 * ephemeris["y_acc"] * dt**2 + ) * 1000 + zk = ( + ephemeris["z"] + + ephemeris["z_vel"] * dt + + 0.5 * ephemeris["z_acc"] * dt**2 + ) * 1000 else: # Constants mu = 3.986005e14 # Earth's gravitational constant (m^3/s^2) omega_e_dot = 7.2921151467e-5 # Earth rotation rate (rad/s) # Semi-major axis - a = ephemeris['sqrt_a'] ** 2 + a = ephemeris["sqrt_a"] ** 2 # Corrected mean motion - n0 = sqrt(mu / (a ** 3)) - n = n0 + ephemeris['delta_n'] + n0 = sqrt(mu / (a**3)) + n = n0 + ephemeris["delta_n"] # Mean anomaly - mk = ephemeris['m0'] + n * transmit_time + mk = ephemeris["m0"] + n * transmit_time # Solve Kepler's equation for eccentric anomaly (Ek) ek = mk for _ in range(10): ek_old = ek - ek = mk + ephemeris['ecc'] * sin(ek) + ek = mk + ephemeris["ecc"] * sin(ek) if abs(ek - ek_old) < 1e-12: break # True anomaly - nu_k = atan2(sqrt(1 - ephemeris['ecc']**2) * sin(ek), cos(ek) - ephemeris['ecc']) + nu_k = atan2( + sqrt(1 - ephemeris["ecc"] ** 2) * sin(ek), + cos(ek) - ephemeris["ecc"], + ) # Argument of latitude - phi_k = nu_k + ephemeris['omega'] + phi_k = nu_k + ephemeris["omega"] # Second harmonic perturbations - delta_uk = ephemeris['cus'] * sin(2 * phi_k) + ephemeris['cuc'] * cos(2 * phi_k) - delta_rk = ephemeris['crs'] * sin(2 * phi_k) + ephemeris['crc'] * cos(2 * phi_k) - delta_ik = ephemeris['cis'] * sin(2 * phi_k) + ephemeris['cic'] * cos(2 * phi_k) + delta_uk = ephemeris["cus"] * sin(2 * phi_k) + ephemeris["cuc"] * cos( + 2 * phi_k + ) + delta_rk = ephemeris["crs"] * sin(2 * phi_k) + ephemeris["crc"] * cos( + 2 * phi_k + ) + delta_ik = ephemeris["cis"] * sin(2 * phi_k) + ephemeris["cic"] * cos( + 2 * phi_k + ) # Corrected argument of latitude, radius and inclination uk = phi_k + delta_uk - rk = a * (1 - ephemeris['ecc'] * cos(ek)) + delta_rk - ik = ephemeris['i0'] + delta_ik + ephemeris['idot'] * transmit_time + rk = a * (1 - ephemeris["ecc"] * cos(ek)) + delta_rk + ik = ephemeris["i0"] + delta_ik + ephemeris["idot"] * transmit_time # Positions in orbital plane xk_prime = rk * cos(uk) @@ -648,9 +837,9 @@ def calculate_satellite_position(ephemeris, transmit_time): # Corrected longitude of ascending node omega_k = ( - ephemeris['omega0'] - + (ephemeris['omega_dot'] - omega_e_dot) * transmit_time - - omega_e_dot * ephemeris['toe'] + ephemeris["omega0"] + + (ephemeris["omega_dot"] - omega_e_dot) * transmit_time + - omega_e_dot * ephemeris["toe"] ) # Earth-fixed coordinates @@ -661,16 +850,39 @@ def calculate_satellite_position(ephemeris, transmit_time): return xk, yk, zk -def calculate_satellite_positions(ephemeris, start_time, end_time, step_min=5): - """Generate multiple positions over time for a single satellite +def calculate_satellite_positions( + ephemeris: Dict[str, Any], + start_time: datetime, + end_time: datetime, + step_min: int = 5, +) -> List[Tuple[datetime, float, float, float]]: + """ + Compute multiple positions over time for a single satellite between start_time and end_time. + + Parameters + ---------- + ephemeris : Dict[str, Any] + Satellite ephemeris data containing orbital parameters + start_time : datetime + Start time for position calculations + end_time : datetime + End time for position calculations + step_min : int, optional + Time step in minutes between calculations, by default 5 + + Returns + ------- + List[Tuple[datetime, float, float, float]] + List of tuples containing (timestamp, x, y, z) coordinates + in ECEF frame for each time step """ positions = [] current_time = start_time - system = ephemeris['prn'][0] - max_valid_time = 1800 if system == 'R' else 14400 + system = ephemeris["prn"][0] + max_valid_time = 1800 if system == "R" else 14400 while current_time <= end_time: - transmit_time = (current_time - ephemeris['epoch']).total_seconds() + transmit_time = (current_time - ephemeris["epoch"]).total_seconds() if abs(transmit_time) <= max_valid_time: x, y, z = calculate_satellite_position(ephemeris, transmit_time) @@ -681,14 +893,45 @@ def calculate_satellite_positions(ephemeris, start_time, end_time, step_min=5): return positions -def ecef_to_az_el(x, y, z, obs_lat, obs_lon, obs_alt): - """Convert ECEF coordinates to azimuth and elevation""" +def ecef_to_az_el( + x: float, + y: float, + z: float, + obs_lat: float, + obs_lon: float, + obs_alt: float, +) -> Tuple[float, float]: + """ + Convert ECEF coordinates to azimuth and elevation angles. + + Parameters + ---------- + x : float + Satellite X coordinate in ECEF frame (meters) + y : float + Satellite Y coordinate in ECEF frame (meters) + z : float + Satellite Z coordinate in ECEF frame (meters) + obs_lat : float + Observer latitude in radians + obs_lon : float + Observer longitude in radians + obs_alt : float + Observer altitude in meters above WGS-84 ellipsoid + + Returns + ------- + Tuple[float, float] + Azimuth and elevation angles in degrees. + Azimuth: 0° to 360° (0° = North, 90° = East) + Elevation: 0° to 90° (0° = horizon, 90° = zenith) + """ # WGS-84 parameters a = 6378137.0 # semi-major axis e_sq = 6.69437999014e-3 # first eccentricity squared # Convert geodetic coordinates to ECEF - n = a / sqrt(1 - e_sq * sin(obs_lat)**2) + n = a / sqrt(1 - e_sq * sin(obs_lat) ** 2) obs_x = (n + obs_alt) * cos(obs_lat) * cos(obs_lon) obs_y = (n + obs_alt) * cos(obs_lat) * sin(obs_lon) obs_z = (n * (1 - e_sq) + obs_alt) * sin(obs_lat) @@ -700,8 +943,16 @@ def ecef_to_az_el(x, y, z, obs_lat, obs_lon, obs_alt): # Convert to local ENU (East, North, Up) coordinates enu_x = -sin(obs_lon) * dx + cos(obs_lon) * dy - enu_y = -sin(obs_lat) * cos(obs_lon) * dx - sin(obs_lat) * sin(obs_lon) * dy + cos(obs_lat) * dz - enu_z = cos(obs_lat) * cos(obs_lon) * dx + cos(obs_lat) * sin(obs_lon) * dy + sin(obs_lat) * dz + enu_y = ( + -sin(obs_lat) * cos(obs_lon) * dx + - sin(obs_lat) * sin(obs_lon) * dy + + cos(obs_lat) * dz + ) + enu_z = ( + cos(obs_lat) * cos(obs_lon) * dx + + cos(obs_lat) * sin(obs_lon) * dy + + sin(obs_lat) * dz + ) # Calculate azimuth and elevation azimuth = atan2(enu_x, enu_y) @@ -714,85 +965,143 @@ def ecef_to_az_el(x, y, z, obs_lat, obs_lon, obs_alt): return azimuth, elevation -def plot_satellite_tracks(satellites, obs_lat, obs_lon, obs_alt, - footer_text=None, filename=None, - show_plot=True, start_time=None, - end_time=None, elev_mask=5.0, - output_format="pdf"): - """Plot trajectories for all visible satellites""" - plt.rcParams['pdf.fonttype'] = 42 # TrueType fonts - plt.rcParams['ps.fonttype'] = 42 # TrueType fonts - plt.rcParams['font.family'] = 'serif' - plt.rcParams['font.serif'] = ['Times New Roman', 'Times', 'DejaVu Serif'] - plt.rcParams['mathtext.fontset'] = 'dejavuserif' # For math text - plt.rcParams['svg.fonttype'] = 'none' # Make SVG text editable +def plot_satellite_tracks( + satellites: Dict[str, List[Dict[str, Any]]], + obs_lat: float, + obs_lon: float, + obs_alt: float, + footer_text: Optional[str] = None, + filename: Optional[str] = None, + show_plot: bool = True, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + elev_mask: float = 5.0, + output_format: str = "pdf", +) -> None: + """ + Plot trajectories for all visible satellites in a polar skyplot. + + Parameters + ---------- + satellites : Dict[str, List[Dict[str, Any]]] + Dictionary mapping satellite PRN strings to lists of ephemeris data + obs_lat : float + Observer latitude in radians + obs_lon : float + Observer longitude in radians + obs_alt : float + Observer altitude in meters + footer_text : Optional[str], optional + Text to display at bottom of plot, by default None + filename : Optional[str], optional + Input filename for naming output, by default None + show_plot : bool, optional + Whether to display the plot interactively, by default True + start_time : Optional[datetime], optional + Start time for position calculations, by default None (auto-detect) + end_time : Optional[datetime], optional + End time for position calculations, by default None (auto-detect) + elev_mask : float, optional + Minimum elevation angle in degrees to plot, by default 5.0 + output_format : str, optional + Output file format, by default "pdf" + + Returns + ------- + None + Saves plot to file and optionally displays it + """ + plt.rcParams["pdf.fonttype"] = 42 # TrueType fonts + plt.rcParams["ps.fonttype"] = 42 # TrueType fonts + plt.rcParams["font.family"] = "serif" + plt.rcParams["font.serif"] = ["Times New Roman", "Times", "DejaVu Serif"] + plt.rcParams["mathtext.fontset"] = "dejavuserif" # For math text + plt.rcParams["savefig.dpi"] = 300 # for jpg and png + plt.rcParams["savefig.bbox"] = "tight" # Always use bbox_inches='tight' + plt.rcParams["svg.fonttype"] = "none" # Make SVG text editable fig = plt.figure(figsize=(8, 8)) - ax = fig.add_subplot(111, projection='polar') + ax = fig.add_subplot(111, projection="polar") ax.tick_params(labelsize=16, pad=7) # Polar plot setup - ax.set_theta_zero_location('N') + ax.set_theta_zero_location("N") ax.set_theta_direction(-1) ax.set_ylim(0, 90) # Elevation ticks ax.set_yticks(range(0, 91, 15)) - ax.set_yticklabels(['90°', '', '60°', '', '30°', '', '0°'], fontsize=14) + ax.set_yticklabels(["90°", "", "60°", "", "30°", "", "0°"], fontsize=14) # Color scheme by constellation system_colors = { - 'G': 'blue', # GPS - 'E': 'green', # Galileo - 'R': 'red', # GLONASS - 'C': 'orange', # BeiDou - 'J': 'brown', # QZSS - 'I': 'pink', # IRNSS - 'S': 'lightgray', # SBAS - 'L': 'cyan' # LEO (new in RINEX v4) + "G": "blue", # GPS + "E": "green", # Galileo + "R": "red", # GLONASS + "C": "orange", # BeiDou + "J": "brown", # QZSS + "I": "pink", # IRNSS + "S": "lightgray", # SBAS + "L": "cyan", # LEO (new in RINEX v4) } # System names mapping system_names = { - 'G': 'GPS', - 'E': 'Galileo', - 'R': 'GLONASS', - 'C': 'BeiDou', - 'J': 'QZSS', - 'I': 'IRNSS', - 'S': 'SBAS', - 'L': 'LEO' + "G": "GPS", + "E": "Galileo", + "R": "GLONASS", + "C": "BeiDou", + "J": "QZSS", + "I": "IRNSS", + "S": "SBAS", + "L": "LEO", } # Find which systems are actually present - present_systems = {prn[0] for prn in satellites.keys() if prn[0] in system_colors} + present_systems = { + prn[0] for prn in satellites.keys() if prn[0] in system_colors + } # Plot each satellite for prn, ephemeris_list in satellites.items(): - color = system_colors.get(prn[0], 'purple') # Default to purple for unknown systems + # Default to purple for unknown systems + color = system_colors.get(prn[0], "purple") if not ephemeris_list: continue mid_time = start_time + (end_time - start_time) / 2 - prev_eph = [e for e in ephemeris_list if e['epoch'] <= mid_time] + prev_eph = [e for e in ephemeris_list if e["epoch"] <= mid_time] if prev_eph: - ephemeris = max(prev_eph, key=lambda e: e['epoch']) + ephemeris = max(prev_eph, key=lambda e: e["epoch"]) else: - ephemeris = min(ephemeris_list, key=lambda e: abs((e['epoch'] - mid_time).total_seconds())) + ephemeris = min( + ephemeris_list, + key=lambda e: abs((e["epoch"] - mid_time).total_seconds()), + ) if start_time is None or end_time is None: - all_epochs = sorted({e['epoch'] for prn_data in satellites.values() for e in prn_data}) + all_epochs = sorted( + { + e["epoch"] + for prn_data in satellites.values() + for e in prn_data + } + ) start_time = min(all_epochs) end_time = max(all_epochs) - positions = calculate_satellite_positions(ephemeris, start_time, end_time) + positions = calculate_satellite_positions( + ephemeris, start_time, end_time + ) # Split into visible segments segments = [] current_seg_az = [] current_seg_el = [] for _, x, y, z in positions: - azimuth, elevation = ecef_to_az_el(x, y, z, obs_lat, obs_lon, obs_alt) + azimuth, elevation = ecef_to_az_el( + x, y, z, obs_lat, obs_lon, obs_alt + ) if elevation > elev_mask: current_seg_az.append(azimuth) current_seg_el.append(elevation) @@ -807,7 +1116,9 @@ def plot_satellite_tracks(satellites, obs_lat, obs_lon, obs_alt, for az_seg, el_seg in segments: theta = np.radians(az_seg) r = 90 - np.array(el_seg) - ax.plot(theta, r, '-', color=color, alpha=0.7, linewidth=2.5, zorder=1) + ax.plot( + theta, r, "-", color=color, alpha=0.7, linewidth=2.5, zorder=1 + ) # Arrow at end if len(theta) >= 2: @@ -816,65 +1127,86 @@ def plot_satellite_tracks(satellites, obs_lat, obs_lon, obs_alt, arrow_length_factor = 1.8 extended_theta = theta[-2] + dx * arrow_length_factor extended_r = r[-2] + dy * arrow_length_factor - ax.annotate('', - xytext=(theta[-1], r[-1]), - xy=(extended_theta, extended_r), - arrowprops={ - 'arrowstyle': '->', - 'color': color, - 'alpha': 0.9, - 'linewidth': 1.5, - 'shrinkA': 0, - 'shrinkB': 0 - }, - zorder=2) + ax.annotate( + "", + xytext=(theta[-1], r[-1]), + xy=(extended_theta, extended_r), + arrowprops={ + "arrowstyle": "->", + "color": color, + "alpha": 0.9, + "linewidth": 1.5, + "shrinkA": 0, + "shrinkB": 0, + }, + zorder=2, + ) # Label at midpoint of the segment - mid_idx = len(theta)//2 - ax.text(theta[mid_idx], r[mid_idx], prn, - fontsize=12, ha='center', va='center', - bbox={"facecolor": system_colors.get(prn[0], "white"), "alpha": 0.2, "pad": 2}, - zorder=3) + mid_idx = len(theta) // 2 + ax.text( + theta[mid_idx], + r[mid_idx], + prn, + fontsize=12, + ha="center", + va="center", + bbox={ + "facecolor": system_colors.get(prn[0], "white"), + "alpha": 0.2, + "pad": 2, + }, + zorder=3, + ) # Legend for present systems legend_elements = [ - plt.Line2D([0], [0], marker='o', color='w', - label=f'{system_names[sys]} ({sys})', - markerfacecolor=system_colors[sys], - markersize=10) + plt.Line2D( + [0], + [0], + marker="o", + color="w", + label=f"{system_names[sys]} ({sys})", + markerfacecolor=system_colors[sys], + markersize=10, + ) for sys in present_systems ] if legend_elements: - ax.legend(handles=legend_elements, - loc='upper right', - bbox_to_anchor=(1.3, 1.1), - fontsize=14) + ax.legend( + handles=legend_elements, + loc="upper right", + bbox_to_anchor=(1.3, 1.1), + fontsize=14, + ) lat_deg = np.degrees(obs_lat) lon_deg = np.degrees(obs_lon) - lat_hemisphere = 'N' if lat_deg >= 0 else 'S' - lon_hemisphere = 'E' if lon_deg >= 0 else 'W' + lat_hemisphere = "N" if lat_deg >= 0 else "S" + lon_hemisphere = "E" if lon_deg >= 0 else "W" plt.title( f"GNSS skyplot from {abs(lat_deg):.2f}° {lat_hemisphere}, " f"{abs(lon_deg):.2f}° {lon_hemisphere}", pad=25, - fontsize=20 + fontsize=20, ) if footer_text: - fig.text(0.42, 0.05, footer_text, ha='center', va='center', fontsize=16) + fig.text( + 0.42, 0.05, footer_text, ha="center", va="center", fontsize=16 + ) plt.tight_layout() if filename: filename_no_path = Path(filename).name - filename_no_dots = filename_no_path.replace('.', '_') + filename_no_dots = filename_no_path.replace(".", "_") output_name = f"skyplot_{filename_no_dots}.{output_format}" else: output_name = f"skyplot.{output_format}" - plt.savefig(output_name, format=output_format, bbox_inches='tight') + plt.savefig(output_name, format=output_format) print(f"Image saved as {output_name}") if show_plot: try: @@ -891,82 +1223,89 @@ def main(): try: # Set system names and codes system_name_to_code = { - 'GPS': 'G', - 'GLONASS': 'R', - 'GALILEO': 'E', - 'BEIDOU': 'C', - 'QZSS': 'J', - 'IRNSS': 'I', - 'SBAS': 'S', - 'LEO': 'L' + "GPS": "G", + "GLONASS": "R", + "GALILEO": "E", + "BEIDOU": "C", + "QZSS": "J", + "IRNSS": "I", + "SBAS": "S", + "LEO": "L", } # Set up argument parser parser = argparse.ArgumentParser( - description='Generate a GNSS skyplot from a RINEX navigation file', - epilog="Example: skyplot.py brdc0010.22n -33.4592 -70.6453 520.0 --format png --system G E --elev-mask 10 --no-show" + description="Generate a GNSS skyplot from a RINEX navigation file", + epilog="Example: skyplot.py brdc0010.22n -33.4592 -70.6453 520.0 --format png --system G E --elev-mask 10 --no-show", ) # Positional arguments + parser.add_argument("filename", help="RINEX navigation file path") + parser.add_argument( - 'filename', - help='RINEX navigation file path' + "lat", + nargs="?", + type=float, + default=DEFAULT_LAT, + help=f"Observer latitude in degrees (default: {DEFAULT_LAT}° N)", ) parser.add_argument( - 'lat', nargs='?', type=float, default=DEFAULT_LAT, - help=f'Observer latitude in degrees (default: {DEFAULT_LAT}° N)' + "lon", + nargs="?", + type=float, + default=DEFAULT_LON, + help=f"Observer longitude in degrees (default: {DEFAULT_LON}° E)", ) parser.add_argument( - 'lon', nargs='?', type=float, default=DEFAULT_LON, - help=f'Observer longitude in degrees (default: {DEFAULT_LON}° E)' - ) - - parser.add_argument( - 'alt', nargs='?', type=float, default=DEFAULT_ALT, - help=f'Observer altitude in meters (default: {DEFAULT_ALT} m)' + "alt", + nargs="?", + type=float, + default=DEFAULT_ALT, + help=f"Observer altitude in meters (default: {DEFAULT_ALT} m)", ) # Optional arguments parser.add_argument( - '--elev-mask', + "--elev-mask", type=float, default=5.0, - help='Elevation mask in degrees for plotting satellite tracks (default: 5°)' + help="Elevation mask in degrees for plotting satellite tracks (default: 5°)", ) parser.add_argument( - '--format', + "--format", type=str, default="pdf", - choices=["pdf", "eps", "png", "svg"], - help='Output file format for plot (default: pdf)' + choices=["pdf", "eps", "jpg", "png", "svg"], + help="Output file format for plot (default: pdf)", ) parser.add_argument( - '--no-show', - action='store_true', - help='Run without displaying plot window' + "--no-show", + action="store_true", + help="Run without displaying plot window", ) parser.add_argument( - '--system', - nargs='+', - help='Only plot satellites from these systems (e.g., G R E or GPS GLONASS Galileo)' + "--system", + nargs="+", + help="Only plot satellites from these systems (e.g., G R E or GPS GLONASS Galileo)", ) parser.add_argument( - '--use-obs', - action='store_true', - help='Use corresponding RINEX observation file to bound the skyplot to the receiver time window' + "--use-obs", + action="store_true", + help="Use corresponding RINEX observation file to bound the skyplot to the receiver time window", ) parser.add_argument( - "-v", "--version", + "-v", + "--version", action="version", version=f"%(prog)s {__version__}", - help="Show program version and exit" + help="Show program version and exit", ) # Parse all arguments with full validation @@ -974,17 +1313,16 @@ def main(): filename = args.filename user_provided_position = ( - (args.lat != DEFAULT_LAT) or - (args.lon != DEFAULT_LON) or - (args.alt != DEFAULT_ALT) - ) + (args.lat != DEFAULT_LAT) + or (args.lon != DEFAULT_LON) + or (args.alt != DEFAULT_ALT) + ) # Convert coordinates to radians obs_lat = np.radians(args.lat) obs_lon = np.radians(args.lon) obs_alt = args.alt - # Read RINEX file print(f"Reading {filename} ...") try: @@ -1006,16 +1344,28 @@ def main(): else: systems_upper.add(s_upper) # Assume user passed the code - satellites = {prn: eph_list for prn, eph_list in satellites.items() if prn[0].upper() in systems_upper} + satellites = { + prn: eph_list + for prn, eph_list in satellites.items() + if prn[0].upper() in systems_upper + } if not satellites: - print(f"No satellites found for systems: {', '.join(sorted(systems_upper))}") + print( + f"No satellites found for systems: {', '.join(sorted(systems_upper))}" + ) return 1 # Print summary information - all_epochs = sorted(list(set( - e['epoch'] for prn, ephemerides in satellites.items() for e in ephemerides - ))) + all_epochs = sorted( + list( + set( + e["epoch"] + for prn, ephemerides in satellites.items() + for e in ephemerides + ) + ) + ) print("\nFile contains:") print(f"- {len(satellites)} unique satellites") print(f"- {len(all_epochs)} unique epochs") @@ -1030,15 +1380,15 @@ def main(): print("\nSatellite systems found:") for system, count in sorted(system_counts.items()): system_name = { - 'G': 'GPS', - 'R': 'GLONASS', - 'E': 'Galileo', - 'C': 'BeiDou', - 'J': 'QZSS', - 'I': 'IRNSS', - 'S': 'SBAS', - 'L': 'LEO' - }.get(system, 'Unknown') + "G": "GPS", + "R": "GLONASS", + "E": "Galileo", + "C": "BeiDou", + "J": "QZSS", + "I": "IRNSS", + "S": "SBAS", + "L": "LEO", + }.get(system, "Unknown") print(f"- {system_name} ({system}): {count} satellites") # Generate the combined skyplot @@ -1050,9 +1400,13 @@ def main(): obs_start, obs_end = read_obs_time_bounds(obs_path) if obs_start and obs_end: use_start, use_end = obs_start, obs_end - print(f"\nObservation window detected in {obs_path}: from {use_start} to {use_end}") + print( + f"\nObservation window detected in {obs_path}: from {use_start} to {use_end}" + ) else: - print(f"\nWarning: Could not read valid times from {obs_path}. Using NAV span instead.") + print( + f"\nWarning: Could not read valid times from {obs_path}. Using NAV span instead." + ) if not user_provided_position: approx_pos = get_approx_position_from_obs(obs_path) if approx_pos is not None: @@ -1060,7 +1414,9 @@ def main(): obs_lat = np.radians(lat) obs_lon = np.radians(lon) obs_alt = h - print(f"\nObserver position found in {obs_path}: lat {lat:.2f}°, lon {lon:.2f}°, height {h:.2f} m.") + print( + f"\nObserver position found in {obs_path}: lat {lat:.2f}°, lon {lon:.2f}°, height {h:.2f} m." + ) # Ensure at least two samples with the default 5-minute step if (use_end - use_start) < timedelta(minutes=5): @@ -1081,7 +1437,7 @@ def main(): start_time=use_start, end_time=use_end, elev_mask=args.elev_mask, - output_format=args.format + output_format=args.format, ) except Exception as e: print(f"Error: {str(e)}")