diff --git a/__init__.py b/fgtools/__init__.py old mode 100755 new mode 100644 similarity index 100% rename from __init__.py rename to fgtools/__init__.py diff --git a/fgtools/aptdat.py b/fgtools/aptdat.py new file mode 100644 index 0000000..2b50a2e --- /dev/null +++ b/fgtools/aptdat.py @@ -0,0 +1,386 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +import logging +import os + +from fgtools.utils import files +from fgtools import geo +from fgtools.utils import unit_convert +from fgtools import utils + +class Code: + def __init__(self, name, code): + self.name = name + self.code = code + + def __str__(self): + return self.name + + def __repr__(self): + return str(self.code) + + def __int__(self): + return self.code + + def __float__(self): + return float(self.code) + + def __eq__(self, other): + if isinstance(other, Code): + return self.name == other.name + elif isinstance(other, str): + return self.name == other + elif isinstance(other, int): + return self.code == other + + def __ne__(self, other): + if isinstance(other, Code): + return self.name != other.name + elif isinstance(other, str): + return self.name != other + elif isinstance(other, int): + return self.code != other + +class CodeEnum: + def __init__(self, names, codes): + for name, code in zip(names, codes): + setattr(self, name, Code(name, code)) + +SurfaceCode = CodeEnum( + ("Asphalt", "Concrete", "Grass", "Dirt", "Gravel", "DryLakebed", "Water", "SnowIce", "Transparent"), + list(range(1, 6)) + list(range(12, 16)) +) + +RunwayShoulderCode = CodeEnum( + ("NoShoulder", "Asphalt", "Concrete"), + range(3) +) + +RunwayMarkingCode = CodeEnum( + ("NoMarkings", "Visual", "NonPrecision", "Precision", "UKNonPrecision", "UKPrecision"), + range(6) +) + +ApproachLightsCode = CodeEnum( + ("NoLights", "ALSF_I", "ALSF_II", "Calvert", "CalvertILS", "SSALR", "SSALF", "SALS", "MALSR", "MALSF", "MALS", "ODALS", "RAIL"), + range(13) +) + +LineTypeCode = CodeEnum( + ( + "NoLine", "TaxiwayCenter", "MiscBoundary", "TaxiwayEdge", "RunwayHold", "OtherHold", "ILSHold", + "TaxiwayCenterRunwaySafety", "AircraftQueueLane", "AircraftQueueLaneDouble", + "TaxiwayCenterLarge", "TaxiwayCenterRunwaySafetyLarge", "RunwayHoldLarge", "OtherHoldLarge", "ILSHoldLarge", + "TaxiwayCenterBordered", "MiscBoundaryBordered", "TaxiwayEdgeBordered", "RunwayHoldBordered", + "OtherHoldBordered", "ILSHoldBordered", "TaxiwayCenterRunwaySafetyBordered", "AircraftQueueLaneBordered", + "AircraftQueueLaneDoubleBordered", "TaxiwayCenterLargeBordered", "TaxiwayCenterRunwaySafetyLargeBordered", + "RunwayHoldLargeBordered", "OtherHoldLargeBordered", "ILSHoldLargeBordered", + "TaxiwayShoulder", "RoadwayEdge", "RoadwayEdgeAircraftMovingArea", "RoadwayCenter", "RoadwayEdgeBroken", + "RoadwayEdgeWide", "RoadwayCenterWide", "SolidRed", "DashedRed", "SolidRedWide", "SolidOrange", + "SolidBlue", "SolidGreen", "RoadwayEdgeBordered", "RoadwayEdgeAircraftMovingAreaBordered", + "RoadwayCenterBordered", "RoadwayEdgeBrokenBordered", "RoadwayEdgeWideBordered", + "RoadwayCenterWideBordered", "SolidRedBordered", "DashedRedBordered", "SolidRedWideBordered", + "SolidOrangeBordered", "SolidBlueBordered", "SolidGreenBordered" + ), + tuple(range(15)) + tuple(range(51, 65)) + tuple(range(19, 26)) + (30, 31, 32) + (40, 41, 42) + \ + tuple(range(70, 76)) + (80, 81, 82) + (90, 91, 92) +) + +LineLightTypeCode = CodeEnum( + ( + "TaxiwayCenter", "TaxiwayEdge", "Hold", "RunwayHold", "TaxiwayCenterRunwaySafety", + "TaxiwayEdgeDangerous", "TaxiwayLeadOff", "TaxiwayLeadOffAmber" + ), + range(101, 109) +) + +BeaconTypeCode = CodeEnum( + ("NoBeacon", "CivilianAirport", "Seaport", "Heliport", "MilitaryAirport"), + range(5) +) + +SignSizeCode = CodeEnum( + ("TaxiwaySmall", "TaxiwayMedium", "TaxiwayLarge", "DistanceRemainingLarge", "DistanceRemainingSmall"), + range(1, 6) +) + +LightingObjectCode = CodeEnum( + ("VASI", "PAPI4L", "PAPI4R", "PAPISpaceShuttle", "TriColorVASI", "RunwayGuard"), + range(1, 7) +) + +REILCode = CodeEnum( + ("NoREIL", "OmnidirREIL", "UnidirREIL"), + range(3) +) + +AirportType = CodeEnum( + ("Land", "Sea", "Heli"), + (1, 16, 17) +) + +class Helipad: + def __init__(self, id, lon, lat, heading, length, width, surface, shoulder=RunwayShoulderCode.NoShoulder, + smoothness=0.25, edge_lights=False): + self.id = id + self.lon = lon + self.lat = lat + self.heading = heading + self.length = length + self.width = width + self.surface = surface + self.shoulder = shoulder + self.smoothness = smoothness + self.edge_lights = edge_lights + + def write(self, f): + f.write((f"102 {self.id} {float(self.lat)} {float(self.lon)} {float(self.heading)} {float(self.length):.2f}" + + f" {float(self.width):.2f} {int(self.surface)} {int(self.shoulder)} {float(self.smoothness):.2f} {int(self.edge_lights)}\n")) + +class Runway: + def __init__(self, width, id1, lon1, lat1, id2, lon2, lat2): + self.width = width + self.id1 = id1 + self.lon1 = lon1 + self.lat1 = lat1 + self.id2 = id2 + self.lon2 = lon2 + self.lat2 = lat2 + + def get_heading1(self): + return geo.get_bearing_deg(self.lon1, self.lat1, self.lon2, self.lat2) + + def get_heading2(self): + return utils.wrap_period(self.get_heading1() + 180, 0, 360) + + def get_length_m(self): + return geo.great_circle_distance_m(self.lon1, self.lat1, self.lon2, self.lat2) + + def get_length_ft(self): + return unit_convert.m2ft(self.get_length_m()) + +class WaterRunway(Runway): + def __init__(self, width, id1, lon1, lat1, id2, lon2, lat2, perimeter_buoys=False): + Runway.__init__(self, width, id1, lon1, lat1, id2, lon2, lat2) + + self.perimeter_buoys = perimeter_buoys + + def write(self, f): + f.write((f"101 {float(self.width):.2f} {int(self.perimeter_buoys)} {self.id1} {float(self.lat1):.8f}" + + f" {float(self.lon1):.8f} {self.id2} {float(self.lat2):.8f} {float(self.lon2):.8f}\n")) + +class LandRunway(Runway): + def __init__(self, width, surface, id1, lon1, lat1, id2, lon2, lat2, + smoothness=0.25, shoulder=RunwayShoulderCode.NoShoulder, center_lights=False, + edge_lights=False, distance_signs=False, + displ_thresh1=0, blastpad1=0, markings1=RunwayMarkingCode.Visual, + appr_lights1=ApproachLightsCode.NoLights, tdz_lights1=False, reil_type1=REILCode.NoREIL, + displ_thresh2=0, blastpad2=0, markings2=RunwayMarkingCode.Visual, + appr_lights2=ApproachLightsCode.NoLights, tdz_lights2=False, reil_type2=REILCode.NoREIL): + Runway.__init__(self, width, id1, lon1, lat1, id2, lon2, lat2) + + self.surface = surface + self.shoulder = shoulder + self.smoothness = smoothness + self.center_lights = center_lights + self.edge_lights = edge_lights + self.distance_signs = distance_signs + + self.displ_thresh1 = displ_thresh1 + self.blastpad1 = blastpad1 + self.markings1 = markings1 + self.appr_lights1 = appr_lights1 + self.tdz_lights1 = tdz_lights1 + self.reil_type1 = reil_type1 + + self.displ_thresh2 = displ_thresh2 + self.blastpad2 = blastpad2 + self.markings2 = markings2 + self.appr_lights2 = appr_lights2 + self.tdz_lights2 = tdz_lights2 + self.reil_type2 = reil_type2 + + def write(self, f): + f.write((f"100 {float(self.width):.2f} {int(self.surface)} {int(self.shoulder)} {self.smoothness} {int(self.center_lights)}" + + f" {int(self.edge_lights)} {int(self.distance_signs)}" + + f" {self.id1} {float(self.lat1):.8f} {float(self.lon1):.8f} {float(self.displ_thresh1):.2f} {float(self.blastpad1):.2f} {int(self.markings1)}" + + f" {int(self.appr_lights2)} {int(self.tdz_lights2)} {int(self.reil_type2)}" + + f" {self.id2} {float(self.lat2):.8f} {float(self.lon2):.8f} {float(self.displ_thresh2):.2f} {float(self.blastpad2):.2f} {int(self.markings2)}" + + f" {int(self.appr_lights2)} {int(self.tdz_lights2)} {int(self.reil_type2)}\n")) + +class Airport: + def __init__(self, elev, icao, name, lon, lat, type=AirportType.Land): + self.runways = {} + self.helipads = {} + self.parkings = [] + self.aprons = [] + self.tower = None + self.windsocks = [] + self.beacons = [] + + self.elev = elev + self.icao = icao + self.name = name + self.type = type + self.lon = lon + self.lat = lat + + def add_runway(self, runway): + self.runways[runway.id1] = runway + + def add_helipad(self, helipad): + self.helipads[helipad.id] = helipad + + def write(self, f): + f.write(f"{repr(self.type)} {int(self.elev)} 0 0 {self.icao} {self.name}\n") + f.write(f"1302 datum_lat {self.lat}\n") + f.write(f"1302 datum_lon {self.lon}\n") + f.write(f"1302 icao_code {self.icao}\n") + for id in self.runways: + self.runways[id].write(f) + for id in self.helipads: + self.helipads[id].write(f) + """for parking in self.parkings: + parking.write(f) + for apron in self.aprons: + apron.write(f) + if self._tower: + self.tower.write(f) + for windsock in self.windsocks: + windsock.write(f) + for beacon in self.beacons: + beacon.write(f)""" + +class ReaderWriterAptDat: + def __init__(self, file_header="Generated by fgtools.aptdat.ReaderWriterAptDat"): + self._airports = [] + self.file_header = file_header + + def _get_airport_index(self, icao): + for i, airport in enumerate(self._airports): + if airport.icao == icao: + return i + return -1 + + def add_airport(self, airport): + if not airport in self._airports: + self._airports.append(airport) + + def add_airports(self, airports): + for airport in airports: + self.add_airport(airport) + + def get_airport(self, icao): + i = _get_airport_index(icao) + if i > -1: + return self._airports[i] + + def get_airports(self, icaos): + for icao in icaos: + yield self.get_airport(icao) + + def set_airport(self, airport): + i = self._get_airport_index(airport.icao) + if i > -1: + self._airports[i] = airport + else: + self.add_airport(airport) + return i + + def set_airports(self, airports): + for airport in airports: + self.set_airport(airport) + + def remove_airport(self, icao): + return self._airports.pop(self._get_airport_index(icao)) + + def remove_airports(self, icaos): + for icao in icaos: + yield self._airports.pop(self._get_airport_index(icao)) + + def read(self, path): + exists = files.check_exists(path, exit=False) + if exists == 1: + pass + elif exists == 2: + self.read_multiple(os.listdir(path)) + else: + logging.fatal(f"Path {path} does not exist - exiting !") + + + def read_multiple(self, paths): + for path in paths: + self.read(path) + + # Write apt.dat files into output + # @param output -> str Path to put apt.dat files into + # @param merge -> bool Whether to merge all airports into one apt.dat file or write one file per airport + # @param overwrite -> bool Whether to overwrite an apt.dat file when it already exists + # @param overwrite_func -> callable Function whose return value replces overwrite - will get passed + # output_path as positional argument, will be called only if path actually + # exists and is a file + # @return bool 0 on success, 1 if the apt.dat file already exists and overwrite == False + def write(self, output, merge=False, overwrite=False, overwrite_func=None): + if len(self._airports) == 0: + print("ReaderWriterAptDat has no airports - not writing anything !") + return 1 + if merge: + exists = files.check_exists(output, type="file", exit=False) + if exists == 2: + output = os.path.join(output, "apt.dat") + + exists = files.check_exists(output, type="file", exit=False) + if exists == 1: + if callable(overwrite_func): + overwrite = overwrite_func(path) + if not overwrite: + print(f"Output file {output} exists already - not writing any airports !") + return 1 + elif exists == 2: + print(f"Output path {path} for airport is a directory - skipping", end=" " * 100 + "\n") + return 1 + + with open(output, "w") as f: + self._write_header(f) + i = 0 + total = len(airports) + for airport in self._airports: + print(f"Writing airports … {i / total * 100}% ({i} of {total} airports done)", end="\r") + i += 1 + airport.write(f) + print(f"Writing airports … {i / total * 100}% ({i} of {total} airports done)") + self._write_footer(f) + else: + files.check_exists(output, type="dir") + i = 0 + total = len(self._airports) + skipped = 0 + for airport in self._airports: + print(f"Writing airports … {i / total * 100}% ({i} of {total} airports done, {skipped} skipped)", end="\r") + i += 1 + path = os.path.join(output, airport.icao + ".dat") + exists = files.check_exists(path, type="file", exit=False, create=True) + if exists == 1: + if callable(overwrite_func): + overwrite = overwrite_func(path) + if not overwrite: + print(f"Output file {path} for airport exists already - skipping", end=" " * 100 + "\n") + skipped += 1 + continue + elif exists == 2: + print(f"Output path {path} for airport is a directory - skipping", end=" " * 100 + "\n") + continue + with open(path, "w") as f: + self._write_header(f) + airport.write(f) + self._write_footer(f) + print(f"Writing airports … {i / total * 100}% ({i} of {total} airports done, {skipped} skipped)", end=" " * 100 + "\n") + return 0 + + def _write_header(self, f): + f.write("I\n") + f.write(f"1130 {self.file_header}\n") + + def _write_footer(self, f): + f.write("99\n") diff --git a/dsf2stg_lookup.py b/fgtools/dsf2stg_lookup.py old mode 100755 new mode 100644 similarity index 100% rename from dsf2stg_lookup.py rename to fgtools/dsf2stg_lookup.py diff --git a/fgtools/fgelev.py b/fgtools/fgelev.py new file mode 100644 index 0000000..2d3eff1 --- /dev/null +++ b/fgtools/fgelev.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +import subprocess + +class Pipe: + def __init__(self, fgelev, fgscenery, fgdata): + print("Creating pipe to fgelev … ", end="") + sys.stdout.flush() + self.env = os.environ.copy() + self.env["FG_SCENERY"] = os.pathsep.join(fgscenery) + self.env["FG_ROOT"] = fgdata + self.pipe = subprocess.Popen(args=[fgelev, "--expire", "1"], env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.pipe.stdout.flush() + self.pipe.stdout.readline() + self.pipe.stdin.flush() + self.pipe.stdin.flush() + print("done") + + def get_elevation(self, lon, lat): + elevpipe.stdin.write(f"_ {lon} {lat}\n".encode("utf-8")) + elevpipe.stdin.flush() + elevout = elevpipe.stdout.readline().split() + if len(elevout) == 2: + return float(elevout[1]) + else + diff --git a/fgtools/geo/__init__.py b/fgtools/geo/__init__.py new file mode 100644 index 0000000..210bca5 --- /dev/null +++ b/fgtools/geo/__init__.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +EARTH_RADIUS = 6378138.12 + diff --git a/fgtools/geo/coord.py b/fgtools/geo/coord.py new file mode 100644 index 0000000..26ec393 --- /dev/null +++ b/fgtools/geo/coord.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +import math + +from fgtools.utils import unit_convert, wrap_period +from fgtools.geo import EARTH_RADIUS + +class Coord: + def __init__(self, lon, lat): + self.lon = lon + self.lat = lat + + def distance_m(self, other): + lon1, lat1, lon2, lat2 = map(math.radians, (self.lon, self.lat, other.lon, other.lat)) + return abs(EARTH_RADIUS * math.acos(round(math.sin(lat1) * math.sin(lat2) + math.cos(lat1) * math.cos(lat2) * math.cos(lon1 - lon2), 14))) + + def distance_km(self, other): + return self.distance_m(other) / 1000 + + def distance_ft(self, other): + return unit_convert.m2ft(self.distance_m(other)) + + def angle(self, other): + dlon = (math.radians(other.lon) - math.radians(self.lon)) + x = math.sin(dlon) * math.cos(math.radians(other.lat)) + y = (math.cos(math.radians(self.lat)) * math.sin(math.radians(other.lat)) - + math.sin(math.radians(self.lat)) * math.cos(math.radians(other.lat)) * math.cos(dlon)) + angle = math.degrees(math.fmod(math.atan2(x, y), 2 * math.pi)) + return wrap_period(angle, 0, 360) + + def apply_angle_distance_m(self, angle, distance): + lon = math.radians(self.lon) + lat = math.radians(self.lat) + heading = math.radians(angle) + distance /= EARTH_RADIUS + + if distance < 0: + distance = abs(distance) + heading -= math.pi + + lat = math.asin(math.sin(lat) * math.cos(distance) + math.cos(lat) * math.sin(distance) * math.cos(heading)) + + if math.cos(lat) > 1e-15: + lon = math.pi - math.fmod(math.pi - lon - math.asin(math.sin(heading) * math.sin(distance) / math.cos(lat)), (2 * math.pi)) + + lon = math.degrees(lon) + lat = math.degrees(lat) + if lon > 180: + lon -= 360 + elif lon < -180: + lon += 360 + + return Coord(lon, lat) + diff --git a/fgtools/geo/rectangle.py b/fgtools/geo/rectangle.py new file mode 100644 index 0000000..36372de --- /dev/null +++ b/fgtools/geo/rectangle.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +from fgtools.geo import coord + +class Rectangle: + def __init__(self, ll, ur): + if not (isinstance(ll, coord.Coord) and isinstance(ur, coord.Coord)): + raise TypeError("loer left or upper right coordinate is not of type fgtools.geo.coord.Coord") + self.ll = ll + self.ur = ur + + def midpoint(self): + return coord.Coord((self.ll.lon + self.ur.lon) / 2, (self.ll.lat + self.ur.lat) / 2) + + def is_inside(self, coord): + return self.ll.lon <= coord.lon <= self.ur.lon and self.ll.lat <= coord.lat <= self.ur.lat + + def diagonal_m(self): + return self.ll.distance_m(self.ur) + + def length_m(self): + return self.ur.distance_m(Coord(self.ur.lon, self.ll.lat)) + + def width_m(self): + return self.ll.distance_m(Coord(self.ur.lon, self.ll.lat)) + diff --git a/fgtools/math/__init__.py b/fgtools/math/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fgtools/math/coord.py b/fgtools/math/coord.py new file mode 100644 index 0000000..90e841a --- /dev/null +++ b/fgtools/math/coord.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +class Coord: + def __init__(self, x, y): + self.x = x + self.y = y + + def distance(self, other): + return math.sqrt((other.x - self.x) ** 2 + (other.y - self.y) ** 2) + + def angle(self, other): + return math.degrees(math.atan2(other.x - self.x, other.y - self.y)) + + def apply_angle_distance(self, angle, distance): + angle_rad = math.pi / 2 - math.radians(angle) + return Coord(self.x + distance * math.cos(angle_rad), self.y + distance * math.sin(angle_rad)) diff --git a/fgtools/math/rectangle.py b/fgtools/math/rectangle.py new file mode 100644 index 0000000..c492d23 --- /dev/null +++ b/fgtools/math/rectangle.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +from fgtools.math import coord + +class Rectangle: + def __init__(self, ll, ur): + if not (isinstance(ll, coord.Coord) and isinstance(ur, coord.Coord): + raise TypeError("loer left or upper right coordinate is not of type fgtools.math.coord.Coord") + self.ll = ll + self.ur = ur + + def midpoint(self): + return coord.Coord((self.ll.x + self.ur.x) / 2, (self.ll.y + self.ur.y) / 2) + +- def is_inside(self, coord): + return self.ll.x <= coord.x <= self.ur.x and self.ll.y <= coord.y <= self.ur.y + + def diagonal(self): + return self.ll.distance(self.ur) + + def length(self): + return self.ur.distance(Coord(self.ur.x, self.ll.y)) + + def width(self): + return self.ll.distance(Coord(self.ur.x, self.ll.y)) + diff --git a/utils/__init__.py b/fgtools/utils/__init__.py similarity index 100% rename from utils/__init__.py rename to fgtools/utils/__init__.py diff --git a/utils/constants.py b/fgtools/utils/constants.py similarity index 54% rename from utils/constants.py rename to fgtools/utils/constants.py index 9e14e6c..4db0bd8 100644 --- a/utils/constants.py +++ b/fgtools/utils/constants.py @@ -2,9 +2,11 @@ #-*- coding:utf-8 -*- import os -import sys +from appdirs import user_cache_dir HOME = os.environ.get("HOME", os.path.expanduser("~")) +CACHEDIR = os.environ.get("FGTOOLS_CACHEDIR", user_cache_dir("fgtools", "TheEagle")) +os.makedirs(CACHEDIR, exist_ok=True) __version__ = (1, 0, 0) __versionstr__ = ".".join(map(str, __version__)) diff --git a/fgtools/utils/files.py b/fgtools/utils/files.py new file mode 100644 index 0000000..b22eef4 --- /dev/null +++ b/fgtools/utils/files.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +import os +import logging + +from fgtools.utils import isiterable + +def find_input_files(paths, prefix="", suffix=""): + if not isiterable(paths): + if isinstance(paths, str): + paths = [paths] + else: + raise TypeError("paths is not iterable / not a string") + + files = [] + for path in paths: + if os.path.isfile(path) and os.path.split(path)[-1].startswith(prefix) and path.endswith(suffix): + files.append(path) + elif os.path.isdir(path): + files += find_input_files([os.path.join(path, s) for s in os.listdir(path)]) + else: + print(f"Input file / directory {path} does not exist - skipping") + + return files + +def write_xml_header(f): + f.write('\n') + +def check_exists(path, exit=True, type="file", create=True): + if type not in ("file", "dir"): + raise ValueError(f'check_exists() got an unrecognized value {type} for argument "type" - must be either "file" or "dir" !') + + if not path: + raise ValueError(f'check_exists() got an empty path !') + + action = ("skipping", "exiting")[exit] + # logging.fatal returns nothing so this will always succeed + func = (logging.warn, lambda s: logging.fatal(s) or sys.exit(1))[exit] + if os.path.isfile(path): + if type == "file": + return 1 + else: + func(f"Path {path} is not a file - {action} !") + return 2 + elif os.path.isdir(path): + if type == "dir": + return 1 + else: + func(f"Path {path} is not a directory - {action} !") + return 2 + else: + if create: + logging.info(f"Creating non-existent path {path}") + if type == "file": + parts = os.path.split(path) + if len(parts) > 1: + os.makedirs(os.path.join(*parts[:-1]), exist_ok=True) + else: + os.makedirs(".", exist_ok=True) + with open(path, "a") as f: + pass + else: + os.makedirs(path, exist_ok=True) + return 3 + else: + func(f"Path {path} does not exist - {action} !") + return 0 diff --git a/utils/interpolator.py b/fgtools/utils/interpolator.py similarity index 100% rename from utils/interpolator.py rename to fgtools/utils/interpolator.py diff --git a/utils/unit_convert.py b/fgtools/utils/unit_convert.py similarity index 100% rename from utils/unit_convert.py rename to fgtools/utils/unit_convert.py diff --git a/scenery/aptdat2airportsxml.py b/scenery/aptdat2airportsxml.py index e20e8a7..09ad3a9 100644 --- a/scenery/aptdat2airportsxml.py +++ b/scenery/aptdat2airportsxml.py @@ -8,7 +8,7 @@ import statistics from fgtools.utils.files import find_input_files from fgtools import utils -from fgtools.utils import geo +from fgtools.geo import coord from fgtools.utils import unit_convert def format_coord(coord, lonlat): @@ -19,10 +19,7 @@ def format_coord(coord, lonlat): return f"{prefix}{i} {f * 60:.8f}" def get_icao_xml_path(icao, what): - if len(icao) == 3: - return f"{icao[0]}/{icao[1]}/{icao}.{what}.xml" - else: - return f"{icao[0]}/{icao[1]}/{icao[2]}/{icao}.{what}.xml" + return f"{icao[0]}/{icao[1]}/{icao[2]}/{icao}.{what}.xml" class Parking: def __init__(self, index, type, name, lon, lat, hdg, radius=7.5, pushback_route=-1, airline_codes=[]): @@ -42,7 +39,7 @@ class Parking: def __repr__(self): s = (f' -1: s += f' pushBackRoute="{self.pushback_route}"' @@ -63,66 +60,69 @@ class TaxiNode: return self.on_runway != None def __repr__(self): - return (f' \n') + return (f' \n') class TaxiEdge: - def __init__(self, begin, end, is_on_runway, name): + def __init__(self, begin, end, bidirectional, is_on_runway, name): self.begin = begin self.end = end self.name = name + self.bidirectional = bidirectional self.is_on_runway = is_on_runway self.is_pushback_route = False def __bool__(self): return self.is_pushback_route != None + def __contains__(self, node): + return node.index in (self.begin, self.end) + def __repr__(self): - return (f' \n') + if self.bidirectional: + s += (f' \n') + return s class Runway: def __init__(self, id1, lon1, lat1, displ1, stopway1, id2, lon2, lat2, displ2, stopway2): - self.lon1 = lon1 - self.lat1 = lat1 + self.coord1 = coord.Coord(lon1, lat1) self.id1 = id1 self.displ1 = displ1 self.stopway1 = stopway1 - self.lon2 = lon2 - self.lat2 = lat2 + self.coord2 = coord.Coord(lon2, lat2) self.id2 = id2 self.displ2 = displ2 self.stopway2 = stopway2 def get_length_m(self): - return geo.great_circle_distance_m(self.lon1, self.lat1, self.lon2, self.lat2) + return self.coord1.distance_m(self.coord2) def get_length_ft(self): return unit_convert.m2ft(self.get_length_m()) def get_heading1_deg(self): - return geo.get_bearing_deg(self.lon1, self.lat1, self.lon2, self.lat2) + return self.coord2.angle(self.coord1) def get_heading2_deg(self): - brg = self.get_heading1_deg() + 180 - while brg >= 360: - brg -= 360 - return brg + return self.coord1.angle(self.coord2) def __repr__(self): return f""" - {self.lon1} - {self.lat1} + {self.coord1.lat} + {self.coord1.lon} {self.id1} {self.get_heading1_deg():.2f} {self.displ1} {self.stopway1} - {self.lon2} - {self.lat2} + {self.coord2.lat} + {self.coord2.lon} {self.id2} {self.get_heading2_deg():.2f} {self.displ2} @@ -144,8 +144,8 @@ class Tower: def __repr__(self): return f""" - {self.lon} {self.lat} + {self.lon} {self.agl} @@ -185,8 +185,8 @@ class ILS: if None not in (self.lon1, self.lat1, self.rwy1, self.hdg1, self.elev1, self.ident1): s += f""" - {self.lon1} {self.lat1} + {self.lon1} {self.rwy1} {self.hdg1:.2f} {self.elev1} @@ -195,8 +195,8 @@ class ILS: """ if None not in (self.lon2, self.lat2, self.rwy2, self.hdg2, self.elev2, self.ident2): s += """ - {self.lon2} {self.lat2} + {self.lon2} {self.rwy2} {self.hdg2:.2f} {self.elev2} @@ -283,25 +283,26 @@ def parse_aptdat_files(files, nav_dat, print_runway_lengths): elif line[0] == 14: towers[icao] = Tower(float(line[2]), float(line[1]), float(line[3])) elif line[0] == 1201: # taxi node - taxi_nodes[icao].append(TaxiNode(float(line[2]), float(line[1]), len(taxi_nodes[icao]))) + taxi_nodes[icao].append(TaxiNode(float(line[2]), float(line[1]), int(line[4]) + len(parkings[icao]))) elif line[0] == 1202: # taxi edge if len(line) == 6: - taxi_edges[icao].append(TaxiEdge(int(line[1]), int(line[2]), line[4] == "runway", line[5])) + edge = TaxiEdge(int(line[1]) + len(parkings[icao]), int(line[2]) + len(parkings[icao]), line[3] == "twoway", line[4] == "runway", line[5]) + if edge.begin != edge.end: + taxi_edges[icao].append(edge) if not icao in towers and len(runways[icao]) > 0: runway_lons = [] runway_lats = [] runway_hdgs = [] for runway in runways[icao]: - runway_lons += [runway.lon1, runway.lon2] - runway_lats += [runway.lat1, runway.lat2] + runway_lons += [runway.coord1.lon, runway.coord2.lon] + runway_lats += [runway.coord1.lat, runway.coord2.lat] runway_hdgs.append(runway.get_heading1_deg()) - tower_lon, tower_lat = geo.apply_heading_distance(statistics.median(runway_lons), - statistics.median(runway_lats), - statistics.median(runway_hdgs) + 90, 50) - towers[icao] = Tower(tower_lon, tower_lat, 15) + tower_pos = (coord.Coord(statistics.median(runway_lons), statistics.median(runway_lats)) + .apply_angle_distance_m(statistics.median(runway_hdgs) + 90, 200)) + towers[icao] = Tower(tower_pos.lon, tower_pos.lat, 15) if not parkings[icao]: del parkings[icao] @@ -352,10 +353,11 @@ def write_groundnet_files(parkings, taxi_nodes, taxi_edges, output, overwrite): if len(taxi_nodes[icao]) > 0 and len(taxi_edges[icao]) > 0: f.write(" \n") for edge in taxi_edges[icao]: - taxi_nodes[icao][edge.begin].is_on_runway = edge.is_on_runway - taxi_nodes[icao][edge.end].is_on_runway = edge.is_on_runway + taxi_nodes[icao][edge.begin - len(parkings[icao])].is_on_runway = edge.is_on_runway + taxi_nodes[icao][edge.end - len(parkings[icao])].is_on_runway = edge.is_on_runway for node in taxi_nodes[icao]: - f.write(repr(node)) + if any(node in edge for edge in taxi_edges[icao]): + f.write(repr(node)) f.write(" \n") f.write(" \n") diff --git a/scenery/fix-aptdat-icaos.py b/scenery/fix-aptdat-icaos.py new file mode 100644 index 0000000..07acd25 --- /dev/null +++ b/scenery/fix-aptdat-icaos.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +import os +import sys +import csv +import requests +import argparse +import shutil + +from fgtools.utils.files import find_input_files +from fgtools.geo.coord import Coord +from fgtools.utils import constants + +def _get_ourairports_csv(what): + path = os.path.join(constants.CACHEDIR, what + ".csv") + if not os.path.isfile(path): + with open(path, "w") as f: + f.write(requests.get(f"https://davidmegginson.github.io/ourairports-data/{what}.csv").content.decode()) + f = open(path, "r", newline="") + return list(csv.DictReader(f))[1:] + +def get_ourairports_icao(airport, csv): + c = Coord(airport["lon"], airport["lat"]) + matches = [] + for line in csv: + d = c.distance_km(Coord(float(line["longitude_deg"]), float(line["latitude_deg"]))) + if d <= 10: + matches.append({"distance": d, "icao": line["gps_code"] or line["local_code"] or line["ident"]}) + matches.sort(key=lambda m: m["distance"]) + if len(matches) and matches[0]["icao"]: + if len(airport["icao"]) != 4: + airport["newicao"] = matches[0]["icao"] + else: + print(f"No matching airport found for {airport['icao']} - skipping", end=" " * 100 + "\n") + return airport + +def process(files, output): + csv = _get_ourairports_csv("airports") + i = 0 + n = 0 + skipped = 0 + total = len(files) + files_d = {} + for p in files: + print(f"Parsing files … {i / total * 100:.1f}% ({i} of {total} done, found {n} airports)", end="\r") + i += 1 + file_d = {"lines": [], "airports": {}} + with open(p, "r") as f: + file_d["lines"] = list(map(lambda l: list(filter(None, l)), map(lambda s: s.split(" "), filter(None, map(str.strip, f.readlines()))))) + curicao = "" + for line in file_d["lines"]: + if line[0] in ("1", "16", "17"): + curicao = line[4] + skip = False + file_d["airports"][curicao] = {"icao": curicao} + n += 1 + elif line[0] == "1302": + if line[1] == "datum_lon": + if len(line) < 3: + continue + file_d["airports"][curicao]["lon"] = float(line[2]) + if line[1] == "datum_lat": + if len(line) < 3: + continue + file_d["airports"][curicao]["lat"] = float(line[2]) + elif line[0] in ("100", "101", "102") and not ("lon" in file_d["airports"][curicao] and "lat" in file_d["airports"][curicao]): + # no datum_lon / datum_lat found, approximate airport position from first runway / helipad found + if line[0] == "100": # land runway + lon = (float(line[10]) + float(line[19])) / 2 + lat = (float(line[9]) + float(line[18])) / 2 + elif line[0] == "101": # water runway + lon = (float(line[5]) + float(line[8])) / 2 + lat = (float(line[4]) + float(line[7])) / 2 + else: # helipad + lon = float(line[3]) + lat = float(line[2]) + file_d["airports"][curicao]["lon"] = lon + file_d["airports"][curicao]["lat"] = lat + + for icao in list(file_d["airports"].keys()): + if not ("lon" in file_d["airports"][icao] and "lat" in file_d["airports"][icao]): + print(f"Unable to get longitude / latitude of airport {curicao} in file {p} - skipping", end=" " * 100 + "\n") + del file_d["airports"][icao] + n -= 1 + skipped += 1 + + files_d[p] = file_d + print(f"Parsing files … {i / total * 100:.1f}% ({i} of {total} done, found {n} airports, skipped {skipped})", end=" " * 100 + "\n") + + i = 0 + total = n + for p in files_d: + for icao in files_d[p]["airports"]: + print(f"Getting ICAOs for airports … {i / total * 100:.1f}% ({i} of {total} done)", end="\r") + i += 1 + files_d[p]["airports"][icao] = get_ourairports_icao(files_d[p]["airports"][icao], csv) + print(f"Getting ICAOs for airports … {i / total * 100:.1f}% ({i} of {total} done)", end=" " * 100 + "\n") + + i = 0 + total = len(files_d) + for p in files_d: + print(f"Writing new apt.dat files … {i / total * 100:.1f}% ({i} of {total} done)", end="\r") + i += 1 + if output == None: + outp = p + else: + outp = os.path.join(output, os.path.split(p)[-1]) + + parts = os.path.split(outp) + prefix, newname = os.path.join(*parts[:-1]), parts[-1] + if len(files_d[p]["airports"]) > 0 and newname != "apt.dat": + firsticao = list(files_d[p]["airports"].keys())[0] + if "newicao" in files_d[p]["airports"][firsticao]: + newname = files_d[p]["airports"][firsticao]["newicao"] + ".dat" + newoutp = os.path.join(prefix, newname) + + with open(outp, "w") as f: + for line in files_d[p]["lines"]: + if line[0] in ("1", "16", "17") and line[4] in files_d[p]["airports"] and "newicao" in files_d[p]["airports"][line[4]]: + line[4] = files_d[p]["airports"][line[4]]["newicao"] + f.write(" ".join(line) + "\n") + + if outp != newoutp: + print(f"Renaming file: {outp} -> {newoutp}", end=" " * 100 + "\n") + shutil.move(outp, newoutp) + print(f"Writing new apt.dat files … {i / total * 100:.1f}% ({i} of {total} done)", end=" " * 100 + "\n") + +if __name__ == "__main__": + argp = argparse.ArgumentParser(description="Fix apt.dat ICAO's - some apt.dat files from the XPlane gateway have them of the form XAY0016 - this script gets the right ICAO from OurAirports data (if the airport is found there)") + + argp.add_argument( + "-i", "--input", + help="Input apt.dat file(s) / folder(s) containing apt.dat files", + required=True, + nargs="+" + ) + + argp.add_argument( + "-o", "--output", + help="Output folder for the modified apt.dat files - omit to edit the files in-place", + default=None + ) + + args = argp.parse_args() + + infiles = find_input_files(args.input) + process(infiles, args.output) + diff --git a/scenery/osm2aptdat.py b/scenery/osm2aptdat.py new file mode 100644 index 0000000..0536f93 --- /dev/null +++ b/scenery/osm2aptdat.py @@ -0,0 +1,490 @@ +#!/usr/bin/env python +#-*- coding:utf-8 -*- + +import os +import sys +import argparse +import requests +import csv +import re +import logging +import math + +from OSMPythonTools import overpass + +from fgtools.geo import coord, rectangle +from fgtools.utils import files +from fgtools import aptdat +from fgtools.utils import constants +from fgtools.utils import unit_convert + +osmapi = overpass.Overpass() + +def parse_runway_id(id): + which, heading = "", 0 + if id[-1] in ("L", "C", "R"): + which = id[-1] + try: + if which: + heading = int(id[:-1]) + else: + heading = int(id) + except ValueError: + heading = 0 + return heading * 10, which + +def parse_surface_type(surface): + if re.search("pem|mac|sealed|bit|asp(h)?(alt)?|tarmac", surface) or surface in ("b"): + surface = "Asphalt" + elif re.search("wood|cement|bri(ck)?|hard|paved|pad|psp|met|c[o0]n(c)?", surface): + surface = "Concrete" + elif re.search("rock|gvl|grvl|gravel|pi(ç|c)", surface): + surface = "Gravel" + elif re.search("tr(ea)?t(e)?d|san(d)?|ter|none|cor|so(ft|d|il)|earth|cop|com|per|ground|silt|cla(y)?|dirt|turf", surface): + surface = "Dirt" + elif re.search("gr(a*)?s|gre", surface) or surface in ("g"): + surface = "Grass" + elif re.search("wat(er)?", surface): + surface = "Water" + elif re.search("sno|ice", surface): + surface = "SnowIce" + else: + surface = "Unknown" + return surface + +def _get_ourairports_csv(what): + path = os.path.join(constants.CACHEDIR, what + ".csv") + if not os.path.isfile(path): + with open(path, "w") as f: + f.write(requests.get(f"https://davidmegginson.github.io/ourairports-data/{what}.csv").content.decode()) + f = open(path, "r", newline="") + return list(csv.DictReader(f))[1:] + +def get_ourairports_airports(bbox=None, icaos=[]): + if not (bbox or icaos): + raise TypeError("both bbox and icaos are None") + csv = _get_ourairports_csv("airports") + airports = [] + print("Creating airports from OurAirports data … ", end="") + for line in csv: + type = aptdat.AirportType.Land + if "sea" in line["type"]: + type = aptdat.AirportType.Sea + elif "heli" in line["type"]: + type = aptdat.AirportType.Heli + + code = line["gps_code"] or line["local_code"] or line["ident"] + if not code: + continue + airport = {"ident": line["ident"], "airport": aptdat.Airport(int(line["elevation_ft"] or 0), code, + line["name"], float(line["longitude_deg"]), + float(line["latitude_deg"]), type=type)} + + if code in icaos: + icaos.remove(code) + airports.append(airport) + elif bbox and bbox.is_inside(coord.Coord(airport["airport"].lon, airport["airport"].lat)): + airports.append(airport) + print(f"done - found {len(airports)} airports for the given bounding box / ICAO") + return airports + +def get_osm_elements_near_airport(airport, what, query, element_type, radius=10000, max_retries=10): + left = coord.Coord(airport.lon, 0).apply_angle_distance_m(-90, radius).lon + right = coord.Coord(airport.lon, 0).apply_angle_distance_m(90, radius).lon + upper = coord.Coord(0, airport.lat).apply_angle_distance_m(0, radius).lat + lower = coord.Coord(0, airport.lat).apply_angle_distance_m(180, radius).lat + query = overpass.overpassQueryBuilder(bbox=[lower, left, upper, right], elementType=element_type, + selector=query, out="center") + result = -1 + retries = 0 + while result == -1 and retries < max_retries: + try: + if element_type == "node": + result = osmapi.query(query, timeout=60).nodes() + elif element_type == "way": + result = osmapi.query(query, timeout=60).ways() + elif element_type == "relations": + result = osmapi.query(query, timeout=60).relations() + else: + result = [] + qresult = osmapi.query(query, timeout=100) + if not qresult: + result = None + break + for etype in element_type: + result += getattr(qresult, etype + "s")() or [] + except Exception as e: + if "timeout" in str(e.args).lower(): + result = -1 + else: + raise e + retries += 1 + if result == -1: + print(f"API query for OSM {what} data for airport {airport.icao} timed out {retries} times - won't retry", end=" " * 100 + "\n") + result = [] + if result == None: + result = [] + print(f"No OSM {what} data found for airport {airport.icao}", end=" " * 100 + "\n") + return result + +def add_osm_runways(airport): + result = get_osm_elements_near_airport(airport["airport"], "runway", '"aeroway"="runway"', "way") + osmways = [] + for way in result: + first, last = way.nodes()[0], way.nodes()[-1] + if first.id() == last.id(): + print("Got a runway mapped as area from OSM - not supported yet", end=" " * 100 + "\n") + continue + first = coord.Coord(first.lon(), first.lat()) + last = coord.Coord(last.lon(), last.lat()) + heading = first.angle(last) + if heading > 180: + first, last = last, first + heading -= 180 + center = rectangle.Rectangle(last, first).midpoint() + distance = coord.Coord(airport["airport"].lon, airport["airport"].lat).distance_m(center) + osmways.append({"distance": distance, "heading": heading, "way": way, "first": first, "last": last}) + osmways.sort(key=lambda r: r["distance"]) + for i, runway in enumerate(airport["runways"]): + osmways_filtered = [] + for osmway in osmways: + if "ref" in osmway["way"].tags(): + # water runways somtimes have N, NE, etc. as identifier instead of 36, 04, etc. + mapping = {"N": "36", "NE": "04", "E": "09", "SE": "13", "S": "18", "SW": "22", "W": "27", "NW": "31"} + if runway["le_ident"] in mapping: + runway["le_ident"] = mapping[runway["le_ident"]] + if runway["he_ident"] in mapping: + runway["he_ident"] = mapping[runway["he_ident"]] + + if osmway["way"].tags()["ref"] == runway["le_ident"] + "/" + runway["he_ident"]: + osmways_filtered = [osmway] + break + else: + heading, which = parse_runway_id(runway["le_ident"]) + if heading <= osmway["heading"] < heading + 10: + if not which: + osmways_filtered = [osmway] + break + else: + osmways_filtered.append(osmway) + if len(osmways_filtered) == 3: + break + if len(osmways_filtered) == 0: + print("No OSM data found for runway", runway["le_ident"], "at airport", airport["airport"].icao, end=" " * 100 + "\n") + elif len(osmways_filtered) == 1: # just one matching runway - nothing left to do + runway["osmway"] = osmways_filtered[0] + elif len(osmways_filtered) == 2: # two parallel runways - sort from left to right and pick the right one + center1 = coord.Coord(osmways_filtered[0].centerLon(), osmways_filtered[0].centerLat()) + center2 = coord.Coord(osmways_filtered[1].centerLon(), osmways_filtered[1].centerLat()) + heading, which = parse_runway_id(runway["le_ident"]) + rel_bearing = center1.angle(center2) - heading + index = 0 + if rel_bearing > 0: + index = 0 if which == "L" else 1 + else: + index = 1 if which == "L" else 0 + runway["osmway"] = osmways_filtered[index] + else: # three or more parallel runways - sort the first three from left to right and pick the right one + center1 = coord.Coord(osmways_filtered[0].centerLon(), osmways_filtered[0].centerLat()) + center2 = coord.Coord(osmways_filtered[1].centerLon(), osmways_filtered[1].centerLat()) + center3 = coord.Coord(osmways_filtered[2].centerLon(), osmways_filtered[2].centerLat()) + heading, which = parse_runway_id(runway["le_ident"]) + rel_bearing1 = center1.angle(center2) - heading + rel_bearing2 = center2.angle(center3) - heading + index = 0 + if rel_bearing1 > 0 and rel_bearing2 > 0: + index = "LCR".find(which) + elif rel_bearing1 <= 0 and rel_bearing2 > 0: + index = "CLR".find(which) + elif rel_bearing1 > 0 and rel_bearing2 <= 0: + index = "LRC".find(which) + else: + index = "RCL".find(which) + runway["osmway"] = osmways_filtered[index] + + if not runway["le_longitude_deg"] or not runway["he_longitude_deg"]: + if "osmway" in runway: + runway["le_heading_degT"] = runway["osmway"]["heading"] + runway["he_heading_degT"] = runway["osmway"]["heading"] + 180 + runway["le_longitude_deg"] = runway["osmway"]["first"].lon + runway["le_latitude_deg"] = runway["osmway"]["first"].lat + runway["he_longitude_deg"] = runway["osmway"]["last"].lon + runway["he_latitude_deg"] = runway["osmway"]["last"].lat + else: + print("No threshold information found for runway", runway["le_ident"], "at", airport["airport"].icao, "- removing !", end=" " * 100 + "\n") + airport["runways"][i] = None + airport["runways"] = list(filter(None, airport["runways"])) + +def add_osm_helipads(airport): + result = get_osm_elements_near_airport(airport["airport"], "helipad", '"aeroway"="helipad"', ["node", "way"]) + osmhelipads = [] + counter = 0 + for element in result: + if element.type() == "node": + c = coord.Coord(element.lon(), element.lat()) + radius = 0 + else: + lon_sum = lat_sum = 0 + divider = 0 + for node in element.nodes(): + lon_sum += node.lon() + lat_sum += node.lat() + divider += 1 + c = coord.Coord(lon_sum / divider, lat_sum / divider) + dist_sum = 0 + for node in element.nodes(): + dist_sum += c.distance_m(coord.Coord(node.lon(), node.lat())) + radius = dist_sum / divider + surface = "" + if "surface" in element.tags(): + surface = element.tags()["surface"] + lit = None + if "lit" in element.tags(): + lit = element.tags()["lit"] == "yes" + id = f"H{counter}" + counter += 1 + osmhelipads.append({"coord": c, "radius": radius, "surface": surface, "id": id, "lit": None}) + with_lon_lat = [] + without_lon_lat = [] + while len(airport["helipads"]): + helipad = airport["helipads"].pop() + if helipad["le_longitude_deg"] and helipad["le_latitude_deg"]: + with_lon_lat.append(helipad) + else: + without_lon_lat.append(helipad) + for helipad in with_lon_lat: + if len(osmhelipads) > 0: + for osmhelipad in osmhelipads: + osmhelipad["distance"] = osmhelipad["coord"].distance_m(coord.Coord(float(helipad["le_longitude_deg"]), + float(helipad["le_latitude_deg"]))) + osmhelipads.sort(key=lambda d: d["distance"]) + helipad["osmhelipad"] = osmhelipads.pop(0) + else: + helipad["osmhelipad"] = {} + + for osmhelipad in osmhelipads: + osmhelipad["distance"] = osmhelipad["coord"].distance_m(coord.Coord(airport["airport"].lon, airport["airport"].lat)) + osmhelipads.sort(key=lambda d: d["distance"]) + for i, helipad in enumerate(without_lon_lat): + if len(osmhelipads) > 0: + helipad["osmhelipad"] = osmhelipads.pop(0) + helipad["le_longitude_deg"] = helipad["osmhelipad"]["coord"].lon + helipad["le_latitude_deg"] = helipad["osmhelipad"]["coord"].lat + else: + without_lon_lat[i] = None + if None in without_lon_lat: + print(f"No position information found for {without_lon_lat.count(None)} helipad(s) at {airport['airport'].icao} - removing", end=" " * 100 + "\n") + without_lon_lat = list(filter(None, without_lon_lat)) + + airport["helipads"] = with_lon_lat + without_lon_lat + + for osmhelipad in osmhelipads: + helipad = {"airport_ident": airport["airport"].icao, "le_longitude_deg": osmhelipad["coord"].lon, + "le_latitude_deg": osmhelipad["coord"].lat, "lighted": osmhelipad["lit"], "surface": osmhelipad["surface"], + "length_ft": 0, "width_ft": 0, "osmhelipad": osmhelipad} + +def add_ourairports_runways(airports): + csv = _get_ourairports_csv("runways") + i = 0 + total = len(airports) + for airport in airports: + print(f"Extracting runways from OurAirports data … {i / total * 100:.1f}% ({i} of {total} airports done)", end="\r") + i += 1 + runways = [] + helipads = [] + for line in csv: + if line["airport_ident"] == airport["ident"]: + if re.match(line["le_ident"], "H[0-9]*"): + helipads.append(line) + else: + runways.append(line) + else: + if runways or helipads: + break + airport["runways"] = runways + airport["helipads"] = helipads + add_osm_runways(airport) + add_osm_helipads(airport) + if len(airport["runways"]) == 0 and len(airport["helipads"]) == 0: + print(f"Removing airport {airport['airport'].icao} since it has no runways / helipads !", end=" " * 100 + "\n") + airports[i - 1] = None + continue + + for runway in airport["runways"]: + surface = parse_surface_type(runway["surface"].lower()) + if surface == "Unknown": + if "osmway" in runway: + if "surface" in runway["osmway"]["way"].tags(): + surface = parse_surface_type(runway["osmway"]["way"].tags()["surface"]) + + if surface == "Unknown": + if (int(runway["length_ft"] or 0) > 1500 and int(runway["width_ft"] or 0) > 30) or int(runway["lighted"]): + surface = "Asphalt" + else: + surface = "Dirt" + print("Unknown surface type:", runway["surface"], "for runway", runway["le_ident"], "at airport", airport["airport"].icao, "- falling back to", surface, end=" " * 100 + "\n") + runway["surface"] = surface + print(f"Extracting runways from OurAirports data … {i / total * 100:.1f}% ({i} of {total} airports done)", end="\r\n") + + airports = list(filter(None, airports)) + + i = 0 + total = len(airports) + for airport in airports: + print(f"Creating runways … {i / total * 100}% ({i} of {total} airports done)", end="\r") + i += 1 + for runway in airport["runways"]: + width = runway["width_ft"] + if width != "": + width = float(width) + elif "osmway" in runway and "width" in runway["osmway"]["way"].tags(): + width = round(float(runway["osmway"]["way"].tags()["width"]), 2) + else: + print(f"No width found for runway {runway['le_ident']} at airport {airport['airport'].icao} - guessing from length", end=" " * 100 + "\n") + width = math.sqrt(int(runway["length_ft"] or 0)) + if runway["surface"] == "water": + runway = aptdat.WaterRunway(unit_convert.ft2m(width), + runway["le_ident"], float(runway["le_longitude_deg"]), float(runway["le_latitude_deg"]), + runway["he_ident"], float(runway["he_longitude_deg"]), float(runway["he_latitude_deg"]), + perimeter_buoys=True) + else: + center_lights = edge_lights = bool(runway["lighted"]) + if center_lights and surface not in ("Asphalt", "Concrete"): + center_lights = edge_lights = False + distance_signs = int(runway["length_ft"] or 0) > 4000 + tdz_lights = runway["surface"] in ("Asphalt", "Concrete") + markings = aptdat.RunwayMarkingCode.Visual + if runway["surface"] not in ("Asphalt", "Concrete"): + markings = aptdat.RunwayMarkingCode.NoMarkings + elif 4000 < int(runway["length_ft"] or 0) < 6000: + markings = aptdat.RunwayMarkingCode.NonPrecision + elif int(runway["length_ft"] or 0) >= 6000: + markings = aptdat.RunwayMarkingCode.Precision + reil_type = aptdat.REILCode.NoREIL + if markings == aptdat.RunwayMarkingCode.NonPrecision: + reil_type = aptdat.REILCode.UnidirREIL + + runway = aptdat.LandRunway(unit_convert.ft2m(width), getattr(aptdat.SurfaceCode, runway["surface"]), + runway["le_ident"], float(runway["le_longitude_deg"]), float(runway["le_latitude_deg"]), + runway["he_ident"], float(runway["he_longitude_deg"]), float(runway["he_latitude_deg"]), + center_lights=center_lights, edge_lights=edge_lights, distance_signs=distance_signs, + displ_thresh1=float(runway["le_displaced_threshold_ft"] or 0), tdz_lights1=tdz_lights, + markings1=markings, reil_type1=reil_type, + displ_thresh2=float(runway["he_displaced_threshold_ft"] or 0), tdz_lights2=tdz_lights, + markings2=markings, reil_type2=reil_type) + airport["airport"].add_runway(runway) + print(f"Creating runways … {i / total * 100}% ({i} of {total} airports done)") + + i = 0 + for airport in airports: + print("Creating helipads … {i / total * 100}% ({i} of {total} airports done)", end="\r") + i += 1 + for helipad in airport["helipads"]: + if helipad["width_ft"]: + width = round(float(helipad["width_ft"]), 2) + elif "radius" in helipad["osmhelipad"]: + width = radius * 2 + else: + width = 50 + print((f"Unable to get width for for helipad {helipad['le_ident']} at airport {airport['airport'].icao}" + + f" - setting to {width} ft"), end=" " * 100 + "\n") + if helipad["length_ft"]: + length = round(float(helipad["length_ft"]), 2) + elif "radius" in helipad["osmhelipad"]: + length = radius * 2 + else: + length = 50 + print((f"Unable to get length for for helipad {helipad['le_ident']} at airport {airport['airport'].icao}" + + f" - setting to {length} ft"), end=" " * 100 + "\n") + + lighted = bool(int(helipad["lighted"])) + surface = parse_surface_type(helipad["surface"]) + if surface == "Unknown" and "surface" in helipad["osmhelipad"]: + surface = parse_surface_type(helipad["osmhelipad"]["surface"]) + if surface == "Unknown": + if lighted: + surface = "Asphalt" + else: + surface = "Grass" + + print((f"Unknown surface type {helipad['surface']} for helipad {helipad['le_ident']} at" + + f" airport {airport['airport'].icao} - setting to {surface}"), end=" " * 100 + "\n") + + if surface not in ("Concrete", "Asphalt"): + lighted = False + print(helipad) + helipad = aptdat.Helipad(helipad["id"], float(helipad["le_longitude_deg"]), float(helipad["le_latitude_deg"]), 0, + unit_convert.ft2m(length), unit_convert.ft2m(width), surface, edge_lights=lighted) + airport["airport"].add_helipad(helipad) + airports[i - 1] = airport["airport"] + return airports + +def query_airports_by_icaos(icaos): + # remove doubles + icaos = list(set(icaos)) + ourairports = get_ourairports_airports(icaos=icaos) + ourairports = add_ourairports_runways(ourairports) + return ourairports + +def query_airports_by_bbox(left, lower, right, upper): + ourairports = get_ourairports_airports(bbox=rectangle.Rectangle(coord.Coord(left, lower), coord.Coord(right, upper))) + ourairports = add_ourairports_runways(ourairports) + return ourairports + +def check_aptdat_written_by_this(path): + with open(path, "r") as f: + i = 0 + while i < 2: + fl = f.readline() + if fl: + i += 1 + if "osm2aptdat.py" in fl: + return True + return False + +def write_aptdat_files(output, airports, merge=False): + writer = aptdat.ReaderWriterAptDat(file_header="Generated from OSM and OurAirports data by fgtools.osm2aptdat.py") + writer.add_airports(airports) + writer.write(output, merge=merge, overwrite_func=check_aptdat_written_by_this) + +if __name__ == "__main__": + logging.getLogger("OSMPythonTools").setLevel(logging.FATAL) + + argp = argparse.ArgumentParser(description="query airports from OSM and convert the results to apt.dat files") + + bbox_icao_group = argp.add_mutually_exclusive_group(required=True) + bbox_icao_group.add_argument( + "-b", "--bbox", + help="GPS coordinates of the lower left and upper right corners of the bounding box within which all airports should be processed", + nargs=4, + metavar=("LL_LON", "LL_LAT", "UR_LON", "UR_LAT"), + type=float, + ) + + bbox_icao_group.add_argument( + "-i", "--icao", + help="ICAO code(s) of the airport(s) to process", + nargs="+" + ) + + argp.add_argument( + "-m", "--merge", + help="Merge all airports into one big apt.dat file instead of writing one file for each airport", + action="store_true" + ) + + argp.add_argument( + "-o", "--output", + help="directory to put apt.dat files into", + required=True + ) + + args = argp.parse_args() + + if args.icao: + airports = query_airports_by_icaos(args.icao) + else: + airports = query_airports_by_bbox(left=args.bbox[0], lower=args.bbox[1], right=args.bbox[2], upper=args.bbox[3]) + + write_aptdat_files(args.output, airports, merge=args.merge) + diff --git a/utils/files.py b/utils/files.py deleted file mode 100644 index 180588c..0000000 --- a/utils/files.py +++ /dev/null @@ -1,28 +0,0 @@ -#!/usr/bin/env python -#-*- coding:utf-8 -*- - -import os - -from fgtools.utils import isiterable - -def find_input_files(paths, prefix="", suffix=""): - if not isiterable(paths): - if isinstance(paths, str): - paths = [paths] - else: - raise TypeError("paths is not iterable") - - files = [] - for path in paths: - if os.path.isfile(path) and os.path.split(path)[-1].startswith(prefix) and path.endswith(suffix): - files.append(path) - elif os.path.isdir(path): - files += find_input_files([os.path.join(path, s) for s in os.listdir(path)]) - else: - print(f"Input file / directory {path} does not exist - skipping") - - return files - -def write_xml_header(f): - f.write('\n') - diff --git a/utils/geo.py b/utils/geo.py deleted file mode 100644 index a595451..0000000 --- a/utils/geo.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python -#-*- coding:utf-8 -*- - -import math - -from fgtools.utils import wrap_period - -EARTH_RADIUS = 6378138.12 - -def great_circle_distance_m(lon1, lat1, lon2, lat2): - lon1, lat1, lon2, lat2 = map(math.radians, (lon1, lat1, lon2, lat2)) - return abs(EARTH_RADIUS * math.acos(math.sin(lat1) * math.sin(lat2) + math.cos(lat1) * math.cos(lat2) * math.cos(lon1 - lon2))) - -def great_circle_distance_km(lon1, lat1, lon2, lat2): - return great_circle_distance_m(lon1, lat1, lon2, lat2) / 1000 - -def get_bearing_deg(lon1, lat1, lon2, lat2): - dlon = (lon2 - lon1) - x = math.cos(math.radians(lat2)) * math.sin(math.radians(dlon)) - y = math.cos(math.radians(lat1)) * math.sin(math.radians(lat2)) - math.sin(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.cos(math.radians(dlon)) - brg = math.atan2(x, y) - brg = math.degrees(brg) - - return wrap_period(brg, 0, 360) - -def apply_heading_distance(lon, lat, heading, distance): - lon = math.radians(lon) - lat = math.radians(lat) - heading = math.radians(heading) - distance /= EARTH_RADIUS - - if distance < 0: - distance = abs(distance) - heading -= math.pi - - lat = math.asin(math.sin(lat) * math.cos(distance) + math.cos(lat) * math.sin(distance) * math.cos(heading)) - - if math.cos(lat) > 1e-15: - lon = math.pi - (math.pi - lon - math.asin(math.sin(heading) * math.sin(distance) / math.cos(lat)) % (2 * math.pi)) - - return wrap_period(math.degrees(lon), -180, 180), wrap_period(math.degrees(lat), -90, 90) -