TomTom batch geocoding

This commit is contained in:
Juan Ignacio Sánchez Lara
2018-07-06 20:11:48 +02:00
parent bf8b76b5fe
commit b8475bac30
2 changed files with 122 additions and 35 deletions

View File

@@ -1,12 +1,30 @@
import json, requests, time
from requests.adapters import HTTPAdapter
from cartodb_services import StreetPointBulkGeocoder
from cartodb_services.tomtom import TomTomGeocoder
from cartodb_services.tools.exceptions import ServiceException
class TomTomBulkGeocoder(TomTomGeocoder, StreetPointBulkGeocoder):
# TODO: ?
BASE_URL = 'https://api.tomtom.com'
BATCH_URL = BASE_URL + '/search/2/batch.json'
MAX_BATCH_SIZE = 1000000 # From the docs
# TODO: ?
MIN_BATCHED_SEARCH = 100 # Under this, serial will be used
MAX_STALLED_RETRIES = 100
BATCH_RETRY_SLEEP_S = 5
MIN_BATCHED_SEARCH = 10 # Batch API is really fast
READ_TIMEOUT = 60
CONNECT_TIMEOUT = 10
MAX_RETRIES = 1
def __init__(self, apikey, logger, service_params=None):
TomTomGeocoder.__init__(self, apikey, logger, service_params)
self.connect_timeout = service_params.get('connect_timeout', self.CONNECT_TIMEOUT)
self.read_timeout = service_params.get('read_timeout', self.READ_TIMEOUT)
self.max_retries = service_params.get('max_retries', self.MAX_RETRIES)
self.session = requests.Session()
self.session.headers.update({'Content-Type': 'application/json'})
self.session.mount(self.BATCH_URL,
HTTPAdapter(max_retries=self.max_retries))
def _bulk_geocode(self, searches):
if len(searches) > self.MAX_BATCH_SIZE:
@@ -25,12 +43,72 @@ class TomTomBulkGeocoder(TomTomGeocoder, StreetPointBulkGeocoder):
results = []
for search in searches:
(search_id, address, city, state, country) = search
address = address.encode('utf-8') if address else None
city = city.encode('utf-8') if city else None
state = state.encode('utf-8') if state else None
country = country.encode('utf-8') if country else None
self._logger.debug('--> Sending serial search: {}'.format(search))
coordinates = self.geocode(searchtext=address.encode('utf-8'),
city=city.encode('utf-8'),
state_province=state.encode('utf-8'),
country=country.encode('utf-8'))
coordinates = self.geocode(searchtext=address, city=city,
state_province=state, country=country)
self._logger.debug('--> result sent')
results.append((search_id, coordinates, []))
return results
def _batch_geocode(self, searches):
location = self._send_batch(searches)
xy_results = self._download_results(location)
results = []
for s, r in zip(searches, xy_results):
results.append((s[0], r, []))
self._logger.debug('--> results: {}'.format(results))
return results
def _send_batch(self, searches):
body = {'batchItems': [{'query': self._query(s)} for s in searches]}
self._logger.debug('--> {}; Body: {}'.format(self.BATCH_URL, body))
request_params = {
'key': self._apikey
}
response = self.session.post(self.BATCH_URL, data=json.dumps(body),
allow_redirects=False,
params=request_params,
timeout=(self.connect_timeout, self.read_timeout))
self._logger.debug('--> response: {}'.format(response.status_code))
if response.status_code == 303:
self._logger.debug(response.headers)
return response.headers['Location']
else:
msg = "Error sending batch: {}; Headers: {}".format(
response.text.encode('utf-8'), response.headers)
self._logger.error(msg)
raise ServiceException(msg, response)
def _download_results(self, location):
stalled_retries = 0
while True:
response = self.session.get(self.BASE_URL + location)
if response.status_code == 200:
self._logger.debug('--> Results ready {}'.format(location))
return self._parse_results(response.json())
elif response.status_code == 202:
stalled_retries += 1
if stalled_retries > self.MAX_STALLED_RETRIES:
raise Exception('Too many retries for job {}'.format(location))
location = response.headers['Location']
self._logger.debug('--> Waiting for {}'.format(location))
time.sleep(self.BATCH_RETRY_SLEEP_S)
else:
msg = "Error downloading batch: {}; Headers: {}".format(
response.text.encode('utf-8'), response.headers)
self._logger.error(msg)
raise ServiceException(msg, response)
def _query(self, search):
(search_id, address, city, state, country) = search
searchtext = ', '.join(filter(None, [address, city, state]))
return self._request_uri(searchtext=searchtext, countries=country)
def _parse_results(self, json_body):
return [self._parse_response(item['statusCode'], item['response'])
for item in json_body['batchItems']]

