diff --git a/client/cdb_dataservices_client--0.25.0.sql b/client/cdb_dataservices_client--0.25.0.sql index 2e906d7..0742f43 100644 --- a/client/cdb_dataservices_client--0.25.0.sql +++ b/client/cdb_dataservices_client--0.25.0.sql @@ -1997,7 +1997,7 @@ DECLARE cartodb_id_batch integer; batches_n integer; DEFAULT_BATCH_SIZE CONSTANT numeric := 100; - MAX_BATCH_SIZE CONSTANT numeric := 1000; + MAX_BATCH_SIZE CONSTANT numeric := 10000; current_row_count integer ; temp_table_name text; diff --git a/client/sql/21_bulk_geocoding_functions.sql b/client/sql/21_bulk_geocoding_functions.sql index 4314e7c..d0bcf5e 100644 --- a/client/sql/21_bulk_geocoding_functions.sql +++ b/client/sql/21_bulk_geocoding_functions.sql @@ -9,7 +9,7 @@ DECLARE cartodb_id_batch integer; batches_n integer; DEFAULT_BATCH_SIZE CONSTANT numeric := 100; - MAX_BATCH_SIZE CONSTANT numeric := 1000; + MAX_BATCH_SIZE CONSTANT numeric := 10000; current_row_count integer ; temp_table_name text; diff --git a/server/extension/cdb_dataservices_server--0.32.0.sql b/server/extension/cdb_dataservices_server--0.32.0.sql index 1f77d9a..849c688 100644 --- a/server/extension/cdb_dataservices_server--0.32.0.sql +++ b/server/extension/cdb_dataservices_server--0.32.0.sql @@ -2393,10 +2393,10 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_st RETURNS SETOF cdb_dataservices_server.geocoding AS $$ from cartodb_services import run_street_point_geocoder from cartodb_services.tools import LegacyServiceManager - from cartodb_services.here import HereMapsGeocoder + from cartodb_services.here import HereMapsBulkGeocoder service_manager = LegacyServiceManager('geocoder', username, orgname, GD) - geocoder = HereMapsGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params) + geocoder = HereMapsBulkGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params) return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; diff --git a/server/extension/sql/21_bulk_geocode_street.sql b/server/extension/sql/21_bulk_geocode_street.sql index 0913117..6732340 100644 --- a/server/extension/sql/21_bulk_geocode_street.sql +++ b/server/extension/sql/21_bulk_geocode_street.sql @@ -50,10 +50,10 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_st RETURNS SETOF cdb_dataservices_server.geocoding AS $$ from cartodb_services import run_street_point_geocoder from cartodb_services.tools import LegacyServiceManager - from cartodb_services.here import HereMapsGeocoder + from cartodb_services.here import HereMapsBulkGeocoder service_manager = LegacyServiceManager('geocoder', username, orgname, GD) - geocoder = HereMapsGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params) + geocoder = HereMapsBulkGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params) return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; diff --git a/server/lib/python/cartodb_services/cartodb_services/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/geocoder.py index 96679fd..d82768a 100644 --- a/server/lib/python/cartodb_services/cartodb_services/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/geocoder.py @@ -71,6 +71,6 @@ class StreetPointBulkGeocoder: search_id, address, city, state, country = \ [search.get(k, None) for k in self.SEARCH_KEYS] street_geocoder_searches.append( - (search_id, address, city, state, country)) + StreetGeocoderSearch(search_id, address, city, state, country)) return self._bulk_geocode(street_geocoder_searches) diff --git a/server/lib/python/cartodb_services/cartodb_services/here/__init__.py b/server/lib/python/cartodb_services/cartodb_services/here/__init__.py index 7c20cc5..85cc596 100644 --- a/server/lib/python/cartodb_services/cartodb_services/here/__init__.py +++ b/server/lib/python/cartodb_services/cartodb_services/here/__init__.py @@ -1,2 +1,3 @@ from geocoder import HereMapsGeocoder +from bulk_geocoder import HereMapsBulkGeocoder from routing import HereMapsRoutingIsoline diff --git a/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py b/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py new file mode 100644 index 0000000..3a28968 --- /dev/null +++ b/server/lib/python/cartodb_services/cartodb_services/here/bulk_geocoder.py @@ -0,0 +1,153 @@ +#!/usr/local/bin/python +# -*- coding: utf-8 -*- + + +import requests, time, zipfile, io, csv, cStringIO +import xml.etree.ElementTree as ET +from collections import namedtuple +from requests.adapters import HTTPAdapter +from cartodb_services import StreetPointBulkGeocoder +from cartodb_services.here import HereMapsGeocoder +from cartodb_services.metrics import Traceable +from cartodb_services.tools.exceptions import ServiceException + + +HereJobStatus = namedtuple('HereJobStatus', 'total_count processed_count status') + +class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder): + BATCH_URL = 'https://batch.geocoder.cit.api.here.com/6.2/jobs' + MAX_BATCH_SIZE = 1000000 # From the docs + # https://developer.here.com/documentation/batch-geocoder/topics/read-batch-request-output.html + META_COLS = ['relevance', 'matchType', 'matchCode', 'matchLevel', 'matchQualityStreet'] + MAX_STALLED_RETRIES = 100 + BATCH_RETRY_SLEEP_S = 5 + MIN_BATCHED_SEARCH = 100 # Under this, serial will be used + JOB_FINAL_STATES = ['completed', 'cancelled', 'deleted', 'failed'] + + def __init__(self, app_id, app_code, logger, service_params=None, maxresults=HereMapsGeocoder.DEFAULT_MAXRESULTS): + HereMapsGeocoder.__init__(self, app_id, app_code, logger, service_params, maxresults) + self.session = requests.Session() + self.session.mount(self.BATCH_URL, + HTTPAdapter(max_retries=self.max_retries)) + self.credentials_params = { + 'app_id': self.app_id, + 'app_code': self.app_code, + } + + def _bulk_geocode(self, searches): + if len(searches) > self.MAX_STALLED_RETRIES: + raise "Batch size can't be larger than {}".format(self.MAX_STALLED_RETRIES) + if self._should_use_batch(searches): + self._logger.debug('--> Batch geocode') + return self._batch_geocode(searches) + else: + self._logger.debug('--> Serial geocode') + return self._serial_geocode(searches) + + def _should_use_batch(self, searches): + return len(searches) >= self.MIN_BATCHED_SEARCH + + def _serial_geocode(self, searches): + results = [] + for search in searches: + (search_id, address, city, state, country) = search + coordinates = self.geocode(searchtext=address, city=city, state=state, country=country) + results.append((search_id, coordinates, [])) + return results + + def _batch_geocode(self, searches): + request_id = self._send_batch(self._searches_to_csv(searches)) + self._logger.debug('--> Sent batch {}'.format(request_id)) + + last_processed = 0 + stalled_retries = 0 + # https://developer.here.com/documentation/batch-geocoder/topics/job-status.html + while True: + job_info = self._job_status(request_id) + if job_info.processed_count == last_processed: + stalled_retries += 1 + if stalled_retries > self.MAX_STALLED_RETRIES: + raise Exception('Too many retries for job {}'.format(request_id)) + else: + stalled_retries = 0 + last_processed = job_info.processed_count + + self._logger.debug('--> Job poll check: {}'.format(job_info)) + if job_info.status in self.JOB_FINAL_STATES: + break + else: + time.sleep(self.BATCH_RETRY_SLEEP_S) + + self._logger.debug('--> Job complete: {}'.format(job_info)) + + results = self._download_results(request_id) + self._logger.debug('--> Results: {} rows; {}'.format(len(results), results)) + + return results + + def _searches_to_csv(self, searches): + queue = cStringIO.StringIO() + writer = csv.writer(queue, delimiter='|') + writer.writerow(['recId', 'searchText', 'country']) + + for search in searches: + fields = [search.address, search.city, search.state] + search_text = ', '.join(filter(None, fields)) + row = [s.encode("utf-8") + for s in [str(search.id), search_text, search.country]] + writer.writerow(row) + + return queue.getvalue() + + def _send_batch(self, data): + cols = 'displayLatitude,displayLongitude,' + ','.join(self.META_COLS) + request_params = self.credentials_params.copy() + request_params.update({ + 'gen': 8, + 'action': 'run', + #'mailto': 'juanignaciosl@carto.com', + 'header': 'true', + 'inDelim': '|', + 'outDelim': '|', + 'outCols': cols, + 'outputcombined': 'true' + }) + + response = self.session.post(self.BATCH_URL, data=data, + params=request_params, + timeout=(self.connect_timeout, self.read_timeout)) + + if response.status_code == 200: + root = ET.fromstring(response.text) + return root.find('./Response/MetaInfo/RequestId').text + else: + raise ServiceException("Error sending HERE batch", response) + + def _job_status(self, request_id): + polling_params = self.credentials_params.copy() + polling_params.update({'action': 'status'}) + polling_r = self.session.get("{}/{}".format(self.BATCH_URL, request_id), + params=polling_params, + timeout=(self.connect_timeout, self.read_timeout)) + polling_root = ET.fromstring(polling_r.text) + return HereJobStatus( + total_count=polling_root.find('./Response/TotalCount').text, + processed_count=polling_root.find('./Response/ProcessedCount').text, + status=polling_root.find('./Response/Status').text) + + def _download_results(self, job_id): + result_r = self.session.get("{}/{}/result".format(self.BATCH_URL, job_id), + params=self.credentials_params, + timeout=(self.connect_timeout, self.read_timeout)) + root_zip = zipfile.ZipFile(io.BytesIO(result_r.content)) + + results = [] + for name in root_zip.namelist(): + if name.endswith('_out.txt'): + reader = csv.DictReader(root_zip.open(name), delimiter='|') + for row in reader: + if row['SeqNumber'] == '1': # First per requested data + results.append((row['recId'], [row['displayLongitude'], row['displayLatitude']])) + + return results + diff --git a/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py index efef8b1..6c3b81c 100644 --- a/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/here/geocoder.py @@ -6,11 +6,10 @@ import requests from requests.adapters import HTTPAdapter from exceptions import * -from cartodb_services import StreetPointBulkGeocoder from cartodb_services.metrics import Traceable -class HereMapsGeocoder(Traceable, StreetPointBulkGeocoder): +class HereMapsGeocoder(Traceable): 'A Here Maps Geocoder wrapper for python' PRODUCTION_GEOCODE_JSON_URL = 'https://geocoder.api.here.com/6.2/geocode.json' @@ -65,14 +64,6 @@ class HereMapsGeocoder(Traceable, StreetPointBulkGeocoder): self.read_timeout = service_params.get('read_timeout', self.READ_TIMEOUT) self.max_retries = service_params.get('max_retries', self.MAX_RETRIES) - def _bulk_geocode(self, searches): - results = [] - for search in searches: - (search_id, address, city, state, country) = search - coordinates = self.geocode(searchtext=address, city=city, state=state, country=country) - results.append((search_id, coordinates, [])) - return results - def geocode(self, **kwargs): params = {} for key, value in kwargs.iteritems(): diff --git a/test/integration/test_street_functions.py b/test/integration/test_street_functions.py index db80f75..45bda56 100644 --- a/test/integration/test_street_functions.py +++ b/test/integration/test_street_functions.py @@ -214,6 +214,25 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp): } assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id) + def test_large_batches(self): + """ + Useful just to test a good batch size + """ + n = 10 + streets = [] + for i in range(0, n): + streets.append('{{"cartodb_id": {}, "address": "{} Yonge Street, ' \ + 'Toronto, Canada"}}'.format(i, i)) + + query = "select *, st_x(the_geom), st_y(the_geom) " \ + "FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \ + "'select * from jsonb_to_recordset(''[" \ + "{}" \ + "]''::jsonb) as (cartodb_id integer, address text)', " \ + "'address', null, null, null, {})".format(','.join(streets), n) + response = self._run_authenticated(query) + assert_equal(n - 1, len(response['rows'])) + def _run_authenticated(self, query): authenticated_query = "{}&api_key={}".format(query, self.env_variables[