Merge pull request #503 from CartoDB/geocoder_boost_here
Geocoder boost HERE
This commit is contained in:
@@ -1997,7 +1997,7 @@ DECLARE
|
||||
cartodb_id_batch integer;
|
||||
batches_n integer;
|
||||
DEFAULT_BATCH_SIZE CONSTANT numeric := 100;
|
||||
MAX_BATCH_SIZE CONSTANT numeric := 1000;
|
||||
MAX_BATCH_SIZE CONSTANT numeric := 10000;
|
||||
current_row_count integer ;
|
||||
|
||||
temp_table_name text;
|
||||
|
||||
@@ -9,7 +9,7 @@ DECLARE
|
||||
cartodb_id_batch integer;
|
||||
batches_n integer;
|
||||
DEFAULT_BATCH_SIZE CONSTANT numeric := 100;
|
||||
MAX_BATCH_SIZE CONSTANT numeric := 1000;
|
||||
MAX_BATCH_SIZE CONSTANT numeric := 10000;
|
||||
current_row_count integer ;
|
||||
|
||||
temp_table_name text;
|
||||
|
||||
0
server/__init__.py
Normal file
0
server/__init__.py
Normal file
@@ -2367,53 +2367,37 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
|
||||
with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params):
|
||||
if user_geocoder_config.google_geocoder:
|
||||
google_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
result = plpy.execute(google_plan, [username, orgname, searches])
|
||||
return result
|
||||
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
elif user_geocoder_config.heremaps_geocoder:
|
||||
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
else:
|
||||
raise Exception('Requested geocoder is not available')
|
||||
|
||||
result = plpy.execute(plan, [username, orgname, searches])
|
||||
return result
|
||||
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
|
||||
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
from cartodb_services.tools import LegacyServiceManager,QuotaExceededException,Logger
|
||||
from cartodb_services import run_street_point_geocoder
|
||||
from cartodb_services.tools import LegacyServiceManager
|
||||
from cartodb_services.google import GoogleMapsGeocoder
|
||||
|
||||
plpy.execute("SELECT cdb_dataservices_server._get_logger_config()")
|
||||
logger_config = GD["logger_config"]
|
||||
|
||||
logger = Logger(logger_config)
|
||||
service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
|
||||
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
try:
|
||||
service_manager.assert_within_limits(quota=False)
|
||||
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
|
||||
geocode_results = geocoder.bulk_geocode(searches=searches)
|
||||
if geocode_results:
|
||||
results = []
|
||||
for result in geocode_results:
|
||||
if result[1]:
|
||||
plan = plpy.prepare("SELECT ST_SetSRID(ST_MakePoint($1, $2), 4326) as the_geom; ", ["double precision", "double precision"])
|
||||
point = plpy.execute(plan, result[1], 1)[0]
|
||||
results.append([result[0], point['the_geom'], None])
|
||||
else:
|
||||
results.append([result[0], None, None])
|
||||
service_manager.quota_service.increment_success_service_use(len(results))
|
||||
return results
|
||||
else:
|
||||
service_manager.quota_service.increment_empty_service_use(len(searches))
|
||||
return []
|
||||
except QuotaExceededException as qe:
|
||||
service_manager.quota_service.increment_failed_service_use(len(searches))
|
||||
return []
|
||||
except BaseException as e:
|
||||
import sys
|
||||
service_manager.quota_service.increment_failed_service_use()
|
||||
service_manager.logger.error('Error trying to bulk geocode street point using google maps', sys.exc_info(), data={"username": username, "orgname": orgname})
|
||||
raise Exception('Error trying to bulk geocode street point using google maps')
|
||||
finally:
|
||||
service_manager.quota_service.increment_total_service_use()
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
|
||||
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
from cartodb_services import run_street_point_geocoder
|
||||
from cartodb_services.tools import LegacyServiceManager
|
||||
from cartodb_services.here import HereMapsBulkGeocoder
|
||||
|
||||
service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
|
||||
geocoder = HereMapsBulkGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params)
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_geocode_admin0_polygon(username text, orgname text, country_name text)
|
||||
|
||||
@@ -24,52 +24,36 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
|
||||
with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params):
|
||||
if user_geocoder_config.google_geocoder:
|
||||
google_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
result = plpy.execute(google_plan, [username, orgname, searches])
|
||||
return result
|
||||
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
elif user_geocoder_config.heremaps_geocoder:
|
||||
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
else:
|
||||
raise Exception('Requested geocoder is not available')
|
||||
|
||||
result = plpy.execute(plan, [username, orgname, searches])
|
||||
return result
|
||||
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
|
||||
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
from cartodb_services.tools import LegacyServiceManager,QuotaExceededException,Logger
|
||||
from cartodb_services import run_street_point_geocoder
|
||||
from cartodb_services.tools import LegacyServiceManager
|
||||
from cartodb_services.google import GoogleMapsGeocoder
|
||||
|
||||
plpy.execute("SELECT cdb_dataservices_server._get_logger_config()")
|
||||
logger_config = GD["logger_config"]
|
||||
|
||||
logger = Logger(logger_config)
|
||||
service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
|
||||
|
||||
try:
|
||||
service_manager.assert_within_limits(quota=False)
|
||||
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
|
||||
geocode_results = geocoder.bulk_geocode(searches=searches)
|
||||
if geocode_results:
|
||||
results = []
|
||||
for result in geocode_results:
|
||||
if result[1]:
|
||||
plan = plpy.prepare("SELECT ST_SetSRID(ST_MakePoint($1, $2), 4326) as the_geom; ", ["double precision", "double precision"])
|
||||
point = plpy.execute(plan, result[1], 1)[0]
|
||||
results.append([result[0], point['the_geom'], None])
|
||||
else:
|
||||
results.append([result[0], None, None])
|
||||
service_manager.quota_service.increment_success_service_use(len(results))
|
||||
return results
|
||||
else:
|
||||
service_manager.quota_service.increment_empty_service_use(len(searches))
|
||||
return []
|
||||
except QuotaExceededException as qe:
|
||||
service_manager.quota_service.increment_failed_service_use(len(searches))
|
||||
return []
|
||||
except BaseException as e:
|
||||
import sys
|
||||
service_manager.quota_service.increment_failed_service_use()
|
||||
service_manager.logger.error('Error trying to bulk geocode street point using google maps', sys.exc_info(), data={"username": username, "orgname": orgname})
|
||||
raise Exception('Error trying to bulk geocode street point using google maps')
|
||||
finally:
|
||||
service_manager.quota_service.increment_total_service_use()
|
||||
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
|
||||
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
from cartodb_services import run_street_point_geocoder
|
||||
from cartodb_services.tools import LegacyServiceManager
|
||||
from cartodb_services.here import HereMapsBulkGeocoder
|
||||
|
||||
service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
|
||||
geocoder = HereMapsBulkGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params)
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
|
||||
0
server/lib/__init__.py
Normal file
0
server/lib/__init__.py
Normal file
0
server/lib/python/__init__.py
Normal file
0
server/lib/python/__init__.py
Normal file
0
server/lib/python/cartodb_services/__init__.py
Normal file
0
server/lib/python/cartodb_services/__init__.py
Normal file
@@ -33,3 +33,5 @@ def _reset():
|
||||
|
||||
plpy = None
|
||||
GD = None
|
||||
|
||||
from geocoder import run_street_point_geocoder, StreetPointBulkGeocoder
|
||||
|
||||
@@ -0,0 +1,82 @@
|
||||
#!/usr/local/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from tools import QuotaExceededException, Logger
|
||||
from collections import namedtuple
|
||||
import json
|
||||
|
||||
|
||||
def run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches):
|
||||
plpy.execute("SELECT cdb_dataservices_server._get_logger_config()")
|
||||
logger_config = GD["logger_config"]
|
||||
|
||||
logger = Logger(logger_config)
|
||||
|
||||
try:
|
||||
service_manager.assert_within_limits(quota=False)
|
||||
geocode_results = geocoder.bulk_geocode(searches=searches)
|
||||
if geocode_results:
|
||||
results = []
|
||||
for result in geocode_results:
|
||||
if result[1] and len(result[1]) == 2:
|
||||
plan = plpy.prepare("SELECT ST_SetSRID(ST_MakePoint($1, $2), 4326) as the_geom; ", ["double precision", "double precision"])
|
||||
point = plpy.execute(plan, result[1], 1)[0]
|
||||
results.append([result[0], point['the_geom'], None])
|
||||
else:
|
||||
results.append([result[0], None, None])
|
||||
service_manager.quota_service.increment_success_service_use(len(results))
|
||||
return results
|
||||
else:
|
||||
service_manager.quota_service.increment_empty_service_use(len(searches))
|
||||
return []
|
||||
except QuotaExceededException as qe:
|
||||
service_manager.quota_service.increment_failed_service_use(len(searches))
|
||||
return []
|
||||
except BaseException as e:
|
||||
import sys
|
||||
service_manager.quota_service.increment_failed_service_use()
|
||||
service_manager.logger.error('Error trying to bulk geocode street point', sys.exc_info(), data={"username": username, "orgname": orgname})
|
||||
raise Exception('Error trying to bulk geocode street')
|
||||
finally:
|
||||
service_manager.quota_service.increment_total_service_use()
|
||||
|
||||
|
||||
StreetGeocoderSearch = namedtuple('StreetGeocoderSearch', 'id address city state country')
|
||||
|
||||
|
||||
class StreetPointBulkGeocoder:
|
||||
"""
|
||||
Classes extending StreetPointBulkGeocoder should implement:
|
||||
* _bulk_geocode(street_geocoder_searches)
|
||||
"""
|
||||
|
||||
SEARCH_KEYS = ['id', 'address', 'city', 'state', 'country']
|
||||
|
||||
def bulk_geocode(self, searches):
|
||||
"""
|
||||
:param searches: array of StreetGeocoderSearch
|
||||
:return: array of tuples with three elements:
|
||||
* id
|
||||
* latitude and longitude (array of two elements)
|
||||
* empty array (future use: metadata)
|
||||
"""
|
||||
try:
|
||||
decoded_searches = json.loads(searches)
|
||||
except Exception as e:
|
||||
self._logger.error('General error', exception=e)
|
||||
raise e
|
||||
|
||||
street_geocoder_searches = []
|
||||
for search in decoded_searches:
|
||||
search_id, address, city, state, country = \
|
||||
[search.get(k, None) for k in self.SEARCH_KEYS]
|
||||
street_geocoder_searches.append(
|
||||
StreetGeocoderSearch(search_id, address, city, state, country))
|
||||
|
||||
return self._bulk_geocode(street_geocoder_searches)
|
||||
|
||||
def _bulk_geocode(self, street_geocoder_searches):
|
||||
"""
|
||||
Subclasses must implement _bulk_geocode
|
||||
"""
|
||||
raise NotImplementedError('Subclasses must implement _bulk_geocode')
|
||||
@@ -5,13 +5,12 @@ import googlemaps
|
||||
from urlparse import parse_qs
|
||||
|
||||
from exceptions import MalformedResult
|
||||
from cartodb_services import StreetPointBulkGeocoder
|
||||
from cartodb_services.google.exceptions import InvalidGoogleCredentials
|
||||
from client_factory import GoogleMapsClientFactory
|
||||
|
||||
from multiprocessing import Pool, TimeoutError
|
||||
|
||||
import json
|
||||
|
||||
import time, random
|
||||
|
||||
def async_geocoder(geocoder, address, components):
|
||||
@@ -22,10 +21,9 @@ def async_geocoder(geocoder, address, components):
|
||||
results = geocoder.geocode(address=address, components=components)
|
||||
return results if results else []
|
||||
|
||||
class GoogleMapsGeocoder:
|
||||
class GoogleMapsGeocoder(StreetPointBulkGeocoder):
|
||||
"""A Google Maps Geocoder wrapper for python"""
|
||||
PARALLEL_PROCESSES = 13
|
||||
SEARCH_KEYS = ['id', 'address', 'city', 'state', 'country']
|
||||
|
||||
def __init__(self, client_id, client_secret, logger):
|
||||
if client_id is None:
|
||||
@@ -48,18 +46,11 @@ class GoogleMapsGeocoder:
|
||||
except KeyError:
|
||||
raise MalformedResult()
|
||||
|
||||
def bulk_geocode(self, searches):
|
||||
try:
|
||||
decoded_searches = json.loads(searches)
|
||||
except Exception as e:
|
||||
self._logger.error('General error', exception=e)
|
||||
raise e
|
||||
|
||||
def _bulk_geocode(self, searches):
|
||||
bulk_results = {}
|
||||
pool = Pool(processes=self.PARALLEL_PROCESSES)
|
||||
for search in decoded_searches:
|
||||
search_id, address, city, state, country = \
|
||||
[search.get(k, None) for k in self.SEARCH_KEYS]
|
||||
for search in searches:
|
||||
(search_id, address, city, state, country) = search
|
||||
opt_params = self._build_optional_parameters(city, state, country)
|
||||
# Geocoding works better if components are also inside the address
|
||||
address = ', '.join(filter(None, [address, city, state, country]))
|
||||
@@ -83,7 +74,7 @@ class GoogleMapsGeocoder:
|
||||
result = []
|
||||
|
||||
lng_lat = self._extract_lng_lat_from_result(result[0]) if result else []
|
||||
results.append([search_id, lng_lat, []])
|
||||
results.append((search_id, lng_lat, []))
|
||||
return results
|
||||
except KeyError as e:
|
||||
self._logger.error('KeyError error', exception=e)
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
from geocoder import HereMapsGeocoder
|
||||
from bulk_geocoder import HereMapsBulkGeocoder
|
||||
from routing import HereMapsRoutingIsoline
|
||||
|
||||
@@ -0,0 +1,153 @@
|
||||
#!/usr/local/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
import requests, time, zipfile, io, csv, cStringIO
|
||||
import xml.etree.ElementTree as ET
|
||||
from collections import namedtuple
|
||||
from requests.adapters import HTTPAdapter
|
||||
from cartodb_services import StreetPointBulkGeocoder
|
||||
from cartodb_services.here import HereMapsGeocoder
|
||||
from cartodb_services.metrics import Traceable
|
||||
from cartodb_services.tools.exceptions import ServiceException
|
||||
|
||||
|
||||
HereJobStatus = namedtuple('HereJobStatus', 'total_count processed_count status')
|
||||
|
||||
class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder):
|
||||
BATCH_URL = 'https://batch.geocoder.cit.api.here.com/6.2/jobs'
|
||||
MAX_BATCH_SIZE = 1000000 # From the docs
|
||||
# https://developer.here.com/documentation/batch-geocoder/topics/read-batch-request-output.html
|
||||
META_COLS = ['relevance', 'matchType', 'matchCode', 'matchLevel', 'matchQualityStreet']
|
||||
MAX_STALLED_RETRIES = 100
|
||||
BATCH_RETRY_SLEEP_S = 5
|
||||
MIN_BATCHED_SEARCH = 100 # Under this, serial will be used
|
||||
JOB_FINAL_STATES = ['completed', 'cancelled', 'deleted', 'failed']
|
||||
|
||||
def __init__(self, app_id, app_code, logger, service_params=None, maxresults=HereMapsGeocoder.DEFAULT_MAXRESULTS):
|
||||
HereMapsGeocoder.__init__(self, app_id, app_code, logger, service_params, maxresults)
|
||||
self.session = requests.Session()
|
||||
self.session.mount(self.BATCH_URL,
|
||||
HTTPAdapter(max_retries=self.max_retries))
|
||||
self.credentials_params = {
|
||||
'app_id': self.app_id,
|
||||
'app_code': self.app_code,
|
||||
}
|
||||
|
||||
def _bulk_geocode(self, searches):
|
||||
if len(searches) > self.MAX_BATCH_SIZE:
|
||||
raise "Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE)
|
||||
if self._should_use_batch(searches):
|
||||
self._logger.debug('--> Batch geocode')
|
||||
return self._batch_geocode(searches)
|
||||
else:
|
||||
self._logger.debug('--> Serial geocode')
|
||||
return self._serial_geocode(searches)
|
||||
|
||||
def _should_use_batch(self, searches):
|
||||
return len(searches) >= self.MIN_BATCHED_SEARCH
|
||||
|
||||
def _serial_geocode(self, searches):
|
||||
results = []
|
||||
for search in searches:
|
||||
(search_id, address, city, state, country) = search
|
||||
coordinates = self.geocode(searchtext=address, city=city, state=state, country=country)
|
||||
results.append((search_id, coordinates, []))
|
||||
return results
|
||||
|
||||
def _batch_geocode(self, searches):
|
||||
request_id = self._send_batch(self._searches_to_csv(searches))
|
||||
self._logger.debug('--> Sent batch {}'.format(request_id))
|
||||
|
||||
last_processed = 0
|
||||
stalled_retries = 0
|
||||
# https://developer.here.com/documentation/batch-geocoder/topics/job-status.html
|
||||
while True:
|
||||
job_info = self._job_status(request_id)
|
||||
if job_info.processed_count == last_processed:
|
||||
stalled_retries += 1
|
||||
if stalled_retries > self.MAX_STALLED_RETRIES:
|
||||
raise Exception('Too many retries for job {}'.format(request_id))
|
||||
else:
|
||||
stalled_retries = 0
|
||||
last_processed = job_info.processed_count
|
||||
|
||||
self._logger.debug('--> Job poll check: {}'.format(job_info))
|
||||
if job_info.status in self.JOB_FINAL_STATES:
|
||||
break
|
||||
else:
|
||||
time.sleep(self.BATCH_RETRY_SLEEP_S)
|
||||
|
||||
self._logger.debug('--> Job complete: {}'.format(job_info))
|
||||
|
||||
results = self._download_results(request_id)
|
||||
self._logger.debug('--> Results: {} rows; {}'.format(len(results), results))
|
||||
|
||||
return results
|
||||
|
||||
def _searches_to_csv(self, searches):
|
||||
queue = cStringIO.StringIO()
|
||||
writer = csv.writer(queue, delimiter='|')
|
||||
writer.writerow(['recId', 'searchText', 'country'])
|
||||
|
||||
for search in searches:
|
||||
fields = [search.address, search.city, search.state]
|
||||
search_text = ', '.join(filter(None, fields))
|
||||
row = [s.encode("utf-8")
|
||||
for s in [str(search.id), search_text, search.country]]
|
||||
writer.writerow(row)
|
||||
|
||||
return queue.getvalue()
|
||||
|
||||
def _send_batch(self, data):
|
||||
cols = 'displayLatitude,displayLongitude,' + ','.join(self.META_COLS)
|
||||
request_params = self.credentials_params.copy()
|
||||
request_params.update({
|
||||
'gen': 8,
|
||||
'action': 'run',
|
||||
#'mailto': 'juanignaciosl@carto.com',
|
||||
'header': 'true',
|
||||
'inDelim': '|',
|
||||
'outDelim': '|',
|
||||
'outCols': cols,
|
||||
'outputcombined': 'true'
|
||||
})
|
||||
|
||||
response = self.session.post(self.BATCH_URL, data=data,
|
||||
params=request_params,
|
||||
timeout=(self.connect_timeout, self.read_timeout))
|
||||
|
||||
if response.status_code == 200:
|
||||
root = ET.fromstring(response.text)
|
||||
return root.find('./Response/MetaInfo/RequestId').text
|
||||
else:
|
||||
raise ServiceException("Error sending HERE batch", response)
|
||||
|
||||
def _job_status(self, request_id):
|
||||
polling_params = self.credentials_params.copy()
|
||||
polling_params.update({'action': 'status'})
|
||||
polling_r = self.session.get("{}/{}".format(self.BATCH_URL, request_id),
|
||||
params=polling_params,
|
||||
timeout=(self.connect_timeout, self.read_timeout))
|
||||
polling_root = ET.fromstring(polling_r.text)
|
||||
return HereJobStatus(
|
||||
total_count=polling_root.find('./Response/TotalCount').text,
|
||||
processed_count=polling_root.find('./Response/ProcessedCount').text,
|
||||
status=polling_root.find('./Response/Status').text)
|
||||
|
||||
def _download_results(self, job_id):
|
||||
result_r = self.session.get("{}/{}/result".format(self.BATCH_URL, job_id),
|
||||
params=self.credentials_params,
|
||||
timeout=(self.connect_timeout, self.read_timeout))
|
||||
root_zip = zipfile.ZipFile(io.BytesIO(result_r.content))
|
||||
|
||||
results = []
|
||||
for name in root_zip.namelist():
|
||||
if name.endswith('_out.txt'):
|
||||
reader = csv.DictReader(root_zip.open(name), delimiter='|')
|
||||
for row in reader:
|
||||
if row['SeqNumber'] == '1': # First per requested data
|
||||
results.append((row['recId'], [row['displayLongitude'], row['displayLatitude']]))
|
||||
|
||||
return results
|
||||
|
||||
@@ -2,6 +2,29 @@ import os
|
||||
import requests
|
||||
import json
|
||||
|
||||
from nose.tools import assert_true
|
||||
|
||||
|
||||
# From https://www.python.org/dev/peps/pep-0485/#proposed-implementation
|
||||
def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
|
||||
return abs(a-b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
|
||||
|
||||
|
||||
def assert_close_enough(xy_a, xy_b, rel_tol=0.0001, abs_tol=0.0005):
|
||||
"""
|
||||
Asserts that the given points are "close enough", in a square.
|
||||
:param xy_a: Array of 2 elements, X and Y.
|
||||
:param xy_b: Array of 2 elements, X and Y.
|
||||
:param rel_tol: Relative tolerance. Default: 0.001 (0.1%).
|
||||
:param abs_tol: Absolute tolerance. Default: 0.0005.
|
||||
"""
|
||||
|
||||
for i in [0, 1]:
|
||||
assert_true(isclose(xy_a[i], xy_b[i], rel_tol, abs_tol),
|
||||
"Coord {} error: {} and {} are not closer than {}, {}".format(
|
||||
i, xy_a[i], xy_b[i], rel_tol, abs_tol
|
||||
))
|
||||
|
||||
|
||||
class IntegrationTestHelper:
|
||||
|
||||
@@ -34,3 +57,5 @@ class IntegrationTestHelper:
|
||||
@classmethod
|
||||
def execute_query(cls, sql_api_url, query):
|
||||
return cls.execute_query_raw(sql_api_url, query)['rows'][0]
|
||||
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ from unittest import TestCase
|
||||
from nose.tools import assert_raises
|
||||
from nose.tools import assert_not_equal, assert_equal
|
||||
from ..helpers.integration_test_helper import IntegrationTestHelper
|
||||
|
||||
from ..helpers.integration_test_helper import assert_close_enough
|
||||
|
||||
class TestStreetFunctionsSetUp(TestCase):
|
||||
|
||||
@@ -40,6 +40,52 @@ class TestStreetFunctions(TestStreetFunctionsSetUp):
|
||||
|
||||
|
||||
class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
provider = None
|
||||
fixture_points = None
|
||||
|
||||
GOOGLE_POINTS = {
|
||||
'Plaza Mayor, Valladolid': [-4.728252, 41.6517025],
|
||||
'Paseo Zorrilla, Valladolid': [-4.7404453, 41.6314339],
|
||||
'1900 amphitheatre parkway': [-122.0875324, 37.4227968],
|
||||
'1901 amphitheatre parkway': [-122.0885504, 37.4238657],
|
||||
'1902 amphitheatre parkway': [-122.0876674, 37.4235729],
|
||||
'Valladolid': [-4.7245321, 41.652251],
|
||||
'Valladolid, Spain': [-4.7245321, 41.652251],
|
||||
'Valladolid, Mexico': [-88.2022488, 20.68964],
|
||||
'Madrid': [-3.7037902, 40.4167754],
|
||||
'Logroño, Spain': [-2.4449852, 42.4627195],
|
||||
'Logroño, Argentina': [-61.6961807, -29.5031057]
|
||||
}
|
||||
|
||||
HERE_POINTS = {
|
||||
'Plaza Mayor, Valladolid': [-4.72979, 41.65258],
|
||||
'Paseo Zorrilla, Valladolid': [-4.73869, 41.63817],
|
||||
'1900 amphitheatre parkway': [-122.0879468, 37.4234763],
|
||||
'1901 amphitheatre parkway': [-122.0879253, 37.4238725],
|
||||
'1902 amphitheatre parkway': [-122.0879531, 37.4234775],
|
||||
'Valladolid': [-4.73214, 41.6542],
|
||||
'Valladolid, Spain': [-4.73214, 41.6542],
|
||||
'Valladolid, Mexico': [-88.20117, 20.69021],
|
||||
'Madrid': [-3.70578, 40.42028],
|
||||
'Logroño, Spain': [-2.45194, 42.46592],
|
||||
'Logroño, Argentina': [-61.69604, -29.50425]
|
||||
}
|
||||
|
||||
FIXTURE_POINTS = {
|
||||
'google': GOOGLE_POINTS,
|
||||
'heremaps': HERE_POINTS
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
TestStreetFunctionsSetUp.setUp(self)
|
||||
|
||||
if not self.fixture_points:
|
||||
query = "select provider from " \
|
||||
"cdb_dataservices_client.cdb_service_quota_info() " \
|
||||
"where service = 'hires_geocoder'"
|
||||
response = self._run_authenticated(query)
|
||||
provider = response['rows'][0]['provider']
|
||||
self.fixture_points = self.FIXTURE_POINTS[provider]
|
||||
|
||||
def test_full_spec(self):
|
||||
query = "select cartodb_id, st_x(the_geom), st_y(the_geom) " \
|
||||
@@ -54,11 +100,11 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
", 'street', 'city', 'state', 'country')"
|
||||
response = self._run_authenticated(query)
|
||||
|
||||
assert_equal(response['total_rows'], 2)
|
||||
|
||||
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||
self._assert_x_y(row_by_cartodb_id[1], -4.728252, 41.6517025);
|
||||
self._assert_x_y(row_by_cartodb_id[2], -4.7404453, 41.6314339)
|
||||
points_by_cartodb_id = {
|
||||
1: self.fixture_points['Plaza Mayor, Valladolid'],
|
||||
2: self.fixture_points['Paseo Zorrilla, Valladolid']
|
||||
}
|
||||
self.assert_close_points(self._x_y_by_cartodb_id(response), points_by_cartodb_id)
|
||||
|
||||
def test_empty_columns(self):
|
||||
query = "select *, st_x(the_geom), st_y(the_geom) " \
|
||||
@@ -69,10 +115,8 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
"'address', '''''', '''''', '''''')"
|
||||
response = self._run_authenticated(query)
|
||||
|
||||
assert_equal(response['total_rows'], 1)
|
||||
|
||||
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||
self._assert_x_y(row_by_cartodb_id[1], -122.0885504, 37.4238657)
|
||||
assert_close_enough(self._x_y_by_cartodb_id(response)[1],
|
||||
self.fixture_points['1901 amphitheatre parkway'])
|
||||
|
||||
def test_null_columns(self):
|
||||
query = "select *, st_x(the_geom), st_y(the_geom) " \
|
||||
@@ -83,10 +127,8 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
"'address')"
|
||||
response = self._run_authenticated(query)
|
||||
|
||||
assert_equal(response['total_rows'], 1)
|
||||
|
||||
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||
self._assert_x_y(row_by_cartodb_id[1], -122.0885504, 37.4238657)
|
||||
assert_close_enough(self._x_y_by_cartodb_id(response)[1],
|
||||
self.fixture_points['1901 amphitheatre parkway'])
|
||||
|
||||
def test_batching(self):
|
||||
query = "select *, st_x(the_geom), st_y(the_geom) " \
|
||||
@@ -99,12 +141,12 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
"'address', null, null, null, 2)"
|
||||
response = self._run_authenticated(query)
|
||||
|
||||
assert_equal(response['total_rows'], 3)
|
||||
|
||||
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||
self._assert_x_y(row_by_cartodb_id[1], -122.0875324, 37.4227968)
|
||||
self._assert_x_y(row_by_cartodb_id[2], -122.0885504, 37.4238657)
|
||||
self._assert_x_y(row_by_cartodb_id[3], -122.0876674, 37.4235729)
|
||||
points_by_cartodb_id = {
|
||||
1: self.fixture_points['1900 amphitheatre parkway'],
|
||||
2: self.fixture_points['1901 amphitheatre parkway'],
|
||||
3: self.fixture_points['1902 amphitheatre parkway'],
|
||||
}
|
||||
self.assert_close_points(self._x_y_by_cartodb_id(response), points_by_cartodb_id)
|
||||
|
||||
def test_city_column_geocoding(self):
|
||||
query = "select *, st_x(the_geom), st_y(the_geom) " \
|
||||
@@ -118,9 +160,11 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
|
||||
assert_equal(response['total_rows'], 2)
|
||||
|
||||
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||
self._assert_x_y(row_by_cartodb_id[1], -4.7245321, 41.652251)
|
||||
self._assert_x_y(row_by_cartodb_id[2], -3.7037902, 40.4167754)
|
||||
points_by_cartodb_id = {
|
||||
1: self.fixture_points['Valladolid'],
|
||||
2: self.fixture_points['Madrid']
|
||||
}
|
||||
self.assert_close_points(self._x_y_by_cartodb_id(response), points_by_cartodb_id)
|
||||
|
||||
def test_free_text_geocoding(self):
|
||||
query = "select *, st_x(the_geom), st_y(the_geom) " \
|
||||
@@ -132,10 +176,8 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
"'''Logroño, La Rioja, Spain''')"
|
||||
response = self._run_authenticated(query)
|
||||
|
||||
assert_equal(response['total_rows'], 1)
|
||||
|
||||
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||
self._assert_x_y(row_by_cartodb_id[1], -2.4449852, 42.4627195)
|
||||
assert_close_enough(self._x_y_by_cartodb_id(response)[1],
|
||||
self.fixture_points['Logroño, Spain'])
|
||||
|
||||
def test_templating_geocoding(self):
|
||||
query = "SELECT cartodb_id, st_x(the_geom), st_y(the_geom) from " \
|
||||
@@ -149,11 +191,11 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
"'city || '', '' || ''Argentina''')"
|
||||
response = self._run_authenticated(query)
|
||||
|
||||
assert_equal(response['total_rows'], 2)
|
||||
|
||||
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||
self._assert_x_y(row_by_cartodb_id[1], -2.4449852, 42.4627195)
|
||||
self._assert_x_y(row_by_cartodb_id[2], -61.6961807, -29.5031057)
|
||||
points_by_cartodb_id = {
|
||||
1: self.fixture_points['Logroño, Spain'],
|
||||
2: self.fixture_points['Logroño, Argentina']
|
||||
}
|
||||
self.assert_close_points(self._x_y_by_cartodb_id(response), points_by_cartodb_id)
|
||||
|
||||
def test_template_with_two_columns_geocoding(self):
|
||||
query = "SELECT cartodb_id, st_x(the_geom), st_y(the_geom) from " \
|
||||
@@ -165,14 +207,31 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
" ') _x'," \
|
||||
"'city || '', '' || country')"
|
||||
response = self._run_authenticated(query)
|
||||
# from nose.tools import set_trace; set_trace()
|
||||
|
||||
assert_equal(response['total_rows'], 2)
|
||||
points_by_cartodb_id = {
|
||||
1: self.fixture_points['Valladolid, Mexico'],
|
||||
2: self.fixture_points['Valladolid, Spain']
|
||||
}
|
||||
self.assert_close_points(self._x_y_by_cartodb_id(response), points_by_cartodb_id)
|
||||
|
||||
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||
self._assert_x_y(row_by_cartodb_id[1], -88.2022488, 20.68964)
|
||||
self._assert_x_y(row_by_cartodb_id[2], -4.7245321, 41.652251)
|
||||
def test_large_batches(self):
|
||||
"""
|
||||
Useful just to test a good batch size
|
||||
"""
|
||||
n = 10
|
||||
streets = []
|
||||
for i in range(0, n):
|
||||
streets.append('{{"cartodb_id": {}, "address": "{} Yonge Street, ' \
|
||||
'Toronto, Canada"}}'.format(i, i))
|
||||
|
||||
query = "select *, st_x(the_geom), st_y(the_geom) " \
|
||||
"FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \
|
||||
"'select * from jsonb_to_recordset(''[" \
|
||||
"{}" \
|
||||
"]''::jsonb) as (cartodb_id integer, address text)', " \
|
||||
"'address', null, null, null, {})".format(','.join(streets), n)
|
||||
response = self._run_authenticated(query)
|
||||
assert_equal(n - 1, len(response['rows']))
|
||||
|
||||
def _run_authenticated(self, query):
|
||||
authenticated_query = "{}&api_key={}".format(query,
|
||||
@@ -181,10 +240,12 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
return IntegrationTestHelper.execute_query_raw(self.sql_api_url,
|
||||
authenticated_query)
|
||||
|
||||
def _row_by_cartodb_id(self, response):
|
||||
return {r['cartodb_id']: r for r in response['rows']}
|
||||
|
||||
def _assert_x_y(self, row, expected_x, expected_y):
|
||||
assert_equal(row['st_x'], expected_x)
|
||||
assert_equal(row['st_y'], expected_y)
|
||||
@staticmethod
|
||||
def _x_y_by_cartodb_id(response):
|
||||
return {r['cartodb_id']: [r['st_x'], r['st_y']]
|
||||
for r in response['rows']}
|
||||
|
||||
@staticmethod
|
||||
def assert_close_points(points_a_by_cartodb_id, points_b_by_cartodb_id):
|
||||
for cartodb_id, point in points_a_by_cartodb_id.iteritems():
|
||||
assert_close_enough(point, points_b_by_cartodb_id[cartodb_id])
|
||||
|
||||
Reference in New Issue
Block a user