1
0
mirror of https://github.com/gnss-sdr/gnss-sdr synced 2025-11-12 05:13:04 +00:00
Files
gnss-sdr/utils/skyplot/skyplot.py
Carles Fernandez 1d31c8427d skyplot improvements
Add new --format optional argument. Allowed options: pdf, eps, png, svg. Default: pdf
Add new --version / -v optional arguments, show program version and exit
Improve --help / -h message
Improve error handling
Generate PDF files with embedded fonts, ready for journal submission
Improve argument handling
Improve documentation
Add explicit z-order control for cleaner display of labels in multiconstellation files
2025-08-22 12:42:36 +02:00

737 lines
27 KiB
Python
Executable File

#!/usr/bin/env python
"""
skyplot.py
Reads a RINEX navigation file and generates a skyplot. Optionally, a RINEX
observation file can also be read to match the skyplot to the receiver
processing time.
Usage:
skyplot.py <RINEX_NAV_FILE> [observer_lat] [observer_lon] [observer_alt]
[--elev-mask ELEV_MASK]
[--format {pdf,eps,png,svg}]
[--no-show]
[--system SYSTEM [SYSTEM ...]]
[--use-obs]
-----------------------------------------------------------------------------
GNSS-SDR is a Global Navigation Satellite System software-defined receiver.
This file is part of GNSS-SDR.
SPDX-FileCopyrightText: 2025 Carles Fernandez-Prades cfernandez(at)cttc.es
SPDX-License-Identifier: GPL-3.0-or-later
-----------------------------------------------------------------------------
"""
import argparse
import re
import sys
from datetime import datetime, timedelta
from math import atan2, cos, sin, sqrt
from pathlib import Path
try:
import matplotlib.pyplot as plt
import numpy as np
except ImportError:
print("Error: This script requires matplotlib and numpy.")
print("Install them with: pip install matplotlib numpy")
sys.exit(1)
__version__ = "1.0.0"
def _read_obs_time_bounds(obs_path):
"""Return (start_time, end_time) from a RINEX 2/3 OBS file by scanning epoch lines.
If parsing fails or the file is not OBS, return (None, None).
"""
try:
with open(obs_path, 'r', encoding='utf-8', errors='ignore') as f:
# Detect OBS in header and skip to END OF HEADER
is_obs = False
for line in f:
if "RINEX VERSION / TYPE" in line:
# Robust OBS detection:
# In RINEX 2/3 the file-type letter at col 21 (0-based idx 20)
tchar = line[20:21]
if tchar == 'O' or 'OBSERVATION DATA' in line:
is_obs = True
if "END OF HEADER" in line:
break
if not is_obs:
return (None, None)
start_time = None
end_time = None
for line in f:
if not line.strip():
continue
if line.startswith('>'): # RINEX 3 epoch line
yyyy = int(line[2:6]); mm = int(line[7:9]); dd = int(line[10:12])
hh = int(line[13:15]); mi = int(line[16:18]); ss = float(line[19:29])
epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1)*1e6))
else:
# RINEX 2 epoch line
try:
yy = int(line[1:3]); mm = int(line[4:6]); dd = int(line[7:9])
hh = int(line[10:12]); mi = int(line[13:15]); ss = float(line[15:26])
yyyy = 1900 + yy if yy >= 80 else 2000 + yy
epoch = datetime(yyyy, mm, dd, hh, mi, int(ss), int((ss % 1)*1e6))
except Exception:
continue
if start_time is None or epoch < start_time:
start_time = epoch
if end_time is None or epoch > end_time:
end_time = epoch
return (start_time, end_time)
except Exception:
return (None, None)
def parse_rinex_float(s):
"""Parse RINEX formatted float string which may contain D or E exponent and compact spacing"""
# Handle empty string
if not s.strip():
return 0.0
# Replace D exponent with E (some RINEX files use D instead of E)
s = s.replace('D', 'E').replace('d', 'e')
# Handle cases where exponent lacks E (e.g., "12345-3")
if re.match(r'[+-]?\d+[+-]\d+', s.strip()):
s = s.replace('+', 'E+').replace('-', 'E-')
try:
return float(s)
except ValueError:
# Handle cases where the number runs into the next field
# Try to split at the exponent if present
if 'E' in s:
base, exp = s.split('E')[:2]
# Take first character of exponent if needed
if exp and exp[0] in '+-' and len(exp) > 1:
return float(base + 'E' + exp[0] + exp[1:].split()[0])
return 0.0 # Default if parsing fails
def read_rinex_nav(filename):
"""Read RINEX v3.0 navigation file"""
satellites = {}
line_number = 0
with open(filename, 'r', encoding='utf-8') as f:
# Skip header
while True:
line = f.readline()
line_number += 1
if not line:
return satellites # Empty file
if "END OF HEADER" in line:
break
# Read ephemeris data
current_line = f.readline()
line_number += 1
while current_line:
if len(current_line) < 23:
current_line = f.readline()
line_number += 1
continue
prn = current_line[:3].strip()
system = prn[0] # G, R, E, etc.
try:
# Parse epoch fields
year = int(current_line[4:8])
month = int(current_line[9:11])
day = int(current_line[12:14])
hour = int(current_line[15:17])
minute = int(current_line[18:20])
second = int(float(current_line[21:23]))
year += 2000 if year < 80 else 0
epoch = datetime(year, month, day, hour, minute, second)
# Read the next lines
lines = [current_line]
line_count = 4 if system == 'R' else 7
for _ in range(line_count):
next_line = f.readline()
line_number += 1
if not next_line:
break
lines.append(next_line)
if len(lines) < line_count + 1:
current_line = f.readline()
line_number += 1
continue
if system == 'R': # GLONASS specific parsing
ephemeris = {
'prn': prn,
'epoch': epoch,
'sv_clock_bias': parse_rinex_float(lines[0][23:42]),
'sv_relative_freq_bias': parse_rinex_float(lines[0][42:61]),
'message_frame_time': parse_rinex_float(lines[0][61:80]),
'x': parse_rinex_float(lines[1][4:23]), # Position (km)
'x_vel': parse_rinex_float(lines[1][23:42]), # Velocity (km/s)
'x_acc': parse_rinex_float(lines[1][42:61]),
'health': parse_rinex_float(lines[1][61:80]),
'y': parse_rinex_float(lines[2][4:23]),
'y_vel': parse_rinex_float(lines[2][23:42]),
'y_acc': parse_rinex_float(lines[2][42:61]),
'freq_num': parse_rinex_float(lines[2][61:80]),
'z': parse_rinex_float(lines[3][4:23]),
'z_vel': parse_rinex_float(lines[3][23:42]),
'z_acc': parse_rinex_float(lines[3][42:61]),
'age': parse_rinex_float(lines[3][61:80])
}
else:
# Parse all ephemeris parameters
ephemeris = {
'prn': prn,
'epoch': epoch,
'sv_clock_bias': parse_rinex_float(lines[0][23:42]),
'sv_clock_drift': parse_rinex_float(lines[0][42:61]),
'sv_clock_drift_rate': parse_rinex_float(lines[0][61:80]),
'iode': parse_rinex_float(lines[1][4:23]),
'crs': parse_rinex_float(lines[1][23:42]),
'delta_n': parse_rinex_float(lines[1][42:61]),
'm0': parse_rinex_float(lines[1][61:80]),
'cuc': parse_rinex_float(lines[2][4:23]),
'ecc': parse_rinex_float(lines[2][23:42]),
'cus': parse_rinex_float(lines[2][42:61]),
'sqrt_a': parse_rinex_float(lines[2][61:80]),
'toe': parse_rinex_float(lines[3][4:23]),
'cic': parse_rinex_float(lines[3][23:42]),
'omega0': parse_rinex_float(lines[3][42:61]),
'cis': parse_rinex_float(lines[3][61:80]),
'i0': parse_rinex_float(lines[4][4:23]),
'crc': parse_rinex_float(lines[4][23:42]),
'omega': parse_rinex_float(lines[4][42:61]),
'omega_dot': parse_rinex_float(lines[4][61:80]),
'idot': parse_rinex_float(lines[5][4:23]),
'codes_l2': parse_rinex_float(lines[5][23:42]),
'gps_week': parse_rinex_float(lines[5][42:61]),
'l2p_flag': parse_rinex_float(lines[5][61:80]),
'sv_accuracy': parse_rinex_float(lines[6][4:23]),
'sv_health': parse_rinex_float(lines[6][23:42]),
'tgd': parse_rinex_float(lines[6][42:61]),
'iodc': parse_rinex_float(lines[6][61:80]),
'transmission_time': parse_rinex_float(lines[7][4:23]),
'fit_interval': (
parse_rinex_float(lines[7][23:42])) if len(lines[7]) > 23 else 0.0
}
if prn not in satellites:
satellites[prn] = []
satellites[prn].append(ephemeris)
except (ValueError, IndexError) as e:
print(f"\nError in file {filename} at line {line_number}:")
print(f" PRN: {prn}")
print(f" Line content: {current_line.strip()}")
print(f" Error type: {type(e).__name__}")
print(f" Error details: {str(e)}")
print("Skipping to next satellite block...\n")
# Skip to next block by reading until next PRN
while current_line and not current_line.startswith(prn[0]):
current_line = f.readline()
line_number += 1
continue
current_line = f.readline()
line_number += 1
return satellites
def calculate_satellite_position(ephemeris, transmit_time):
"""Calculate satellite position in ECEF coordinates at given transmission time"""
system = ephemeris['prn'][0]
if system == 'R': # GLONASS - use position + velocity * time
dt = transmit_time
# Convert km to meters
xk = (ephemeris['x'] + ephemeris['x_vel'] * dt + 0.5 * ephemeris['x_acc'] * dt**2) * 1000
yk = (ephemeris['y'] + ephemeris['y_vel'] * dt + 0.5 * ephemeris['y_acc'] * dt**2) * 1000
zk = (ephemeris['z'] + ephemeris['z_vel'] * dt + 0.5 * ephemeris['z_acc'] * dt**2) * 1000
else:
# Constants
mu = 3.986005e14 # Earth's gravitational constant (m^3/s^2)
omega_e_dot = 7.2921151467e-5 # Earth rotation rate (rad/s)
# Semi-major axis
a = ephemeris['sqrt_a'] ** 2
# Corrected mean motion
n0 = sqrt(mu / (a ** 3))
n = n0 + ephemeris['delta_n']
# Mean anomaly
mk = ephemeris['m0'] + n * transmit_time
# Solve Kepler's equation for eccentric anomaly (Ek)
ek = mk
for _ in range(10):
ek_old = ek
ek = mk + ephemeris['ecc'] * sin(ek)
if abs(ek - ek_old) < 1e-12:
break
# True anomaly
nu_k = atan2(sqrt(1 - ephemeris['ecc']**2) * sin(ek), cos(ek) - ephemeris['ecc'])
# Argument of latitude
phi_k = nu_k + ephemeris['omega']
# Second harmonic perturbations
delta_uk = ephemeris['cus'] * sin(2 * phi_k) + ephemeris['cuc'] * cos(2 * phi_k)
delta_rk = ephemeris['crs'] * sin(2 * phi_k) + ephemeris['crc'] * cos(2 * phi_k)
delta_ik = ephemeris['cis'] * sin(2 * phi_k) + ephemeris['cic'] * cos(2 * phi_k)
# Corrected argument of latitude, radius and inclination
uk = phi_k + delta_uk
rk = a * (1 - ephemeris['ecc'] * cos(ek)) + delta_rk
ik = ephemeris['i0'] + delta_ik + ephemeris['idot'] * transmit_time
# Positions in orbital plane
xk_prime = rk * cos(uk)
yk_prime = rk * sin(uk)
# Corrected longitude of ascending node
omega_k = (
ephemeris['omega0']
+ (ephemeris['omega_dot'] - omega_e_dot) * transmit_time
- omega_e_dot * ephemeris['toe']
)
# Earth-fixed coordinates
xk = xk_prime * cos(omega_k) - yk_prime * cos(ik) * sin(omega_k)
yk = xk_prime * sin(omega_k) + yk_prime * cos(ik) * cos(omega_k)
zk = yk_prime * sin(ik)
return xk, yk, zk
def calculate_satellite_positions(ephemeris, start_time, end_time, step_min=5):
"""Generate multiple positions over time for a single satellite
between start_time and end_time.
"""
positions = []
current_time = start_time
system = ephemeris['prn'][0]
max_valid_time = 1800 if system == 'R' else 14400
while current_time <= end_time:
transmit_time = (current_time - ephemeris['epoch']).total_seconds()
if abs(transmit_time) <= max_valid_time:
x, y, z = calculate_satellite_position(ephemeris, transmit_time)
positions.append((current_time, x, y, z))
current_time += timedelta(minutes=step_min)
return positions
def ecef_to_az_el(x, y, z, obs_lat, obs_lon, obs_alt):
"""Convert ECEF coordinates to azimuth and elevation"""
# WGS-84 parameters
a = 6378137.0 # semi-major axis
e_sq = 6.69437999014e-3 # first eccentricity squared
# Convert geodetic coordinates to ECEF
n = a / sqrt(1 - e_sq * sin(obs_lat)**2)
obs_x = (n + obs_alt) * cos(obs_lat) * cos(obs_lon)
obs_y = (n + obs_alt) * cos(obs_lat) * sin(obs_lon)
obs_z = (n * (1 - e_sq) + obs_alt) * sin(obs_lat)
# Vector from observer to satellite
dx = x - obs_x
dy = y - obs_y
dz = z - obs_z
# Convert to local ENU (East, North, Up) coordinates
enu_x = -sin(obs_lon) * dx + cos(obs_lon) * dy
enu_y = -sin(obs_lat) * cos(obs_lon) * dx - sin(obs_lat) * sin(obs_lon) * dy + cos(obs_lat) * dz
enu_z = cos(obs_lat) * cos(obs_lon) * dx + cos(obs_lat) * sin(obs_lon) * dy + sin(obs_lat) * dz
# Calculate azimuth and elevation
azimuth = atan2(enu_x, enu_y)
elevation = atan2(enu_z, sqrt(enu_x**2 + enu_y**2))
# Convert to degrees and adjust azimuth to 0-360
azimuth = np.degrees(azimuth) % 360
elevation = np.degrees(elevation)
return azimuth, elevation
def plot_satellite_tracks(satellites, obs_lat, obs_lon, obs_alt,
footer_text=None, filename=None,
show_plot=True, start_time=None,
end_time=None, elev_mask=5.0,
output_format="pdf"):
"""Plot trajectories for all visible satellites"""
plt.rcParams['pdf.fonttype'] = 42 # TrueType fonts
plt.rcParams['ps.fonttype'] = 42 # TrueType fonts
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman', 'Times', 'DejaVu Serif']
plt.rcParams['mathtext.fontset'] = 'dejavuserif' # For math text
plt.rcParams['svg.fonttype'] = 'none' # Make SVG text editable
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(111, projection='polar')
ax.tick_params(labelsize=16, pad=7)
# Polar plot setup
ax.set_theta_zero_location('N')
ax.set_theta_direction(-1)
ax.set_ylim(0, 90)
# Elevation ticks
ax.set_yticks(range(0, 91, 15))
ax.set_yticklabels(['90°', '', '60°', '', '30°', '', ''], fontsize=14)
# Color scheme by constellation
system_colors = {
'G': 'blue', # GPS
'E': 'green', # Galileo
'R': 'red', # GLONASS
'C': 'orange', # BeiDou
'J': 'brown', # QZSS
'I': 'pink', # IRNSS
'S': 'gray' # SBAS
}
# System names mapping
system_names = {
'G': 'GPS',
'E': 'Galileo',
'R': 'GLONASS',
'C': 'BeiDou',
'J': 'QZSS',
'I': 'IRNSS',
'S': 'SBAS'
}
# Find which systems are actually present
present_systems = {prn[0] for prn in satellites.keys() if prn[0] in system_colors}
# Plot each satellite
for prn, ephemeris_list in satellites.items():
color = system_colors.get(prn[0], 'purple') # Default to purple for unknown systems
if not ephemeris_list:
continue
mid_time = start_time + (end_time - start_time) / 2
prev_eph = [e for e in ephemeris_list if e['epoch'] <= mid_time]
if prev_eph:
ephemeris = max(prev_eph, key=lambda e: e['epoch'])
else:
ephemeris = min(ephemeris_list, key=lambda e: abs((e['epoch'] - mid_time).total_seconds()))
if start_time is None or end_time is None:
all_epochs = sorted({e['epoch'] for prn_data in satellites.values() for e in prn_data})
start_time = min(all_epochs)
end_time = max(all_epochs)
positions = calculate_satellite_positions(ephemeris, start_time, end_time)
# Split into visible segments
segments = []
current_seg_az = []
current_seg_el = []
for _, x, y, z in positions:
azimuth, elevation = ecef_to_az_el(x, y, z, obs_lat, obs_lon, obs_alt)
if elevation > elev_mask:
current_seg_az.append(azimuth)
current_seg_el.append(elevation)
else:
if len(current_seg_az) > 1:
segments.append((current_seg_az, current_seg_el))
current_seg_az, current_seg_el = [], []
if len(current_seg_az) > 1:
segments.append((current_seg_az, current_seg_el))
# Plot each segment separately
for az_seg, el_seg in segments:
theta = np.radians(az_seg)
r = 90 - np.array(el_seg)
ax.plot(theta, r, '-', color=color, alpha=0.7, linewidth=2.5, zorder=1)
# Arrow at end
if len(theta) >= 2:
dx = theta[-1] - theta[-2]
dy = r[-1] - r[-2]
arrow_length_factor = 1.8
extended_theta = theta[-2] + dx * arrow_length_factor
extended_r = r[-2] + dy * arrow_length_factor
ax.annotate('',
xytext=(theta[-1], r[-1]),
xy=(extended_theta, extended_r),
arrowprops={
'arrowstyle': '->',
'color': color,
'alpha': 0.9,
'linewidth': 1.5,
'shrinkA': 0,
'shrinkB': 0
},
zorder=2)
# Label at midpoint of the segment
mid_idx = len(theta)//2
ax.text(theta[mid_idx], r[mid_idx], prn,
fontsize=12, ha='center', va='center',
bbox={"facecolor": "white", "alpha": 0.8, "pad": 2},
zorder=3)
# Legend for present systems
legend_elements = [
plt.Line2D([0], [0], marker='o', color='w',
label=f'{system_names[sys]} ({sys})',
markerfacecolor=system_colors[sys],
markersize=10)
for sys in present_systems
]
if legend_elements:
ax.legend(handles=legend_elements,
loc='upper right',
bbox_to_anchor=(1.3, 1.1),
fontsize=14)
lat_deg = np.degrees(obs_lat)
lon_deg = np.degrees(obs_lon)
lat_hemisphere = 'N' if lat_deg >= 0 else 'S'
lon_hemisphere = 'E' if lon_deg >= 0 else 'W'
plt.title(
f"GNSS skyplot from {abs(lat_deg):.2f}° {lat_hemisphere}, "
f"{abs(lon_deg):.2f}° {lon_hemisphere}",
pad=25,
fontsize=20
)
if footer_text:
fig.text(0.42, 0.05, footer_text, ha='center', va='center', fontsize=16)
plt.tight_layout()
if filename:
filename_no_path = Path(filename).name
filename_no_dots = filename_no_path.replace('.', '_')
output_name = f"skyplot_{filename_no_dots}.{output_format}"
else:
output_name = f"skyplot.{output_format}"
plt.savefig(output_name, format=output_format, bbox_inches='tight')
print(f"Image saved as {output_name}")
if show_plot:
plt.show()
else:
plt.close()
def main():
"""Generate the skyplot"""
try:
# Set system names and codes
system_name_to_code = {
'GPS': 'G',
'GLONASS': 'R',
'GALILEO': 'E',
'BEIDOU': 'C',
'QZSS': 'J',
'IRNSS': 'I',
'SBAS': 'S'
}
# Set up argument parser
parser = argparse.ArgumentParser(
description='Generate a GNSS skyplot from a RINEX navigation file',
epilog="Example: skyplot.py brdc0010.22n -33.4592 -70.6453 520.0 --format png --system G E --elev-mask 10 --no-show"
)
# Positional arguments
parser.add_argument(
'filename',
help='RINEX navigation file path'
)
parser.add_argument(
'lat', nargs='?', type=float, default=41.2750,
help='Observer latitude in degrees (default: 41.275° N)'
)
parser.add_argument(
'lon', nargs='?', type=float, default=1.9876,
help='Observer longitude in degrees (default: 1.9876° E)'
)
parser.add_argument(
'alt', nargs='?', type=float, default=80.0,
help='Observer altitude in meters (default: 80.0 m)'
)
# Optional arguments
parser.add_argument(
'--elev-mask',
type=float,
default=5.0,
help='Elevation mask in degrees for plotting satellite tracks (default: 5°)'
)
parser.add_argument(
'--format',
type=str,
default="pdf",
choices=["pdf", "eps", "png", "svg"],
help='Output file format for plot (default: pdf)'
)
parser.add_argument(
'--no-show',
action='store_true',
help='Run without displaying plot window'
)
parser.add_argument(
'--system',
nargs='+',
help='Only plot satellites from these systems (e.g., G R E or GPS GLONASS Galileo)'
)
parser.add_argument(
'--use-obs',
action='store_true',
help='Use corresponding RINEX observation file to bound the skyplot to the receiver time window'
)
parser.add_argument(
"-v", "--version",
action="version",
version=f"%(prog)s {__version__}",
help="Show program version and exit"
)
# Parse all arguments with full validation
args = parser.parse_args()
# Convert coordinates to radians
obs_lat = np.radians(args.lat)
obs_lon = np.radians(args.lon)
obs_alt = args.alt
filename = args.filename
# Read RINEX file
print(f"Reading {filename}...")
try:
satellites = read_rinex_nav(filename)
except FileNotFoundError:
print(f"Error: File '{filename}' not found.")
return 1
if not satellites:
print("No satellite data found in the file.")
return 1
if args.system:
systems_upper = set()
for s in args.system:
s_upper = s.upper()
if s_upper in system_name_to_code:
systems_upper.add(system_name_to_code[s_upper])
else:
systems_upper.add(s_upper) # Assume user passed the code
satellites = {prn: eph_list for prn, eph_list in satellites.items() if prn[0].upper() in systems_upper}
if not satellites:
print(f"No satellites found for systems: {', '.join(sorted(systems_upper))}")
return 1
# Print summary information
all_epochs = sorted(list(set(
e['epoch'] for prn, ephemerides in satellites.items() for e in ephemerides
)))
print("\nFile contains:")
print(f"- {len(satellites)} unique satellites")
print(f"- {len(all_epochs)} unique epochs")
print(f"- From {all_epochs[0]} to {all_epochs[-1]}")
# Calculate and print satellite counts by system
system_counts = {}
for prn in satellites:
system = prn[0]
system_counts[system] = system_counts.get(system, 0) + 1
print("\nSatellite systems found:")
for system, count in sorted(system_counts.items()):
system_name = {
'G': 'GPS',
'R': 'GLONASS',
'E': 'Galileo',
'C': 'BeiDou'
}.get(system, 'Unknown')
print(f"- {system_name} ({system}): {count} satellites")
# Generate the combined skyplot
# Time window: OBS bounds if provided; else NAV span
use_start, use_end = all_epochs[0], all_epochs[-1]
if args.use_obs:
tried = []
obs_path = None
stem = filename[:-1]
for s in ('O', 'o'): # Try uppercase then lowercase
candidate = stem + s
tried.append(candidate)
if Path(candidate).exists():
obs_path = candidate
break
if obs_path:
obs_start, obs_end = _read_obs_time_bounds(obs_path)
if obs_start and obs_end:
use_start, use_end = obs_start, obs_end
print(f"\nObservation window detected from {obs_path}: {use_start}{use_end}")
else:
print(f"\nWarning: Could not read valid times from {obs_path}. Using NAV span instead.")
else:
print(f"\nOBS file not found. Tried: {', '.join(tried)}. Using NAV span instead.")
# Ensure at least two samples with the default 5-minute step
if (use_end - use_start) < timedelta(minutes=5):
use_end = use_start + timedelta(minutes=5)
# Generate the combined skyplot
print("\nGenerating skyplot...")
footer = f"From {use_start} to {use_end} UTC"
plot_satellite_tracks(
satellites,
obs_lat,
obs_lon,
obs_alt,
footer_text=footer,
filename=filename,
show_plot=not args.no_show,
start_time=use_start,
end_time=use_end,
elev_mask=args.elev_mask,
output_format=args.format
)
except Exception as e:
print(f"Error: {str(e)}")
return 1
return 0
if __name__ == "__main__":
main()