24 Commits
v2.9 ... v2.10

Author SHA1 Message Date
Junzi Sun
5dc74a5548 new release 2022-04-18 11:59:14 +02:00
Junzi Sun
e7edba9e25 Merge pull request #121 from Flyer350/patch-9
Update bds20.py
2022-04-18 11:28:15 +02:00
Junzi Sun
ce5adbafe5 Update bds20.py 2022-04-18 11:27:53 +02:00
Flyer350
b52431ca51 Update bds20.py
#message with empty callsign can still be BDS2,0

Example messages:
a900189f20000000000000d0d739
a100000020000000000000a81785
2022-01-24 10:38:17 +01:00
Junzi Sun
00f04a8886 Merge pull request #119 from Flyer350/patch-8
fix bug in squawk decoding for BDS61
2021-11-22 10:23:32 +01:00
Flyer350
0eb2d1f6a2 Update bds61.py
Fix the Mode A decoder. The ZERO is already part of  the binary data inside the message and does not need to be added.

See ED-102A, 2.2.3.2.7.8.1.2 b. Starting with ―ME‖ bit 12, the code sequence shall be C1, A1, C2, A2, C4, A4, ZERO, B1, D1, B2, D2, B4, D4.
2021-11-18 11:24:42 +01:00
Junzi Sun
26ef5d3ad9 Merge pull request #116 from dforsi/fix/typos
Fix typos in doc strings
2021-11-02 22:10:54 +01:00
Daniele Forsi
b90c2e740d Fix typos in print() strings 2021-10-09 23:17:31 +02:00
Daniele Forsi
e39baf38ed Fix typos 2021-10-09 22:50:28 +02:00
Junzi Sun
b9471d7fcb move functions to py_common 2021-10-09 00:02:33 +02:00
Junzi Sun
a350050e6e Merge pull request #112 from maranov/bugfix/airborne_velocity_trk
Fix UnboundLocalError in airborne_velocity when velocity is zero
2021-07-07 22:47:51 +02:00
Junzi Sun
01574a9f01 Merge pull request #111 from maranov/bugfix/tcpclient_disconnect
Fix invalid socket disconnect call in TcpClient
2021-07-07 22:46:21 +02:00
Junzi Sun
cdb7aef82e Merge pull request #108 from dforsi/fix/typos
Fix typos
2021-07-07 22:45:22 +02:00
Novak, Marek
a0a8c9b2f7 Fix UnboundLocalError in airborne_velocity when velocity is zero 2021-06-28 17:21:33 +02:00
maranov
0b2648bfe0 Fix invalid socket disconnect call in TcpClient
The testing main function now also stops the Client explicitly
2021-06-21 17:26:58 +02:00
Daniele Forsi
69ce39ab39 Fix typos 2021-05-09 14:28:46 +02:00
Junzi Sun
83e22892ba Merge pull request #98 from TimA346/implement_tc29
Decoder for ADS-B TC=29 (target state and status message)
2021-03-12 18:07:19 +01:00
Junzi Sun
7434fc9ed3 Merge pull request #103 from amhirsch/spelling-magnetic
Fixed spelling of "magnetic_north" in docstring
2021-03-12 13:37:27 +01:00
Junzi Sun
7260bff7e9 Merge pull request #101 from Flyer350/patch-7
Update uplink.py
2021-03-12 13:34:19 +01:00
Xander Hirsch
dd1fd596f8 Fixed spelling of "magnetic_north" in docstring 2021-03-07 23:27:08 -08:00
Flyer350
b9089d55d2 Update uplink.py 2021-02-24 15:28:59 +01:00
TimA346
6ab147bffe Added test for target state and status message subtype 1 2021-01-23 11:51:31 +01:00
TimA346
a308b9a7e0 implemented tc29
BDS6,2
2021-01-23 03:05:48 +01:00
TimA346
96f49a00e4 Merge pull request #1 from junzis/master
Update
2021-01-23 02:52:40 +01:00
24 changed files with 697 additions and 121 deletions

View File