View File

@@ -9,10 +9,11 @@ from cartodb_services.tools.exceptions import ServiceException
from cartodb_services.tools.qps import qps_retry
from cartodb_services.tools.normalize import normalize
BASEURI = ('https://api.tomtom.com/search/2/geocode/'
'{searchtext}.JSON'
'?key={apiKey}'
'&limit=1')
HOST = 'https://api.tomtom.com'
API_BASEURI = '/search/2'
REQUEST_BASEURI = ('/geocode/'
'{searchtext}.json'
'?limit=1')
ENTRY_RESULTS = 'results'
ENTRY_POSITION = 'position'
ENTRY_LON = 'lon'
@@ -30,20 +31,15 @@ class TomTomGeocoder(Traceable):
self._logger = logger
def _uri(self, searchtext, countries=None):
baseuri = BASEURI + '&countrySet={}'.format(countries) \
if countries else BASEURI
uri = URITemplate(baseuri).expand(apiKey=self._apikey,
searchtext=searchtext.encode('utf-8'))
return uri
return API_BASEURI + self._request_uri(searchtext, countries, self._apikey)
def _parse_geocoder_response(self, response):
json_response = json.loads(response)
if json_response and json_response[ENTRY_RESULTS]:
result = json_response[ENTRY_RESULTS][0]
return self._extract_lng_lat_from_feature(result)
else:
return []
def _request_uri(self, searchtext, countries=None, apiKey=None):
baseuri = REQUEST_BASEURI
if countries:
baseuri += '&countrySet={}'.format(countries)
baseuri = baseuri + '&key={apiKey}' if apiKey else baseuri
return URITemplate(baseuri).expand(apiKey=apiKey,
searchtext=searchtext.encode('utf-8'))
def _extract_lng_lat_from_feature(self, result):
position = result[ENTRY_POSITION]
@@ -85,20 +81,11 @@ class TomTomGeocoder(Traceable):
if state_province:
address.append(normalize(state_province))
uri = self._uri(searchtext=', '.join(address), countries=country)
uri = HOST + self._uri(searchtext=', '.join(address), countries=country)
try:
response = requests.get(uri)
if response.status_code == requests.codes.ok:
return self._parse_geocoder_response(response.text)
elif response.status_code == requests.codes.bad_request:
return []
elif response.status_code == requests.codes.unprocessable_entity:
return []
else:
msg = 'Unknown response: {}'.format(str(response.status_code))
raise ServiceException(msg, response)
return self._parse_response(response.status_code, response.text)
except requests.Timeout as te:
# In case of timeout we want to stop the job because the server
# could be down
@@ -111,3 +98,25 @@ class TomTomGeocoder(Traceable):
self._logger.error('Error connecting to TomTom geocoding server',
exception=ce)
return []
def _parse_response(self, status_code, text):
if status_code == requests.codes.ok:
return self._parse_geocoder_response(text)
elif status_code == requests.codes.bad_request:
return []
elif status_code == requests.codes.unprocessable_entity:
return []
else:
msg = 'Unknown response {}: {}'.format(str(status_code), text)
raise ServiceException(msg, None)
def _parse_geocoder_response(self, response):
json_response = json.loads(response) \
if type(response) != dict else response
if json_response and json_response[ENTRY_RESULTS]:
result = json_response[ENTRY_RESULTS][0]
return self._extract_lng_lat_from_feature(result)
else:
return []