diff --git a/server/extension/Makefile b/server/extension/Makefile index 8f53576..968e51c 100644 --- a/server/extension/Makefile +++ b/server/extension/Makefile @@ -83,3 +83,6 @@ deploy: release_remove_parallel_deploy $(INSTALL_DATA) old_versions/*.sql *.sql '$(DESTDIR)$(datadir)/extension/' install: deploy + +reinstall: install + psql -U postgres -d dataservices_db -c "drop extension if exists cdb_dataservices_server; create extension cdb_dataservices_server;" diff --git a/server/extension/cdb_dataservices_server--0.32.0.sql b/server/extension/cdb_dataservices_server--0.32.0.sql index 849c688..4f66a15 100644 --- a/server/extension/cdb_dataservices_server--0.32.0.sql +++ b/server/extension/cdb_dataservices_server--0.32.0.sql @@ -2367,12 +2367,15 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$ with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params): if user_geocoder_config.google_geocoder: - plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) + provider_function = "_cdb_bulk_google_geocode_street_point"; elif user_geocoder_config.heremaps_geocoder: - plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) + provider_function = "_cdb_bulk_heremaps_geocode_street_point"; + elif user_geocoder_config.tomtom_geocoder: + provider_function = "_cdb_bulk_tomtom_geocode_street_point"; else: raise Exception('Requested geocoder is not available') + plan = plpy.prepare("SELECT * FROM cdb_dataservices_server.{}($1, $2, $3); ".format(provider_function), ["text", "text", "jsonb"]) result = plpy.execute(plan, [username, orgname, searches]) return result @@ -2400,6 +2403,23 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$ return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; +CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_tomtom_geocode_street_point(username TEXT, orgname TEXT, searches jsonb) +RETURNS SETOF cdb_dataservices_server.geocoding AS $$ + from cartodb_services import run_street_point_geocoder + from cartodb_services.tools import ServiceManager + from cartodb_services.refactor.service.tomtom_geocoder_config import TomTomGeocoderConfigBuilder + from cartodb_services.tomtom import TomTomBulkGeocoder + from cartodb_services.tools import Logger + import cartodb_services + cartodb_services.init(plpy, GD) + + logger_config = GD["logger_config"] + logger = Logger(logger_config) + service_manager = ServiceManager('geocoder', TomTomGeocoderConfigBuilder, username, orgname, GD) + geocoder = TomTomBulkGeocoder(service_manager.config.tomtom_api_key, service_manager.logger, service_manager.config.service_params) + return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) +$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; + CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_geocode_admin0_polygon(username text, orgname text, country_name text) RETURNS Geometry AS $$ from cartodb_services.metrics import QuotaService diff --git a/server/extension/sql/21_bulk_geocode_street.sql b/server/extension/sql/21_bulk_geocode_street.sql index 6732340..50d5008 100644 --- a/server/extension/sql/21_bulk_geocode_street.sql +++ b/server/extension/sql/21_bulk_geocode_street.sql @@ -24,12 +24,15 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$ with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params): if user_geocoder_config.google_geocoder: - plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) + provider_function = "_cdb_bulk_google_geocode_street_point"; elif user_geocoder_config.heremaps_geocoder: - plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) + provider_function = "_cdb_bulk_heremaps_geocode_street_point"; + elif user_geocoder_config.tomtom_geocoder: + provider_function = "_cdb_bulk_tomtom_geocode_street_point"; else: raise Exception('Requested geocoder is not available') + plan = plpy.prepare("SELECT * FROM cdb_dataservices_server.{}($1, $2, $3); ".format(provider_function), ["text", "text", "jsonb"]) result = plpy.execute(plan, [username, orgname, searches]) return result @@ -57,3 +60,20 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$ return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; +CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_tomtom_geocode_street_point(username TEXT, orgname TEXT, searches jsonb) +RETURNS SETOF cdb_dataservices_server.geocoding AS $$ + from cartodb_services import run_street_point_geocoder + from cartodb_services.tools import ServiceManager + from cartodb_services.refactor.service.tomtom_geocoder_config import TomTomGeocoderConfigBuilder + from cartodb_services.tomtom import TomTomBulkGeocoder + from cartodb_services.tools import Logger + import cartodb_services + cartodb_services.init(plpy, GD) + + logger_config = GD["logger_config"] + logger = Logger(logger_config) + service_manager = ServiceManager('geocoder', TomTomGeocoderConfigBuilder, username, orgname, GD) + geocoder = TomTomBulkGeocoder(service_manager.config.tomtom_api_key, service_manager.logger, service_manager.config.service_params) + return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) +$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; + diff --git a/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py b/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py index 9a688f4..d652891 100644 --- a/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py @@ -24,7 +24,7 @@ class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder): MIN_BATCHED_SEARCH = 100 # Under this, serial will be used JOB_FINAL_STATES = ['completed', 'cancelled', 'deleted', 'failed'] - def __init__(self, app_id, app_code, logger, service_params=None, maxresults=HereMapsGeocoder.DEFAULT_MAXRESULTS): + def __init__(self, app_id, app_code, logger, service_params=None, maxresults=MAX_BATCH_SIZE): HereMapsGeocoder.__init__(self, app_id, app_code, logger, service_params, maxresults) self.session = requests.Session() self.session.mount(self.BATCH_URL, diff --git a/server/lib/python/cartodb_services/cartodb_services/tomtom/__init__.py b/server/lib/python/cartodb_services/cartodb_services/tomtom/__init__.py index 8c5df3c..21d5ced 100644 --- a/server/lib/python/cartodb_services/cartodb_services/tomtom/__init__.py +++ b/server/lib/python/cartodb_services/cartodb_services/tomtom/__init__.py @@ -1,3 +1,4 @@ from geocoder import TomTomGeocoder +from bulk_geocoder import TomTomBulkGeocoder from routing import TomTomRouting, TomTomRoutingResponse from isolines import TomTomIsolines, TomTomIsochronesResponse diff --git a/server/lib/python/cartodb_services/cartodb_services/tomtom/bulk_geocoder.py b/server/lib/python/cartodb_services/cartodb_services/tomtom/bulk_geocoder.py new file mode 100644 index 0000000..8036667 --- /dev/null +++ b/server/lib/python/cartodb_services/cartodb_services/tomtom/bulk_geocoder.py @@ -0,0 +1,114 @@ +import json, requests, time +from requests.adapters import HTTPAdapter +from cartodb_services import StreetPointBulkGeocoder +from cartodb_services.tomtom import TomTomGeocoder +from cartodb_services.tools.exceptions import ServiceException + + +class TomTomBulkGeocoder(TomTomGeocoder, StreetPointBulkGeocoder): + BASE_URL = 'https://api.tomtom.com' + BATCH_URL = BASE_URL + '/search/2/batch.json' + MAX_BATCH_SIZE = 1000000 # From the docs + MAX_STALLED_RETRIES = 100 + BATCH_RETRY_SLEEP_S = 5 + MIN_BATCHED_SEARCH = 10 # Batch API is really fast + READ_TIMEOUT = 60 + CONNECT_TIMEOUT = 10 + MAX_RETRIES = 1 + + def __init__(self, apikey, logger, service_params=None): + TomTomGeocoder.__init__(self, apikey, logger, service_params) + self.connect_timeout = service_params.get('connect_timeout', self.CONNECT_TIMEOUT) + self.read_timeout = service_params.get('read_timeout', self.READ_TIMEOUT) + self.max_retries = service_params.get('max_retries', self.MAX_RETRIES) + self.session = requests.Session() + self.session.headers.update({'Content-Type': 'application/json'}) + self.session.mount(self.BATCH_URL, + HTTPAdapter(max_retries=self.max_retries)) + + def _bulk_geocode(self, searches): + if len(searches) > self.MAX_BATCH_SIZE: + raise Exception("Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE)) + if self._should_use_batch(searches): + self._logger.debug('--> Batch geocode') + return self._batch_geocode(searches) + else: + self._logger.debug('--> Serial geocode') + return self._serial_geocode(searches) + + def _should_use_batch(self, searches): + return len(searches) >= self.MIN_BATCHED_SEARCH + + def _serial_geocode(self, searches): + results = [] + for search in searches: + (search_id, address, city, state, country) = search + address = address.encode('utf-8') if address else None + city = city.encode('utf-8') if city else None + state = state.encode('utf-8') if state else None + country = country.encode('utf-8') if country else None + self._logger.debug('--> Sending serial search: {}'.format(search)) + coordinates = self.geocode(searchtext=address, city=city, + state_province=state, country=country) + self._logger.debug('--> result sent') + results.append((search_id, coordinates, [])) + return results + + def _batch_geocode(self, searches): + location = self._send_batch(searches) + xy_results = self._download_results(location) + results = [] + for s, r in zip(searches, xy_results): + results.append((s[0], r, [])) + self._logger.debug('--> results: {}'.format(results)) + return results + + def _send_batch(self, searches): + body = {'batchItems': [{'query': self._query(s)} for s in searches]} + self._logger.debug('--> {}; Body: {}'.format(self.BATCH_URL, body)) + request_params = { + 'key': self._apikey + } + response = self.session.post(self.BATCH_URL, data=json.dumps(body), + allow_redirects=False, + params=request_params, + timeout=(self.connect_timeout, self.read_timeout)) + self._logger.debug('--> response: {}'.format(response.status_code)) + if response.status_code == 303: + self._logger.debug(response.headers) + return response.headers['Location'] + else: + msg = "Error sending batch: {}; Headers: {}".format( + response.text.encode('utf-8'), response.headers) + self._logger.error(msg) + raise ServiceException(msg, response) + + def _download_results(self, location): + stalled_retries = 0 + while True: + response = self.session.get(self.BASE_URL + location) + if response.status_code == 200: + self._logger.debug('--> Results ready {}'.format(location)) + return self._parse_results(response.json()) + elif response.status_code == 202: + stalled_retries += 1 + if stalled_retries > self.MAX_STALLED_RETRIES: + raise Exception('Too many retries for job {}'.format(location)) + location = response.headers['Location'] + self._logger.debug('--> Waiting for {}'.format(location)) + time.sleep(self.BATCH_RETRY_SLEEP_S) + else: + msg = "Error downloading batch: {}; Headers: {}".format( + response.text.encode('utf-8'), response.headers) + self._logger.error(msg) + raise ServiceException(msg, response) + + def _query(self, search): + (search_id, address, city, state, country) = search + searchtext = ', '.join(filter(None, [address, city, state])) + return self._request_uri(searchtext=searchtext, country=country) + + def _parse_results(self, json_body): + return [self._parse_response(item['statusCode'], item['response']) + for item in json_body['batchItems']] + diff --git a/server/lib/python/cartodb_services/cartodb_services/tomtom/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/tomtom/geocoder.py index 432dd31..b4e82cb 100644 --- a/server/lib/python/cartodb_services/cartodb_services/tomtom/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/tomtom/geocoder.py @@ -9,10 +9,11 @@ from cartodb_services.tools.exceptions import ServiceException from cartodb_services.tools.qps import qps_retry from cartodb_services.tools.normalize import normalize -BASEURI = ('https://api.tomtom.com/search/2/geocode/' - '{searchtext}.JSON' - '?key={apiKey}' - '&limit=1') +HOST = 'https://api.tomtom.com' +API_BASEURI = '/search/2' +REQUEST_BASEURI = ('/geocode/' + '{searchtext}.json' + '?limit=1') ENTRY_RESULTS = 'results' ENTRY_POSITION = 'position' ENTRY_LON = 'lon' @@ -29,21 +30,17 @@ class TomTomGeocoder(Traceable): self._apikey = apikey self._logger = logger - def _uri(self, searchtext, countries=None): - baseuri = BASEURI + '&countrySet={}'.format(countries) \ - if countries else BASEURI - uri = URITemplate(baseuri).expand(apiKey=self._apikey, - searchtext=searchtext.encode('utf-8')) - return uri + def _uri(self, searchtext, country=None): + return HOST + API_BASEURI + \ + self._request_uri(searchtext, country, self._apikey) - def _parse_geocoder_response(self, response): - json_response = json.loads(response) - - if json_response and json_response[ENTRY_RESULTS]: - result = json_response[ENTRY_RESULTS][0] - return self._extract_lng_lat_from_feature(result) - else: - return [] + def _request_uri(self, searchtext, country=None, apiKey=None): + baseuri = REQUEST_BASEURI + if country: + baseuri += '&countrySet={}'.format(country) + baseuri = baseuri + '&key={apiKey}' if apiKey else baseuri + return URITemplate(baseuri).expand(apiKey=apiKey, + searchtext=searchtext.encode('utf-8')) def _extract_lng_lat_from_feature(self, result): position = result[ENTRY_POSITION] @@ -85,19 +82,11 @@ class TomTomGeocoder(Traceable): if state_province: address.append(normalize(state_province)) - uri = self._uri(searchtext=', '.join(address), countries=country) + uri = self._uri(searchtext=', '.join(address), country=country) try: response = requests.get(uri) - - if response.status_code == requests.codes.ok: - return self._parse_geocoder_response(response.text) - elif response.status_code == requests.codes.bad_request: - return [] - elif response.status_code == requests.codes.unprocessable_entity: - return [] - else: - raise ServiceException(response.status_code, response) + return self._parse_response(response.status_code, response.text) except requests.Timeout as te: # In case of timeout we want to stop the job because the server # could be down @@ -110,3 +99,25 @@ class TomTomGeocoder(Traceable): self._logger.error('Error connecting to TomTom geocoding server', exception=ce) return [] + + def _parse_response(self, status_code, text): + if status_code == requests.codes.ok: + return self._parse_geocoder_response(text) + elif status_code == requests.codes.bad_request: + return [] + elif status_code == requests.codes.unprocessable_entity: + return [] + else: + msg = 'Unknown response {}: {}'.format(str(status_code), text) + raise ServiceException(msg, None) + + def _parse_geocoder_response(self, response): + json_response = json.loads(response) \ + if type(response) != dict else response + + if json_response and json_response[ENTRY_RESULTS]: + result = json_response[ENTRY_RESULTS][0] + return self._extract_lng_lat_from_feature(result) + else: + return [] + diff --git a/test/integration/test_street_functions.py b/test/integration/test_street_functions.py index f4a83b2..2060385 100644 --- a/test/integration/test_street_functions.py +++ b/test/integration/test_street_functions.py @@ -71,9 +71,20 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): 'Logroño, Argentina': [-61.69604, -29.50425] } + TOMTOM_POINTS = HERE_POINTS.copy() + TOMTOM_POINTS.update({ + 'Plaza Mayor, Valladolid': [-4.72183, 41.5826], + 'Paseo Zorrilla, Valladolid': [-4.74031, 41.63181], + 'Valladolid': [-4.72838, 41.6542], + 'Valladolid, Spain': [-4.72838, 41.6542], + 'Madrid': [-3.70035, 40.42028], + 'Logroño, Spain': [-2.44998, 42.46592], + }) + FIXTURE_POINTS = { - 'google': GOOGLE_POINTS, - 'heremaps': HERE_POINTS + 'google': GOOGLE_POINTS, + 'heremaps': HERE_POINTS, + 'tomtom': TOMTOM_POINTS } def setUp(self):