1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2025-09-04 11:58:00 +00:00

style: format python code

This commit is contained in:
pedromiguelcp
2025-08-13 15:01:13 +01:00
parent 0cf75d15aa
commit dad143ace0

View File

@@ -1,21 +1,21 @@
#!/usr/bin/env python #!/usr/bin/env python
""" """
skyplot.py skyplot.py
Reads a RINEX navigation file and generates a skyplot. Optionally, a RINEX observation file can Reads a RINEX navigation file and generates a skyplot. Optionally, a RINEX observation file can
also be read to match the skyplot to the receiver processing time. also be read to match the skyplot to the receiver processing time.
Usage: python skyplot.py <RINEX_NAV_FILE> [observer_lat] [observer_lon] [observer_alt] [--use-obs] Usage: python skyplot.py <RINEX_NAV_FILE> [observer_lat] [observer_lon] [observer_alt] [--use-obs]
----------------------------------------------------------------------------- -----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver. GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR. This file is part of GNSS-SDR.
SPDX-FileCopyrightText: 2025 Carles Fernandez-Prades cfernandez(at)cttc.es SPDX-FileCopyrightText: 2025 Carles Fernandez-Prades cfernandez(at)cttc.es
SPDX-License-Identifier: GPL-3.0-or-later SPDX-License-Identifier: GPL-3.0-or-later
----------------------------------------------------------------------------- -----------------------------------------------------------------------------
""" """
import argparse import argparse
@@ -30,20 +30,19 @@ import numpy as np
def _read_obs_time_bounds(obs_path): def _read_obs_time_bounds(obs_path):
""" """Return (start_time, end_time) from a RINEX 2/3 OBS file by scanning epoch lines.
Return (start_time, end_time) from a RINEX 2/3 OBS file by scanning epoch lines.
If parsing fails or the file is not OBS, return (None, None). If parsing fails or the file is not OBS, return (None, None).
""" """
try: try:
with open(obs_path, "r", encoding="utf-8", errors="ignore") as f: with open(obs_path, 'r', encoding='utf-8', errors='ignore') as f:
# Detect OBS in header and skip to END OF HEADER # Detect OBS in header and skip to END OF HEADER
is_obs = False is_obs = False
for line in f: for line in f:
if "RINEX VERSION / TYPE" in line: if "RINEX VERSION / TYPE" in line:
# Robust OBS detection: # Robust OBS detection:
# - In RINEX 2/3 the file-type letter at col 21 (0-based idx 20) # In RINEX 2/3 the file-type letter at col 21 (0-based idx 20)
tchar = line[20:21] tchar = line[20:21]
if tchar == "O" or "OBSERVATION DATA" in line: if tchar == 'O' or 'OBSERVATION DATA' in line:
is_obs = True is_obs = True
if "END OF HEADER" in line: if "END OF HEADER" in line:
break break
@@ -56,27 +55,17 @@ def _read_obs_time_bounds(obs_path):
for line in f: for line in f:
if not line.strip(): if not line.strip():
continue continue
if line.startswith(">"): # RINEX 3 epoch line if line.startswith('>'): # RINEX 3 epoch line
yyyy = int(line[2:6]) yyyy = int(line[2:6]); mm = int(line[7:9]); dd = int(line[10:12])
mm = int(line[7:9]) hh = int(line[13:15]); mi = int(line[16:18]); ss = float(line[19:29])
dd = int(line[10:12]) epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1)*1e6))
hh = int(line[13:15])
mi = int(line[16:18])
ss = float(line[19:29])
epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1) * 1e6))
else: else:
# RINEX 2 epoch line # RINEX 2 epoch line
try: try:
yy = int(line[1:3]) yy = int(line[1:3]); mm = int(line[4:6]); dd = int(line[7:9])
mm = int(line[4:6]) hh = int(line[10:12]); mi = int(line[13:15]); ss = float(line[15:26])
dd = int(line[7:9])
hh = int(line[10:12])
mi = int(line[13:15])
ss = float(line[15:26])
yyyy = 1900 + yy if yy >= 80 else 2000 + yy yyyy = 1900 + yy if yy >= 80 else 2000 + yy
epoch = datetime( epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1)*1e6))
yyyy, mm, dd, hh, mi, int(ss), int((ss % 1) * 1e6)
)
except Exception: except Exception:
continue continue
if start_time is None or epoch < start_time: if start_time is None or epoch < start_time:
@@ -95,22 +84,22 @@ def parse_rinex_float(s):
return 0.0 return 0.0
# Replace D exponent with E (some RINEX files use D instead of E) # Replace D exponent with E (some RINEX files use D instead of E)
s = s.replace("D", "E").replace("d", "e") s = s.replace('D', 'E').replace('d', 'e')
# Handle cases where exponent lacks E (e.g., "12345-3") # Handle cases where exponent lacks E (e.g., "12345-3")
if re.match(r"[+-]?\d+[+-]\d+", s.strip()): if re.match(r'[+-]?\d+[+-]\d+', s.strip()):
s = s.replace("+", "E+").replace("-", "E-") s = s.replace('+', 'E+').replace('-', 'E-')
try: try:
return float(s) return float(s)
except ValueError: except ValueError:
# Handle cases where the number runs into the next field # Handle cases where the number runs into the next field
# Try to split at the exponent if present # Try to split at the exponent if present
if "E" in s: if 'E' in s:
base, exp = s.split("E")[:2] base, exp = s.split('E')[:2]
# Take first character of exponent if needed # Take first character of exponent if needed
if exp and exp[0] in "+-" and len(exp) > 1: if exp and exp[0] in '+-' and len(exp) > 1:
return float(base + "E" + exp[0] + exp[1:].split()[0]) return float(base + 'E' + exp[0] + exp[1:].split()[0])
return 0.0 # Default if parsing fails return 0.0 # Default if parsing fails
@@ -118,7 +107,7 @@ def read_rinex_nav(filename):
"""Read RINEX v3.0 navigation file""" """Read RINEX v3.0 navigation file"""
satellites = {} satellites = {}
line_number = 0 line_number = 0
with open(filename, "r", encoding="utf-8") as f: with open(filename, 'r', encoding='utf-8') as f:
# Skip header # Skip header
while True: while True:
line = f.readline() line = f.readline()
@@ -154,7 +143,7 @@ def read_rinex_nav(filename):
# Read the next lines # Read the next lines
lines = [current_line] lines = [current_line]
line_count = 4 if system == "R" else 7 line_count = 4 if system == 'R' else 7
for _ in range(line_count): for _ in range(line_count):
next_line = f.readline() next_line = f.readline()
line_number += 1 line_number += 1
@@ -167,64 +156,61 @@ def read_rinex_nav(filename):
line_number += 1 line_number += 1
continue continue
if system == "R": # GLONASS specific parsing if system == 'R': # GLONASS specific parsing
ephemeris = { ephemeris = {
"prn": prn, 'prn': prn,
"epoch": epoch, 'epoch': epoch,
"sv_clock_bias": parse_rinex_float(lines[0][23:42]), 'sv_clock_bias': parse_rinex_float(lines[0][23:42]),
"sv_relative_freq_bias": parse_rinex_float(lines[0][42:61]), 'sv_relative_freq_bias': parse_rinex_float(lines[0][42:61]),
"message_frame_time": parse_rinex_float(lines[0][61:80]), 'message_frame_time': parse_rinex_float(lines[0][61:80]),
"x": parse_rinex_float(lines[1][4:23]), # Position (km) 'x': parse_rinex_float(lines[1][4:23]), # Position (km)
"x_vel": parse_rinex_float(lines[1][23:42]), # Velocity (km/s) 'x_vel': parse_rinex_float(lines[1][23:42]), # Velocity (km/s)
"x_acc": parse_rinex_float(lines[1][42:61]), 'x_acc': parse_rinex_float(lines[1][42:61]),
"health": parse_rinex_float(lines[1][61:80]), 'health': parse_rinex_float(lines[1][61:80]),
"y": parse_rinex_float(lines[2][4:23]), 'y': parse_rinex_float(lines[2][4:23]),
"y_vel": parse_rinex_float(lines[2][23:42]), 'y_vel': parse_rinex_float(lines[2][23:42]),
"y_acc": parse_rinex_float(lines[2][42:61]), 'y_acc': parse_rinex_float(lines[2][42:61]),
"freq_num": parse_rinex_float(lines[2][61:80]), 'freq_num': parse_rinex_float(lines[2][61:80]),
"z": parse_rinex_float(lines[3][4:23]), 'z': parse_rinex_float(lines[3][4:23]),
"z_vel": parse_rinex_float(lines[3][23:42]), 'z_vel': parse_rinex_float(lines[3][23:42]),
"z_acc": parse_rinex_float(lines[3][42:61]), 'z_acc': parse_rinex_float(lines[3][42:61]),
"age": parse_rinex_float(lines[3][61:80]), 'age': parse_rinex_float(lines[3][61:80])
} }
else: else:
# Parse all ephemeris parameters # Parse all ephemeris parameters
ephemeris = { ephemeris = {
"prn": prn, 'prn': prn,
"epoch": epoch, 'epoch': epoch,
"sv_clock_bias": parse_rinex_float(lines[0][23:42]), 'sv_clock_bias': parse_rinex_float(lines[0][23:42]),
"sv_clock_drift": parse_rinex_float(lines[0][42:61]), 'sv_clock_drift': parse_rinex_float(lines[0][42:61]),
"sv_clock_drift_rate": parse_rinex_float(lines[0][61:80]), 'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]),
"iode": parse_rinex_float(lines[1][4:23]), 'iode': parse_rinex_float(lines[1][4:23]),
"crs": parse_rinex_float(lines[1][23:42]), 'crs': parse_rinex_float(lines[1][23:42]),
"delta_n": parse_rinex_float(lines[1][42:61]), 'delta_n': parse_rinex_float(lines[1][42:61]),
"m0": parse_rinex_float(lines[1][61:80]), 'm0': parse_rinex_float(lines[1][61:80]),
"cuc": parse_rinex_float(lines[2][4:23]), 'cuc': parse_rinex_float(lines[2][4:23]),
"ecc": parse_rinex_float(lines[2][23:42]), 'ecc': parse_rinex_float(lines[2][23:42]),
"cus": parse_rinex_float(lines[2][42:61]), 'cus': parse_rinex_float(lines[2][42:61]),
"sqrt_a": parse_rinex_float(lines[2][61:80]), 'sqrt_a': parse_rinex_float(lines[2][61:80]),
"toe": parse_rinex_float(lines[3][4:23]), 'toe': parse_rinex_float(lines[3][4:23]),
"cic": parse_rinex_float(lines[3][23:42]), 'cic': parse_rinex_float(lines[3][23:42]),
"omega0": parse_rinex_float(lines[3][42:61]), 'omega0': parse_rinex_float(lines[3][42:61]),
"cis": parse_rinex_float(lines[3][61:80]), 'cis': parse_rinex_float(lines[3][61:80]),
"i0": parse_rinex_float(lines[4][4:23]), 'i0': parse_rinex_float(lines[4][4:23]),
"crc": parse_rinex_float(lines[4][23:42]), 'crc': parse_rinex_float(lines[4][23:42]),
"omega": parse_rinex_float(lines[4][42:61]), 'omega': parse_rinex_float(lines[4][42:61]),
"omega_dot": parse_rinex_float(lines[4][61:80]), 'omega_dot': parse_rinex_float(lines[4][61:80]),
"idot": parse_rinex_float(lines[5][4:23]), 'idot': parse_rinex_float(lines[5][4:23]),
"codes_l2": parse_rinex_float(lines[5][23:42]), 'codes_l2': parse_rinex_float(lines[5][23:42]),
"gps_week": parse_rinex_float(lines[5][42:61]), 'gps_week': parse_rinex_float(lines[5][42:61]),
"l2p_flag": parse_rinex_float(lines[5][61:80]), 'l2p_flag': parse_rinex_float(lines[5][61:80]),
"sv_accuracy": parse_rinex_float(lines[6][4:23]), 'sv_accuracy': parse_rinex_float(lines[6][4:23]),
"sv_health": parse_rinex_float(lines[6][23:42]), 'sv_health': parse_rinex_float(lines[6][23:42]),
"tgd": parse_rinex_float(lines[6][42:61]), 'tgd': parse_rinex_float(lines[6][42:61]),
"iodc": parse_rinex_float(lines[6][61:80]), 'iodc': parse_rinex_float(lines[6][61:80]),
"transmission_time": parse_rinex_float(lines[7][4:23]), 'transmission_time': parse_rinex_float(lines[7][4:23]),
"fit_interval": ( 'fit_interval': (
(parse_rinex_float(lines[7][23:42])) parse_rinex_float(lines[7][23:42])) if len(lines[7]) > 23 else 0.0
if len(lines[7]) > 23
else 0.0
),
} }
if prn not in satellites: if prn not in satellites:
@@ -252,60 +238,52 @@ def read_rinex_nav(filename):
def calculate_satellite_position(ephemeris, transmit_time): def calculate_satellite_position(ephemeris, transmit_time):
"""Calculate satellite position in ECEF coordinates at given transmission time""" """Calculate satellite position in ECEF coordinates at given transmission time"""
system = ephemeris["prn"][0] system = ephemeris['prn'][0]
if system == "R": # GLONASS - use position + velocity * time if system == 'R': # GLONASS - use position + velocity * time
dt = transmit_time dt = transmit_time
# Convert km to meters # Convert km to meters
xk = ( xk = (ephemeris['x'] + ephemeris['x_vel'] * dt + 0.5 * ephemeris['x_acc'] * dt**2) * 1000
ephemeris["x"] + ephemeris["x_vel"] * dt + 0.5 * ephemeris["x_acc"] * dt**2 yk = (ephemeris['y'] + ephemeris['y_vel'] * dt + 0.5 * ephemeris['y_acc'] * dt**2) * 1000
) * 1000 zk = (ephemeris['z'] + ephemeris['z_vel'] * dt + 0.5 * ephemeris['z_acc'] * dt**2) * 1000
yk = (
ephemeris["y"] + ephemeris["y_vel"] * dt + 0.5 * ephemeris["y_acc"] * dt**2
) * 1000
zk = (
ephemeris["z"] + ephemeris["z_vel"] * dt + 0.5 * ephemeris["z_acc"] * dt**2
) * 1000
else: else:
# Constants # Constants
mu = 3.986005e14 # Earth's gravitational constant (m^3/s^2) mu = 3.986005e14 # Earth's gravitational constant (m^3/s^2)
omega_e_dot = 7.2921151467e-5 # Earth rotation rate (rad/s) omega_e_dot = 7.2921151467e-5 # Earth rotation rate (rad/s)
# Semi-major axis # Semi-major axis
a = ephemeris["sqrt_a"] ** 2 a = ephemeris['sqrt_a'] ** 2
# Corrected mean motion # Corrected mean motion
n0 = sqrt(mu / (a**3)) n0 = sqrt(mu / (a ** 3))
n = n0 + ephemeris["delta_n"] n = n0 + ephemeris['delta_n']
# Mean anomaly # Mean anomaly
mk = ephemeris["m0"] + n * transmit_time mk = ephemeris['m0'] + n * transmit_time
# Solve Kepler's equation for eccentric anomaly (Ek) # Solve Kepler's equation for eccentric anomaly (Ek)
ek = mk ek = mk
for _ in range(10): for _ in range(10):
ek_old = ek ek_old = ek
ek = mk + ephemeris["ecc"] * sin(ek) ek = mk + ephemeris['ecc'] * sin(ek)
if abs(ek - ek_old) < 1e-12: if abs(ek - ek_old) < 1e-12:
break break
# True anomaly # True anomaly
nu_k = atan2( nu_k = atan2(sqrt(1 - ephemeris['ecc']**2) * sin(ek), cos(ek) - ephemeris['ecc'])
sqrt(1 - ephemeris["ecc"] ** 2) * sin(ek), cos(ek) - ephemeris["ecc"]
)
# Argument of latitude # Argument of latitude
phi_k = nu_k + ephemeris["omega"] phi_k = nu_k + ephemeris['omega']
# Second harmonic perturbations # Second harmonic perturbations
delta_uk = ephemeris["cus"] * sin(2 * phi_k) + ephemeris["cuc"] * cos(2 * phi_k) delta_uk = ephemeris['cus'] * sin(2 * phi_k) + ephemeris['cuc'] * cos(2 * phi_k)
delta_rk = ephemeris["crs"] * sin(2 * phi_k) + ephemeris["crc"] * cos(2 * phi_k) delta_rk = ephemeris['crs'] * sin(2 * phi_k) + ephemeris['crc'] * cos(2 * phi_k)
delta_ik = ephemeris["cis"] * sin(2 * phi_k) + ephemeris["cic"] * cos(2 * phi_k) delta_ik = ephemeris['cis'] * sin(2 * phi_k) + ephemeris['cic'] * cos(2 * phi_k)
# Corrected argument of latitude, radius and inclination # Corrected argument of latitude, radius and inclination
uk = phi_k + delta_uk uk = phi_k + delta_uk
rk = a * (1 - ephemeris["ecc"] * cos(ek)) + delta_rk rk = a * (1 - ephemeris['ecc'] * cos(ek)) + delta_rk
ik = ephemeris["i0"] + delta_ik + ephemeris["idot"] * transmit_time ik = ephemeris['i0'] + delta_ik + ephemeris['idot'] * transmit_time
# Positions in orbital plane # Positions in orbital plane
xk_prime = rk * cos(uk) xk_prime = rk * cos(uk)
@@ -313,9 +291,9 @@ def calculate_satellite_position(ephemeris, transmit_time):
# Corrected longitude of ascending node # Corrected longitude of ascending node
omega_k = ( omega_k = (
ephemeris["omega0"] ephemeris['omega0']
+ (ephemeris["omega_dot"] - omega_e_dot) * transmit_time + (ephemeris['omega_dot'] - omega_e_dot) * transmit_time
- omega_e_dot * ephemeris["toe"] - omega_e_dot * ephemeris['toe']
) )
# Earth-fixed coordinates # Earth-fixed coordinates
@@ -332,10 +310,10 @@ def calculate_satellite_positions(ephemeris, start_time, end_time, step_min=5):
""" """
positions = [] positions = []
current_time = start_time current_time = start_time
system = ephemeris["prn"][0] system = ephemeris['prn'][0]
max_valid_time = 1800 if system == "R" else 14400 max_valid_time = 1800 if system == 'R' else 14400
while current_time <= end_time: while current_time <= end_time:
transmit_time = (current_time - ephemeris["epoch"]).total_seconds() transmit_time = (current_time - ephemeris['epoch']).total_seconds()
if abs(transmit_time) <= max_valid_time: if abs(transmit_time) <= max_valid_time:
x, y, z = calculate_satellite_position(ephemeris, transmit_time) x, y, z = calculate_satellite_position(ephemeris, transmit_time)
@@ -353,7 +331,7 @@ def ecef_to_az_el(x, y, z, obs_lat, obs_lon, obs_alt):
e_sq = 6.69437999014e-3 # first eccentricity squared e_sq = 6.69437999014e-3 # first eccentricity squared
# Convert geodetic coordinates to ECEF # Convert geodetic coordinates to ECEF
n = a / sqrt(1 - e_sq * sin(obs_lat) ** 2) n = a / sqrt(1 - e_sq * sin(obs_lat)**2)
obs_x = (n + obs_alt) * cos(obs_lat) * cos(obs_lon) obs_x = (n + obs_alt) * cos(obs_lat) * cos(obs_lon)
obs_y = (n + obs_alt) * cos(obs_lat) * sin(obs_lon) obs_y = (n + obs_alt) * cos(obs_lat) * sin(obs_lon)
obs_z = (n * (1 - e_sq) + obs_alt) * sin(obs_lat) obs_z = (n * (1 - e_sq) + obs_alt) * sin(obs_lat)
@@ -365,16 +343,8 @@ def ecef_to_az_el(x, y, z, obs_lat, obs_lon, obs_alt):
# Convert to local ENU (East, North, Up) coordinates # Convert to local ENU (East, North, Up) coordinates
enu_x = -sin(obs_lon) * dx + cos(obs_lon) * dy enu_x = -sin(obs_lon) * dx + cos(obs_lon) * dy
enu_y = ( enu_y = -sin(obs_lat) * cos(obs_lon) * dx - sin(obs_lat) * sin(obs_lon) * dy + cos(obs_lat) * dz
-sin(obs_lat) * cos(obs_lon) * dx enu_z = cos(obs_lat) * cos(obs_lon) * dx + cos(obs_lat) * sin(obs_lon) * dy + sin(obs_lat) * dz
- sin(obs_lat) * sin(obs_lon) * dy
+ cos(obs_lat) * dz
)
enu_z = (
cos(obs_lat) * cos(obs_lon) * dx
+ cos(obs_lat) * sin(obs_lon) * dy
+ sin(obs_lat) * dz
)
# Calculate azimuth and elevation # Calculate azimuth and elevation
azimuth = atan2(enu_x, enu_y) azimuth = atan2(enu_x, enu_y)
@@ -387,52 +357,45 @@ def ecef_to_az_el(x, y, z, obs_lat, obs_lon, obs_alt):
return azimuth, elevation return azimuth, elevation
def plot_satellite_tracks( def plot_satellite_tracks(satellites, obs_lat, obs_lon, obs_alt,
satellites, footer_text=None, filename=None,
obs_lat, show_plot=True, start_time=None,
obs_lon, end_time=None):
obs_alt,
footer_text=None,
filename=None,
show_plot=True,
start_time=None,
end_time=None,
):
"""Plot trajectories for all visible satellites""" """Plot trajectories for all visible satellites"""
plt.rcParams["font.family"] = "Times New Roman" plt.rcParams["font.family"] = "Times New Roman"
fig = plt.figure(figsize=(8, 8)) fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(111, projection="polar") ax = fig.add_subplot(111, projection='polar')
ax.tick_params(labelsize=16, pad=7) ax.tick_params(labelsize=16, pad=7)
# Polar plot setup # Polar plot setup
ax.set_theta_zero_location("N") ax.set_theta_zero_location('N')
ax.set_theta_direction(-1) ax.set_theta_direction(-1)
ax.set_ylim(0, 90) ax.set_ylim(0, 90)
# Elevation ticks # Elevation ticks
ax.set_yticks(range(0, 91, 15)) ax.set_yticks(range(0, 91, 15))
ax.set_yticklabels(["90°", "", "60°", "", "30°", "", ""], fontsize=14) ax.set_yticklabels(['90°', '', '60°', '', '30°', '', ''], fontsize=14)
# Color scheme by constellation # Color scheme by constellation
system_colors = { system_colors = {
"G": "blue", # GPS 'G': 'blue', # GPS
"E": "green", # Galileo 'E': 'green', # Galileo
"R": "red", # GLONASS 'R': 'red', # GLONASS
"C": "orange", # BeiDou 'C': 'orange', # BeiDou
"J": "brown", # QZSS 'J': 'brown', # QZSS
"I": "pink", # IRNSS 'I': 'pink', # IRNSS
"S": "gray", # SBAS 'S': 'gray' # SBAS
} }
# System names mapping # System names mapping
system_names = { system_names = {
"G": "GPS", 'G': 'GPS',
"E": "Galileo", 'E': 'Galileo',
"R": "GLONASS", 'R': 'GLONASS',
"C": "BeiDou", 'C': 'BeiDou',
"J": "QZSS", 'J': 'QZSS',
"I": "IRNSS", 'I': 'IRNSS',
"S": "SBAS", 'S': 'SBAS'
} }
# Find which systems are actually present # Find which systems are actually present
@@ -440,32 +403,23 @@ def plot_satellite_tracks(
# Plot each satellite # Plot each satellite
for prn, ephemeris_list in satellites.items(): for prn, ephemeris_list in satellites.items():
color = system_colors.get( color = system_colors.get(prn[0], 'purple') # Default to purple for unknown systems
prn[0], "purple"
) # Default to purple for unknown systems
# Get the most recent ephemeris # Get the most recent ephemeris
if not ephemeris_list: if not ephemeris_list:
continue continue
mid_time = start_time + (end_time - start_time) / 2 mid_time = start_time + (end_time - start_time) / 2
prev_eph = [ prev_eph = [e for e in ephemeris_list if e['epoch'] <= mid_time] # Previous ephemeris
e for e in ephemeris_list if e["epoch"] <= mid_time
] # Previous ephemeris
if prev_eph: # Pick the most recent ephemeris before or at the midpoint if prev_eph: # Pick the most recent ephemeris before or at the midpoint
ephemeris = max(prev_eph, key=lambda e: e["epoch"]) ephemeris = max(prev_eph, key=lambda e: e['epoch'])
else: # pick the ephemeris whose epoch is closest in time to the midpoint else: # Pick the ephemeris whose epoch is closest in time to the midpoint
ephemeris = min( ephemeris = min(ephemeris_list, key=lambda e: abs((e['epoch'] - mid_time).total_seconds()))
ephemeris_list,
key=lambda e: abs((e["epoch"] - mid_time).total_seconds()),
)
# Calculate trajectory # Calculate trajectory
if start_time is None or end_time is None: if start_time is None or end_time is None:
all_epochs = sorted( all_epochs = sorted({e['epoch'] for prn_data in satellites.values() for e in prn_data})
{e["epoch"] for prn_data in satellites.values() for e in prn_data}
)
start_time = min(all_epochs) start_time = min(all_epochs)
end_time = max(all_epochs) end_time = max(all_epochs)
@@ -485,7 +439,7 @@ def plot_satellite_tracks(
r = 90 - np.array(el) r = 90 - np.array(el)
# Plot trajectory # Plot trajectory
ax.plot(theta, r, "-", color=color, alpha=0.7, linewidth=2.5) ax.plot(theta, r, '-', color=color, alpha=0.7, linewidth=2.5)
# Add arrow at last point # Add arrow at last point
if len(theta) >= 2: # Need at least 2 points for direction if len(theta) >= 2: # Need at least 2 points for direction
@@ -496,43 +450,30 @@ def plot_satellite_tracks(
arrow_length_factor = 1.3 arrow_length_factor = 1.3
extended_theta = theta[-2] + dx * arrow_length_factor extended_theta = theta[-2] + dx * arrow_length_factor
extended_r = r[-2] + dy * arrow_length_factor extended_r = r[-2] + dy * arrow_length_factor
ax.annotate( ax.annotate('',
"", xytext=(theta[-1], r[-1]),
xytext=(theta[-1], r[-1]), xy=(extended_theta, extended_r),
xy=(extended_theta, extended_r), arrowprops={
arrowprops={ 'arrowstyle': '->',
"arrowstyle": "->", 'color': color,
"color": color, 'alpha': 0.9,
"alpha": 0.9, 'linewidth': 1.5,
"linewidth": 1.5, 'shrinkA': 0,
"shrinkA": 0, 'shrinkB': 0
"shrinkB": 0, })
},
)
# Label at midpoint # Label at midpoint
mid_idx = len(theta) // 2 mid_idx = len(theta)//2
ax.text( ax.text(theta[mid_idx], r[mid_idx], prn,
theta[mid_idx], fontsize=12, ha='center', va='center',
r[mid_idx], bbox={"facecolor": "white", "alpha": 0.8, "pad": 2})
prn,
fontsize=12,
ha="center",
va="center",
bbox={"facecolor": "white", "alpha": 0.8, "pad": 2},
)
# Create legend elements only for present systems # Create legend elements only for present systems
legend_elements = [ legend_elements = [
plt.Line2D( plt.Line2D([0], [0], marker='o', color='w',
[0], label=f'{system_names[sys]} ({sys})',
[0], markerfacecolor=system_colors[sys],
marker="o", markersize=10)
color="w",
label=f"{system_names[sys]} ({sys})",
markerfacecolor=system_colors[sys],
markersize=10,
)
for sys in present_systems for sys in present_systems
] ]
@@ -540,36 +481,36 @@ def plot_satellite_tracks(
if legend_elements: if legend_elements:
ax.legend( ax.legend(
handles=legend_elements, handles=legend_elements,
loc="upper right", loc='upper right',
bbox_to_anchor=(1.3, 1.1), bbox_to_anchor=(1.3, 1.1),
fontsize=14, fontsize=14
) )
lat_deg = np.degrees(obs_lat) lat_deg = np.degrees(obs_lat)
lon_deg = np.degrees(obs_lon) lon_deg = np.degrees(obs_lon)
lat_hemisphere = "N" if lat_deg >= 0 else "S" lat_hemisphere = 'N' if lat_deg >= 0 else 'S'
lon_hemisphere = "E" if lon_deg >= 0 else "W" lon_hemisphere = 'E' if lon_deg >= 0 else 'W'
plt.title( plt.title(
f"GNSS skyplot from {abs(lat_deg):.2f}° {lat_hemisphere}, " f"GNSS skyplot from {abs(lat_deg):.2f}° {lat_hemisphere}, "
f"{abs(lon_deg):.2f}° {lon_hemisphere}", f"{abs(lon_deg):.2f}° {lon_hemisphere}",
pad=25, pad=25,
fontsize=20, fontsize=20
) )
if footer_text: if footer_text:
fig.text(0.42, 0.05, footer_text, ha="center", va="center", fontsize=16) fig.text(0.42, 0.05, footer_text, ha='center', va='center', fontsize=16)
plt.tight_layout() plt.tight_layout()
if filename: if filename:
filename_no_path = Path(filename).name filename_no_path = Path(filename).name
filename_no_dots = filename_no_path.replace(".", "_") filename_no_dots = filename_no_path.replace('.', '_')
output_name = f"skyplot_{filename_no_dots}.pdf" output_name = f"skyplot_{filename_no_dots}.pdf"
else: else:
output_name = "skyplot.pdf" output_name = "skyplot.pdf"
plt.savefig(output_name, format="pdf", bbox_inches="tight") plt.savefig(output_name, format='pdf', bbox_inches='tight')
print(f"Image saved as {output_name}") print(f"Image saved as {output_name}")
if show_plot: if show_plot:
plt.show() plt.show()
@@ -581,33 +522,34 @@ def main():
"""Generate the skyplot""" """Generate the skyplot"""
# Set up argument parser # Set up argument parser
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Generate GNSS skyplot from RINEX navigation file", add_help=False description='Generate GNSS skyplot from RINEX navigation file',
add_help=False
) )
# Add the no-show flag # Add the no-show flag
parser.add_argument( parser.add_argument(
"--no-show", action="store_true", help="Run without displaying plot window" '--no-show',
action='store_true',
help='Run without displaying plot window'
) )
# Add the observation filename # Add the observation filename
parser.add_argument( parser.add_argument(
"--use-obs", '--use-obs',
action="store_true", action='store_true',
help="Use corresponding RINEX observation file to bound the skyplot to the receiver time window", help='Use corresponding RINEX observation file to bound the skyplot to the receiver time window'
) )
# Parse known args (this ignores other positional args) # Parse known args (this ignores other positional args)
args, remaining_args = parser.parse_known_args() args, remaining_args = parser.parse_known_args()
# Handle help manually # Handle help manually
if "-h" in remaining_args or "--help" in remaining_args: if '-h' in remaining_args or '--help' in remaining_args:
print( print("""
"""
Usage: python skyplot.py <RINEX_FILE> [LATITUDE] [LONGITUDE] [ALTITUDE] [--use-obs] [--no-show] Usage: python skyplot.py <RINEX_FILE> [LATITUDE] [LONGITUDE] [ALTITUDE] [--use-obs] [--no-show]
Example: Example:
python skyplot.py brdc0010.22n 41.275 1.9876 80.0 --use-obs --no-show python skyplot.py brdc0010.22n 41.275 1.9876 80.0 --use-obs --no-show
""" """)
)
sys.exit(0) sys.exit(0)
if len(remaining_args) < 1: if len(remaining_args) < 1:
@@ -644,15 +586,9 @@ Example:
return return
# Print summary information # Print summary information
all_epochs = sorted( all_epochs = sorted(list(set(
list( e['epoch'] for prn, ephemerides in satellites.items() for e in ephemerides
set( )))
e["epoch"]
for prn, ephemerides in satellites.items()
for e in ephemerides
)
)
)
print("\nFile contains:") print("\nFile contains:")
print(f"- {len(satellites)} unique satellites") print(f"- {len(satellites)} unique satellites")
print(f"- {len(all_epochs)} unique epochs") print(f"- {len(all_epochs)} unique epochs")
@@ -666,9 +602,12 @@ Example:
print("\nSatellite systems found:") print("\nSatellite systems found:")
for system, count in sorted(system_counts.items()): for system, count in sorted(system_counts.items()):
system_name = {"G": "GPS", "R": "GLONASS", "E": "Galileo", "C": "BeiDou"}.get( system_name = {
system, "Unknown" 'G': 'GPS',
) 'R': 'GLONASS',
'E': 'Galileo',
'C': 'BeiDou'
}.get(system, 'Unknown')
print(f"- {system_name} ({system}): {count} satellites") print(f"- {system_name} ({system}): {count} satellites")
# Generate the combined skyplot # Generate the combined skyplot
@@ -679,7 +618,7 @@ Example:
obs_path = None obs_path = None
stem = filename[:-1] stem = filename[:-1]
for s in ("O", "o"): # try uppercase then lowercase for s in ('O', 'o'): # Try uppercase then lowercase
candidate = stem + s candidate = stem + s
tried.append(candidate) tried.append(candidate)
if Path(candidate).exists(): if Path(candidate).exists():
@@ -690,17 +629,11 @@ Example:
obs_start, obs_end = _read_obs_time_bounds(obs_path) obs_start, obs_end = _read_obs_time_bounds(obs_path)
if obs_start and obs_end: if obs_start and obs_end:
use_start, use_end = obs_start, obs_end use_start, use_end = obs_start, obs_end
print( print(f"\nObservation window detected from {obs_path}: {use_start}{use_end}")
f"\nObservation window detected from {obs_path}: {use_start}{use_end}"
)
else: else:
print( print(f"\nWarning: Could not read valid times from {obs_path}. Using NAV span instead.")
f"\nWarning: Could not read valid times from {obs_path}. Using NAV span instead."
)
else: else:
print( print(f"\nOBS file not found. Tried: {', '.join(tried)}. Using NAV span instead.")
f"\nOBS file not found. Tried: {', '.join(tried)}. Using NAV span instead."
)
# Ensure at least two samples with the default 5-minute step # Ensure at least two samples with the default 5-minute step
if (use_end - use_start) < timedelta(minutes=5): if (use_end - use_start) < timedelta(minutes=5):
@@ -719,7 +652,7 @@ Example:
filename=filename, filename=filename,
show_plot=not args.no_show, show_plot=not args.no_show,
start_time=use_start, start_time=use_start,
end_time=use_end, end_time=use_end
) )