@@ -1,7 +1,7 @@
The Python ADS-B/Mode-S Decoder
===============================
PyModeS is a Python library designed to decode Mode-S (including ADS-B) message. It can be imported to your python project or used as a standalone tool to view and save live traffic data.
PyModeS is a Python library designed to decode Mode-S (including ADS-B) messages. It can be imported to your python project or used as a standalone tool to view and save live traffic data.
This is a project created by Junzi Sun, who works at `TU Delft <https://www.tudelft.nl/en/>`_, `Aerospace Engineering Faculty <https://www.tudelft.nl/en/ae/>`_, `CNS/ATM research group <http://cs.lr.tudelft.nl/atm/>`_. It is supported by many `contributors <https://github.com/junzis/pyModeS/graphs/contributors>`_ from different institutions.

View File

@@ -25,7 +25,7 @@ cdef unsigned char int_to_char(unsigned char i):
@cython.boundscheck(False)
@cython.overflowcheck(False)
cpdef str hex2bin(str hexstr):
"""Convert a hexdecimal string to binary string, with zero fillings."""
"""Convert a hexadecimal string to binary string, with zero fillings."""
# num_of_bits = len(hexstr) * 4
cdef hexbytes = bytes(hexstr.encode())
cdef Py_ssize_t len_hexstr = PyBytes_GET_SIZE(hexbytes)
@@ -73,7 +73,7 @@ cpdef str bin2hex(str binstr):
@cython.boundscheck(False)
cpdef unsigned char df(str msg):
"""Decode Downlink Format vaule, bits 1 to 5."""
"""Decode Downlink Format value, bits 1 to 5."""
cdef str dfbin = hex2bin(msg[:2])
# return min(bin2int(dfbin[0:5]), 24)
cdef long df = bin2int(dfbin[0:5])

View File

@@ -22,7 +22,7 @@ def tell(msg: str) -> None:
tc = common.typecode(msg)
if 1 <= tc <= 4: # callsign
callsign = adsb.callsign(msg)
_print("Type", "Identitification and category")
_print("Type", "Identification and category")
_print("Callsign:", callsign)
if 5 <= tc <= 8: # surface position
@@ -71,6 +71,68 @@ def tell(msg: str) -> None:
_print("CPR Longitude", cprlon)
_print("Altitude", alt, "feet")
if tc == 29: # target state and status
_print("Type", "Target State and Status")
subtype = common.bin2int((common.hex2bin(msg)[32:])[5:7])
_print("Subtype", subtype)
tcas_operational = adsb.tcas_operational(msg)
types = {0: "Not Engaged", 1: "Engaged"}
tcas_operational_types = {0: "Not Operational", 1: "Operational"}
if subtype == 0:
emergency_types = {
0: "No emergency",
1: "General emergency",
2: "Lifeguard/medical emergency",
3: "Minimum fuel",
4: "No communications",
5: "Unlawful interference",
6: "Downed aircraft",
7: "Reserved"
}
vertical_horizontal_types = {
1: "Acquiring mode",
2: "Capturing/Maintaining mode"
}
tcas_ra_types = {0: "Not active", 1: "Active"}
alt, alt_source, alt_ref = adsb.target_altitude(msg)
angle, angle_type, angle_source = adsb.target_angle(msg)
vertical_mode = adsb.vertical_mode(msg)
horizontal_mode = adsb.horizontal_mode(msg)
tcas_ra = adsb.tcas_ra(msg)
emergency_status = adsb.emergency_status(msg)
_print("Target altitude", alt, "feet")
_print("Altitude source", alt_source)
_print("Altitude reference", alt_ref)
_print("Angle", angle, "°")
_print("Angle Type", angle_type)
_print("Angle Source", angle_source)
_print("Vertical mode", vertical_horizontal_types[vertical_mode])
_print("Horizontal mode", vertical_horizontal_types[horizontal_mode])
_print("TCAS/ACAS", tcas_operational_types[tcas_operational])
_print("TCAS/ACAS RA", tcas_ra_types[tcas_ra])
_print("Emergency status", emergency_types[emergency_status])
else:
alt, alt_source = adsb.selected_altitude(msg)
baro = adsb.baro_pressure_setting(msg)
hdg = adsb.selected_heading(msg)
autopilot = adsb.autopilot(msg)
vnav = adsb.vnav_mode(msg)
alt_hold = adsb.altitude_hold_mode(msg)
app = adsb.approach_mode(msg)
lnav = adsb.lnav_mode(msg)
_print("Selected altitude", alt, "feet")
_print("Altitude source", alt_source)
_print("Barometric pressure setting", baro, "millibars")
_print("Selected Heading", hdg, "°")
if not(common.bin2int((common.hex2bin(msg)[32:])[46]) == 0):
_print("Autopilot", types[autopilot])
_print("VNAV mode", types[vnav])
_print("Altitude hold mode", types[alt_hold])
_print("Approach mode", types[app])
_print("TCAS/ACAS", tcas_operational_types[tcas_operational])
_print("LNAV mode", types[lnav])
if df == 20:
_print("Protocol", "Mode-S Comm-B altitude reply")
_print("Altitude", common.altcode(msg), "feet")

