Merge pull request #504 from CartoDB/geocoder_boost_tomtom

Geocoder boost tomtom
This commit is contained in:
Juan Ignacio Sánchez Lara
2018-07-10 09:50:44 +02:00
committed by GitHub
8 changed files with 215 additions and 35 deletions

View File

@@ -83,3 +83,6 @@ deploy: release_remove_parallel_deploy
$(INSTALL_DATA) old_versions/*.sql *.sql '$(DESTDIR)$(datadir)/extension/'
install: deploy
reinstall: install
psql -U postgres -d dataservices_db -c "drop extension if exists cdb_dataservices_server; create extension cdb_dataservices_server;"

View File

@@ -2367,12 +2367,15 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params):
if user_geocoder_config.google_geocoder:
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
provider_function = "_cdb_bulk_google_geocode_street_point";
elif user_geocoder_config.heremaps_geocoder:
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
provider_function = "_cdb_bulk_heremaps_geocode_street_point";
elif user_geocoder_config.tomtom_geocoder:
provider_function = "_cdb_bulk_tomtom_geocode_street_point";
else:
raise Exception('Requested geocoder is not available')
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server.{}($1, $2, $3); ".format(provider_function), ["text", "text", "jsonb"])
result = plpy.execute(plan, [username, orgname, searches])
return result
@@ -2400,6 +2403,23 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_tomtom_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
from cartodb_services import run_street_point_geocoder
from cartodb_services.tools import ServiceManager
from cartodb_services.refactor.service.tomtom_geocoder_config import TomTomGeocoderConfigBuilder
from cartodb_services.tomtom import TomTomBulkGeocoder
from cartodb_services.tools import Logger
import cartodb_services
cartodb_services.init(plpy, GD)
logger_config = GD["logger_config"]
logger = Logger(logger_config)
service_manager = ServiceManager('geocoder', TomTomGeocoderConfigBuilder, username, orgname, GD)
geocoder = TomTomBulkGeocoder(service_manager.config.tomtom_api_key, service_manager.logger, service_manager.config.service_params)
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_geocode_admin0_polygon(username text, orgname text, country_name text)
RETURNS Geometry AS $$
from cartodb_services.metrics import QuotaService

View File

@@ -24,12 +24,15 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params):
if user_geocoder_config.google_geocoder:
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
provider_function = "_cdb_bulk_google_geocode_street_point";
elif user_geocoder_config.heremaps_geocoder:
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
provider_function = "_cdb_bulk_heremaps_geocode_street_point";
elif user_geocoder_config.tomtom_geocoder:
provider_function = "_cdb_bulk_tomtom_geocode_street_point";
else:
raise Exception('Requested geocoder is not available')
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server.{}($1, $2, $3); ".format(provider_function), ["text", "text", "jsonb"])
result = plpy.execute(plan, [username, orgname, searches])
return result
@@ -57,3 +60,20 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_tomtom_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
from cartodb_services import run_street_point_geocoder
from cartodb_services.tools import ServiceManager
from cartodb_services.refactor.service.tomtom_geocoder_config import TomTomGeocoderConfigBuilder
from cartodb_services.tomtom import TomTomBulkGeocoder
from cartodb_services.tools import Logger
import cartodb_services
cartodb_services.init(plpy, GD)
logger_config = GD["logger_config"]
logger = Logger(logger_config)
service_manager = ServiceManager('geocoder', TomTomGeocoderConfigBuilder, username, orgname, GD)
geocoder = TomTomBulkGeocoder(service_manager.config.tomtom_api_key, service_manager.logger, service_manager.config.service_params)
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;

View File

@@ -24,7 +24,7 @@ class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder):
MIN_BATCHED_SEARCH = 100 # Under this, serial will be used
JOB_FINAL_STATES = ['completed', 'cancelled', 'deleted', 'failed']
def __init__(self, app_id, app_code, logger, service_params=None, maxresults=HereMapsGeocoder.DEFAULT_MAXRESULTS):
def __init__(self, app_id, app_code, logger, service_params=None, maxresults=MAX_BATCH_SIZE):
HereMapsGeocoder.__init__(self, app_id, app_code, logger, service_params, maxresults)
self.session = requests.Session()
self.session.mount(self.BATCH_URL,

View File

@@ -1,3 +1,4 @@
from geocoder import TomTomGeocoder
from bulk_geocoder import TomTomBulkGeocoder
from routing import TomTomRouting, TomTomRoutingResponse
from isolines import TomTomIsolines, TomTomIsochronesResponse

View File

@@ -0,0 +1,114 @@
import json, requests, time
from requests.adapters import HTTPAdapter
from cartodb_services import StreetPointBulkGeocoder
from cartodb_services.tomtom import TomTomGeocoder
from cartodb_services.tools.exceptions import ServiceException
class TomTomBulkGeocoder(TomTomGeocoder, StreetPointBulkGeocoder):
BASE_URL = 'https://api.tomtom.com'
BATCH_URL = BASE_URL + '/search/2/batch.json'
MAX_BATCH_SIZE = 1000000 # From the docs
MAX_STALLED_RETRIES = 100
BATCH_RETRY_SLEEP_S = 5
MIN_BATCHED_SEARCH = 10 # Batch API is really fast
READ_TIMEOUT = 60
CONNECT_TIMEOUT = 10
MAX_RETRIES = 1
def __init__(self, apikey, logger, service_params=None):
TomTomGeocoder.__init__(self, apikey, logger, service_params)
self.connect_timeout = service_params.get('connect_timeout', self.CONNECT_TIMEOUT)
self.read_timeout = service_params.get('read_timeout', self.READ_TIMEOUT)
self.max_retries = service_params.get('max_retries', self.MAX_RETRIES)
self.session = requests.Session()
self.session.headers.update({'Content-Type': 'application/json'})
self.session.mount(self.BATCH_URL,
HTTPAdapter(max_retries=self.max_retries))
def _bulk_geocode(self, searches):
if len(searches) > self.MAX_BATCH_SIZE:
raise Exception("Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE))
if self._should_use_batch(searches):
self._logger.debug('--> Batch geocode')
return self._batch_geocode(searches)
else:
self._logger.debug('--> Serial geocode')
return self._serial_geocode(searches)
def _should_use_batch(self, searches):
return len(searches) >= self.MIN_BATCHED_SEARCH
def _serial_geocode(self, searches):
results = []
for search in searches:
(search_id, address, city, state, country) = search
address = address.encode('utf-8') if address else None
city = city.encode('utf-8') if city else None
state = state.encode('utf-8') if state else None
country = country.encode('utf-8') if country else None
self._logger.debug('--> Sending serial search: {}'.format(search))
coordinates = self.geocode(searchtext=address, city=city,
state_province=state, country=country)
self._logger.debug('--> result sent')
results.append((search_id, coordinates, []))
return results
def _batch_geocode(self, searches):
location = self._send_batch(searches)
xy_results = self._download_results(location)
results = []
for s, r in zip(searches, xy_results):
results.append((s[0], r, []))
self._logger.debug('--> results: {}'.format(results))
return results
def _send_batch(self, searches):
body = {'batchItems': [{'query': self._query(s)} for s in searches]}
self._logger.debug('--> {}; Body: {}'.format(self.BATCH_URL, body))
request_params = {
'key': self._apikey
}
response = self.session.post(self.BATCH_URL, data=json.dumps(body),
allow_redirects=False,
params=request_params,
timeout=(self.connect_timeout, self.read_timeout))
self._logger.debug('--> response: {}'.format(response.status_code))
if response.status_code == 303:
self._logger.debug(response.headers)
return response.headers['Location']
else:
msg = "Error sending batch: {}; Headers: {}".format(
response.text.encode('utf-8'), response.headers)
self._logger.error(msg)
raise ServiceException(msg, response)
def _download_results(self, location):
stalled_retries = 0
while True:
response = self.session.get(self.BASE_URL + location)
if response.status_code == 200:
self._logger.debug('--> Results ready {}'.format(location))
return self._parse_results(response.json())
elif response.status_code == 202:
stalled_retries += 1
if stalled_retries > self.MAX_STALLED_RETRIES:
raise Exception('Too many retries for job {}'.format(location))
location = response.headers['Location']
self._logger.debug('--> Waiting for {}'.format(location))
time.sleep(self.BATCH_RETRY_SLEEP_S)
else:
msg = "Error downloading batch: {}; Headers: {}".format(
response.text.encode('utf-8'), response.headers)
self._logger.error(msg)
raise ServiceException(msg, response)
def _query(self, search):
(search_id, address, city, state, country) = search
searchtext = ', '.join(filter(None, [address, city, state]))
return self._request_uri(searchtext=searchtext, country=country)
def _parse_results(self, json_body):
return [self._parse_response(item['statusCode'], item['response'])
for item in json_body['batchItems']]

View File

@@ -9,10 +9,11 @@ from cartodb_services.tools.exceptions import ServiceException
from cartodb_services.tools.qps import qps_retry
from cartodb_services.tools.normalize import normalize
BASEURI = ('https://api.tomtom.com/search/2/geocode/'
'{searchtext}.JSON'
'?key={apiKey}'
'&limit=1')
HOST = 'https://api.tomtom.com'
API_BASEURI = '/search/2'
REQUEST_BASEURI = ('/geocode/'
'{searchtext}.json'
'?limit=1')
ENTRY_RESULTS = 'results'
ENTRY_POSITION = 'position'
ENTRY_LON = 'lon'
@@ -29,21 +30,17 @@ class TomTomGeocoder(Traceable):
self._apikey = apikey
self._logger = logger
def _uri(self, searchtext, countries=None):
baseuri = BASEURI + '&countrySet={}'.format(countries) \
if countries else BASEURI
uri = URITemplate(baseuri).expand(apiKey=self._apikey,
searchtext=searchtext.encode('utf-8'))
return uri
def _uri(self, searchtext, country=None):
return HOST + API_BASEURI + \
self._request_uri(searchtext, country, self._apikey)
def _parse_geocoder_response(self, response):
json_response = json.loads(response)
if json_response and json_response[ENTRY_RESULTS]:
result = json_response[ENTRY_RESULTS][0]
return self._extract_lng_lat_from_feature(result)
else:
return []
def _request_uri(self, searchtext, country=None, apiKey=None):
baseuri = REQUEST_BASEURI
if country:
baseuri += '&countrySet={}'.format(country)
baseuri = baseuri + '&key={apiKey}' if apiKey else baseuri
return URITemplate(baseuri).expand(apiKey=apiKey,
searchtext=searchtext.encode('utf-8'))
def _extract_lng_lat_from_feature(self, result):
position = result[ENTRY_POSITION]
@@ -85,19 +82,11 @@ class TomTomGeocoder(Traceable):
if state_province:
address.append(normalize(state_province))
uri = self._uri(searchtext=', '.join(address), countries=country)
uri = self._uri(searchtext=', '.join(address), country=country)
try:
response = requests.get(uri)
if response.status_code == requests.codes.ok:
return self._parse_geocoder_response(response.text)
elif response.status_code == requests.codes.bad_request:
return []
elif response.status_code == requests.codes.unprocessable_entity:
return []
else:
raise ServiceException(response.status_code, response)
return self._parse_response(response.status_code, response.text)
except requests.Timeout as te:
# In case of timeout we want to stop the job because the server
# could be down
@@ -110,3 +99,25 @@ class TomTomGeocoder(Traceable):
self._logger.error('Error connecting to TomTom geocoding server',
exception=ce)
return []
def _parse_response(self, status_code, text):
if status_code == requests.codes.ok:
return self._parse_geocoder_response(text)
elif status_code == requests.codes.bad_request:
return []
elif status_code == requests.codes.unprocessable_entity:
return []
else:
msg = 'Unknown response {}: {}'.format(str(status_code), text)
raise ServiceException(msg, None)
def _parse_geocoder_response(self, response):
json_response = json.loads(response) \
if type(response) != dict else response
if json_response and json_response[ENTRY_RESULTS]:
result = json_response[ENTRY_RESULTS][0]
return self._extract_lng_lat_from_feature(result)
else:
return []

View File

@@ -71,9 +71,20 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
'Logroño, Argentina': [-61.69604, -29.50425]
}
TOMTOM_POINTS = HERE_POINTS.copy()
TOMTOM_POINTS.update({
'Plaza Mayor, Valladolid': [-4.72183, 41.5826],
'Paseo Zorrilla, Valladolid': [-4.74031, 41.63181],
'Valladolid': [-4.72838, 41.6542],
'Valladolid, Spain': [-4.72838, 41.6542],
'Madrid': [-3.70035, 40.42028],
'Logroño, Spain': [-2.44998, 42.46592],
})
FIXTURE_POINTS = {
'google': GOOGLE_POINTS,
'heremaps': HERE_POINTS
'google': GOOGLE_POINTS,
'heremaps': HERE_POINTS,
'tomtom': TOMTOM_POINTS
}
def setUp(self):