Merge pull request #505 from CartoDB/geocoder_boost_mapbox
Mapbox bulk geocoding
This commit is contained in:
@@ -2372,6 +2372,8 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
provider_function = "_cdb_bulk_heremaps_geocode_street_point";
|
||||
elif user_geocoder_config.tomtom_geocoder:
|
||||
provider_function = "_cdb_bulk_tomtom_geocode_street_point";
|
||||
elif user_geocoder_config.mapbox_geocoder:
|
||||
provider_function = "_cdb_bulk_mapbox_geocode_street_point";
|
||||
else:
|
||||
raise Exception('Requested geocoder is not available')
|
||||
|
||||
@@ -2420,6 +2422,23 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_mapbox_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
|
||||
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
from cartodb_services import run_street_point_geocoder
|
||||
from cartodb_services.tools import ServiceManager
|
||||
from cartodb_services.refactor.service.mapbox_geocoder_config import MapboxGeocoderConfigBuilder
|
||||
from cartodb_services.mapbox import MapboxBulkGeocoder
|
||||
from cartodb_services.tools import Logger
|
||||
import cartodb_services
|
||||
cartodb_services.init(plpy, GD)
|
||||
|
||||
logger_config = GD["logger_config"]
|
||||
logger = Logger(logger_config)
|
||||
service_manager = ServiceManager('geocoder', MapboxGeocoderConfigBuilder, username, orgname, GD)
|
||||
geocoder = MapboxBulkGeocoder(service_manager.config.mapbox_api_key, service_manager.logger, service_manager.config.service_params)
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_geocode_admin0_polygon(username text, orgname text, country_name text)
|
||||
RETURNS Geometry AS $$
|
||||
from cartodb_services.metrics import QuotaService
|
||||
|
||||
@@ -29,6 +29,8 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
provider_function = "_cdb_bulk_heremaps_geocode_street_point";
|
||||
elif user_geocoder_config.tomtom_geocoder:
|
||||
provider_function = "_cdb_bulk_tomtom_geocode_street_point";
|
||||
elif user_geocoder_config.mapbox_geocoder:
|
||||
provider_function = "_cdb_bulk_mapbox_geocode_street_point";
|
||||
else:
|
||||
raise Exception('Requested geocoder is not available')
|
||||
|
||||
@@ -77,3 +79,20 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_mapbox_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
|
||||
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
from cartodb_services import run_street_point_geocoder
|
||||
from cartodb_services.tools import ServiceManager
|
||||
from cartodb_services.refactor.service.mapbox_geocoder_config import MapboxGeocoderConfigBuilder
|
||||
from cartodb_services.mapbox import MapboxBulkGeocoder
|
||||
from cartodb_services.tools import Logger
|
||||
import cartodb_services
|
||||
cartodb_services.init(plpy, GD)
|
||||
|
||||
logger_config = GD["logger_config"]
|
||||
logger = Logger(logger_config)
|
||||
service_manager = ServiceManager('geocoder', MapboxGeocoderConfigBuilder, username, orgname, GD)
|
||||
geocoder = MapboxBulkGeocoder(service_manager.config.mapbox_api_key, service_manager.logger, service_manager.config.service_params)
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from routing import MapboxRouting, MapboxRoutingResponse
|
||||
from geocoder import MapboxGeocoder
|
||||
from bulk_geocoder import MapboxBulkGeocoder
|
||||
from isolines import MapboxIsolines, MapboxIsochronesResponse
|
||||
from matrix_client import MapboxMatrixClient
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
import json, requests, time
|
||||
from requests.adapters import HTTPAdapter
|
||||
from cartodb_services import StreetPointBulkGeocoder
|
||||
from cartodb_services.mapbox import MapboxGeocoder
|
||||
from cartodb_services.tools.exceptions import ServiceException
|
||||
from iso3166 import countries
|
||||
from cartodb_services.tools.country import country_to_iso3
|
||||
|
||||
|
||||
class MapboxBulkGeocoder(MapboxGeocoder, StreetPointBulkGeocoder):
|
||||
MAX_BATCH_SIZE = 50 # From the docs
|
||||
MIN_BATCHED_SEARCH = 0
|
||||
READ_TIMEOUT = 60
|
||||
CONNECT_TIMEOUT = 10
|
||||
MAX_RETRIES = 1
|
||||
|
||||
def __init__(self, token, logger, service_params=None):
|
||||
MapboxGeocoder.__init__(self, token, logger, service_params)
|
||||
self.connect_timeout = service_params.get('connect_timeout', self.CONNECT_TIMEOUT)
|
||||
self.read_timeout = service_params.get('read_timeout', self.READ_TIMEOUT)
|
||||
self.max_retries = service_params.get('max_retries', self.MAX_RETRIES)
|
||||
self.session = requests.Session()
|
||||
|
||||
def _bulk_geocode(self, searches):
|
||||
if len(searches) > self.MAX_BATCH_SIZE:
|
||||
raise Exception("Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE))
|
||||
if self._should_use_batch(searches):
|
||||
self._logger.debug('--> Batch geocode')
|
||||
return self._batch_geocode(searches)
|
||||
else:
|
||||
self._logger.debug('--> Serial geocode')
|
||||
return self._serial_geocode(searches)
|
||||
|
||||
def _should_use_batch(self, searches):
|
||||
return len(searches) >= self.MIN_BATCHED_SEARCH
|
||||
|
||||
def _serial_geocode(self, searches):
|
||||
results = []
|
||||
for search in searches:
|
||||
elements = self._encoded_elements(search)
|
||||
self._logger.debug('--> Sending serial search: {}'.format(search))
|
||||
coordinates = self._geocode_search(*elements)
|
||||
results.append((search[0], coordinates, []))
|
||||
return results
|
||||
|
||||
def _geocode_search(self, address, city, state, country):
|
||||
coordinates = self.geocode(searchtext=address, city=city,
|
||||
state_province=state, country=country)
|
||||
self._logger.debug('--> result sent')
|
||||
return coordinates
|
||||
|
||||
def _encoded_elements(self, search):
|
||||
(search_id, address, city, state, country) = search
|
||||
address = address.encode('utf-8') if address else None
|
||||
city = city.encode('utf-8') if city else None
|
||||
state = state.encode('utf-8') if state else None
|
||||
country = self._country_code(country) if country else None
|
||||
return address, city, state, country
|
||||
|
||||
def _batch_geocode(self, searches):
|
||||
if len(searches) == 1:
|
||||
return self._serial_geocode(searches)
|
||||
else:
|
||||
frees = []
|
||||
for search in searches:
|
||||
elements = self._encoded_elements(search)
|
||||
free = ', '.join([elem for elem in elements if elem])
|
||||
frees.append(free)
|
||||
|
||||
self._logger.debug('--> sending free search: {}'.format(frees))
|
||||
xy_results = self.geocode_free_text(frees)
|
||||
results = []
|
||||
self._logger.debug('--> searches: {}; xy: {}'.format(searches, xy_results))
|
||||
for s, r in zip(searches, xy_results):
|
||||
results.append((s[0], r, []))
|
||||
self._logger.debug('--> results: {}'.format(results))
|
||||
return results
|
||||
|
||||
def _country_code(self, country):
|
||||
country_iso3166 = None
|
||||
country_iso3 = country_to_iso3(country)
|
||||
if country_iso3:
|
||||
country_iso3166 = countries.get(country_iso3).alpha2.lower()
|
||||
|
||||
return country_iso3166
|
||||
|
||||
@@ -39,17 +39,20 @@ class MapboxGeocoder(Traceable):
|
||||
|
||||
def _parse_geocoder_response(self, response):
|
||||
json_response = json.loads(response)
|
||||
self._logger.debug('--> json response: {}'.format(json_response))
|
||||
|
||||
# If Mapbox returns more that one result, take the first one
|
||||
if json_response:
|
||||
if type(json_response) == list:
|
||||
json_response = json_response[0]
|
||||
if type(json_response) != list:
|
||||
json_response = [json_response]
|
||||
|
||||
if json_response[ENTRY_FEATURES]:
|
||||
feature = json_response[ENTRY_FEATURES][0]
|
||||
return self._extract_lng_lat_from_feature(feature)
|
||||
else:
|
||||
return []
|
||||
result = []
|
||||
for a_json_response in json_response:
|
||||
if a_json_response[ENTRY_FEATURES]:
|
||||
feature = a_json_response[ENTRY_FEATURES][0]
|
||||
result.append(self._extract_lng_lat_from_feature(feature))
|
||||
else:
|
||||
result.append([])
|
||||
return result
|
||||
else:
|
||||
return []
|
||||
|
||||
@@ -78,6 +81,13 @@ class MapboxGeocoder(Traceable):
|
||||
@qps_retry(qps=10)
|
||||
def geocode(self, searchtext, city=None, state_province=None,
|
||||
country=None):
|
||||
"""
|
||||
:param searchtext:
|
||||
:param city:
|
||||
:param state_province:
|
||||
:param country: Country ISO 3166 code
|
||||
:return: [x, y] on success, [] on error
|
||||
"""
|
||||
if not self._validate_input(searchtext, city, state_province, country):
|
||||
return []
|
||||
|
||||
@@ -91,8 +101,21 @@ class MapboxGeocoder(Traceable):
|
||||
|
||||
country = [country] if country else None
|
||||
|
||||
free_search = ', '.join(address)
|
||||
|
||||
return self.geocode_free_text([free_search], country)[0]
|
||||
|
||||
@qps_retry(qps=10)
|
||||
def geocode_free_text(self, free_searches, country=None):
|
||||
"""
|
||||
:param free_searches: Free text searches
|
||||
:param country: Country ISO 3166 code
|
||||
:return: list of [x, y] on success, [] on error
|
||||
"""
|
||||
try:
|
||||
response = self._geocoder.forward(address=', '.join(address).decode('utf-8'),
|
||||
free_search = ';'.join([self._escape(fs) for fs in free_searches])
|
||||
self._logger.debug('--> free search: {}'.format(free_search))
|
||||
response = self._geocoder.forward(address=free_search.decode('utf-8'),
|
||||
country=country,
|
||||
limit=1)
|
||||
|
||||
@@ -110,9 +133,16 @@ class MapboxGeocoder(Traceable):
|
||||
self._logger.error('Timeout connecting to Mapbox geocoding server',
|
||||
te)
|
||||
raise ServiceException('Error geocoding {0} using Mapbox'.format(
|
||||
searchtext), None)
|
||||
free_search), None)
|
||||
except requests.ConnectionError as ce:
|
||||
# Don't raise the exception to continue with the geocoding job
|
||||
self._logger.error('Error connecting to Mapbox geocoding server',
|
||||
exception=ce)
|
||||
return []
|
||||
|
||||
def _escape(self, free_search):
|
||||
# Semicolon is used to separate batch geocoding; there's no documented
|
||||
# way to pass actual semicolons, and %3B or ; won't work (check
|
||||
# TestBulkStreetFunctions.test_semicolon and the docs,
|
||||
# https://www.mapbox.com/api-documentation/#batch-requests)
|
||||
return free_search.replace(';', ',')
|
||||
|
||||
@@ -81,10 +81,21 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
'Logroño, Spain': [-2.44998, 42.46592],
|
||||
})
|
||||
|
||||
MAPBOX_POINTS = GOOGLE_POINTS.copy()
|
||||
MAPBOX_POINTS.update({
|
||||
'Logroño, Spain': [-2.44556, 42.47],
|
||||
'Logroño, Argentina': [-70.687195, -33.470901], # TODO: huge mismatch
|
||||
'Valladolid': [-4.72856, 41.652251],
|
||||
'Valladolid, Spain': [-4.72856, 41.652251],
|
||||
'1902 amphitheatre parkway': [-118.03, 34.06], # TODO: huge mismatch
|
||||
'Madrid': [-3.69194, 40.4167754],
|
||||
})
|
||||
|
||||
FIXTURE_POINTS = {
|
||||
'google': GOOGLE_POINTS,
|
||||
'heremaps': HERE_POINTS,
|
||||
'tomtom': TOMTOM_POINTS
|
||||
'tomtom': TOMTOM_POINTS,
|
||||
'mapbox': MAPBOX_POINTS
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
@@ -251,6 +262,19 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||
response = self._run_authenticated(query)
|
||||
assert_equal(1, len(response['rows']))
|
||||
|
||||
def test_semicolon(self):
|
||||
query = "select *, st_x(the_geom), st_y(the_geom) " \
|
||||
"FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \
|
||||
"'select * from jsonb_to_recordset(''[" \
|
||||
"{\"cartodb_id\": 1, \"address\": \"1900 amphitheatre parkway; mountain view; ca; us\"}," \
|
||||
"{\"cartodb_id\": 2, \"address\": \"1900 amphitheatre parkway, mountain view, ca, us\"}" \
|
||||
"]''::jsonb) as (cartodb_id integer, address text)', " \
|
||||
"'address', null, null, null)"
|
||||
response = self._run_authenticated(query)
|
||||
|
||||
x_y_by_cartodb_id = self._x_y_by_cartodb_id(response)
|
||||
assert_equal(x_y_by_cartodb_id[1], x_y_by_cartodb_id[2])
|
||||
|
||||
def _run_authenticated(self, query):
|
||||
authenticated_query = "{}&api_key={}".format(query,
|
||||
self.env_variables[
|
||||
|
||||
Reference in New Issue
Block a user