diff --git a/server/extension/cdb_dataservices_server--0.32.0.sql b/server/extension/cdb_dataservices_server--0.32.0.sql index 4f66a15..8543e7f 100644 --- a/server/extension/cdb_dataservices_server--0.32.0.sql +++ b/server/extension/cdb_dataservices_server--0.32.0.sql @@ -2372,6 +2372,8 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$ provider_function = "_cdb_bulk_heremaps_geocode_street_point"; elif user_geocoder_config.tomtom_geocoder: provider_function = "_cdb_bulk_tomtom_geocode_street_point"; + elif user_geocoder_config.mapbox_geocoder: + provider_function = "_cdb_bulk_mapbox_geocode_street_point"; else: raise Exception('Requested geocoder is not available') @@ -2420,6 +2422,23 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$ return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; +CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_mapbox_geocode_street_point(username TEXT, orgname TEXT, searches jsonb) +RETURNS SETOF cdb_dataservices_server.geocoding AS $$ + from cartodb_services import run_street_point_geocoder + from cartodb_services.tools import ServiceManager + from cartodb_services.refactor.service.mapbox_geocoder_config import MapboxGeocoderConfigBuilder + from cartodb_services.mapbox import MapboxBulkGeocoder + from cartodb_services.tools import Logger + import cartodb_services + cartodb_services.init(plpy, GD) + + logger_config = GD["logger_config"] + logger = Logger(logger_config) + service_manager = ServiceManager('geocoder', MapboxGeocoderConfigBuilder, username, orgname, GD) + geocoder = MapboxBulkGeocoder(service_manager.config.mapbox_api_key, service_manager.logger, service_manager.config.service_params) + return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) +$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; + CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_geocode_admin0_polygon(username text, orgname text, country_name text) RETURNS Geometry AS $$ from cartodb_services.metrics import QuotaService diff --git a/server/extension/sql/21_bulk_geocode_street.sql b/server/extension/sql/21_bulk_geocode_street.sql index 50d5008..adee498 100644 --- a/server/extension/sql/21_bulk_geocode_street.sql +++ b/server/extension/sql/21_bulk_geocode_street.sql @@ -29,6 +29,8 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$ provider_function = "_cdb_bulk_heremaps_geocode_street_point"; elif user_geocoder_config.tomtom_geocoder: provider_function = "_cdb_bulk_tomtom_geocode_street_point"; + elif user_geocoder_config.mapbox_geocoder: + provider_function = "_cdb_bulk_mapbox_geocode_street_point"; else: raise Exception('Requested geocoder is not available') @@ -77,3 +79,20 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$ return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; +CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_mapbox_geocode_street_point(username TEXT, orgname TEXT, searches jsonb) +RETURNS SETOF cdb_dataservices_server.geocoding AS $$ + from cartodb_services import run_street_point_geocoder + from cartodb_services.tools import ServiceManager + from cartodb_services.refactor.service.mapbox_geocoder_config import MapboxGeocoderConfigBuilder + from cartodb_services.mapbox import MapboxBulkGeocoder + from cartodb_services.tools import Logger + import cartodb_services + cartodb_services.init(plpy, GD) + + logger_config = GD["logger_config"] + logger = Logger(logger_config) + service_manager = ServiceManager('geocoder', MapboxGeocoderConfigBuilder, username, orgname, GD) + geocoder = MapboxBulkGeocoder(service_manager.config.mapbox_api_key, service_manager.logger, service_manager.config.service_params) + return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) +$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; + diff --git a/server/lib/python/cartodb_services/cartodb_services/mapbox/__init__.py b/server/lib/python/cartodb_services/cartodb_services/mapbox/__init__.py index c613cdc..10ca46e 100644 --- a/server/lib/python/cartodb_services/cartodb_services/mapbox/__init__.py +++ b/server/lib/python/cartodb_services/cartodb_services/mapbox/__init__.py @@ -1,4 +1,5 @@ from routing import MapboxRouting, MapboxRoutingResponse from geocoder import MapboxGeocoder +from bulk_geocoder import MapboxBulkGeocoder from isolines import MapboxIsolines, MapboxIsochronesResponse from matrix_client import MapboxMatrixClient diff --git a/server/lib/python/cartodb_services/cartodb_services/mapbox/bulk_geocoder.py b/server/lib/python/cartodb_services/cartodb_services/mapbox/bulk_geocoder.py new file mode 100644 index 0000000..5c502e3 --- /dev/null +++ b/server/lib/python/cartodb_services/cartodb_services/mapbox/bulk_geocoder.py @@ -0,0 +1,86 @@ +import json, requests, time +from requests.adapters import HTTPAdapter +from cartodb_services import StreetPointBulkGeocoder +from cartodb_services.mapbox import MapboxGeocoder +from cartodb_services.tools.exceptions import ServiceException +from iso3166 import countries +from cartodb_services.tools.country import country_to_iso3 + + +class MapboxBulkGeocoder(MapboxGeocoder, StreetPointBulkGeocoder): + MAX_BATCH_SIZE = 50 # From the docs + MIN_BATCHED_SEARCH = 0 + READ_TIMEOUT = 60 + CONNECT_TIMEOUT = 10 + MAX_RETRIES = 1 + + def __init__(self, token, logger, service_params=None): + MapboxGeocoder.__init__(self, token, logger, service_params) + self.connect_timeout = service_params.get('connect_timeout', self.CONNECT_TIMEOUT) + self.read_timeout = service_params.get('read_timeout', self.READ_TIMEOUT) + self.max_retries = service_params.get('max_retries', self.MAX_RETRIES) + self.session = requests.Session() + + def _bulk_geocode(self, searches): + if len(searches) > self.MAX_BATCH_SIZE: + raise Exception("Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE)) + if self._should_use_batch(searches): + self._logger.debug('--> Batch geocode') + return self._batch_geocode(searches) + else: + self._logger.debug('--> Serial geocode') + return self._serial_geocode(searches) + + def _should_use_batch(self, searches): + return len(searches) >= self.MIN_BATCHED_SEARCH + + def _serial_geocode(self, searches): + results = [] + for search in searches: + elements = self._encoded_elements(search) + self._logger.debug('--> Sending serial search: {}'.format(search)) + coordinates = self._geocode_search(*elements) + results.append((search[0], coordinates, [])) + return results + + def _geocode_search(self, address, city, state, country): + coordinates = self.geocode(searchtext=address, city=city, + state_province=state, country=country) + self._logger.debug('--> result sent') + return coordinates + + def _encoded_elements(self, search): + (search_id, address, city, state, country) = search + address = address.encode('utf-8') if address else None + city = city.encode('utf-8') if city else None + state = state.encode('utf-8') if state else None + country = self._country_code(country) if country else None + return address, city, state, country + + def _batch_geocode(self, searches): + if len(searches) == 1: + return self._serial_geocode(searches) + else: + frees = [] + for search in searches: + elements = self._encoded_elements(search) + free = ', '.join([elem for elem in elements if elem]) + frees.append(free) + + self._logger.debug('--> sending free search: {}'.format(frees)) + xy_results = self.geocode_free_text(frees) + results = [] + self._logger.debug('--> searches: {}; xy: {}'.format(searches, xy_results)) + for s, r in zip(searches, xy_results): + results.append((s[0], r, [])) + self._logger.debug('--> results: {}'.format(results)) + return results + + def _country_code(self, country): + country_iso3166 = None + country_iso3 = country_to_iso3(country) + if country_iso3: + country_iso3166 = countries.get(country_iso3).alpha2.lower() + + return country_iso3166 + diff --git a/server/lib/python/cartodb_services/cartodb_services/mapbox/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/mapbox/geocoder.py index a8a9de0..a3d6f93 100644 --- a/server/lib/python/cartodb_services/cartodb_services/mapbox/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/mapbox/geocoder.py @@ -39,17 +39,20 @@ class MapboxGeocoder(Traceable): def _parse_geocoder_response(self, response): json_response = json.loads(response) + self._logger.debug('--> json response: {}'.format(json_response)) - # If Mapbox returns more that one result, take the first one if json_response: - if type(json_response) == list: - json_response = json_response[0] + if type(json_response) != list: + json_response = [json_response] - if json_response[ENTRY_FEATURES]: - feature = json_response[ENTRY_FEATURES][0] - return self._extract_lng_lat_from_feature(feature) - else: - return [] + result = [] + for a_json_response in json_response: + if a_json_response[ENTRY_FEATURES]: + feature = a_json_response[ENTRY_FEATURES][0] + result.append(self._extract_lng_lat_from_feature(feature)) + else: + result.append([]) + return result else: return [] @@ -78,6 +81,13 @@ class MapboxGeocoder(Traceable): @qps_retry(qps=10) def geocode(self, searchtext, city=None, state_province=None, country=None): + """ + :param searchtext: + :param city: + :param state_province: + :param country: Country ISO 3166 code + :return: [x, y] on success, [] on error + """ if not self._validate_input(searchtext, city, state_province, country): return [] @@ -91,8 +101,21 @@ class MapboxGeocoder(Traceable): country = [country] if country else None + free_search = ', '.join(address) + + return self.geocode_free_text([free_search], country)[0] + + @qps_retry(qps=10) + def geocode_free_text(self, free_searches, country=None): + """ + :param free_searches: Free text searches + :param country: Country ISO 3166 code + :return: list of [x, y] on success, [] on error + """ try: - response = self._geocoder.forward(address=', '.join(address).decode('utf-8'), + free_search = ';'.join([self._escape(fs) for fs in free_searches]) + self._logger.debug('--> free search: {}'.format(free_search)) + response = self._geocoder.forward(address=free_search.decode('utf-8'), country=country, limit=1) @@ -110,9 +133,16 @@ class MapboxGeocoder(Traceable): self._logger.error('Timeout connecting to Mapbox geocoding server', te) raise ServiceException('Error geocoding {0} using Mapbox'.format( - searchtext), None) + free_search), None) except requests.ConnectionError as ce: # Don't raise the exception to continue with the geocoding job self._logger.error('Error connecting to Mapbox geocoding server', exception=ce) return [] + + def _escape(self, free_search): + # Semicolon is used to separate batch geocoding; there's no documented + # way to pass actual semicolons, and %3B or ; won't work (check + # TestBulkStreetFunctions.test_semicolon and the docs, + # https://www.mapbox.com/api-documentation/#batch-requests) + return free_search.replace(';', ',') diff --git a/test/integration/test_street_functions.py b/test/integration/test_street_functions.py index 2060385..d1fdbd2 100644 --- a/test/integration/test_street_functions.py +++ b/test/integration/test_street_functions.py @@ -81,10 +81,21 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): 'Logroño, Spain': [-2.44998, 42.46592], }) + MAPBOX_POINTS = GOOGLE_POINTS.copy() + MAPBOX_POINTS.update({ + 'Logroño, Spain': [-2.44556, 42.47], + 'Logroño, Argentina': [-70.687195, -33.470901], # TODO: huge mismatch + 'Valladolid': [-4.72856, 41.652251], + 'Valladolid, Spain': [-4.72856, 41.652251], + '1902 amphitheatre parkway': [-118.03, 34.06], # TODO: huge mismatch + 'Madrid': [-3.69194, 40.4167754], + }) + FIXTURE_POINTS = { 'google': GOOGLE_POINTS, 'heremaps': HERE_POINTS, - 'tomtom': TOMTOM_POINTS + 'tomtom': TOMTOM_POINTS, + 'mapbox': MAPBOX_POINTS } def setUp(self): @@ -251,6 +262,19 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): response = self._run_authenticated(query) assert_equal(1, len(response['rows'])) + def test_semicolon(self): + query = "select *, st_x(the_geom), st_y(the_geom) " \ + "FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \ + "'select * from jsonb_to_recordset(''[" \ + "{\"cartodb_id\": 1, \"address\": \"1900 amphitheatre parkway; mountain view; ca; us\"}," \ + "{\"cartodb_id\": 2, \"address\": \"1900 amphitheatre parkway, mountain view, ca, us\"}" \ + "]''::jsonb) as (cartodb_id integer, address text)', " \ + "'address', null, null, null)" + response = self._run_authenticated(query) + + x_y_by_cartodb_id = self._x_y_by_cartodb_id(response) + assert_equal(x_y_by_cartodb_id[1], x_y_by_cartodb_id[2]) + def _run_authenticated(self, query): authenticated_query = "{}&api_key={}".format(query, self.env_variables[