View File

@@ -29,6 +29,23 @@ from pyModeS.decoder.bds.bds06 import (
from pyModeS.decoder.bds.bds08 import category, callsign
from pyModeS.decoder.bds.bds09 import airborne_velocity, altitude_diff
from pyModeS.decoder.bds.bds61 import is_emergency, emergency_state, emergency_squawk
from pyModeS.decoder.bds.bds62 import (
selected_altitude,
selected_heading,
target_altitude,
target_angle,
tcas_operational,
tcas_ra,
baro_pressure_setting,
vertical_mode,
horizontal_mode,
vnav_mode,
lnav_mode,
autopilot,
altitude_hold_mode,
approach_mode,
emergency_status
)
def df(msg):
@@ -92,7 +109,7 @@ def position_with_ref(msg, lat_ref, lon_ref):
A reference position is required, which can be previously
calculated location, ground station, or airport location.
The function works with both airborne and surface position messages.
The reference position shall be with in 180NM (airborne) or 45NM (surface)
The reference position shall be within 180NM (airborne) or 45NM (surface)
of the true position.
Args:
@@ -146,15 +163,15 @@ def velocity(msg, source=False):
Args:
msg (str): 28 hexdigits string
source (boolean): Include direction and vertical rate sources in return. Default to False.
If set to True, the function will return six value instead of four.
If set to True, the function will return six values instead of four.
Returns:
int, float, int, string, [string], [string]: Four or six parameters, including:
(int, float, int, string, [string], [string]): Four or six parameters, including:
- Speed (kt)
- Angle (degree), either ground track or heading
- Vertical rate (ft/min)
- Speed type ('GS' for ground speed, 'AS' for airspeed)
- [Optional] Direction source ('TRUE_NORTH' or 'MAGENTIC_NORTH')
- [Optional] Direction source ('TRUE_NORTH' or 'MAGNETIC_NORTH')
- [Optional] Vertical rate source ('BARO' or 'GNSS')
For surface messages, vertical rate and its respective sources are set to None.
@@ -324,7 +341,7 @@ def nic_v2(msg, NICa, NICbc):
Args:
msg (str): 28 hexdigits string
NICa (int or string): NIC supplement - A
NICbc (int or srting): NIC supplement - B or C
NICbc (int or string): NIC supplement - B or C
Returns:
int or string: Horizontal Radius of Containment

View File

@@ -1,5 +1,5 @@
"""
Decode all-call reply messages, with dowlink format 11
Decode all-call reply messages, with downlink format 11
"""
from pyModeS import common

View File

@@ -38,6 +38,7 @@ from pyModeS.decoder.bds import (
bds50,
bds53,
bds60,
bds62
)

View File

@@ -82,7 +82,7 @@ def airborne_position_with_ref(msg, lat_ref, lon_ref):
"""Decode airborne position with only one message,
knowing reference nearby location, such as previously calculated location,
ground station, or airport location, etc. The reference position shall
be with in 180NM of the true position.
be within 180NM of the true position.
Args:
msg (str): even message (28 hexdigits)

View File

@@ -1,7 +1,7 @@
# ------------------------------------------
# BDS 0,6
# ADS-B TC=5-8
# Surface movment
# Surface movement
# ------------------------------------------
from pyModeS import common
@@ -86,7 +86,7 @@ def surface_position_with_ref(msg, lat_ref, lon_ref):
"""Decode surface position with only one message,
knowing reference nearby location, such as previously calculated location,
ground station, or airport location, etc. The reference position shall
be with in 45NM of the true position.
be within 45NM of the true position.
Args:
msg (str): even message (28 hexdigits)
@@ -133,7 +133,7 @@ def surface_velocity(msg, source=False):
Args:
msg (str): 28 hexdigits string
source (boolean): Include direction and vertical rate sources in return. Default to False.
If set to True, the function will return six value instead of four.
If set to True, the function will return six values instead of four.
Returns:
int, float, int, string, [string], [string]: Four or six parameters, including:

View File

@@ -1,7 +1,7 @@
# ------------------------------------------
# BDS 0,8
# ADS-B TC=1-4
# Aircraft identitification and category
# Aircraft identification and category
# ------------------------------------------
from pyModeS import common

View File

@@ -16,7 +16,7 @@ def airborne_velocity(msg, source=False):
Args:
msg (str): 28 hexdigits string
source (boolean): Include direction and vertical rate sources in return. Default to False.
If set to True, the function will return six value instead of four.
If set to True, the function will return six values instead of four.
Returns:
int, float, int, string, [string], [string]: Four or six parameters, including:
@@ -65,8 +65,9 @@ def airborne_velocity(msg, source=False):
trk = math.degrees(trk) # convert to degrees
trk = trk if trk >= 0 else trk + 360 # no negative val
trk_or_hdg = round(trk, 2)
spd_type = "GS"
trk_or_hdg = round(trk, 2)
dir_type = "TRUE_NORTH"
else:

View File

@@ -45,7 +45,7 @@ def cap17(msg):
msg (str): 28 hexdigits string
Returns:
list: list of support BDS codes
list: list of supported BDS codes
"""
allbds = [
"05",

View File

@@ -24,9 +24,11 @@ def is20(msg):
if d[0:8] != "00100000":
return False
cs = cs20(msg)
# allow empty callsign
if common.bin2int(d[8:56]) == 0
return True
if "#" in cs:
if "#" in cs20(msg):
return False
return True

View File

@@ -142,7 +142,7 @@ def hum44(msg):
def turb44(msg):
"""Turblence.
"""Turbulence.
Args:
msg (str): 28 hexdigits string

View File

@@ -158,7 +158,7 @@ def vr60baro(msg):
def vr60ins(msg):
"""Vertical rate measurd by onbard equiments (IRS, AHRS)
"""Vertical rate measured by onboard equipment (IRS, AHRS)
Args:
msg (str): 28 hexdigits string

View File

@@ -77,7 +77,7 @@ def emergency_squawk(msg: str) -> str:
msgbin = common.hex2bin(msg)
# construct the 13 bits Mode A ID code
idcode = msgbin[43:49] + "0" + msgbin[49:55]
idcode = msgbin[43:56]
squawk = common.squawk(idcode)
return squawk

View File

@@ -0,0 +1,474 @@
# ------------------------------------------
# BDS 6,2
# ADS-B TC=29
# Target State and Status
# ------------------------------------------
from pyModeS import common
def selected_altitude(msg):
"""Decode selected altitude.
Args:
msg (str): 28 hexdigits string
Returns:
int: Selected altitude (ft)
string: Source ('MCP/FCU' or 'FMS')
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain selected altitude, use target altitude instead" % msg)
alt = common.bin2int(mb[9:20])
alt = None if alt == 0 else (alt - 1) * 32
alt_source = "MCP/FCU" if int(mb[8]) == 0 else "FMS"
return alt, alt_source
def target_altitude(msg):
"""Decode target altitude.
Args:
msg (str): 28 hexdigits string
Returns:
int: Target altitude (ft)
string: Source ('MCP/FCU', 'Holding mode' or 'FMS/RNAV')
string: Altitude reference, either pressure altitude or barometric corrected altitude ('FL' or 'MSL')
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain target altitude, use selected altitude instead" % msg)
alt_avail = common.bin2int(mb[7:9])
if alt_avail == 0:
return None
elif alt_avail == 1:
alt_source = "MCP/FCU"
elif alt_avail == 2:
alt_source = "Holding mode"
else:
alt_source = "FMS/RNAV"
alt_ref = "FL" if int(mb[9]) == 0 else "MSL"
alt = -1000 + common.bin2int(mb[15:25]) * 100
return alt, alt_source, alt_ref
def vertical_mode(msg):
"""Decode vertical mode.
Value Meaning
----- -----------------------
1 "Acquiring" mode
2 "Capturing" or "Maintaining" mode
3 Reserved
Args:
msg (str): 28 hexdigits string
Returns:
int: Vertical mode
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain vertical mode, use vnav mode instead" % msg)
vertical_mode = common.bin2int(mb[13:15])
if vertical_mode == 0:
return None
return vertical_mode
def horizontal_mode(msg):
"""Decode horizontal mode.
Value Meaning
----- -----------------------
1 "Acquiring" mode
2 "Capturing" or "Maintaining" mode
3 Reserved
Args:
msg (str): 28 hexdigits string
Returns:
int: Horizontal mode
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain horizontal mode, use lnav mode instead" % msg)
horizontal_mode = common.bin2int(mb[25:27])
if horizontal_mode == 0:
return None
return horizontal_mode
def selected_heading(msg):
"""Decode selected heading.
Args:
msg (str): 28 bytes hexadecimal message string
Returns:
float: Selected heading (degree)
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain selected heading, use target angle instead" % msg)
if int(mb[29]) == 0:
hdg = None
else:
hdg_sign = int(mb[30])
hdg = (hdg_sign+1) * common.bin2int(mb[31:39]) * (180/256)
hdg = round(hdg, 2)
return hdg
def target_angle(msg):
"""Decode target heading/track angle.
Args:
msg (str): 28 bytes hexadecimal message string
Returns:
int: Target angle (degree)
string: Angle type ('Heading' or 'Track')
string: Source ('MCP/FCU', 'Autopilot Mode' or 'FMS/RNAV')
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain target angle, use selected heading instead" % msg)
angle_avail = common.bin2int(mb[25:27])
if angle_avail == 0:
angle = None
else:
angle = common.bin2int(mb[27:36])
if angle_avail == 1:
angle_source = "MCP/FCU"
elif angle_avail == 2:
angle_source = "Autopilot mode"
else:
angle_source = "FMS/RNAV"
angle_type = "Heading" if int(mb[36]) else "Track"
return angle, angle_type, angle_source
def baro_pressure_setting(msg):
"""Decode barometric pressure setting.
Args:
msg (str): 28 hexdigits string
Returns:
float: Barometric pressure setting (millibars)
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain barometric pressure setting" % msg)
baro = common.bin2int(mb[20:29])
baro = None if baro == 0 else 800 + (baro - 1) * 0.8
baro = round(baro, 1)
return baro
def autopilot(msg) -> bool:
"""Decode autopilot engagement.
Args:
msg (str): 28 hexdigits string
Returns:
bool: Autopilot engaged
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain autopilot engagement" % msg)
if int(mb[46]) == 0:
return None
autopilot = True if int(mb[47]) == 1 else False
return autopilot
def vnav_mode(msg) -> bool:
"""Decode VNAV mode.
Args:
msg (str): 28 hexdigits string
Returns:
bool: VNAV mode engaged
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain vnav mode, use vertical mode instead" % msg)
if int(mb[46]) == 0:
return None
vnav_mode = True if int(mb[48]) == 1 else False
return vnav_mode
def altitude_hold_mode(msg) -> bool:
"""Decode altitude hold mode.
Args:
msg (str): 28 hexdigits string
Returns:
bool: Altitude hold mode engaged
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain altitude hold mode" % msg)
if int(mb[46]) == 0:
return None
alt_hold_mode = True if int(mb[49]) == 1 else False
return alt_hold_mode
def approach_mode(msg) -> bool:
"""Decode approach mode.
Args:
msg (str): 28 hexdigits string
Returns:
bool: Approach mode engaged
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain approach mode" % msg)
if int(mb[46]) == 0:
return None
app_mode = True if int(mb[51]) == 1 else False
return app_mode
def lnav_mode(msg) -> bool:
"""Decode LNAV mode.
Args:
msg (str): 28 hexdigits string
Returns:
bool: LNAV mode engaged
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 0:
raise RuntimeError("%s: ADS-B version 1 target state and status message does not contain lnav mode, use horizontal mode instead" % msg)
if int(mb[46]) == 0:
return None
lnav_mode = True if int(mb[53]) == 1 else False
return lnav_mode
def tcas_operational(msg) -> bool:
"""Decode TCAS/ACAS operational.
Args:
msg (str): 28 bytes hexadecimal message string
Returns:
bool: TCAS/ACAS operational
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 0:
tcas = True if int(mb[51]) == 0 else False
else:
tcas = True if int(mb[52]) == 1 else False
return tcas
def tcas_ra(msg) -> bool:
"""Decode TCAS/ACAS Resolution advisory.
Args:
msg (str): 28 bytes hexadecimal message string
Returns:
bool: TCAS/ACAS Resolution advisory active
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain TCAS/ACAS RA" % msg)
tcas_ra = True if int(mb[52]) == 1 else False
return tcas_ra
def emergency_status(msg) -> int:
"""Decode aircraft emergency status.
Value Meaning
----- -----------------------
0 No emergency
1 General emergency
2 Lifeguard/medical emergency
3 Minimum fuel
4 No communications
5 Unlawful interference
6 Downed aircraft
7 Reserved
Args:
msg (str): 28 bytes hexadecimal message string
Returns:
int: Emergency status
"""
if common.typecode(msg) != 29:
raise RuntimeError("%s: Not a target state and status message, expecting TC=29" % msg)
mb = common.hex2bin(msg)[32:]
subtype = common.bin2int(mb[5:7])
if subtype == 1:
raise RuntimeError("%s: ADS-B version 2 target state and status message does not contain emergency status" % msg)
return common.bin2int(mb[53:56])

View File

@@ -36,3 +36,5 @@ from pyModeS.decoder.bds.bds60 import *
# MRAR and MHR
from pyModeS.decoder.bds.bds44 import *
from pyModeS.decoder.bds.bds45 import *
from pyModeS.py_common import fs, dr, um

View File

@@ -3,6 +3,7 @@ Decode short roll call surveillance replies, with downlink format 4 or 5
"""
from pyModeS import common
from pyModeS.py_common import fs, dr, um
def _checkdf(func):
@@ -19,91 +20,6 @@ def _checkdf(func):
return wrapper
@_checkdf
def fs(msg):
"""Decode flight status.
Args:
msg (str): 14 hexdigits string
Returns:
int, str: flight status, description
"""
msgbin = common.hex2bin(msg)
fs = common.bin2int(msgbin[5:8])
text = None
if fs == 0:
text = "no alert, no SPI, aircraft is airborne"
elif fs == 1:
text = "no alert, no SPI, aircraft is on-ground"
elif fs == 2:
text = "alert, no SPI, aircraft is airborne"
elif fs == 3:
text = "alert, no SPI, aircraft is on-ground"
elif fs == 4:
text = "alert, SPI, aircraft is airborne or on-ground"
elif fs == 5:
text = "no alert, SPI, aircraft is airborne or on-ground"
return fs, text
@_checkdf
def dr(msg):
"""Decode downlink request.
Args:
msg (str): 14 hexdigits string
Returns:
int, str: downlink request, description
"""
msgbin = common.hex2bin(msg)
dr = common.bin2int(msgbin[8:13])
text = None
if dr == 0:
text = "no downlink request"
elif dr == 1:
text = "request to send Comm-B message"
elif dr == 4:
text = "Comm-B broadcast 1 available"
elif dr == 5:
text = "Comm-B broadcast 2 available"
elif dr >= 16:
text = "ELM downlink segments available: {}".format(dr - 15)
return dr, text
@_checkdf
def um(msg):
"""Decode utility message.
Utility message contains interrogator identifier and reservation type.
Args:
msg (str): 14 hexdigits string
Returns:
int, str: interrogator identifier code that triggered the reply, and
reservation type made by the interrogator
"""
msgbin = common.hex2bin(msg)
iis = common.bin2int(msgbin[13:17])
ids = common.bin2int(msgbin[17:19])
if ids == 0:
ids_text = None
if ids == 1:
ids_text = "Comm-B interrogator identifier code"
if ids == 2:
ids_text = "Comm-C interrogator identifier code"
if ids == 3:
ids_text = "Comm-D interrogator identifier code"
return iis, ids, ids_text
@_checkdf
def altitude(msg):
"""Decode altitude.

View File

@@ -43,12 +43,12 @@ def bds(msg):
RRS = mbytes[2] & 0x0F
BDS2 = RRS
elif di == 3:
RRS = ((mbytes[2] & 0x1) << 4) | ((mbytes[3] & 0xE0) >> 5)
RRS = ((mbytes[2] & 0x1) << 3) | ((mbytes[3] & 0xE0) >> 5)
BDS2 = RRS
else:
BDS2 = 0 # for other values of DI, the BDS2 is assumed 0 (as per ICAO Annex 10 Vol IV)
return str(BDS1) + str(BDS2)
return str(format(BDS1,"X")) + str(format(BDS2,"X"))
else:
return None
else:
@@ -154,8 +154,7 @@ def uplink_fields(msg):
di = ""
RR = ""
RRS = ""
BDS1 = ""
BDS2 = ""
BDS = ""
if uf(msg) == 11:
@@ -208,8 +207,11 @@ def uplink_fields(msg):
# SI
SI = (mbytes[2] >> 2) & 0x3F
IC = "SI" + str(SI)
RRS = ((mbytes[2] & 0x1) << 4) | ((mbytes[3] & 0xE0) >> 5)
RRS = ((mbytes[2] & 0x1) << 3) | ((mbytes[3] & 0xE0) >> 5)
BDS2 = RRS
if RR > 15:
BDS = str(format(BDS1,"X")) + str(format(BDS2,"X"))
return {
"DI": di,
"IC": IC,
@@ -217,5 +219,5 @@ def uplink_fields(msg):
"PR": PR,
"RR": RR,
"RRS": RRS,
"BDS": str(BDS1) + str(BDS2),
"BDS": BDS,
}

View File

@@ -7,7 +7,7 @@ try:
import rtlsdr
except:
print("------------------------------------------------------------------------")
print("! Warining: pyrtlsdr not installed (required for using RTL-SDR devices) ")
print("! Warning: pyrtlsdr not installed (required for using RTL-SDR devices) !")
print("------------------------------------------------------------------------")
sampling_rate = 2e6

View File

@@ -32,7 +32,7 @@ class TcpClient(object):
self.socket.connect("tcp://%s:%s" % (self.host, self.port))
def stop(self):
self.socket.disconnect()
self.socket.close()
def read_raw_buffer(self):
""" Read raw ADS-B data type.
@@ -292,4 +292,7 @@ if __name__ == "__main__":
port = int(sys.argv[2])
datatype = sys.argv[3]
client = TcpClient(host=host, port=port, datatype=datatype)
client.run()
try:
client.run()
finally:
client.stop()

View File

@@ -5,14 +5,14 @@ from textwrap import wrap
def hex2bin(hexstr: str) -> str:
"""Convert a hexdecimal string to binary string, with zero fillings."""
"""Convert a hexadecimal string to binary string, with zero fillings."""
num_of_bits = len(hexstr) * 4
binstr = bin(int(hexstr, 16))[2:].zfill(int(num_of_bits))
return binstr
def hex2int(hexstr: str) -> int:
"""Convert a hexdecimal string to integer."""
"""Convert a hexadecimal string to integer."""
return int(hexstr, 16)
@@ -22,7 +22,7 @@ def bin2int(binstr: str) -> int:
def bin2hex(binstr: str) -> str:
"""Convert a binary string to hexdecimal string."""
"""Convert a binary string to hexadecimal string."""
return "{0:X}".format(int(binstr, 2))
@@ -404,3 +404,85 @@ def wrongstatus(data: str, sb: int, msb: int, lsb: int) -> bool:
return True
return False
def fs(msg):
"""Decode flight status for DF 4, 5, 20, and 21.
Args:
msg (str): 14 hexdigits string
Returns:
int, str: flight status, description
"""
msgbin = hex2bin(msg)
fs = bin2int(msgbin[5:8])
text = None
if fs == 0:
text = "no alert, no SPI, aircraft is airborne"
elif fs == 1:
text = "no alert, no SPI, aircraft is on-ground"
elif fs == 2:
text = "alert, no SPI, aircraft is airborne"
elif fs == 3:
text = "alert, no SPI, aircraft is on-ground"
elif fs == 4:
text = "alert, SPI, aircraft is airborne or on-ground"
elif fs == 5:
text = "no alert, SPI, aircraft is airborne or on-ground"
return fs, text
def dr(msg):
"""Decode downlink request for DF 4, 5, 20, and 21.
Args:
msg (str): 14 hexdigits string
Returns:
int, str: downlink request, description
"""
msgbin = hex2bin(msg)
dr = bin2int(msgbin[8:13])
text = None
if dr == 0:
text = "no downlink request"
elif dr == 1:
text = "request to send Comm-B message"
elif dr == 4:
text = "Comm-B broadcast 1 available"
elif dr == 5:
text = "Comm-B broadcast 2 available"
elif dr >= 16:
text = "ELM downlink segments available: {}".format(dr - 15)
return dr, text
def um(msg):
"""Decode utility message for DF 4, 5, 20, and 21.
Utility message contains interrogator identifier and reservation type.
Args:
msg (str): 14 hexdigits string
Returns:
int, str: interrogator identifier code that triggered the reply, and
reservation type made by the interrogator
"""
msgbin = hex2bin(msg)
iis = bin2int(msgbin[13:17])
ids = bin2int(msgbin[17:19])
if ids == 0:
ids_text = None
if ids == 1:
ids_text = "Comm-B interrogator identifier code"
if ids == 2:
ids_text = "Comm-C interrogator identifier code"
if ids == 3:
ids_text = "Comm-D interrogator identifier code"
return iis, ids, ids_text

View File

@@ -27,7 +27,7 @@ with open(path.join(here, "README.rst"), encoding="utf-8") as f:
details = dict(
name="pyModeS",
version="2.9",
version="2.10",
description="Python Mode-S and ADS-B Decoder",
long_description=long_description,
url="https://github.com/junzis/pyModeS",

View File

@@ -86,6 +86,20 @@ def test_adsb_emergency():
assert adsb.emergency_squawk("8DA2C1B6E112B600000000760759") == "6615"
def test_adsb_target_state_status():
sel_alt = adsb.selected_altitude("8DA05629EA21485CBF3F8CADAEEB")
assert sel_alt == (16992, "MCP/FCU")
assert adsb.baro_pressure_setting("8DA05629EA21485CBF3F8CADAEEB") == 1012.8
assert adsb.selected_heading("8DA05629EA21485CBF3F8CADAEEB")== 66.8
assert adsb.autopilot("8DA05629EA21485CBF3F8CADAEEB") == True
assert adsb.vnav_mode("8DA05629EA21485CBF3F8CADAEEB") == True
assert adsb.altitude_hold_mode("8DA05629EA21485CBF3F8CADAEEB") == False
assert adsb.approach_mode("8DA05629EA21485CBF3F8CADAEEB") == False
assert adsb.tcas_operational("8DA05629EA21485CBF3F8CADAEEB") == True
assert adsb.lnav_mode("8DA05629EA21485CBF3F8CADAEEB") == True
# def test_nic():
# assert adsb.nic('8D3C70A390AB11F55B8C57F65FE6') == 0
# assert adsb.nic('8DE1C9738A4A430B427D219C8225') == 1