diff --git a/apps/modes_rx b/apps/modes_rx index 8fe8334..8c39aee 100755 --- a/apps/modes_rx +++ b/apps/modes_rx @@ -27,33 +27,6 @@ import air_modes from air_modes.exceptions import * import zmq -class screen_printer(threading.Thread): - def __init__(self, position, context, addr=None, port=None): - threading.Thread.__init__(self) - self._subscriber = context.socket(zmq.SUB) - if addr is not None: - self._subscriber.connect("tcp://%s:%i" % (addr, port)) - else: - self._subscriber.connect("inproc://modes-radio-pub") - self._subscriber.setsockopt(zmq.SUBSCRIBE, "dl_data") - - self._printer = air_modes.output_print(position) - self.done = threading.Event() - self.finished = threading.Event() - self.setDaemon(True) - self.start() - - def run(self): - while not self.done.is_set(): - [address, msg] = self._subscriber.recv_multipart() #blocking - try: - self._printer.output(msg) - except ADSBError: - pass - - self._subscriber.close() - self.finished.set() - #todo: maybe move plugins to separate programs (flightgear, SBS1, etc.) def main(): my_position = None @@ -99,7 +72,6 @@ def main(): kmlgen = air_modes.output_kml(options.kml, dbname, my_position, lock) #create a KML generating thread to read from the db relay.subscribe("dl_data", sqldb.insert) - printer = None if options.no_print is not True: relay.subscribe("dl_data", air_modes.output_print(my_position).output) @@ -115,8 +87,7 @@ def main(): tb.run() tb.cleanup() - relay.shutdown.set() - relay.finished.wait(0.2) + relay.close() if options.kml is not None: kmlgen.close() diff --git a/python/radio.py b/python/radio.py index da1f41a..09951cb 100644 --- a/python/radio.py +++ b/python/radio.py @@ -19,7 +19,7 @@ # # Radio interface for Mode S RX. -# Handles all Gnuradio-related functionality. +# Handles all hardware- and source-related functionality # You pass it options, it gives you data. # It uses the pubsub interface to allow clients to subscribe to its data feeds. @@ -31,39 +31,7 @@ import air_modes import zmq import threading import time - -DOWNLINK_DATA_TYPE = "dl_data" - -#ZMQ message publisher. -#TODO: limit high water mark -#TODO: limit number of subscribers -#NOTE: this is obsoleted by zmq_pubsub_iface -class radio_publisher(threading.Thread): - def __init__(self, port, context, queue): - threading.Thread.__init__(self) - self._queue = queue - self._publisher = context.socket(zmq.PUB) - if port is None: - self._publisher.bind("inproc://modes-radio-pub") - else: - self._publisher.bind("tcp://*:%i" % port) - - self.setDaemon(True) - self.shutdown = threading.Event() - self.finished = threading.Event() - self.start() - - def run(self): - done_yet = False - while not self.shutdown.is_set() and not done_yet: - if self.shutdown.is_set(): #gives it another round after done is set - done_yet = True #so we can clean up the last of the queue - while not self._queue.empty_p(): - self._publisher.send_multipart([DOWNLINK_DATA_TYPE, self._queue.delete_head().to_string()]) - time.sleep(0.1) #can use time.sleep(0) to yield, but it'll suck a whole CPU - self._publisher.close() - self.finished.set() - +import re class modes_radio (gr.top_block, pubsub): def __init__(self, options, context): @@ -216,7 +184,6 @@ class modes_radio (gr.top_block, pubsub): else: #semantically detect whether it's ip.ip.ip.ip:port or filename - import re if ':' in options.source: try: ip, port = re.search("(.*)\:(\d{1,5})", options.source).groups() diff --git a/python/rx_path.py b/python/rx_path.py index edf4993..1178099 100644 --- a/python/rx_path.py +++ b/python/rx_path.py @@ -1,5 +1,5 @@ # -# Copyright 2012 Corgan Labs +# Copyright 2012, 2013 Corgan Labs, Nick Foster # # This file is part of gr-air-modes # diff --git a/python/zmq_socket.py b/python/zmq_socket.py index 9a5a168..10f9282 100644 --- a/python/zmq_socket.py +++ b/python/zmq_socket.py @@ -119,7 +119,7 @@ def pr(x): if __name__ == "__main__": #create socket pair context = zmq.Context(1) - sock1 = zmq_pubsub_iface(context, subaddr="inproc://sock2-pub", pubaddr=["inproc://sock1-pub"]) + sock1 = zmq_pubsub_iface(context, subaddr="inproc://sock2-pub", pubaddr="inproc://sock1-pub") sock2 = zmq_pubsub_iface(context, subaddr="inproc://sock1-pub", pubaddr=["inproc://sock2-pub", "tcp://*:5433"]) sock3 = zmq_pubsub_iface(context, subaddr="tcp://localhost:5433", pubaddr=None) @@ -128,10 +128,10 @@ if __name__ == "__main__": sock3.subscribe("data3", pr) for i in range(10): - #sock1["data2"] = "HOWDY" - #sock2["data3"] = "DRAW" + sock1["data2"] = "HOWDY" + sock2["data3"] = "DRAW" sock2["data1"] = "PARDNER" - #time.sleep(0.1) + time.sleep(0.1) sock1.close() sock2.close()