From 18e2349713bf8e71260a44d2fa9d282484974411 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Ignacio=20S=C3=A1nchez=20Lara?= Date: Fri, 29 Jun 2018 14:59:07 +0200 Subject: [PATCH 1/7] Bulk geocoding refactor --- server/__init__.py | 0 .../cdb_dataservices_server--0.32.0.sql | 56 +++++--------- .../extension/sql/21_bulk_geocode_street.sql | 58 +++++--------- server/lib/__init__.py | 0 server/lib/python/__init__.py | 0 .../lib/python/cartodb_services/__init__.py | 0 .../cartodb_services/__init__.py | 2 + .../cartodb_services/geocoder.py | 76 +++++++++++++++++++ .../cartodb_services/google/geocoder.py | 21 ++--- .../cartodb_services/here/geocoder.py | 10 ++- 10 files changed, 134 insertions(+), 89 deletions(-) create mode 100644 server/__init__.py create mode 100644 server/lib/__init__.py create mode 100644 server/lib/python/__init__.py create mode 100644 server/lib/python/cartodb_services/__init__.py create mode 100644 server/lib/python/cartodb_services/cartodb_services/geocoder.py diff --git a/server/__init__.py b/server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/server/extension/cdb_dataservices_server--0.32.0.sql b/server/extension/cdb_dataservices_server--0.32.0.sql index 44f3d98..1f77d9a 100644 --- a/server/extension/cdb_dataservices_server--0.32.0.sql +++ b/server/extension/cdb_dataservices_server--0.32.0.sql @@ -2367,53 +2367,37 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$ with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params): if user_geocoder_config.google_geocoder: - google_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) - result = plpy.execute(google_plan, [username, orgname, searches]) - return result + plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) + elif user_geocoder_config.heremaps_geocoder: + plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) else: raise Exception('Requested geocoder is not available') + result = plpy.execute(plan, [username, orgname, searches]) + return result + $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_street_point(username TEXT, orgname TEXT, searches jsonb) RETURNS SETOF cdb_dataservices_server.geocoding AS $$ - from cartodb_services.tools import LegacyServiceManager,QuotaExceededException,Logger + from cartodb_services import run_street_point_geocoder + from cartodb_services.tools import LegacyServiceManager from cartodb_services.google import GoogleMapsGeocoder - plpy.execute("SELECT cdb_dataservices_server._get_logger_config()") - logger_config = GD["logger_config"] - - logger = Logger(logger_config) service_manager = LegacyServiceManager('geocoder', username, orgname, GD) + geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) + return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) +$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; - try: - service_manager.assert_within_limits(quota=False) - geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) - geocode_results = geocoder.bulk_geocode(searches=searches) - if geocode_results: - results = [] - for result in geocode_results: - if result[1]: - plan = plpy.prepare("SELECT ST_SetSRID(ST_MakePoint($1, $2), 4326) as the_geom; ", ["double precision", "double precision"]) - point = plpy.execute(plan, result[1], 1)[0] - results.append([result[0], point['the_geom'], None]) - else: - results.append([result[0], None, None]) - service_manager.quota_service.increment_success_service_use(len(results)) - return results - else: - service_manager.quota_service.increment_empty_service_use(len(searches)) - return [] - except QuotaExceededException as qe: - service_manager.quota_service.increment_failed_service_use(len(searches)) - return [] - except BaseException as e: - import sys - service_manager.quota_service.increment_failed_service_use() - service_manager.logger.error('Error trying to bulk geocode street point using google maps', sys.exc_info(), data={"username": username, "orgname": orgname}) - raise Exception('Error trying to bulk geocode street point using google maps') - finally: - service_manager.quota_service.increment_total_service_use() +CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point(username TEXT, orgname TEXT, searches jsonb) +RETURNS SETOF cdb_dataservices_server.geocoding AS $$ + from cartodb_services import run_street_point_geocoder + from cartodb_services.tools import LegacyServiceManager + from cartodb_services.here import HereMapsGeocoder + + service_manager = LegacyServiceManager('geocoder', username, orgname, GD) + geocoder = HereMapsGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params) + return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_geocode_admin0_polygon(username text, orgname text, country_name text) diff --git a/server/extension/sql/21_bulk_geocode_street.sql b/server/extension/sql/21_bulk_geocode_street.sql index 8a9696e..0913117 100644 --- a/server/extension/sql/21_bulk_geocode_street.sql +++ b/server/extension/sql/21_bulk_geocode_street.sql @@ -24,52 +24,36 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$ with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params): if user_geocoder_config.google_geocoder: - google_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) - result = plpy.execute(google_plan, [username, orgname, searches]) - return result + plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) + elif user_geocoder_config.heremaps_geocoder: + plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) else: raise Exception('Requested geocoder is not available') + result = plpy.execute(plan, [username, orgname, searches]) + return result + $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_street_point(username TEXT, orgname TEXT, searches jsonb) RETURNS SETOF cdb_dataservices_server.geocoding AS $$ - from cartodb_services.tools import LegacyServiceManager,QuotaExceededException,Logger + from cartodb_services import run_street_point_geocoder + from cartodb_services.tools import LegacyServiceManager from cartodb_services.google import GoogleMapsGeocoder - plpy.execute("SELECT cdb_dataservices_server._get_logger_config()") - logger_config = GD["logger_config"] - - logger = Logger(logger_config) service_manager = LegacyServiceManager('geocoder', username, orgname, GD) - - try: - service_manager.assert_within_limits(quota=False) - geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) - geocode_results = geocoder.bulk_geocode(searches=searches) - if geocode_results: - results = [] - for result in geocode_results: - if result[1]: - plan = plpy.prepare("SELECT ST_SetSRID(ST_MakePoint($1, $2), 4326) as the_geom; ", ["double precision", "double precision"]) - point = plpy.execute(plan, result[1], 1)[0] - results.append([result[0], point['the_geom'], None]) - else: - results.append([result[0], None, None]) - service_manager.quota_service.increment_success_service_use(len(results)) - return results - else: - service_manager.quota_service.increment_empty_service_use(len(searches)) - return [] - except QuotaExceededException as qe: - service_manager.quota_service.increment_failed_service_use(len(searches)) - return [] - except BaseException as e: - import sys - service_manager.quota_service.increment_failed_service_use() - service_manager.logger.error('Error trying to bulk geocode street point using google maps', sys.exc_info(), data={"username": username, "orgname": orgname}) - raise Exception('Error trying to bulk geocode street point using google maps') - finally: - service_manager.quota_service.increment_total_service_use() + geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) + return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) +$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; + +CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point(username TEXT, orgname TEXT, searches jsonb) +RETURNS SETOF cdb_dataservices_server.geocoding AS $$ + from cartodb_services import run_street_point_geocoder + from cartodb_services.tools import LegacyServiceManager + from cartodb_services.here import HereMapsGeocoder + + service_manager = LegacyServiceManager('geocoder', username, orgname, GD) + geocoder = HereMapsGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params) + return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; diff --git a/server/lib/__init__.py b/server/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/server/lib/python/__init__.py b/server/lib/python/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/server/lib/python/cartodb_services/__init__.py b/server/lib/python/cartodb_services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/server/lib/python/cartodb_services/cartodb_services/__init__.py b/server/lib/python/cartodb_services/cartodb_services/__init__.py index 61670c2..ef5caa4 100644 --- a/server/lib/python/cartodb_services/cartodb_services/__init__.py +++ b/server/lib/python/cartodb_services/cartodb_services/__init__.py @@ -33,3 +33,5 @@ def _reset(): plpy = None GD = None + +from geocoder import run_street_point_geocoder, StreetPointBulkGeocoder diff --git a/server/lib/python/cartodb_services/cartodb_services/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/geocoder.py new file mode 100644 index 0000000..96679fd --- /dev/null +++ b/server/lib/python/cartodb_services/cartodb_services/geocoder.py @@ -0,0 +1,76 @@ +#!/usr/local/bin/python +# -*- coding: utf-8 -*- + +from tools import QuotaExceededException, Logger +from collections import namedtuple +import json + + +def run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches): + plpy.execute("SELECT cdb_dataservices_server._get_logger_config()") + logger_config = GD["logger_config"] + + logger = Logger(logger_config) + + try: + service_manager.assert_within_limits(quota=False) + geocode_results = geocoder.bulk_geocode(searches=searches) + if geocode_results: + results = [] + for result in geocode_results: + if result[1]: + plan = plpy.prepare("SELECT ST_SetSRID(ST_MakePoint($1, $2), 4326) as the_geom; ", ["double precision", "double precision"]) + point = plpy.execute(plan, result[1], 1)[0] + results.append([result[0], point['the_geom'], None]) + else: + results.append([result[0], None, None]) + service_manager.quota_service.increment_success_service_use(len(results)) + return results + else: + service_manager.quota_service.increment_empty_service_use(len(searches)) + return [] + except QuotaExceededException as qe: + service_manager.quota_service.increment_failed_service_use(len(searches)) + return [] + except BaseException as e: + import sys + service_manager.quota_service.increment_failed_service_use() + service_manager.logger.error('Error trying to bulk geocode street point', sys.exc_info(), data={"username": username, "orgname": orgname}) + raise Exception('Error trying to bulk geocode street') + finally: + service_manager.quota_service.increment_total_service_use() + + +StreetGeocoderSearch = namedtuple('StreetGeocoderSearch', 'id address city state country') + + +class StreetPointBulkGeocoder: + """ + Classes extending StreetPointBulkGeocoder should implement: + * _bulk_geocode(decoded_searches) + """ + + SEARCH_KEYS = ['id', 'address', 'city', 'state', 'country'] + + def bulk_geocode(self, searches): + """ + :param searches: array of StreetGeocoderSearch + :return: array of tuples with three elements: + * id + * latitude and longitude (array of two elements) + * empty array (future use: metadata) + """ + try: + decoded_searches = json.loads(searches) + except Exception as e: + self._logger.error('General error', exception=e) + raise e + + street_geocoder_searches = [] + for search in decoded_searches: + search_id, address, city, state, country = \ + [search.get(k, None) for k in self.SEARCH_KEYS] + street_geocoder_searches.append( + (search_id, address, city, state, country)) + + return self._bulk_geocode(street_geocoder_searches) diff --git a/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py index a51c990..1cc61c9 100644 --- a/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py @@ -5,13 +5,12 @@ import googlemaps from urlparse import parse_qs from exceptions import MalformedResult +from cartodb_services import StreetPointBulkGeocoder from cartodb_services.google.exceptions import InvalidGoogleCredentials from client_factory import GoogleMapsClientFactory from multiprocessing import Pool, TimeoutError -import json - import time, random def async_geocoder(geocoder, address, components): @@ -22,10 +21,9 @@ def async_geocoder(geocoder, address, components): results = geocoder.geocode(address=address, components=components) return results if results else [] -class GoogleMapsGeocoder: +class GoogleMapsGeocoder(StreetPointBulkGeocoder): """A Google Maps Geocoder wrapper for python""" PARALLEL_PROCESSES = 13 - SEARCH_KEYS = ['id', 'address', 'city', 'state', 'country'] def __init__(self, client_id, client_secret, logger): if client_id is None: @@ -48,18 +46,11 @@ class GoogleMapsGeocoder: except KeyError: raise MalformedResult() - def bulk_geocode(self, searches): - try: - decoded_searches = json.loads(searches) - except Exception as e: - self._logger.error('General error', exception=e) - raise e - + def _bulk_geocode(self, searches): bulk_results = {} pool = Pool(processes=self.PARALLEL_PROCESSES) - for search in decoded_searches: - search_id, address, city, state, country = \ - [search.get(k, None) for k in self.SEARCH_KEYS] + for search in searches: + (search_id, address, city, state, country) = search opt_params = self._build_optional_parameters(city, state, country) # Geocoding works better if components are also inside the address address = ', '.join(filter(None, [address, city, state, country])) @@ -83,7 +74,7 @@ class GoogleMapsGeocoder: result = [] lng_lat = self._extract_lng_lat_from_result(result[0]) if result else [] - results.append([search_id, lng_lat, []]) + results.append((search_id, lng_lat, [])) return results except KeyError as e: self._logger.error('KeyError error', exception=e) diff --git a/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py index 6c3b81c..b72f77c 100644 --- a/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py @@ -6,10 +6,11 @@ import requests from requests.adapters import HTTPAdapter from exceptions import * +from cartodb_services import StreetPointBulkGeocoder from cartodb_services.metrics import Traceable -class HereMapsGeocoder(Traceable): +class HereMapsGeocoder(Traceable, StreetPointBulkGeocoder): 'A Here Maps Geocoder wrapper for python' PRODUCTION_GEOCODE_JSON_URL = 'https://geocoder.api.here.com/6.2/geocode.json' @@ -64,6 +65,13 @@ class HereMapsGeocoder(Traceable): self.read_timeout = service_params.get('read_timeout', self.READ_TIMEOUT) self.max_retries = service_params.get('max_retries', self.MAX_RETRIES) + def _bulk_geocode(self, searches): + results = [] + for search in searches: + result = () + return results + coordinates = geocoder.geocode(searchtext=searchtext, city=city, state=state_province, country=country) + def geocode(self, **kwargs): params = {} for key, value in kwargs.iteritems(): From fc610313bf807911cdf741b8b0aeafce4c8f122c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Ignacio=20S=C3=A1nchez=20Lara?= Date: Fri, 29 Jun 2018 19:18:53 +0200 Subject: [PATCH 2/7] Test refactor and Here serial batch --- .../cartodb_services/here/geocoder.py | 5 +- test/integration/test_street_functions.py | 126 ++++++++++++------ 2 files changed, 85 insertions(+), 46 deletions(-) diff --git a/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py index b72f77c..efef8b1 100644 --- a/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py @@ -68,9 +68,10 @@ class HereMapsGeocoder(Traceable, StreetPointBulkGeocoder): def _bulk_geocode(self, searches): results = [] for search in searches: - result = () + (search_id, address, city, state, country) = search + coordinates = self.geocode(searchtext=address, city=city, state=state, country=country) + results.append((search_id, coordinates, [])) return results - coordinates = geocoder.geocode(searchtext=searchtext, city=city, state=state_province, country=country) def geocode(self, **kwargs): params = {} diff --git a/test/integration/test_street_functions.py b/test/integration/test_street_functions.py index 6634627..db80f75 100644 --- a/test/integration/test_street_functions.py +++ b/test/integration/test_street_functions.py @@ -40,6 +40,52 @@ class TestStreetFunctions(TestStreetFunctionsSetUp): class TestBulkStreetFunctions(TestStreetFunctionsSetUp): + provider = None + fixture_points = None + + GOOGLE_POINTS = { + 'Plaza Mayor, Valladolid': [-4.728252, 41.6517025], + 'Paseo Zorrilla, Valladolid': [-4.7404453, 41.6314339], + '1900 amphitheatre parkway': [-122.0875324, 37.4227968], + '1901 amphitheatre parkway': [-122.0885504, 37.4238657], + '1902 amphitheatre parkway': [-122.0876674, 37.4235729], + 'Valladolid': [-4.7245321, 41.652251], + 'Valladolid, Spain': [-4.7245321, 41.652251], + 'Valladolid, Mexico': [-88.2022488, 20.68964], + 'Madrid': [-3.7037902, 40.4167754], + 'Logroño, Spain': [-2.4449852, 42.4627195], + 'Logroño, Argentina': [-61.6961807, -29.5031057] + } + + HERE_POINTS = { + 'Plaza Mayor, Valladolid': [-4.72979, 41.65258], + 'Paseo Zorrilla, Valladolid': [-4.73869, 41.63817], + '1900 amphitheatre parkway': [-122.0879468, 37.4234763], + '1901 amphitheatre parkway': [-122.0879253, 37.4238725], + '1902 amphitheatre parkway': [-122.0879531, 37.4234775], + 'Valladolid': [-4.73214, 41.6542], + 'Valladolid, Spain': [-4.73214, 41.6542], + 'Valladolid, Mexico': [-88.20117, 20.69021], + 'Madrid': [-3.70578, 40.42028], + 'Logroño, Spain': [-2.45194, 42.46592], + 'Logroño, Argentina': [-61.69604, -29.50425] + } + + FIXTURE_POINTS = { + 'google': GOOGLE_POINTS, + 'heremaps': HERE_POINTS + } + + def setUp(self): + TestStreetFunctionsSetUp.setUp(self) + + if not self.fixture_points: + query = "select provider from " \ + "cdb_dataservices_client.cdb_service_quota_info() " \ + "where service = 'hires_geocoder'" + response = self._run_authenticated(query) + provider = response['rows'][0]['provider'] + self.fixture_points = self.FIXTURE_POINTS[provider] def test_full_spec(self): query = "select cartodb_id, st_x(the_geom), st_y(the_geom) " \ @@ -54,11 +100,11 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): ", 'street', 'city', 'state', 'country')" response = self._run_authenticated(query) - assert_equal(response['total_rows'], 2) - - row_by_cartodb_id = self._row_by_cartodb_id(response) - self._assert_x_y(row_by_cartodb_id[1], -4.728252, 41.6517025); - self._assert_x_y(row_by_cartodb_id[2], -4.7404453, 41.6314339) + points_by_cartodb_id = { + 1: self.fixture_points['Plaza Mayor, Valladolid'], + 2: self.fixture_points['Paseo Zorrilla, Valladolid'] + } + assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) def test_empty_columns(self): query = "select *, st_x(the_geom), st_y(the_geom) " \ @@ -69,10 +115,8 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): "'address', '''''', '''''', '''''')" response = self._run_authenticated(query) - assert_equal(response['total_rows'], 1) - - row_by_cartodb_id = self._row_by_cartodb_id(response) - self._assert_x_y(row_by_cartodb_id[1], -122.0885504, 37.4238657) + assert_equal(self._x_y_by_cartodb_id(response)[1], + self.fixture_points['1901 amphitheatre parkway']) def test_null_columns(self): query = "select *, st_x(the_geom), st_y(the_geom) " \ @@ -83,10 +127,8 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): "'address')" response = self._run_authenticated(query) - assert_equal(response['total_rows'], 1) - - row_by_cartodb_id = self._row_by_cartodb_id(response) - self._assert_x_y(row_by_cartodb_id[1], -122.0885504, 37.4238657) + assert_equal(self._x_y_by_cartodb_id(response)[1], + self.fixture_points['1901 amphitheatre parkway']) def test_batching(self): query = "select *, st_x(the_geom), st_y(the_geom) " \ @@ -99,12 +141,12 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): "'address', null, null, null, 2)" response = self._run_authenticated(query) - assert_equal(response['total_rows'], 3) - - row_by_cartodb_id = self._row_by_cartodb_id(response) - self._assert_x_y(row_by_cartodb_id[1], -122.0875324, 37.4227968) - self._assert_x_y(row_by_cartodb_id[2], -122.0885504, 37.4238657) - self._assert_x_y(row_by_cartodb_id[3], -122.0876674, 37.4235729) + points_by_cartodb_id = { + 1: self.fixture_points['1900 amphitheatre parkway'], + 2: self.fixture_points['1901 amphitheatre parkway'], + 3: self.fixture_points['1902 amphitheatre parkway'], + } + assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) def test_city_column_geocoding(self): query = "select *, st_x(the_geom), st_y(the_geom) " \ @@ -118,9 +160,11 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): assert_equal(response['total_rows'], 2) - row_by_cartodb_id = self._row_by_cartodb_id(response) - self._assert_x_y(row_by_cartodb_id[1], -4.7245321, 41.652251) - self._assert_x_y(row_by_cartodb_id[2], -3.7037902, 40.4167754) + points_by_cartodb_id = { + 1: self.fixture_points['Valladolid'], + 2: self.fixture_points['Madrid'] + } + assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) def test_free_text_geocoding(self): query = "select *, st_x(the_geom), st_y(the_geom) " \ @@ -132,10 +176,8 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): "'''Logroño, La Rioja, Spain''')" response = self._run_authenticated(query) - assert_equal(response['total_rows'], 1) - - row_by_cartodb_id = self._row_by_cartodb_id(response) - self._assert_x_y(row_by_cartodb_id[1], -2.4449852, 42.4627195) + assert_equal(self._x_y_by_cartodb_id(response)[1], + self.fixture_points['Logroño, Spain']) def test_templating_geocoding(self): query = "SELECT cartodb_id, st_x(the_geom), st_y(the_geom) from " \ @@ -149,11 +191,11 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): "'city || '', '' || ''Argentina''')" response = self._run_authenticated(query) - assert_equal(response['total_rows'], 2) - - row_by_cartodb_id = self._row_by_cartodb_id(response) - self._assert_x_y(row_by_cartodb_id[1], -2.4449852, 42.4627195) - self._assert_x_y(row_by_cartodb_id[2], -61.6961807, -29.5031057) + points_by_cartodb_id = { + 1: self.fixture_points['Logroño, Spain'], + 2: self.fixture_points['Logroño, Argentina'] + } + assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) def test_template_with_two_columns_geocoding(self): query = "SELECT cartodb_id, st_x(the_geom), st_y(the_geom) from " \ @@ -165,14 +207,12 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): " ') _x'," \ "'city || '', '' || country')" response = self._run_authenticated(query) - # from nose.tools import set_trace; set_trace() - - assert_equal(response['total_rows'], 2) - - row_by_cartodb_id = self._row_by_cartodb_id(response) - self._assert_x_y(row_by_cartodb_id[1], -88.2022488, 20.68964) - self._assert_x_y(row_by_cartodb_id[2], -4.7245321, 41.652251) + points_by_cartodb_id = { + 1: self.fixture_points['Valladolid, Mexico'], + 2: self.fixture_points['Valladolid, Spain'] + } + assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) def _run_authenticated(self, query): authenticated_query = "{}&api_key={}".format(query, @@ -181,10 +221,8 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): return IntegrationTestHelper.execute_query_raw(self.sql_api_url, authenticated_query) - def _row_by_cartodb_id(self, response): - return {r['cartodb_id']: r for r in response['rows']} - - def _assert_x_y(self, row, expected_x, expected_y): - assert_equal(row['st_x'], expected_x) - assert_equal(row['st_y'], expected_y) + @staticmethod + def _x_y_by_cartodb_id(response): + return {r['cartodb_id']: [r['st_x'], r['st_y']] + for r in response['rows']} From e416a8a641071183a1313d90dca2dff65f7d2ab5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Ignacio=20S=C3=A1nchez=20Lara?= Date: Mon, 2 Jul 2018 18:35:36 +0200 Subject: [PATCH 3/7] HERE batch support --- client/cdb_dataservices_client--0.25.0.sql | 2 +- client/sql/21_bulk_geocoding_functions.sql | 2 +- .../cdb_dataservices_server--0.32.0.sql | 4 +- .../extension/sql/21_bulk_geocode_street.sql | 4 +- .../cartodb_services/geocoder.py | 2 +- .../cartodb_services/here/__init__.py | 1 + .../cartodb_services/here/bulk_geocoder.py | 153 ++++++++++++++++++ .../cartodb_services/here/geocoder.py | 11 +- test/integration/test_street_functions.py | 19 +++ 9 files changed, 181 insertions(+), 17 deletions(-) create mode 100644 server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py diff --git a/client/cdb_dataservices_client--0.25.0.sql b/client/cdb_dataservices_client--0.25.0.sql index 2e906d7..0742f43 100644 --- a/client/cdb_dataservices_client--0.25.0.sql +++ b/client/cdb_dataservices_client--0.25.0.sql @@ -1997,7 +1997,7 @@ DECLARE cartodb_id_batch integer; batches_n integer; DEFAULT_BATCH_SIZE CONSTANT numeric := 100; - MAX_BATCH_SIZE CONSTANT numeric := 1000; + MAX_BATCH_SIZE CONSTANT numeric := 10000; current_row_count integer ; temp_table_name text; diff --git a/client/sql/21_bulk_geocoding_functions.sql b/client/sql/21_bulk_geocoding_functions.sql index 4314e7c..d0bcf5e 100644 --- a/client/sql/21_bulk_geocoding_functions.sql +++ b/client/sql/21_bulk_geocoding_functions.sql @@ -9,7 +9,7 @@ DECLARE cartodb_id_batch integer; batches_n integer; DEFAULT_BATCH_SIZE CONSTANT numeric := 100; - MAX_BATCH_SIZE CONSTANT numeric := 1000; + MAX_BATCH_SIZE CONSTANT numeric := 10000; current_row_count integer ; temp_table_name text; diff --git a/server/extension/cdb_dataservices_server--0.32.0.sql b/server/extension/cdb_dataservices_server--0.32.0.sql index 1f77d9a..849c688 100644 --- a/server/extension/cdb_dataservices_server--0.32.0.sql +++ b/server/extension/cdb_dataservices_server--0.32.0.sql @@ -2393,10 +2393,10 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_st RETURNS SETOF cdb_dataservices_server.geocoding AS $$ from cartodb_services import run_street_point_geocoder from cartodb_services.tools import LegacyServiceManager - from cartodb_services.here import HereMapsGeocoder + from cartodb_services.here import HereMapsBulkGeocoder service_manager = LegacyServiceManager('geocoder', username, orgname, GD) - geocoder = HereMapsGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params) + geocoder = HereMapsBulkGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params) return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; diff --git a/server/extension/sql/21_bulk_geocode_street.sql b/server/extension/sql/21_bulk_geocode_street.sql index 0913117..6732340 100644 --- a/server/extension/sql/21_bulk_geocode_street.sql +++ b/server/extension/sql/21_bulk_geocode_street.sql @@ -50,10 +50,10 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_st RETURNS SETOF cdb_dataservices_server.geocoding AS $$ from cartodb_services import run_street_point_geocoder from cartodb_services.tools import LegacyServiceManager - from cartodb_services.here import HereMapsGeocoder + from cartodb_services.here import HereMapsBulkGeocoder service_manager = LegacyServiceManager('geocoder', username, orgname, GD) - geocoder = HereMapsGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params) + geocoder = HereMapsBulkGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params) return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; diff --git a/server/lib/python/cartodb_services/cartodb_services/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/geocoder.py index 96679fd..d82768a 100644 --- a/server/lib/python/cartodb_services/cartodb_services/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/geocoder.py @@ -71,6 +71,6 @@ class StreetPointBulkGeocoder: search_id, address, city, state, country = \ [search.get(k, None) for k in self.SEARCH_KEYS] street_geocoder_searches.append( - (search_id, address, city, state, country)) + StreetGeocoderSearch(search_id, address, city, state, country)) return self._bulk_geocode(street_geocoder_searches) diff --git a/server/lib/python/cartodb_services/cartodb_services/here/__init__.py b/server/lib/python/cartodb_services/cartodb_services/here/__init__.py index 7c20cc5..85cc596 100644 --- a/server/lib/python/cartodb_services/cartodb_services/here/__init__.py +++ b/server/lib/python/cartodb_services/cartodb_services/here/__init__.py @@ -1,2 +1,3 @@ from geocoder import HereMapsGeocoder +from bulk_geocoder import HereMapsBulkGeocoder from routing import HereMapsRoutingIsoline diff --git a/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py b/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py new file mode 100644 index 0000000..3a28968 --- /dev/null +++ b/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py @@ -0,0 +1,153 @@ +#!/usr/local/bin/python +# -*- coding: utf-8 -*- + + +import requests, time, zipfile, io, csv, cStringIO +import xml.etree.ElementTree as ET +from collections import namedtuple +from requests.adapters import HTTPAdapter +from cartodb_services import StreetPointBulkGeocoder +from cartodb_services.here import HereMapsGeocoder +from cartodb_services.metrics import Traceable +from cartodb_services.tools.exceptions import ServiceException + + +HereJobStatus = namedtuple('HereJobStatus', 'total_count processed_count status') + +class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder): + BATCH_URL = 'https://batch.geocoder.cit.api.here.com/6.2/jobs' + MAX_BATCH_SIZE = 1000000 # From the docs + # https://developer.here.com/documentation/batch-geocoder/topics/read-batch-request-output.html + META_COLS = ['relevance', 'matchType', 'matchCode', 'matchLevel', 'matchQualityStreet'] + MAX_STALLED_RETRIES = 100 + BATCH_RETRY_SLEEP_S = 5 + MIN_BATCHED_SEARCH = 100 # Under this, serial will be used + JOB_FINAL_STATES = ['completed', 'cancelled', 'deleted', 'failed'] + + def __init__(self, app_id, app_code, logger, service_params=None, maxresults=HereMapsGeocoder.DEFAULT_MAXRESULTS): + HereMapsGeocoder.__init__(self, app_id, app_code, logger, service_params, maxresults) + self.session = requests.Session() + self.session.mount(self.BATCH_URL, + HTTPAdapter(max_retries=self.max_retries)) + self.credentials_params = { + 'app_id': self.app_id, + 'app_code': self.app_code, + } + + def _bulk_geocode(self, searches): + if len(searches) > self.MAX_STALLED_RETRIES: + raise "Batch size can't be larger than {}".format(self.MAX_STALLED_RETRIES) + if self._should_use_batch(searches): + self._logger.debug('--> Batch geocode') + return self._batch_geocode(searches) + else: + self._logger.debug('--> Serial geocode') + return self._serial_geocode(searches) + + def _should_use_batch(self, searches): + return len(searches) >= self.MIN_BATCHED_SEARCH + + def _serial_geocode(self, searches): + results = [] + for search in searches: + (search_id, address, city, state, country) = search + coordinates = self.geocode(searchtext=address, city=city, state=state, country=country) + results.append((search_id, coordinates, [])) + return results + + def _batch_geocode(self, searches): + request_id = self._send_batch(self._searches_to_csv(searches)) + self._logger.debug('--> Sent batch {}'.format(request_id)) + + last_processed = 0 + stalled_retries = 0 + # https://developer.here.com/documentation/batch-geocoder/topics/job-status.html + while True: + job_info = self._job_status(request_id) + if job_info.processed_count == last_processed: + stalled_retries += 1 + if stalled_retries > self.MAX_STALLED_RETRIES: + raise Exception('Too many retries for job {}'.format(request_id)) + else: + stalled_retries = 0 + last_processed = job_info.processed_count + + self._logger.debug('--> Job poll check: {}'.format(job_info)) + if job_info.status in self.JOB_FINAL_STATES: + break + else: + time.sleep(self.BATCH_RETRY_SLEEP_S) + + self._logger.debug('--> Job complete: {}'.format(job_info)) + + results = self._download_results(request_id) + self._logger.debug('--> Results: {} rows; {}'.format(len(results), results)) + + return results + + def _searches_to_csv(self, searches): + queue = cStringIO.StringIO() + writer = csv.writer(queue, delimiter='|') + writer.writerow(['recId', 'searchText', 'country']) + + for search in searches: + fields = [search.address, search.city, search.state] + search_text = ', '.join(filter(None, fields)) + row = [s.encode("utf-8") + for s in [str(search.id), search_text, search.country]] + writer.writerow(row) + + return queue.getvalue() + + def _send_batch(self, data): + cols = 'displayLatitude,displayLongitude,' + ','.join(self.META_COLS) + request_params = self.credentials_params.copy() + request_params.update({ + 'gen': 8, + 'action': 'run', + #'mailto': 'juanignaciosl@carto.com', + 'header': 'true', + 'inDelim': '|', + 'outDelim': '|', + 'outCols': cols, + 'outputcombined': 'true' + }) + + response = self.session.post(self.BATCH_URL, data=data, + params=request_params, + timeout=(self.connect_timeout, self.read_timeout)) + + if response.status_code == 200: + root = ET.fromstring(response.text) + return root.find('./Response/MetaInfo/RequestId').text + else: + raise ServiceException("Error sending HERE batch", response) + + def _job_status(self, request_id): + polling_params = self.credentials_params.copy() + polling_params.update({'action': 'status'}) + polling_r = self.session.get("{}/{}".format(self.BATCH_URL, request_id), + params=polling_params, + timeout=(self.connect_timeout, self.read_timeout)) + polling_root = ET.fromstring(polling_r.text) + return HereJobStatus( + total_count=polling_root.find('./Response/TotalCount').text, + processed_count=polling_root.find('./Response/ProcessedCount').text, + status=polling_root.find('./Response/Status').text) + + def _download_results(self, job_id): + result_r = self.session.get("{}/{}/result".format(self.BATCH_URL, job_id), + params=self.credentials_params, + timeout=(self.connect_timeout, self.read_timeout)) + root_zip = zipfile.ZipFile(io.BytesIO(result_r.content)) + + results = [] + for name in root_zip.namelist(): + if name.endswith('_out.txt'): + reader = csv.DictReader(root_zip.open(name), delimiter='|') + for row in reader: + if row['SeqNumber'] == '1': # First per requested data + results.append((row['recId'], [row['displayLongitude'], row['displayLatitude']])) + + return results + diff --git a/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py index efef8b1..6c3b81c 100644 --- a/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py @@ -6,11 +6,10 @@ import requests from requests.adapters import HTTPAdapter from exceptions import * -from cartodb_services import StreetPointBulkGeocoder from cartodb_services.metrics import Traceable -class HereMapsGeocoder(Traceable, StreetPointBulkGeocoder): +class HereMapsGeocoder(Traceable): 'A Here Maps Geocoder wrapper for python' PRODUCTION_GEOCODE_JSON_URL = 'https://geocoder.api.here.com/6.2/geocode.json' @@ -65,14 +64,6 @@ class HereMapsGeocoder(Traceable, StreetPointBulkGeocoder): self.read_timeout = service_params.get('read_timeout', self.READ_TIMEOUT) self.max_retries = service_params.get('max_retries', self.MAX_RETRIES) - def _bulk_geocode(self, searches): - results = [] - for search in searches: - (search_id, address, city, state, country) = search - coordinates = self.geocode(searchtext=address, city=city, state=state, country=country) - results.append((search_id, coordinates, [])) - return results - def geocode(self, **kwargs): params = {} for key, value in kwargs.iteritems(): diff --git a/test/integration/test_street_functions.py b/test/integration/test_street_functions.py index db80f75..45bda56 100644 --- a/test/integration/test_street_functions.py +++ b/test/integration/test_street_functions.py @@ -214,6 +214,25 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): } assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) + def test_large_batches(self): + """ + Useful just to test a good batch size + """ + n = 10 + streets = [] + for i in range(0, n): + streets.append('{{"cartodb_id": {}, "address": "{} Yonge Street, ' \ + 'Toronto, Canada"}}'.format(i, i)) + + query = "select *, st_x(the_geom), st_y(the_geom) " \ + "FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \ + "'select * from jsonb_to_recordset(''[" \ + "{}" \ + "]''::jsonb) as (cartodb_id integer, address text)', " \ + "'address', null, null, null, {})".format(','.join(streets), n) + response = self._run_authenticated(query) + assert_equal(n - 1, len(response['rows'])) + def _run_authenticated(self, query): authenticated_query = "{}&api_key={}".format(query, self.env_variables[ From 9856adb7ce6ed9d254473b629ec8603f276e74e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Ignacio=20S=C3=A1nchez=20Lara?= Date: Wed, 4 Jul 2018 12:32:52 +0200 Subject: [PATCH 4/7] Explicit NotImplementedError --- .../python/cartodb_services/cartodb_services/geocoder.py | 8 +++++++- .../cartodb_services/here/bulk_geocoder.py | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/server/lib/python/cartodb_services/cartodb_services/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/geocoder.py index d82768a..d3c1c60 100644 --- a/server/lib/python/cartodb_services/cartodb_services/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/geocoder.py @@ -47,7 +47,7 @@ StreetGeocoderSearch = namedtuple('StreetGeocoderSearch', 'id address city state class StreetPointBulkGeocoder: """ Classes extending StreetPointBulkGeocoder should implement: - * _bulk_geocode(decoded_searches) + * _bulk_geocode(street_geocoder_searches) """ SEARCH_KEYS = ['id', 'address', 'city', 'state', 'country'] @@ -74,3 +74,9 @@ class StreetPointBulkGeocoder: StreetGeocoderSearch(search_id, address, city, state, country)) return self._bulk_geocode(street_geocoder_searches) + + def _bulk_geocode(self, street_geocoder_searches): + """ + Subclasses must implement _bulk_geocode + """ + raise NotImplementedError('Subclasses must implement _bulk_geocode') diff --git a/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py b/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py index 3a28968..dfc017b 100644 --- a/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py @@ -16,12 +16,12 @@ HereJobStatus = namedtuple('HereJobStatus', 'total_count processed_count status' class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder): BATCH_URL = 'https://batch.geocoder.cit.api.here.com/6.2/jobs' - MAX_BATCH_SIZE = 1000000 # From the docs + MAX_BATCH_SIZE = 1000000 # From the docs # https://developer.here.com/documentation/batch-geocoder/topics/read-batch-request-output.html META_COLS = ['relevance', 'matchType', 'matchCode', 'matchLevel', 'matchQualityStreet'] MAX_STALLED_RETRIES = 100 BATCH_RETRY_SLEEP_S = 5 - MIN_BATCHED_SEARCH = 100 # Under this, serial will be used + MIN_BATCHED_SEARCH = 100 # Under this, serial will be used JOB_FINAL_STATES = ['completed', 'cancelled', 'deleted', 'failed'] def __init__(self, app_id, app_code, logger, service_params=None, maxresults=HereMapsGeocoder.DEFAULT_MAXRESULTS): From 754c364d22e773b00a763125609e637c828850d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Ignacio=20S=C3=A1nchez=20Lara?= Date: Wed, 4 Jul 2018 13:24:11 +0200 Subject: [PATCH 5/7] Reduce precision on fixture points --- test/helpers/integration_test_helper.py | 25 +++++++++++++++++++++++ test/integration/test_street_functions.py | 22 ++++++++++++-------- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/test/helpers/integration_test_helper.py b/test/helpers/integration_test_helper.py index 7fdcd38..df050de 100644 --- a/test/helpers/integration_test_helper.py +++ b/test/helpers/integration_test_helper.py @@ -2,6 +2,29 @@ import os import requests import json +from nose.tools import assert_true + + +# From https://www.python.org/dev/peps/pep-0485/#proposed-implementation +def isclose(a, b, rel_tol=1e-09, abs_tol=0.0): + return abs(a-b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol) + + +def assert_close_enough(xy_a, xy_b, rel_tol=0.0001, abs_tol=0.0005): + """ + Asserts that the given points are "close enough", in a square. + :param xy_a: Array of 2 elements, X and Y. + :param xy_b: Array of 2 elements, X and Y. + :param rel_tol: Relative tolerance. Default: 0.001 (0.1%). + :param abs_tol: Absolute tolerance. Default: 0.0005. + """ + + for i in [0, 1]: + assert_true(isclose(xy_a[i], xy_b[i], rel_tol, abs_tol), + "Coord {} error: {} and {} are not closer than {}, {}".format( + i, xy_a[i], xy_b[i], rel_tol, abs_tol + )) + class IntegrationTestHelper: @@ -34,3 +57,5 @@ class IntegrationTestHelper: @classmethod def execute_query(cls, sql_api_url, query): return cls.execute_query_raw(sql_api_url, query)['rows'][0] + + diff --git a/test/integration/test_street_functions.py b/test/integration/test_street_functions.py index 45bda56..420e6a2 100644 --- a/test/integration/test_street_functions.py +++ b/test/integration/test_street_functions.py @@ -5,7 +5,7 @@ from unittest import TestCase from nose.tools import assert_raises from nose.tools import assert_not_equal, assert_equal from ..helpers.integration_test_helper import IntegrationTestHelper - +from ..helpers.integration_test_helper import assert_close_enough class TestStreetFunctionsSetUp(TestCase): @@ -104,7 +104,7 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): 1: self.fixture_points['Plaza Mayor, Valladolid'], 2: self.fixture_points['Paseo Zorrilla, Valladolid'] } - assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) + self.assert_close_points(self._x_y_by_cartodb_id(response), points_by_cartodb_id) def test_empty_columns(self): query = "select *, st_x(the_geom), st_y(the_geom) " \ @@ -115,7 +115,7 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): "'address', '''''', '''''', '''''')" response = self._run_authenticated(query) - assert_equal(self._x_y_by_cartodb_id(response)[1], + assert_close_enough(self._x_y_by_cartodb_id(response)[1], self.fixture_points['1901 amphitheatre parkway']) def test_null_columns(self): @@ -127,7 +127,7 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): "'address')" response = self._run_authenticated(query) - assert_equal(self._x_y_by_cartodb_id(response)[1], + assert_close_enough(self._x_y_by_cartodb_id(response)[1], self.fixture_points['1901 amphitheatre parkway']) def test_batching(self): @@ -146,7 +146,7 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): 2: self.fixture_points['1901 amphitheatre parkway'], 3: self.fixture_points['1902 amphitheatre parkway'], } - assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) + self.assert_close_points(self._x_y_by_cartodb_id(response), points_by_cartodb_id) def test_city_column_geocoding(self): query = "select *, st_x(the_geom), st_y(the_geom) " \ @@ -164,7 +164,7 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): 1: self.fixture_points['Valladolid'], 2: self.fixture_points['Madrid'] } - assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) + self.assert_close_points(self._x_y_by_cartodb_id(response), points_by_cartodb_id) def test_free_text_geocoding(self): query = "select *, st_x(the_geom), st_y(the_geom) " \ @@ -176,7 +176,7 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): "'''Logroño, La Rioja, Spain''')" response = self._run_authenticated(query) - assert_equal(self._x_y_by_cartodb_id(response)[1], + assert_close_enough(self._x_y_by_cartodb_id(response)[1], self.fixture_points['Logroño, Spain']) def test_templating_geocoding(self): @@ -195,7 +195,7 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): 1: self.fixture_points['Logroño, Spain'], 2: self.fixture_points['Logroño, Argentina'] } - assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) + self.assert_close_points(self._x_y_by_cartodb_id(response), points_by_cartodb_id) def test_template_with_two_columns_geocoding(self): query = "SELECT cartodb_id, st_x(the_geom), st_y(the_geom) from " \ @@ -212,7 +212,7 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): 1: self.fixture_points['Valladolid, Mexico'], 2: self.fixture_points['Valladolid, Spain'] } - assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) + self.assert_close_points(self._x_y_by_cartodb_id(response), points_by_cartodb_id) def test_large_batches(self): """ @@ -245,3 +245,7 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): return {r['cartodb_id']: [r['st_x'], r['st_y']] for r in response['rows']} + @staticmethod + def assert_close_points(points_a_by_cartodb_id, points_b_by_cartodb_id): + for cartodb_id, point in points_a_by_cartodb_id.iteritems(): + assert_close_enough(point, points_b_by_cartodb_id[cartodb_id]) From 44744de73d70c25c9d8c10802ac3e7c6152d6706 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Ignacio=20S=C3=A1nchez=20Lara?= Date: Wed, 4 Jul 2018 13:29:37 +0200 Subject: [PATCH 6/7] Explicit check for result length --- server/lib/python/cartodb_services/cartodb_services/geocoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/lib/python/cartodb_services/cartodb_services/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/geocoder.py index d3c1c60..6a14594 100644 --- a/server/lib/python/cartodb_services/cartodb_services/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/geocoder.py @@ -18,7 +18,7 @@ def run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, org if geocode_results: results = [] for result in geocode_results: - if result[1]: + if result[1] and len(result[1]) == 2: plan = plpy.prepare("SELECT ST_SetSRID(ST_MakePoint($1, $2), 4326) as the_geom; ", ["double precision", "double precision"]) point = plpy.execute(plan, result[1], 1)[0] results.append([result[0], point['the_geom'], None]) From 8968f0e6ec5bf5b03bef393c5f09e699dfd7c363 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Ignacio=20S=C3=A1nchez=20Lara?= Date: Wed, 4 Jul 2018 13:36:18 +0200 Subject: [PATCH 7/7] Fix message --- .../cartodb_services/cartodb_services/here/bulk_geocoder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py b/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py index dfc017b..0942394 100644 --- a/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py @@ -35,8 +35,8 @@ class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder): } def _bulk_geocode(self, searches): - if len(searches) > self.MAX_STALLED_RETRIES: - raise "Batch size can't be larger than {}".format(self.MAX_STALLED_RETRIES) + if len(searches) > self.MAX_BATCH_SIZE: + raise "Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE) if self._should_use_batch(searches): self._logger.debug('--> Batch geocode') return self._batch_geocode(searches)