diff --git a/apps/modes_rx b/apps/modes_rx index 6a1fcc1..ebc4cf0 100755 --- a/apps/modes_rx +++ b/apps/modes_rx @@ -29,19 +29,24 @@ from air_modes.exceptions import * import zmq class screen_printer(threading.Thread): - def __init__(self, position, context): + def __init__(self, position, context, addr=None): threading.Thread.__init__(self) self._subscriber = context.socket(zmq.SUB) - self._subscriber.connect("inproc://modes-radio-pub") + if addr is not None: + self._subscriber.connect("tcp://%s" % addr) + else: + self._subscriber.connect("inproc://modes-radio-pub") self._subscriber.setsockopt(zmq.SUBSCRIBE, "dl_data") self._printer = air_modes.output_print(position) - self.done = False + self.done = threading.Event() + self.finished = threading.Event() self.setDaemon(True) self.start() def run(self): - while not self.done: + queue_empty = False + while not self.done.is_set(): [address, msg] = self._subscriber.recv_multipart() #blocking try: self._printer.output(msg) @@ -49,28 +54,14 @@ class screen_printer(threading.Thread): pass self._subscriber.close() + self.finished.set() def main(): my_position = None - usage = "%prog: [options] output filename" + usage = "%prog: [options]" parser = OptionParser(option_class=eng_option, usage=usage) - parser.add_option("-R", "--subdev", type="string", - help="select USRP Rx side A or B", metavar="SUBDEV") - parser.add_option("-A", "--antenna", type="string", - help="select which antenna to use on daughterboard") - parser.add_option("-D", "--args", type="string", - help="arguments to pass to radio constructor", default="") - parser.add_option("-f", "--freq", type="eng_float", default=1090e6, - help="set receive frequency in Hz [default=%default]", metavar="FREQ") - parser.add_option("-g", "--gain", type="int", default=None, - help="set RF gain", metavar="dB") - parser.add_option("-r", "--rate", type="eng_float", default=4000000, - help="set ADC sample rate [default=%default]") - parser.add_option("-T", "--threshold", type="eng_float", default=5.0, - help="set pulse detection threshold above noise in dB [default=%default]") - parser.add_option("-F","--filename", type="string", default=None, - help="read data from file instead of USRP") + air_modes.modes_radio.add_radio_options(parser) parser.add_option("-K","--kml", type="string", default=None, help="filename for Google Earth KML output") parser.add_option("-P","--sbs1", action="store_true", default=False, @@ -85,37 +76,21 @@ def main(): help="Use UDP source on specified port") parser.add_option("-m","--multiplayer", type="string", default=None, help="FlightGear server to send aircraft data, in format host:port") - parser.add_option("-o","--osmocom", action="store_true", default=False, - help="Use gr-osmocom source (RTLSDR or HackRF) instead of UHD source") - parser.add_option("-p","--pmf", action="store_true", default=False, - help="Use pulse matched filtering") (options, args) = parser.parse_args() #construct the radio context = zmq.Context(1) tb = air_modes.modes_radio(options, context) - tb.start() - #tb will publish via ZMQ on the "radio-pub" feed - #clients connect to tb socket and do things. - #the radio-pub feed is channelized into "dl_data" and (not yet) - #"ul_data" which correspond to downlink and uplink data if options.location is not None: reader = csv.reader([options.location], quoting=csv.QUOTE_NONNUMERIC) my_position = reader.next() if options.kml is not None: - #we spawn a thread to run every 30 seconds (or whatever) to generate KML dbname = 'adsb.db' sqldb = air_modes.output_sql(my_position, dbname, context) #input into the db #kmlgen = air_modes.output_kml(options.kml, dbname, my_position, lock) #create a KML generating thread to read from the db - #tb.subscribe('dl_data', sqldb.output) - -# if options.sbs1 is True: -# sbs1port = air_modes.output_sbs1(my_position, 30003) -# tb.subscribe('dl_data', sbs1port.output) - #updates.append(sbs1port.add_pending_conns) printer = None if options.no_print is not True: @@ -126,23 +101,22 @@ def main(): # fgout = air_modes.output_flightgear(my_position, fghost, int(fgport)) # tb.subscribe('dl_data', fgout.output) - #start an updater thread to handle adding/removing TCP connections - #TODO this can be removed when we start using 0MQ -# updater = threading.Thread(target=timed_callback, args=(updates,)) -# updater.setDaemon(True) -# updater.start() - - tb.wait() +# if options.sbs1 is True: +# sbs1port = air_modes.output_sbs1(my_position, 30003) +# tb.subscribe('dl_data', sbs1port.output) + #updates.append(sbs1port.add_pending_conns) + + tb.run() + tb._pubsub.done.set() + tb._pubsub.finished.wait(0.2) if printer is not None: - printer.done = True - #printer.join() + printer.done.set() + printer.finished.wait(0.2) if options.kml is not None: sqldb.done = True #sqldb.join() - time.sleep(0.1) - if __name__ == '__main__': main() diff --git a/python/radio.py b/python/radio.py index 7aeea32..e366a89 100644 --- a/python/radio.py +++ b/python/radio.py @@ -25,6 +25,7 @@ from gnuradio import gr, gru, optfir, eng_notation, blks2 from gnuradio.eng_option import eng_option +from optparse import OptionParser import air_modes import zmq import threading @@ -46,13 +47,20 @@ class radio_publisher(threading.Thread): self._publisher.bind("tcp://*:%i" % port) self.setDaemon(True) - self.done = False + self.done = threading.Event() + self.finished = threading.Event() + self.start() def run(self): - while self.done is False: + done_yet = False + while not self.done.is_set() and not done_yet: + if self.done.is_set(): #gives it another round after done is set + done_yet = True #so we can clean up the last of the queue while not self._queue.empty_p(): self._publisher.send_multipart([DOWNLINK_DATA_TYPE, self._queue.delete_head().to_string()]) time.sleep(0.1) #can use time.sleep(0) to yield, but it'll suck a whole CPU + self._publisher.close() + self.finished.set() class modes_radio (gr.top_block): @@ -80,7 +88,30 @@ class modes_radio (gr.top_block): #Publish messages when they come back off the queue self._pubsub = radio_publisher(None, context, self._queue) - self._pubsub.start() + + @staticmethod + def add_radio_options(parser): + parser.add_option("-R", "--subdev", type="string", + help="select USRP Rx side A or B", metavar="SUBDEV") + parser.add_option("-A", "--antenna", type="string", + help="select which antenna to use on daughterboard") + parser.add_option("-D", "--args", type="string", + help="arguments to pass to radio constructor", default="") + parser.add_option("-f", "--freq", type="eng_float", default=1090e6, + help="set receive frequency in Hz [default=%default]", metavar="FREQ") + parser.add_option("-g", "--gain", type="int", default=None, + help="set RF gain", metavar="dB") + parser.add_option("-r", "--rate", type="eng_float", default=4000000, + help="set ADC sample rate [default=%default]") + parser.add_option("-T", "--threshold", type="eng_float", default=5.0, + help="set pulse detection threshold above noise in dB [default=%default]") + parser.add_option("-F","--filename", type="string", default=None, + help="read data from file instead of radio") + parser.add_option("-o","--osmocom", action="store_true", default=False, + help="Use gr-osmocom source (RTLSDR or HackRF) instead of UHD source") + parser.add_option("-p","--pmf", action="store_true", default=False, + help="Use pulse matched filtering") + #these are wrapped with try/except because file sources and udp sources #don't have set_center_freq/set_gain functions. this should check to see @@ -90,7 +121,7 @@ class modes_radio (gr.top_block): result = self._u.set_center_freq(freq, 0) return result except: - pass + return 0 def set_gain(self, gain): try: @@ -107,6 +138,9 @@ class modes_radio (gr.top_block): if(options.subdev): self._u.set_subdev_spec(options.subdev, 0) + if not self._u.set_center_freq(options.freq): + print "Failed to set initial frequency" + #check for GPSDO #if you have a GPSDO, UHD will automatically set the timestamp to UTC time #as well as automatically set the clock to lock to GPSDO. diff --git a/python/sql.py b/python/sql.py index d41f01f..9088378 100644 --- a/python/sql.py +++ b/python/sql.py @@ -27,52 +27,54 @@ from air_modes.exceptions import * import zmq class output_sql(air_modes.parse, threading.Thread): - def __init__(self, mypos, filename, context): + def __init__(self, mypos, filename, context, addr=None): threading.Thread.__init__(self) air_modes.parse.__init__(self, mypos) #init socket self._subscriber = context.socket(zmq.SUB) - self._subscriber.connect("inproc://modes-radio-pub") #TODO allow spec addr + if addr is not None: + self._subscriber.connect("tcp://%s" % addr) + else: + self._subscriber.connect("inproc://modes-radio-pub") self._subscriber.setsockopt(zmq.SUBSCRIBE, "dl_data") self._lock = threading.Lock() - with self._lock: - #create the database - self.filename = filename - self._db = sqlite3.connect(filename) - #now execute a schema to create the tables you need - c = self._db.cursor() - query = """CREATE TABLE IF NOT EXISTS "positions" ( - "icao" INTEGER KEY NOT NULL, - "seen" TEXT NOT NULL, - "alt" INTEGER, - "lat" REAL, - "lon" REAL - );""" - c.execute(query) - query = """CREATE TABLE IF NOT EXISTS "vectors" ( - "icao" INTEGER KEY NOT NULL, - "seen" TEXT NOT NULL, - "speed" REAL, - "heading" REAL, - "vertical" REAL - );""" - c.execute(query) - query = """CREATE TABLE IF NOT EXISTS "ident" ( - "icao" INTEGER PRIMARY KEY NOT NULL, - "ident" TEXT NOT NULL - );""" - c.execute(query) - c.close() - self._db.commit() - #we close the db conn now to reopen it in the output() thread context. - self._db.close() - self._db = None + #create the database + self.filename = filename + self._db = sqlite3.connect(filename) + #now execute a schema to create the tables you need + c = self._db.cursor() + query = """CREATE TABLE IF NOT EXISTS "positions" ( + "icao" INTEGER KEY NOT NULL, + "seen" TEXT NOT NULL, + "alt" INTEGER, + "lat" REAL, + "lon" REAL + );""" + c.execute(query) + query = """CREATE TABLE IF NOT EXISTS "vectors" ( + "icao" INTEGER KEY NOT NULL, + "seen" TEXT NOT NULL, + "speed" REAL, + "heading" REAL, + "vertical" REAL + );""" + c.execute(query) + query = """CREATE TABLE IF NOT EXISTS "ident" ( + "icao" INTEGER PRIMARY KEY NOT NULL, + "ident" TEXT NOT NULL + );""" + c.execute(query) + c.close() + self._db.commit() + #we close the db conn now to reopen it in the output() thread context. + self._db.close() + self._db = None - self.setDaemon(True) - self.done = False - self.start() + self.setDaemon(True) + self.done = False + self.start() def run(self): while not self.done: