491 lines
19 KiB
Python
491 lines
19 KiB
Python
#!/usr/bin/env python
|
|
#-*- coding:utf-8 -*-
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import requests
|
|
import csv
|
|
import re
|
|
import logging
|
|
import math
|
|
|
|
from OSMPythonTools import overpass
|
|
|
|
from fgtools.geo import coord, rectangle
|
|
from fgtools.utils import files
|
|
from fgtools import aptdat
|
|
from fgtools.utils import constants
|
|
from fgtools.utils import unit_convert
|
|
|
|
osmapi = overpass.Overpass()
|
|
|
|
def parse_runway_id(id):
|
|
which, heading = "", 0
|
|
if id[-1] in ("L", "C", "R"):
|
|
which = id[-1]
|
|
try:
|
|
if which:
|
|
heading = int(id[:-1])
|
|
else:
|
|
heading = int(id)
|
|
except ValueError:
|
|
heading = 0
|
|
return heading * 10, which
|
|
|
|
def parse_surface_type(surface):
|
|
if re.search("pem|mac|sealed|bit|asp(h)?(alt)?|tarmac", surface) or surface in ("b"):
|
|
surface = "Asphalt"
|
|
elif re.search("wood|cement|bri(ck)?|hard|paved|pad|psp|met|c[o0]n(c)?", surface):
|
|
surface = "Concrete"
|
|
elif re.search("rock|gvl|grvl|gravel|pi(ç|c)", surface):
|
|
surface = "Gravel"
|
|
elif re.search("tr(ea)?t(e)?d|san(d)?|ter|none|cor|so(ft|d|il)|earth|cop|com|per|ground|silt|cla(y)?|dirt|turf", surface):
|
|
surface = "Dirt"
|
|
elif re.search("gr(a*)?s|gre", surface) or surface in ("g"):
|
|
surface = "Grass"
|
|
elif re.search("wat(er)?", surface):
|
|
surface = "Water"
|
|
elif re.search("sno|ice", surface):
|
|
surface = "SnowIce"
|
|
else:
|
|
surface = "Unknown"
|
|
return surface
|
|
|
|
def _get_ourairports_csv(what):
|
|
path = os.path.join(constants.CACHEDIR, what + ".csv")
|
|
if not os.path.isfile(path):
|
|
with open(path, "w") as f:
|
|
f.write(requests.get(f"https://davidmegginson.github.io/ourairports-data/{what}.csv").content.decode())
|
|
f = open(path, "r", newline="")
|
|
return list(csv.DictReader(f))[1:]
|
|
|
|
def get_ourairports_airports(bbox=None, icaos=[]):
|
|
if not (bbox or icaos):
|
|
raise TypeError("both bbox and icaos are None")
|
|
csv = _get_ourairports_csv("airports")
|
|
airports = []
|
|
print("Creating airports from OurAirports data … ", end="")
|
|
for line in csv:
|
|
type = aptdat.AirportType.Land
|
|
if "sea" in line["type"]:
|
|
type = aptdat.AirportType.Sea
|
|
elif "heli" in line["type"]:
|
|
type = aptdat.AirportType.Heli
|
|
|
|
code = line["gps_code"] or line["local_code"] or line["ident"]
|
|
if not code:
|
|
continue
|
|
airport = {"ident": line["ident"], "airport": aptdat.Airport(int(line["elevation_ft"] or 0), code,
|
|
line["name"], float(line["longitude_deg"]),
|
|
float(line["latitude_deg"]), type=type)}
|
|
|
|
if code in icaos:
|
|
icaos.remove(code)
|
|
airports.append(airport)
|
|
elif bbox and bbox.is_inside(coord.Coord(airport["airport"].lon, airport["airport"].lat)):
|
|
airports.append(airport)
|
|
print(f"done - found {len(airports)} airports for the given bounding box / ICAO")
|
|
return airports
|
|
|
|
def get_osm_elements_near_airport(airport, what, query, element_type, radius=10000, max_retries=10):
|
|
left = coord.Coord(airport.lon, 0).apply_angle_distance_m(-90, radius).lon
|
|
right = coord.Coord(airport.lon, 0).apply_angle_distance_m(90, radius).lon
|
|
upper = coord.Coord(0, airport.lat).apply_angle_distance_m(0, radius).lat
|
|
lower = coord.Coord(0, airport.lat).apply_angle_distance_m(180, radius).lat
|
|
query = overpass.overpassQueryBuilder(bbox=[lower, left, upper, right], elementType=element_type,
|
|
selector=query, out="center")
|
|
result = -1
|
|
retries = 0
|
|
while result == -1 and retries < max_retries:
|
|
try:
|
|
if element_type == "node":
|
|
result = osmapi.query(query, timeout=60).nodes()
|
|
elif element_type == "way":
|
|
result = osmapi.query(query, timeout=60).ways()
|
|
elif element_type == "relations":
|
|
result = osmapi.query(query, timeout=60).relations()
|
|
else:
|
|
result = []
|
|
qresult = osmapi.query(query, timeout=100)
|
|
if not qresult:
|
|
result = None
|
|
break
|
|
for etype in element_type:
|
|
result += getattr(qresult, etype + "s")() or []
|
|
except Exception as e:
|
|
if "timeout" in str(e.args).lower():
|
|
result = -1
|
|
else:
|
|
raise e
|
|
retries += 1
|
|
if result == -1:
|
|
print(f"API query for OSM {what} data for airport {airport.icao} timed out {retries} times - won't retry", end=" " * 100 + "\n")
|
|
result = []
|
|
if result == None:
|
|
result = []
|
|
print(f"No OSM {what} data found for airport {airport.icao}", end=" " * 100 + "\n")
|
|
return result
|
|
|
|
def add_osm_runways(airport):
|
|
result = get_osm_elements_near_airport(airport["airport"], "runway", '"aeroway"="runway"', "way")
|
|
osmways = []
|
|
for way in result:
|
|
first, last = way.nodes()[0], way.nodes()[-1]
|
|
if first.id() == last.id():
|
|
print("Got a runway mapped as area from OSM - not supported yet", end=" " * 100 + "\n")
|
|
continue
|
|
first = coord.Coord(first.lon(), first.lat())
|
|
last = coord.Coord(last.lon(), last.lat())
|
|
heading = first.angle(last)
|
|
if heading > 180:
|
|
first, last = last, first
|
|
heading -= 180
|
|
center = rectangle.Rectangle(last, first).midpoint()
|
|
distance = coord.Coord(airport["airport"].lon, airport["airport"].lat).distance_m(center)
|
|
osmways.append({"distance": distance, "heading": heading, "way": way, "first": first, "last": last})
|
|
osmways.sort(key=lambda r: r["distance"])
|
|
for i, runway in enumerate(airport["runways"]):
|
|
osmways_filtered = []
|
|
for osmway in osmways:
|
|
if "ref" in osmway["way"].tags():
|
|
# water runways somtimes have N, NE, etc. as identifier instead of 36, 04, etc.
|
|
mapping = {"N": "36", "NE": "04", "E": "09", "SE": "13", "S": "18", "SW": "22", "W": "27", "NW": "31"}
|
|
if runway["le_ident"] in mapping:
|
|
runway["le_ident"] = mapping[runway["le_ident"]]
|
|
if runway["he_ident"] in mapping:
|
|
runway["he_ident"] = mapping[runway["he_ident"]]
|
|
|
|
if osmway["way"].tags()["ref"] == runway["le_ident"] + "/" + runway["he_ident"]:
|
|
osmways_filtered = [osmway]
|
|
break
|
|
else:
|
|
heading, which = parse_runway_id(runway["le_ident"])
|
|
if heading <= osmway["heading"] < heading + 10:
|
|
if not which:
|
|
osmways_filtered = [osmway]
|
|
break
|
|
else:
|
|
osmways_filtered.append(osmway)
|
|
if len(osmways_filtered) == 3:
|
|
break
|
|
if len(osmways_filtered) == 0:
|
|
print("No OSM data found for runway", runway["le_ident"], "at airport", airport["airport"].icao, end=" " * 100 + "\n")
|
|
elif len(osmways_filtered) == 1: # just one matching runway - nothing left to do
|
|
runway["osmway"] = osmways_filtered[0]
|
|
elif len(osmways_filtered) == 2: # two parallel runways - sort from left to right and pick the right one
|
|
center1 = coord.Coord(osmways_filtered[0].centerLon(), osmways_filtered[0].centerLat())
|
|
center2 = coord.Coord(osmways_filtered[1].centerLon(), osmways_filtered[1].centerLat())
|
|
heading, which = parse_runway_id(runway["le_ident"])
|
|
rel_bearing = center1.angle(center2) - heading
|
|
index = 0
|
|
if rel_bearing > 0:
|
|
index = 0 if which == "L" else 1
|
|
else:
|
|
index = 1 if which == "L" else 0
|
|
runway["osmway"] = osmways_filtered[index]
|
|
else: # three or more parallel runways - sort the first three from left to right and pick the right one
|
|
center1 = coord.Coord(osmways_filtered[0].centerLon(), osmways_filtered[0].centerLat())
|
|
center2 = coord.Coord(osmways_filtered[1].centerLon(), osmways_filtered[1].centerLat())
|
|
center3 = coord.Coord(osmways_filtered[2].centerLon(), osmways_filtered[2].centerLat())
|
|
heading, which = parse_runway_id(runway["le_ident"])
|
|
rel_bearing1 = center1.angle(center2) - heading
|
|
rel_bearing2 = center2.angle(center3) - heading
|
|
index = 0
|
|
if rel_bearing1 > 0 and rel_bearing2 > 0:
|
|
index = "LCR".find(which)
|
|
elif rel_bearing1 <= 0 and rel_bearing2 > 0:
|
|
index = "CLR".find(which)
|
|
elif rel_bearing1 > 0 and rel_bearing2 <= 0:
|
|
index = "LRC".find(which)
|
|
else:
|
|
index = "RCL".find(which)
|
|
runway["osmway"] = osmways_filtered[index]
|
|
|
|
if not runway["le_longitude_deg"] or not runway["he_longitude_deg"]:
|
|
if "osmway" in runway:
|
|
runway["le_heading_degT"] = runway["osmway"]["heading"]
|
|
runway["he_heading_degT"] = runway["osmway"]["heading"] + 180
|
|
runway["le_longitude_deg"] = runway["osmway"]["first"].lon
|
|
runway["le_latitude_deg"] = runway["osmway"]["first"].lat
|
|
runway["he_longitude_deg"] = runway["osmway"]["last"].lon
|
|
runway["he_latitude_deg"] = runway["osmway"]["last"].lat
|
|
else:
|
|
print("No threshold information found for runway", runway["le_ident"], "at", airport["airport"].icao, "- removing !", end=" " * 100 + "\n")
|
|
airport["runways"][i] = None
|
|
airport["runways"] = list(filter(None, airport["runways"]))
|
|
|
|
def add_osm_helipads(airport):
|
|
result = get_osm_elements_near_airport(airport["airport"], "helipad", '"aeroway"="helipad"', ["node", "way"])
|
|
osmhelipads = []
|
|
counter = 0
|
|
for element in result:
|
|
if element.type() == "node":
|
|
c = coord.Coord(element.lon(), element.lat())
|
|
radius = 0
|
|
else:
|
|
lon_sum = lat_sum = 0
|
|
divider = 0
|
|
for node in element.nodes():
|
|
lon_sum += node.lon()
|
|
lat_sum += node.lat()
|
|
divider += 1
|
|
c = coord.Coord(lon_sum / divider, lat_sum / divider)
|
|
dist_sum = 0
|
|
for node in element.nodes():
|
|
dist_sum += c.distance_m(coord.Coord(node.lon(), node.lat()))
|
|
radius = dist_sum / divider
|
|
surface = ""
|
|
if "surface" in element.tags():
|
|
surface = element.tags()["surface"]
|
|
lit = None
|
|
if "lit" in element.tags():
|
|
lit = element.tags()["lit"] == "yes"
|
|
id = f"H{counter}"
|
|
counter += 1
|
|
osmhelipads.append({"coord": c, "radius": radius, "surface": surface, "id": id, "lit": None})
|
|
with_lon_lat = []
|
|
without_lon_lat = []
|
|
while len(airport["helipads"]):
|
|
helipad = airport["helipads"].pop()
|
|
if helipad["le_longitude_deg"] and helipad["le_latitude_deg"]:
|
|
with_lon_lat.append(helipad)
|
|
else:
|
|
without_lon_lat.append(helipad)
|
|
for helipad in with_lon_lat:
|
|
if len(osmhelipads) > 0:
|
|
for osmhelipad in osmhelipads:
|
|
osmhelipad["distance"] = osmhelipad["coord"].distance_m(coord.Coord(float(helipad["le_longitude_deg"]),
|
|
float(helipad["le_latitude_deg"])))
|
|
osmhelipads.sort(key=lambda d: d["distance"])
|
|
helipad["osmhelipad"] = osmhelipads.pop(0)
|
|
else:
|
|
helipad["osmhelipad"] = {}
|
|
|
|
for osmhelipad in osmhelipads:
|
|
osmhelipad["distance"] = osmhelipad["coord"].distance_m(coord.Coord(airport["airport"].lon, airport["airport"].lat))
|
|
osmhelipads.sort(key=lambda d: d["distance"])
|
|
for i, helipad in enumerate(without_lon_lat):
|
|
if len(osmhelipads) > 0:
|
|
helipad["osmhelipad"] = osmhelipads.pop(0)
|
|
helipad["le_longitude_deg"] = helipad["osmhelipad"]["coord"].lon
|
|
helipad["le_latitude_deg"] = helipad["osmhelipad"]["coord"].lat
|
|
else:
|
|
without_lon_lat[i] = None
|
|
if None in without_lon_lat:
|
|
print(f"No position information found for {without_lon_lat.count(None)} helipad(s) at {airport['airport'].icao} - removing", end=" " * 100 + "\n")
|
|
without_lon_lat = list(filter(None, without_lon_lat))
|
|
|
|
airport["helipads"] = with_lon_lat + without_lon_lat
|
|
|
|
for osmhelipad in osmhelipads:
|
|
helipad = {"airport_ident": airport["airport"].icao, "le_longitude_deg": osmhelipad["coord"].lon,
|
|
"le_latitude_deg": osmhelipad["coord"].lat, "lighted": osmhelipad["lit"], "surface": osmhelipad["surface"],
|
|
"length_ft": 0, "width_ft": 0, "osmhelipad": osmhelipad}
|
|
|
|
def add_ourairports_runways(airports):
|
|
csv = _get_ourairports_csv("runways")
|
|
i = 0
|
|
total = len(airports)
|
|
for airport in airports:
|
|
print(f"Extracting runways from OurAirports data … {i / total * 100:.1f}% ({i} of {total} airports done)", end="\r")
|
|
i += 1
|
|
runways = []
|
|
helipads = []
|
|
for line in csv:
|
|
if line["airport_ident"] == airport["ident"]:
|
|
if re.match(line["le_ident"], "H[0-9]*"):
|
|
helipads.append(line)
|
|
else:
|
|
runways.append(line)
|
|
else:
|
|
if runways or helipads:
|
|
break
|
|
airport["runways"] = runways
|
|
airport["helipads"] = helipads
|
|
add_osm_runways(airport)
|
|
add_osm_helipads(airport)
|
|
if len(airport["runways"]) == 0 and len(airport["helipads"]) == 0:
|
|
print(f"Removing airport {airport['airport'].icao} since it has no runways / helipads !", end=" " * 100 + "\n")
|
|
airports[i - 1] = None
|
|
continue
|
|
|
|
for runway in airport["runways"]:
|
|
surface = parse_surface_type(runway["surface"].lower())
|
|
if surface == "Unknown":
|
|
if "osmway" in runway:
|
|
if "surface" in runway["osmway"]["way"].tags():
|
|
surface = parse_surface_type(runway["osmway"]["way"].tags()["surface"])
|
|
|
|
if surface == "Unknown":
|
|
if (int(runway["length_ft"] or 0) > 1500 and int(runway["width_ft"] or 0) > 30) or int(runway["lighted"]):
|
|
surface = "Asphalt"
|
|
else:
|
|
surface = "Dirt"
|
|
print("Unknown surface type:", runway["surface"], "for runway", runway["le_ident"], "at airport", airport["airport"].icao, "- falling back to", surface, end=" " * 100 + "\n")
|
|
runway["surface"] = surface
|
|
print(f"Extracting runways from OurAirports data … {i / total * 100:.1f}% ({i} of {total} airports done)", end="\r\n")
|
|
|
|
airports = list(filter(None, airports))
|
|
|
|
i = 0
|
|
total = len(airports)
|
|
for airport in airports:
|
|
print(f"Creating runways … {i / total * 100}% ({i} of {total} airports done)", end="\r")
|
|
i += 1
|
|
for runway in airport["runways"]:
|
|
width = runway["width_ft"]
|
|
if width != "":
|
|
width = float(width)
|
|
elif "osmway" in runway and "width" in runway["osmway"]["way"].tags():
|
|
width = round(float(runway["osmway"]["way"].tags()["width"]), 2)
|
|
else:
|
|
print(f"No width found for runway {runway['le_ident']} at airport {airport['airport'].icao} - guessing from length", end=" " * 100 + "\n")
|
|
width = math.sqrt(int(runway["length_ft"] or 0))
|
|
if runway["surface"] == "water":
|
|
runway = aptdat.WaterRunway(unit_convert.ft2m(width),
|
|
runway["le_ident"], float(runway["le_longitude_deg"]), float(runway["le_latitude_deg"]),
|
|
runway["he_ident"], float(runway["he_longitude_deg"]), float(runway["he_latitude_deg"]),
|
|
perimeter_buoys=True)
|
|
else:
|
|
center_lights = edge_lights = bool(runway["lighted"])
|
|
if center_lights and surface not in ("Asphalt", "Concrete"):
|
|
center_lights = edge_lights = False
|
|
distance_signs = int(runway["length_ft"] or 0) > 4000
|
|
tdz_lights = runway["surface"] in ("Asphalt", "Concrete")
|
|
markings = aptdat.RunwayMarkingCode.Visual
|
|
if runway["surface"] not in ("Asphalt", "Concrete"):
|
|
markings = aptdat.RunwayMarkingCode.NoMarkings
|
|
elif 4000 < int(runway["length_ft"] or 0) < 6000:
|
|
markings = aptdat.RunwayMarkingCode.NonPrecision
|
|
elif int(runway["length_ft"] or 0) >= 6000:
|
|
markings = aptdat.RunwayMarkingCode.Precision
|
|
reil_type = aptdat.REILCode.NoREIL
|
|
if markings == aptdat.RunwayMarkingCode.NonPrecision:
|
|
reil_type = aptdat.REILCode.UnidirREIL
|
|
|
|
runway = aptdat.LandRunway(unit_convert.ft2m(width), getattr(aptdat.SurfaceCode, runway["surface"]),
|
|
runway["le_ident"], float(runway["le_longitude_deg"]), float(runway["le_latitude_deg"]),
|
|
runway["he_ident"], float(runway["he_longitude_deg"]), float(runway["he_latitude_deg"]),
|
|
center_lights=center_lights, edge_lights=edge_lights, distance_signs=distance_signs,
|
|
displ_thresh1=float(runway["le_displaced_threshold_ft"] or 0), tdz_lights1=tdz_lights,
|
|
markings1=markings, reil_type1=reil_type,
|
|
displ_thresh2=float(runway["he_displaced_threshold_ft"] or 0), tdz_lights2=tdz_lights,
|
|
markings2=markings, reil_type2=reil_type)
|
|
airport["airport"].add_runway(runway)
|
|
print(f"Creating runways … {i / total * 100}% ({i} of {total} airports done)")
|
|
|
|
i = 0
|
|
for airport in airports:
|
|
print("Creating helipads … {i / total * 100}% ({i} of {total} airports done)", end="\r")
|
|
i += 1
|
|
for helipad in airport["helipads"]:
|
|
if helipad["width_ft"]:
|
|
width = round(float(helipad["width_ft"]), 2)
|
|
elif "radius" in helipad["osmhelipad"]:
|
|
width = radius * 2
|
|
else:
|
|
width = 50
|
|
print((f"Unable to get width for for helipad {helipad['le_ident']} at airport {airport['airport'].icao}" +
|
|
f" - setting to {width} ft"), end=" " * 100 + "\n")
|
|
if helipad["length_ft"]:
|
|
length = round(float(helipad["length_ft"]), 2)
|
|
elif "radius" in helipad["osmhelipad"]:
|
|
length = radius * 2
|
|
else:
|
|
length = 50
|
|
print((f"Unable to get length for for helipad {helipad['le_ident']} at airport {airport['airport'].icao}" +
|
|
f" - setting to {length} ft"), end=" " * 100 + "\n")
|
|
|
|
lighted = bool(int(helipad["lighted"]))
|
|
surface = parse_surface_type(helipad["surface"])
|
|
if surface == "Unknown" and "surface" in helipad["osmhelipad"]:
|
|
surface = parse_surface_type(helipad["osmhelipad"]["surface"])
|
|
if surface == "Unknown":
|
|
if lighted:
|
|
surface = "Asphalt"
|
|
else:
|
|
surface = "Grass"
|
|
|
|
print((f"Unknown surface type {helipad['surface']} for helipad {helipad['le_ident']} at" +
|
|
f" airport {airport['airport'].icao} - setting to {surface}"), end=" " * 100 + "\n")
|
|
|
|
if surface not in ("Concrete", "Asphalt"):
|
|
lighted = False
|
|
print(helipad)
|
|
helipad = aptdat.Helipad(helipad["id"], float(helipad["le_longitude_deg"]), float(helipad["le_latitude_deg"]), 0,
|
|
unit_convert.ft2m(length), unit_convert.ft2m(width), surface, edge_lights=lighted)
|
|
airport["airport"].add_helipad(helipad)
|
|
airports[i - 1] = airport["airport"]
|
|
return airports
|
|
|
|
def query_airports_by_icaos(icaos):
|
|
# remove doubles
|
|
icaos = list(set(icaos))
|
|
ourairports = get_ourairports_airports(icaos=icaos)
|
|
ourairports = add_ourairports_runways(ourairports)
|
|
return ourairports
|
|
|
|
def query_airports_by_bbox(left, lower, right, upper):
|
|
ourairports = get_ourairports_airports(bbox=rectangle.Rectangle(coord.Coord(left, lower), coord.Coord(right, upper)))
|
|
ourairports = add_ourairports_runways(ourairports)
|
|
return ourairports
|
|
|
|
def check_aptdat_written_by_this(path):
|
|
with open(path, "r") as f:
|
|
i = 0
|
|
while i < 2:
|
|
fl = f.readline()
|
|
if fl:
|
|
i += 1
|
|
if "osm2aptdat.py" in fl:
|
|
return True
|
|
return False
|
|
|
|
def write_aptdat_files(output, airports, merge=False):
|
|
writer = aptdat.ReaderWriterAptDat(file_header="Generated from OSM and OurAirports data by fgtools.osm2aptdat.py")
|
|
writer.add_airports(airports)
|
|
writer.write(output, merge=merge, overwrite_func=check_aptdat_written_by_this)
|
|
|
|
if __name__ == "__main__":
|
|
logging.getLogger("OSMPythonTools").setLevel(logging.FATAL)
|
|
|
|
argp = argparse.ArgumentParser(description="query airports from OSM and convert the results to apt.dat files")
|
|
|
|
bbox_icao_group = argp.add_mutually_exclusive_group(required=True)
|
|
bbox_icao_group.add_argument(
|
|
"-b", "--bbox",
|
|
help="GPS coordinates of the lower left and upper right corners of the bounding box within which all airports should be processed",
|
|
nargs=4,
|
|
metavar=("LL_LON", "LL_LAT", "UR_LON", "UR_LAT"),
|
|
type=float,
|
|
)
|
|
|
|
bbox_icao_group.add_argument(
|
|
"-i", "--icao",
|
|
help="ICAO code(s) of the airport(s) to process",
|
|
nargs="+"
|
|
)
|
|
|
|
argp.add_argument(
|
|
"-m", "--merge",
|
|
help="Merge all airports into one big apt.dat file instead of writing one file for each airport",
|
|
action="store_true"
|
|
)
|
|
|
|
argp.add_argument(
|
|
"-o", "--output",
|
|
help="directory to put apt.dat files into",
|
|
required=True
|
|
)
|
|
|
|
args = argp.parse_args()
|
|
|
|
if args.icao:
|
|
airports = query_airports_by_icaos(args.icao)
|
|
else:
|
|
airports = query_airports_by_bbox(left=args.bbox[0], lower=args.bbox[1], right=args.bbox[2], upper=args.bbox[3])
|
|
|
|
write_aptdat_files(args.output, airports, merge=args.merge)
|
|
|