diff --git a/pyModeS/extra/rtlreader.py b/pyModeS/extra/rtlreader.py index 908a103..6bc9285 100644 --- a/pyModeS/extra/rtlreader.py +++ b/pyModeS/extra/rtlreader.py @@ -26,8 +26,8 @@ class RtlReader(object): # sdr.freq_correction = 75 self.debug = kwargs.get("debug", False) - self.raw_event = None - self.raw_queue = None + self.raw_pipe_in = None + self.stop_flag = False def _process_buffer(self): messages = [] @@ -129,7 +129,7 @@ class RtlReader(object): if len(self.signal_buffer) >= buffer_size: messages = self._process_buffer() - self.handle_messages(messages, self.raw_event, self.raw_queue) + self.handle_messages(messages) def handle_messages(self, messages): """re-implement this method to handle the messages""" @@ -137,12 +137,12 @@ class RtlReader(object): # print("%15.9f %s" % (t, msg)) pass - def stop(self): + def stop(self, *args, **kwargs): self.sdr.cancel_read_async() - def run(self, raw_event=None, raw_queue=None): - self.raw_event = raw_event - self.raw_queue = raw_queue + def run(self, raw_pipe_in=None, stop_flag=None): + self.raw_pipe_in = raw_pipe_in + self.stop_flag = stop_flag self.sdr.read_samples_async(self._read_callback, read_size) # count = 1 @@ -153,7 +153,10 @@ class RtlReader(object): if __name__ == "__main__": + import signal + rtl = RtlReader() + signal.signal(signal.SIGINT, rtl.stop) + rtl.debug = True - rtl.start() - rtl.join() + rtl.run() diff --git a/pyModeS/extra/tcpclient.py b/pyModeS/extra/tcpclient.py index c6148df..b70f917 100644 --- a/pyModeS/extra/tcpclient.py +++ b/pyModeS/extra/tcpclient.py @@ -26,12 +26,18 @@ class TcpClient(object): print("datatype must be either raw, beast or skysense") os._exit(1) + self.raw_pipe_in = None + self.stop_flag = False + def connect(self): self.socket = zmq.Context().socket(zmq.STREAM) self.socket.setsockopt(zmq.LINGER, 0) self.socket.setsockopt(zmq.RCVTIMEO, 10000) self.socket.connect("tcp://%s:%s" % (self.host, self.port)) + def stop(self): + self.socket.disconnect() + def read_raw_buffer(self): """ Read raw ADS-B data type. @@ -249,12 +255,14 @@ class TcpClient(object): self.buffer = self.buffer[1:] return messages - def handle_messages(self, messages, raw_event=None, raw_queue=None): + def handle_messages(self, messages): """re-implement this method to handle the messages""" for msg, t in messages: print("%15.9f %s" % (t, msg)) - def run(self, raw_event=None, raw_queue=None): + def run(self, raw_pipe_in=None, stop_flag=None): + self.raw_pipe_in = raw_pipe_in + self.stop_flag = stop_flag self.connect() while True: @@ -282,9 +290,8 @@ class TcpClient(object): if not messages: continue else: - self.handle_messages(messages, raw_event, raw_queue) + self.handle_messages(messages) - time.sleep(0.001) except Exception as e: # Provides the user an option to supply the environment # variable PYMODES_DEBUG to halt the execution @@ -298,6 +305,7 @@ class TcpClient(object): try: sock = self.connect() + time.sleep(1) except Exception as e: print("Unexpected Error:", e) diff --git a/pyModeS/streamer/stream.py b/pyModeS/streamer/decode.py similarity index 96% rename from pyModeS/streamer/stream.py rename to pyModeS/streamer/decode.py index f127d18..bdfa7a0 100644 --- a/pyModeS/streamer/stream.py +++ b/pyModeS/streamer/decode.py @@ -6,7 +6,7 @@ import csv import pyModeS as pms -class Stream: +class Decode: def __init__(self, latlon=None, dumpto=None): self.acs = dict() @@ -267,18 +267,22 @@ class Stream: acs = self.acs return acs - def run(self, raw_event, ac_event, raw_queue, aircraft_queue): + def run(self, raw_pipe_out, ac_pipe_in): + local_buffer = [] while True: - if raw_event.is_set(): - data = raw_queue.get() + while raw_pipe_out.poll(): + data = raw_pipe_out.recv() + local_buffer.append(data) + + for data in local_buffer: self.process_raw( data["adsb_ts"], data["adsb_msg"], data["commb_ts"], data["commb_msg"], ) + local_buffer = [] - aircraft_queue.put(self.get_aircraft()) - ac_event.set() - raw_event.clear() + acs = self.get_aircraft() + ac_pipe_in.send(acs) time.sleep(0.001) diff --git a/pyModeS/streamer/modeslive b/pyModeS/streamer/modeslive index 892cbc2..bdf15c5 100755 --- a/pyModeS/streamer/modeslive +++ b/pyModeS/streamer/modeslive @@ -7,7 +7,7 @@ import argparse import curses import signal import multiprocessing -from pyModeS.streamer.stream import Stream +from pyModeS.streamer.decode import Decode from pyModeS.streamer.screen import Screen from pyModeS.streamer.source import NetSource, RtlSdrSource @@ -90,11 +90,14 @@ if DUMPTO is not None: sys.exit(1) -raw_event = multiprocessing.Event() -ac_event = multiprocessing.Event() +# raw_event = multiprocessing.Event() +# ac_event = multiprocessing.Event() +# raw_queue = multiprocessing.Queue() +# ac_queue = multiprocessing.Queue() -raw_queue = multiprocessing.Queue() -aircraft_queue = multiprocessing.Queue() +raw_pipe_in, raw_pipe_out = multiprocessing.Pipe() +ac_pipe_in, ac_pipe_out = multiprocessing.Pipe() +stop_flag = multiprocessing.Value("b", False) if SOURCE == "net": source = NetSource(host=SERVER, port=PORT, rawtype=DATATYPE) @@ -103,29 +106,30 @@ elif SOURCE == "rtlsdr": recv_process = multiprocessing.Process( - target=source.run, args=(raw_event, raw_queue) + target=source.run, args=(raw_pipe_in, stop_flag) ) -stream = Stream(latlon=LATLON, dumpto=DUMPTO) -stream_process = multiprocessing.Process( - target=stream.run, args=(raw_event, ac_event, raw_queue, aircraft_queue) +decode = Decode(latlon=LATLON, dumpto=DUMPTO) +decode_process = multiprocessing.Process( + target=decode.run, args=(raw_pipe_out, ac_pipe_in) ) screen = Screen(uncertainty=UNCERTAINTY) screen_process = multiprocessing.Process( - target=screen.run, args=(ac_event, aircraft_queue) + target=screen.run, args=(ac_pipe_out,) ) def closeall(signal, frame): print("KeyboardInterrupt (ID: {}). Cleaning up...".format(signal)) + stop_flag.value = True curses.endwin() recv_process.terminate() - stream_process.terminate() + decode_process.terminate() screen_process.terminate() recv_process.join() - stream_process.join() + decode_process.join() screen_process.join() exit(0) @@ -133,5 +137,5 @@ def closeall(signal, frame): signal.signal(signal.SIGINT, closeall) recv_process.start() -stream_process.start() +decode_process.start() screen_process.start() diff --git a/pyModeS/streamer/screen.py b/pyModeS/streamer/screen.py index 17678dc..022d9cc 100644 --- a/pyModeS/streamer/screen.py +++ b/pyModeS/streamer/screen.py @@ -189,21 +189,24 @@ class Screen(object): self.screen.refresh() self.draw_frame() - def run(self, ac_event, ac_queue): - + def run(self, ac_pipe_out): + local_buffer = [] key_thread = threading.Thread(target=self.kye_handling) key_thread.start() while True: - if ac_event.is_set(): - while not ac_queue.empty(): - acs = ac_queue.get() - self.update_ac(acs) + while ac_pipe_out.poll(): + acs = ac_pipe_out.recv() + local_buffer.append(acs) - ac_event.clear() - try: - self.update() - except: - pass + for acs in local_buffer: + self.update_ac(acs) + + local_buffer = [] + + try: + self.update() + except: + pass time.sleep(0.001) diff --git a/pyModeS/streamer/source.py b/pyModeS/streamer/source.py index cca6c42..761f877 100644 --- a/pyModeS/streamer/source.py +++ b/pyModeS/streamer/source.py @@ -11,7 +11,11 @@ class NetSource(TcpClient): self.local_buffer_commb_msg = [] self.local_buffer_commb_ts = [] - def handle_messages(self, messages, raw_event, raw_queue): + def handle_messages(self, messages): + + if self.stop_flag.value is True: + self.stop() + return for msg, t in messages: if len(msg) < 28: # only process long messages @@ -29,7 +33,7 @@ class NetSource(TcpClient): continue if len(self.local_buffer_adsb_msg) > 1: - raw_queue.put( + self.raw_pipe_in.send( { "adsb_ts": self.local_buffer_adsb_ts, "adsb_msg": self.local_buffer_adsb_msg, @@ -37,7 +41,6 @@ class NetSource(TcpClient): "commb_msg": self.local_buffer_commb_msg, } ) - raw_event.set() class RtlSdrSource(RtlReader): @@ -48,7 +51,11 @@ class RtlSdrSource(RtlReader): self.local_buffer_commb_msg = [] self.local_buffer_commb_ts = [] - def handle_messages(self, messages, raw_event, raw_queue): + def handle_messages(self, messages): + + if self.stop_flag.value is True: + self.stop() + return for msg, t in messages: if len(msg) < 28: # only process long messages @@ -66,7 +73,7 @@ class RtlSdrSource(RtlReader): continue if len(self.local_buffer_adsb_msg) > 1: - raw_queue.put( + self.raw_pipe_in.send( { "adsb_ts": self.local_buffer_adsb_ts, "adsb_msg": self.local_buffer_adsb_msg, @@ -74,4 +81,3 @@ class RtlSdrSource(RtlReader): "commb_msg": self.local_buffer_commb_msg, } ) - raw_event.set()