From b8475bac304e7290cf6d939ca4f8ddfcc7fe659b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Ignacio=20S=C3=A1nchez=20Lara?= Date: Fri, 6 Jul 2018 20:11:48 +0200 Subject: [PATCH] TomTom batch geocoding --- .../cartodb_services/tomtom/bulk_geocoder.py | 92 +++++++++++++++++-- .../cartodb_services/tomtom/geocoder.py | 65 +++++++------ 2 files changed, 122 insertions(+), 35 deletions(-) diff --git a/server/lib/python/cartodb_services/cartodb_services/tomtom/bulk_geocoder.py b/server/lib/python/cartodb_services/cartodb_services/tomtom/bulk_geocoder.py index 34d0974..ec3949f 100644 --- a/server/lib/python/cartodb_services/cartodb_services/tomtom/bulk_geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/tomtom/bulk_geocoder.py @@ -1,12 +1,30 @@ +import json, requests, time +from requests.adapters import HTTPAdapter from cartodb_services import StreetPointBulkGeocoder from cartodb_services.tomtom import TomTomGeocoder +from cartodb_services.tools.exceptions import ServiceException class TomTomBulkGeocoder(TomTomGeocoder, StreetPointBulkGeocoder): - # TODO: ? + BASE_URL = 'https://api.tomtom.com' + BATCH_URL = BASE_URL + '/search/2/batch.json' MAX_BATCH_SIZE = 1000000 # From the docs - # TODO: ? - MIN_BATCHED_SEARCH = 100 # Under this, serial will be used + MAX_STALLED_RETRIES = 100 + BATCH_RETRY_SLEEP_S = 5 + MIN_BATCHED_SEARCH = 10 # Batch API is really fast + READ_TIMEOUT = 60 + CONNECT_TIMEOUT = 10 + MAX_RETRIES = 1 + + def __init__(self, apikey, logger, service_params=None): + TomTomGeocoder.__init__(self, apikey, logger, service_params) + self.connect_timeout = service_params.get('connect_timeout', self.CONNECT_TIMEOUT) + self.read_timeout = service_params.get('read_timeout', self.READ_TIMEOUT) + self.max_retries = service_params.get('max_retries', self.MAX_RETRIES) + self.session = requests.Session() + self.session.headers.update({'Content-Type': 'application/json'}) + self.session.mount(self.BATCH_URL, + HTTPAdapter(max_retries=self.max_retries)) def _bulk_geocode(self, searches): if len(searches) > self.MAX_BATCH_SIZE: @@ -25,12 +43,72 @@ class TomTomBulkGeocoder(TomTomGeocoder, StreetPointBulkGeocoder): results = [] for search in searches: (search_id, address, city, state, country) = search + address = address.encode('utf-8') if address else None + city = city.encode('utf-8') if city else None + state = state.encode('utf-8') if state else None + country = country.encode('utf-8') if country else None self._logger.debug('--> Sending serial search: {}'.format(search)) - coordinates = self.geocode(searchtext=address.encode('utf-8'), - city=city.encode('utf-8'), - state_province=state.encode('utf-8'), - country=country.encode('utf-8')) + coordinates = self.geocode(searchtext=address, city=city, + state_province=state, country=country) self._logger.debug('--> result sent') results.append((search_id, coordinates, [])) return results + def _batch_geocode(self, searches): + location = self._send_batch(searches) + xy_results = self._download_results(location) + results = [] + for s, r in zip(searches, xy_results): + results.append((s[0], r, [])) + self._logger.debug('--> results: {}'.format(results)) + return results + + def _send_batch(self, searches): + body = {'batchItems': [{'query': self._query(s)} for s in searches]} + self._logger.debug('--> {}; Body: {}'.format(self.BATCH_URL, body)) + request_params = { + 'key': self._apikey + } + response = self.session.post(self.BATCH_URL, data=json.dumps(body), + allow_redirects=False, + params=request_params, + timeout=(self.connect_timeout, self.read_timeout)) + self._logger.debug('--> response: {}'.format(response.status_code)) + if response.status_code == 303: + self._logger.debug(response.headers) + return response.headers['Location'] + else: + msg = "Error sending batch: {}; Headers: {}".format( + response.text.encode('utf-8'), response.headers) + self._logger.error(msg) + raise ServiceException(msg, response) + + def _download_results(self, location): + stalled_retries = 0 + while True: + response = self.session.get(self.BASE_URL + location) + if response.status_code == 200: + self._logger.debug('--> Results ready {}'.format(location)) + return self._parse_results(response.json()) + elif response.status_code == 202: + stalled_retries += 1 + if stalled_retries > self.MAX_STALLED_RETRIES: + raise Exception('Too many retries for job {}'.format(location)) + location = response.headers['Location'] + self._logger.debug('--> Waiting for {}'.format(location)) + time.sleep(self.BATCH_RETRY_SLEEP_S) + else: + msg = "Error downloading batch: {}; Headers: {}".format( + response.text.encode('utf-8'), response.headers) + self._logger.error(msg) + raise ServiceException(msg, response) + + def _query(self, search): + (search_id, address, city, state, country) = search + searchtext = ', '.join(filter(None, [address, city, state])) + return self._request_uri(searchtext=searchtext, countries=country) + + def _parse_results(self, json_body): + return [self._parse_response(item['statusCode'], item['response']) + for item in json_body['batchItems']] + diff --git a/server/lib/python/cartodb_services/cartodb_services/tomtom/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/tomtom/geocoder.py index 225f60d..1f63f58 100644 --- a/server/lib/python/cartodb_services/cartodb_services/tomtom/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/tomtom/geocoder.py @@ -9,10 +9,11 @@ from cartodb_services.tools.exceptions import ServiceException from cartodb_services.tools.qps import qps_retry from cartodb_services.tools.normalize import normalize -BASEURI = ('https://api.tomtom.com/search/2/geocode/' - '{searchtext}.JSON' - '?key={apiKey}' - '&limit=1') +HOST = 'https://api.tomtom.com' +API_BASEURI = '/search/2' +REQUEST_BASEURI = ('/geocode/' + '{searchtext}.json' + '?limit=1') ENTRY_RESULTS = 'results' ENTRY_POSITION = 'position' ENTRY_LON = 'lon' @@ -30,20 +31,15 @@ class TomTomGeocoder(Traceable): self._logger = logger def _uri(self, searchtext, countries=None): - baseuri = BASEURI + '&countrySet={}'.format(countries) \ - if countries else BASEURI - uri = URITemplate(baseuri).expand(apiKey=self._apikey, - searchtext=searchtext.encode('utf-8')) - return uri + return API_BASEURI + self._request_uri(searchtext, countries, self._apikey) - def _parse_geocoder_response(self, response): - json_response = json.loads(response) - - if json_response and json_response[ENTRY_RESULTS]: - result = json_response[ENTRY_RESULTS][0] - return self._extract_lng_lat_from_feature(result) - else: - return [] + def _request_uri(self, searchtext, countries=None, apiKey=None): + baseuri = REQUEST_BASEURI + if countries: + baseuri += '&countrySet={}'.format(countries) + baseuri = baseuri + '&key={apiKey}' if apiKey else baseuri + return URITemplate(baseuri).expand(apiKey=apiKey, + searchtext=searchtext.encode('utf-8')) def _extract_lng_lat_from_feature(self, result): position = result[ENTRY_POSITION] @@ -85,20 +81,11 @@ class TomTomGeocoder(Traceable): if state_province: address.append(normalize(state_province)) - uri = self._uri(searchtext=', '.join(address), countries=country) + uri = HOST + self._uri(searchtext=', '.join(address), countries=country) try: response = requests.get(uri) - - if response.status_code == requests.codes.ok: - return self._parse_geocoder_response(response.text) - elif response.status_code == requests.codes.bad_request: - return [] - elif response.status_code == requests.codes.unprocessable_entity: - return [] - else: - msg = 'Unknown response: {}'.format(str(response.status_code)) - raise ServiceException(msg, response) + return self._parse_response(response.status_code, response.text) except requests.Timeout as te: # In case of timeout we want to stop the job because the server # could be down @@ -111,3 +98,25 @@ class TomTomGeocoder(Traceable): self._logger.error('Error connecting to TomTom geocoding server', exception=ce) return [] + + def _parse_response(self, status_code, text): + if status_code == requests.codes.ok: + return self._parse_geocoder_response(text) + elif status_code == requests.codes.bad_request: + return [] + elif status_code == requests.codes.unprocessable_entity: + return [] + else: + msg = 'Unknown response {}: {}'.format(str(status_code), text) + raise ServiceException(msg, None) + + def _parse_geocoder_response(self, response): + json_response = json.loads(response) \ + if type(response) != dict else response + + if json_response and json_response[ENTRY_RESULTS]: + result = json_response[ENTRY_RESULTS][0] + return self._extract_lng_lat_from_feature(result) + else: + return [] +