diff --git a/pyModeS/extra/rtlreader.py b/pyModeS/extra/rtlreader.py index ec3747d..908a103 100644 --- a/pyModeS/extra/rtlreader.py +++ b/pyModeS/extra/rtlreader.py @@ -1,7 +1,6 @@ import numpy as np import pyModeS as pms from rtlsdr import RtlSdr -from threading import Thread import time modes_sample_rate = 2e6 @@ -16,17 +15,20 @@ th_amp = 0.2 # signal amplitude threshold for 0 and 1 bit th_amp_diff = 0.8 # signal amplitude threshold difference between 0 and 1 bit -class RtlReader(Thread): - def __init__(self, debug=False): +class RtlReader(object): + def __init__(self, **kwargs): super(RtlReader, self).__init__() self.signal_buffer = [] - self.debug = debug self.sdr = RtlSdr() self.sdr.sample_rate = modes_sample_rate self.sdr.center_freq = modes_frequency self.sdr.gain = "auto" # sdr.freq_correction = 75 + self.debug = kwargs.get("debug", False) + self.raw_event = None + self.raw_queue = None + def _process_buffer(self): messages = [] @@ -127,7 +129,7 @@ class RtlReader(Thread): if len(self.signal_buffer) >= buffer_size: messages = self._process_buffer() - self.handle_messages(messages) + self.handle_messages(messages, self.raw_event, self.raw_queue) def handle_messages(self, messages): """re-implement this method to handle the messages""" @@ -138,7 +140,9 @@ class RtlReader(Thread): def stop(self): self.sdr.cancel_read_async() - def run(self): + def run(self, raw_event=None, raw_queue=None): + self.raw_event = raw_event + self.raw_queue = raw_queue self.sdr.read_samples_async(self._read_callback, read_size) # count = 1 diff --git a/pyModeS/extra/tcpclient.py b/pyModeS/extra/tcpclient.py index 0b856da..c6148df 100644 --- a/pyModeS/extra/tcpclient.py +++ b/pyModeS/extra/tcpclient.py @@ -5,7 +5,6 @@ import os import sys import time import pyModeS as pms -from threading import Thread import traceback import zmq @@ -15,9 +14,9 @@ else: PY_VERSION = 2 -class BaseClient(Thread): +class TcpClient(object): def __init__(self, host, port, datatype): - Thread.__init__(self) + super(TcpClient, self).__init__() self.host = host self.port = port self.buffer = [] @@ -136,6 +135,9 @@ class BaseClient(Thread): # Other message tupe continue + if len(msg) not in [14, 28]: + continue + df = pms.df(msg) # skip incomplete message @@ -247,12 +249,12 @@ class BaseClient(Thread): self.buffer = self.buffer[1:] return messages - def handle_messages(self, messages): + def handle_messages(self, messages, raw_event=None, raw_queue=None): """re-implement this method to handle the messages""" for msg, t in messages: print("%15.9f %s" % (t, msg)) - def run(self): + def run(self, raw_event=None, raw_queue=None): self.connect() while True: @@ -280,11 +282,10 @@ class BaseClient(Thread): if not messages: continue else: - self.handle_messages(messages) + self.handle_messages(messages, raw_event, raw_queue) time.sleep(0.001) except Exception as e: - # Provides the user an option to supply the environment # variable PYMODES_DEBUG to halt the execution # for debugging purposes @@ -306,6 +307,5 @@ if __name__ == "__main__": host = sys.argv[1] port = int(sys.argv[2]) datatype = sys.argv[3] - client = BaseClient(host=host, port=port, datatype=datatype) - client.daemon = True + client = TcpClient(host=host, port=port, datatype=datatype) client.run() diff --git a/pyModeS/streamer/modeslive b/pyModeS/streamer/modeslive index 2fc206d..892cbc2 100755 --- a/pyModeS/streamer/modeslive +++ b/pyModeS/streamer/modeslive @@ -3,21 +3,18 @@ from __future__ import print_function, division import os import sys -import time import argparse import curses -from threading import Lock -import pyModeS as pms -from pyModeS.extra.tcpclient import BaseClient -from pyModeS.extra.rtlreader import RtlReader +import signal +import multiprocessing from pyModeS.streamer.stream import Stream from pyModeS.streamer.screen import Screen +from pyModeS.streamer.source import NetSource, RtlSdrSource + + +# redirect all stdout to null, avoiding messing up with the screen +sys.stdout = open(os.devnull, "w") -LOCK = Lock() -ADSB_MSG = [] -ADSB_TS = [] -COMMB_MSG = [] -COMMB_TS = [] support_rawtypes = ["raw", "beast", "skysense"] @@ -93,118 +90,48 @@ if DUMPTO is not None: sys.exit(1) -class ModesClient(BaseClient): - def __init__(self, host, port, rawtype): - super(ModesClient, self).__init__(host, port, rawtype) +raw_event = multiprocessing.Event() +ac_event = multiprocessing.Event() - def handle_messages(self, messages): - local_buffer_adsb_msg = [] - local_buffer_adsb_ts = [] - local_buffer_ehs_msg = [] - local_buffer_ehs_ts = [] - - for msg, t in messages: - if len(msg) < 28: # only process long messages - continue - - df = pms.df(msg) - - if df == 17 or df == 18: - local_buffer_adsb_msg.append(msg) - local_buffer_adsb_ts.append(t) - elif df == 20 or df == 21: - local_buffer_ehs_msg.append(msg) - local_buffer_ehs_ts.append(t) - else: - continue - - LOCK.acquire() - ADSB_MSG.extend(local_buffer_adsb_msg) - ADSB_TS.extend(local_buffer_adsb_ts) - COMMB_MSG.extend(local_buffer_ehs_msg) - COMMB_TS.extend(local_buffer_ehs_ts) - LOCK.release() - - -class ModesRtlReader(RtlReader): - """docstring for ModesRtlReader.""" - - def __init__(self): - super(ModesRtlReader, self).__init__() - - def handle_messages(self, messages): - local_buffer_adsb_msg = [] - local_buffer_adsb_ts = [] - local_buffer_ehs_msg = [] - local_buffer_ehs_ts = [] - - for msg, t in messages: - if len(msg) < 28: # only process long messages - continue - - df = pms.df(msg) - - if df == 17 or df == 18: - local_buffer_adsb_msg.append(msg) - local_buffer_adsb_ts.append(t) - elif df == 20 or df == 21: - local_buffer_ehs_msg.append(msg) - local_buffer_ehs_ts.append(t) - else: - continue - - LOCK.acquire() - ADSB_MSG.extend(local_buffer_adsb_msg) - ADSB_TS.extend(local_buffer_adsb_ts) - COMMB_MSG.extend(local_buffer_ehs_msg) - COMMB_TS.extend(local_buffer_ehs_ts) - # print(len(ADSB_MSG)) - # print(len(COMMB_MSG)) - LOCK.release() - - -# redirect all stdout to null, avoiding messing up with the screen -sys.stdout = open(os.devnull, "w") +raw_queue = multiprocessing.Queue() +aircraft_queue = multiprocessing.Queue() if SOURCE == "net": - client = ModesClient(host=SERVER, port=PORT, rawtype=DATATYPE) - client.daemon = True - client.start() + source = NetSource(host=SERVER, port=PORT, rawtype=DATATYPE) elif SOURCE == "rtlsdr": - rtl = ModesRtlReader() - # rtl.debug = True - rtl.daemon = True - rtl.start() + source = RtlSdrSource() + + +recv_process = multiprocessing.Process( + target=source.run, args=(raw_event, raw_queue) +) + stream = Stream(latlon=LATLON, dumpto=DUMPTO) +stream_process = multiprocessing.Process( + target=stream.run, args=(raw_event, ac_event, raw_queue, aircraft_queue) +) -try: - screen = Screen(uncertainty=UNCERTAINTY) - screen.daemon = True - screen.start() +screen = Screen(uncertainty=UNCERTAINTY) +screen_process = multiprocessing.Process( + target=screen.run, args=(ac_event, aircraft_queue) +) - while True: - if len(ADSB_MSG) > 1: - LOCK.acquire() - stream.process_raw(ADSB_TS, ADSB_MSG, COMMB_TS, COMMB_MSG) - ADSB_MSG = [] - ADSB_TS = [] - COMMB_MSG = [] - COMMB_TS = [] - LOCK.release() - acs = stream.get_aircraft() - try: - screen.update_data(acs) - screen.update() - time.sleep(1) - except KeyboardInterrupt: - raise - except: - continue - -except KeyboardInterrupt: - sys.exit(0) - -finally: +def closeall(signal, frame): + print("KeyboardInterrupt (ID: {}). Cleaning up...".format(signal)) curses.endwin() + recv_process.terminate() + stream_process.terminate() + screen_process.terminate() + recv_process.join() + stream_process.join() + screen_process.join() + exit(0) + + +signal.signal(signal.SIGINT, closeall) + +recv_process.start() +stream_process.start() +screen_process.start() diff --git a/pyModeS/streamer/screen.py b/pyModeS/streamer/screen.py index c061880..17678dc 100644 --- a/pyModeS/streamer/screen.py +++ b/pyModeS/streamer/screen.py @@ -1,9 +1,8 @@ from __future__ import print_function, division -import os import curses import numpy as np import time -from threading import Thread +import threading COLUMNS = [ ("call", 10), @@ -39,9 +38,9 @@ UNCERTAINTY_COLUMNS = [ ] -class Screen(Thread): +class Screen(object): def __init__(self, uncertainty=False): - Thread.__init__(self) + super(Screen, self).__init__() self.screen = curses.initscr() curses.noecho() curses.mousemask(1) @@ -59,7 +58,7 @@ class Screen(Thread): def reset_cursor_pos(self): self.screen.move(self.y, self.x) - def update_data(self, acs): + def update_ac(self, acs): self.acs = acs def draw_frame(self): @@ -154,7 +153,7 @@ class Screen(Thread): self.reset_cursor_pos() - def run(self): + def kye_handling(self): self.draw_frame() self.scr_h, self.scr_w = self.screen.getmaxyx() @@ -184,6 +183,27 @@ class Screen(Thread): self.y = y_intent elif c == curses.KEY_ENTER or c == 10 or c == 13: self.lock_icao = (self.screen.instr(self.y, 1, 6)).decode() + elif c == 27: # escape key + self.lock_icao = None elif c == curses.KEY_F5: self.screen.refresh() self.draw_frame() + + def run(self, ac_event, ac_queue): + + key_thread = threading.Thread(target=self.kye_handling) + key_thread.start() + + while True: + if ac_event.is_set(): + while not ac_queue.empty(): + acs = ac_queue.get() + self.update_ac(acs) + + ac_event.clear() + try: + self.update() + except: + pass + + time.sleep(0.001) diff --git a/pyModeS/streamer/source.py b/pyModeS/streamer/source.py new file mode 100644 index 0000000..cca6c42 --- /dev/null +++ b/pyModeS/streamer/source.py @@ -0,0 +1,77 @@ +import pyModeS as pms +from pyModeS.extra.tcpclient import TcpClient +from pyModeS.extra.rtlreader import RtlReader + + +class NetSource(TcpClient): + def __init__(self, host, port, rawtype): + super(NetSource, self).__init__(host, port, rawtype) + self.local_buffer_adsb_msg = [] + self.local_buffer_adsb_ts = [] + self.local_buffer_commb_msg = [] + self.local_buffer_commb_ts = [] + + def handle_messages(self, messages, raw_event, raw_queue): + + for msg, t in messages: + if len(msg) < 28: # only process long messages + continue + + df = pms.df(msg) + + if df == 17 or df == 18: + self.local_buffer_adsb_msg.append(msg) + self.local_buffer_adsb_ts.append(t) + elif df == 20 or df == 21: + self.local_buffer_commb_msg.append(msg) + self.local_buffer_commb_ts.append(t) + else: + continue + + if len(self.local_buffer_adsb_msg) > 1: + raw_queue.put( + { + "adsb_ts": self.local_buffer_adsb_ts, + "adsb_msg": self.local_buffer_adsb_msg, + "commb_ts": self.local_buffer_commb_ts, + "commb_msg": self.local_buffer_commb_msg, + } + ) + raw_event.set() + + +class RtlSdrSource(RtlReader): + def __init__(self): + super(RtlSdrSource, self).__init__() + self.local_buffer_adsb_msg = [] + self.local_buffer_adsb_ts = [] + self.local_buffer_commb_msg = [] + self.local_buffer_commb_ts = [] + + def handle_messages(self, messages, raw_event, raw_queue): + + for msg, t in messages: + if len(msg) < 28: # only process long messages + continue + + df = pms.df(msg) + + if df == 17 or df == 18: + self.local_buffer_adsb_msg.append(msg) + self.local_buffer_adsb_ts.append(t) + elif df == 20 or df == 21: + self.local_buffer_commb_msg.append(msg) + self.local_buffer_commb_ts.append(t) + else: + continue + + if len(self.local_buffer_adsb_msg) > 1: + raw_queue.put( + { + "adsb_ts": self.local_buffer_adsb_ts, + "adsb_msg": self.local_buffer_adsb_msg, + "commb_ts": self.local_buffer_commb_ts, + "commb_msg": self.local_buffer_commb_msg, + } + ) + raw_event.set() diff --git a/pyModeS/streamer/stream.py b/pyModeS/streamer/stream.py index 1642567..f127d18 100644 --- a/pyModeS/streamer/stream.py +++ b/pyModeS/streamer/stream.py @@ -26,7 +26,7 @@ class Stream: else: self.dumpto = None - def process_raw(self, adsb_ts, adsb_msgs, commb_ts, commb_msgs, tnow=None): + def process_raw(self, adsb_ts, adsb_msg, commb_ts, commb_msg, tnow=None): """process a chunk of adsb and commb messages recieved in the same time period. """ @@ -39,7 +39,7 @@ class Stream: output_buffer = [] # process adsb message - for t, msg in zip(adsb_ts, adsb_msgs): + for t, msg in zip(adsb_ts, adsb_msg): icao = pms.icao(msg) tc = pms.adsb.typecode(msg) @@ -192,7 +192,7 @@ class Stream: ac["nic_a"], ac["nic_bc"] = pms.adsb.nic_a_c(msg) # process commb message - for t, msg in zip(commb_ts, commb_msgs): + for t, msg in zip(commb_ts, commb_msg): icao = pms.icao(msg) if icao not in self.acs: @@ -266,3 +266,19 @@ class Stream: """all aircraft that are stored in memeory""" acs = self.acs return acs + + def run(self, raw_event, ac_event, raw_queue, aircraft_queue): + while True: + if raw_event.is_set(): + data = raw_queue.get() + self.process_raw( + data["adsb_ts"], + data["adsb_msg"], + data["commb_ts"], + data["commb_msg"], + ) + + aircraft_queue.put(self.get_aircraft()) + ac_event.set() + raw_event.clear() + time.sleep(0.001)