Compare commits
14 Commits
0.11.0-cli
...
python-0.8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85d6c2a54e | ||
|
|
cad2051efe | ||
|
|
96a93e3c56 | ||
|
|
facda9e8be | ||
|
|
64fc18b9e0 | ||
|
|
9381d5644b | ||
|
|
9f55f2ee3b | ||
|
|
1087c1266b | ||
|
|
d5a296a30c | ||
|
|
f8caf4314d | ||
|
|
d7910fbbf1 | ||
|
|
d47049c813 | ||
|
|
cc8f93c535 | ||
|
|
3f9441de7e |
10
NEWS.md
10
NEWS.md
@@ -1,3 +1,13 @@
|
||||
September 28, 2016
|
||||
==========
|
||||
* Released version 0.8.1 of Python package cartodb\_services
|
||||
* Improvements in QPS retry decorator for requests to external services
|
||||
|
||||
September 8, 2016
|
||||
===========
|
||||
* Released version 0.11.1 of the client
|
||||
* Minor change in the name of the function parameter sent to server and Observatory backend for compatibility with the last observatory-extension framework updates
|
||||
|
||||
September 1, 2016
|
||||
===========
|
||||
* Released version 0.11.0 of the client
|
||||
|
||||
@@ -5,7 +5,7 @@ The CARTO Data Services SQL API
|
||||
Steps to deploy a new Data Services API version :
|
||||
|
||||
- Deploy new version of dataservices API to all servers
|
||||
- Update the server user using: ALTER EXTENSION cdb_dataservices_server UPDATE TO '<CURRENT_VERSION>';
|
||||
- Update the server user using: ALTER EXTENSION cdb_dataservices_server UPDATE TO '\<CURRENT_VERSION\>';
|
||||
- Update the python dependencies if needed: **cartodb_geocoder** and **heremaps**
|
||||
- Add the needed config in the `cdb_conf` table:
|
||||
- `redis_metadata_config` and `redis_metrics_conf`
|
||||
|
||||
@@ -13,8 +13,8 @@ OLD_VERSIONS = $(wildcard old_versions/*.sql)
|
||||
# @see http://www.postgresql.org/docs/current/static/extend-pgxs.html
|
||||
DATA = $(NEW_EXTENSION_ARTIFACT) \
|
||||
$(OLD_VERSIONS) \
|
||||
cdb_dataservices_client--0.10.2--0.11.0.sql \
|
||||
cdb_dataservices_client--0.11.0--0.10.2.sql
|
||||
cdb_dataservices_client--0.11.0--0.11.1.sql \
|
||||
cdb_dataservices_client--0.11.1--0.11.0.sql
|
||||
|
||||
|
||||
REGRESS = $(notdir $(basename $(wildcard test/sql/*test.sql)))
|
||||
|
||||
140
client/cdb_dataservices_client--0.11.0--0.11.1.sql
Normal file
140
client/cdb_dataservices_client--0.11.0--0.11.1.sql
Normal file
@@ -0,0 +1,140 @@
|
||||
--DO NOT MODIFY THIS FILE, IT IS GENERATED AUTOMATICALLY FROM SOURCES
|
||||
-- Complain if script is sourced in psql, rather than via CREATE EXTENSION
|
||||
\echo Use "ALTER EXTENSION cdb_dataservices_client UPDATE TO '0.11.1'" to load this file. \quit
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(
|
||||
username text,
|
||||
orgname text,
|
||||
user_db_role text,
|
||||
user_schema text,
|
||||
output_table_name text,
|
||||
params json
|
||||
) RETURNS boolean AS $$
|
||||
function_name = 'OBS_GetMeasure'
|
||||
# Obtain return types for augmentation procedure
|
||||
ds_return_metadata = plpy.execute("SELECT colnames, coltypes "
|
||||
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);"
|
||||
.format(
|
||||
username=plpy.quote_nullable(username),
|
||||
orgname=plpy.quote_nullable(orgname),
|
||||
function_name=plpy.quote_literal(function_name),
|
||||
params=plpy.quote_literal(params)
|
||||
)
|
||||
)
|
||||
if ds_return_metadata[0]["colnames"]:
|
||||
colnames_arr = ds_return_metadata[0]["colnames"]
|
||||
coltypes_arr = ds_return_metadata[0]["coltypes"]
|
||||
else:
|
||||
raise Exception('Error retrieving OBS_GetMeasure metadata')
|
||||
|
||||
|
||||
# Prepare column and type strings required in the SQL queries
|
||||
columns_with_types_arr = [colnames_arr[i] + ' ' + coltypes_arr[i] for i in range(0,len(colnames_arr))]
|
||||
columns_with_types = ','.join(columns_with_types_arr)
|
||||
|
||||
# Create a new table with the required columns
|
||||
plpy.execute('CREATE TABLE "{schema}".{table_name} ( '
|
||||
'cartodb_id int, the_geom geometry, {columns_with_types} '
|
||||
');'
|
||||
.format(schema=user_schema, table_name=output_table_name, columns_with_types=columns_with_types)
|
||||
)
|
||||
|
||||
plpy.execute('ALTER TABLE "{schema}".{table_name} OWNER TO "{user}";'
|
||||
.format(schema=user_schema, table_name=output_table_name, user=user_db_role)
|
||||
)
|
||||
|
||||
return True
|
||||
$$ LANGUAGE plpythonu;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure(
|
||||
username text,
|
||||
orgname text,
|
||||
user_db_role text,
|
||||
user_schema text,
|
||||
dbname text,
|
||||
table_name text,
|
||||
output_table_name text,
|
||||
params json
|
||||
) RETURNS boolean AS $$
|
||||
function_name = 'OBS_GetMeasure'
|
||||
# Obtain return types for augmentation procedure
|
||||
ds_return_metadata = plpy.execute(
|
||||
"SELECT colnames, coltypes "
|
||||
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);" .format(
|
||||
username=plpy.quote_nullable(username),
|
||||
orgname=plpy.quote_nullable(orgname),
|
||||
function_name=plpy.quote_literal(function_name),
|
||||
params=plpy.quote_literal(params)))
|
||||
|
||||
if ds_return_metadata[0]["colnames"]:
|
||||
colnames_arr = ds_return_metadata[0]["colnames"]
|
||||
coltypes_arr = ds_return_metadata[0]["coltypes"]
|
||||
else:
|
||||
raise Exception('Error retrieving OBS_GetMeasure metadata')
|
||||
|
||||
# Prepare column and type strings required in the SQL queries
|
||||
columns_with_types_arr = [
|
||||
colnames_arr[i] +
|
||||
' ' +
|
||||
coltypes_arr[i] for i in range(
|
||||
0,
|
||||
len(colnames_arr))]
|
||||
columns_with_types = ','.join(columns_with_types_arr)
|
||||
aliased_colname_list = ','.join(
|
||||
['result.' + name for name in colnames_arr])
|
||||
|
||||
# Instruct the OBS server side to establish a FDW
|
||||
# The metadata is obtained as well in order to:
|
||||
# - (a) be able to write the query to grab the actual data to be executed in the remote server via pl/proxy,
|
||||
# - (b) be able to tell OBS to free resources when done.
|
||||
ds_fdw_metadata = plpy.execute(
|
||||
"SELECT schemaname, tabname, servername "
|
||||
"FROM cdb_dataservices_client._DST_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, "
|
||||
"{schema}::text, {dbname}::text, {table_name}::text);" .format(
|
||||
username=plpy.quote_nullable(username),
|
||||
orgname=plpy.quote_nullable(orgname),
|
||||
user_db_role=plpy.quote_literal(user_db_role),
|
||||
schema=plpy.quote_literal(user_schema),
|
||||
dbname=plpy.quote_literal(dbname),
|
||||
table_name=plpy.quote_literal(table_name)))
|
||||
|
||||
if ds_fdw_metadata[0]["schemaname"]:
|
||||
server_schema = ds_fdw_metadata[0]["schemaname"]
|
||||
server_table_name = ds_fdw_metadata[0]["tabname"]
|
||||
server_name = ds_fdw_metadata[0]["servername"]
|
||||
else:
|
||||
raise Exception('Error connecting dataset via FDW')
|
||||
|
||||
# Create a new table with the required columns
|
||||
plpy.execute(
|
||||
'INSERT INTO "{schema}".{analysis_table_name} '
|
||||
'SELECT ut.cartodb_id, ut.the_geom, {colname_list} '
|
||||
'FROM "{schema}".{table_name} ut '
|
||||
'LEFT JOIN _DST_FetchJoinFdwTableData({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, '
|
||||
'{function_name}::text, {params}::json) '
|
||||
'AS result ({columns_with_types}, cartodb_id int) '
|
||||
'ON result.cartodb_id = ut.cartodb_id;' .format(
|
||||
schema=user_schema,
|
||||
analysis_table_name=output_table_name,
|
||||
colname_list=aliased_colname_list,
|
||||
table_name=table_name,
|
||||
username=plpy.quote_nullable(username),
|
||||
orgname=plpy.quote_nullable(orgname),
|
||||
server_schema=plpy.quote_literal(server_schema),
|
||||
server_table_name=plpy.quote_literal(server_table_name),
|
||||
function_name=plpy.quote_literal(function_name),
|
||||
params=plpy.quote_literal(params),
|
||||
columns_with_types=columns_with_types))
|
||||
|
||||
# Wipe user FDW data from the server
|
||||
wiped = plpy.execute(
|
||||
"SELECT cdb_dataservices_client._DST_DisconnectUserTable({username}::text, {orgname}::text, {server_schema}::text, "
|
||||
"{server_table_name}::text, {fdw_server}::text)" .format(
|
||||
username=plpy.quote_nullable(username),
|
||||
orgname=plpy.quote_nullable(orgname),
|
||||
server_schema=plpy.quote_literal(server_schema),
|
||||
server_table_name=plpy.quote_literal(server_table_name),
|
||||
fdw_server=plpy.quote_literal(server_name)))
|
||||
|
||||
return True
|
||||
$$ LANGUAGE plpythonu;
|
||||
140
client/cdb_dataservices_client--0.11.1--0.11.0.sql
Normal file
140
client/cdb_dataservices_client--0.11.1--0.11.0.sql
Normal file
@@ -0,0 +1,140 @@
|
||||
--DO NOT MODIFY THIS FILE, IT IS GENERATED AUTOMATICALLY FROM SOURCES
|
||||
-- Complain if script is sourced in psql, rather than via CREATE EXTENSION
|
||||
\echo Use "ALTER EXTENSION cdb_dataservices_client UPDATE TO '0.11.0'" to load this file. \quit
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(
|
||||
username text,
|
||||
orgname text,
|
||||
user_db_role text,
|
||||
user_schema text,
|
||||
output_table_name text,
|
||||
params json
|
||||
) RETURNS boolean AS $$
|
||||
function_name = 'GetMeasure'
|
||||
# Obtain return types for augmentation procedure
|
||||
ds_return_metadata = plpy.execute("SELECT colnames, coltypes "
|
||||
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);"
|
||||
.format(
|
||||
username=plpy.quote_nullable(username),
|
||||
orgname=plpy.quote_nullable(orgname),
|
||||
function_name=plpy.quote_literal(function_name),
|
||||
params=plpy.quote_literal(params)
|
||||
)
|
||||
)
|
||||
if ds_return_metadata[0]["colnames"]:
|
||||
colnames_arr = ds_return_metadata[0]["colnames"]
|
||||
coltypes_arr = ds_return_metadata[0]["coltypes"]
|
||||
else:
|
||||
raise Exception('Error retrieving OBS_GetMeasure metadata')
|
||||
|
||||
|
||||
# Prepare column and type strings required in the SQL queries
|
||||
columns_with_types_arr = [colnames_arr[i] + ' ' + coltypes_arr[i] for i in range(0,len(colnames_arr))]
|
||||
columns_with_types = ','.join(columns_with_types_arr)
|
||||
|
||||
# Create a new table with the required columns
|
||||
plpy.execute('CREATE TABLE "{schema}".{table_name} ( '
|
||||
'cartodb_id int, the_geom geometry, {columns_with_types} '
|
||||
');'
|
||||
.format(schema=user_schema, table_name=output_table_name, columns_with_types=columns_with_types)
|
||||
)
|
||||
|
||||
plpy.execute('ALTER TABLE "{schema}".{table_name} OWNER TO "{user}";'
|
||||
.format(schema=user_schema, table_name=output_table_name, user=user_db_role)
|
||||
)
|
||||
|
||||
return True
|
||||
$$ LANGUAGE plpythonu;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure(
|
||||
username text,
|
||||
orgname text,
|
||||
user_db_role text,
|
||||
user_schema text,
|
||||
dbname text,
|
||||
table_name text,
|
||||
output_table_name text,
|
||||
params json
|
||||
) RETURNS boolean AS $$
|
||||
function_name = 'GetMeasure'
|
||||
# Obtain return types for augmentation procedure
|
||||
ds_return_metadata = plpy.execute(
|
||||
"SELECT colnames, coltypes "
|
||||
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);" .format(
|
||||
username=plpy.quote_nullable(username),
|
||||
orgname=plpy.quote_nullable(orgname),
|
||||
function_name=plpy.quote_literal(function_name),
|
||||
params=plpy.quote_literal(params)))
|
||||
|
||||
if ds_return_metadata[0]["colnames"]:
|
||||
colnames_arr = ds_return_metadata[0]["colnames"]
|
||||
coltypes_arr = ds_return_metadata[0]["coltypes"]
|
||||
else:
|
||||
raise Exception('Error retrieving OBS_GetMeasure metadata')
|
||||
|
||||
# Prepare column and type strings required in the SQL queries
|
||||
columns_with_types_arr = [
|
||||
colnames_arr[i] +
|
||||
' ' +
|
||||
coltypes_arr[i] for i in range(
|
||||
0,
|
||||
len(colnames_arr))]
|
||||
columns_with_types = ','.join(columns_with_types_arr)
|
||||
aliased_colname_list = ','.join(
|
||||
['result.' + name for name in colnames_arr])
|
||||
|
||||
# Instruct the OBS server side to establish a FDW
|
||||
# The metadata is obtained as well in order to:
|
||||
# - (a) be able to write the query to grab the actual data to be executed in the remote server via pl/proxy,
|
||||
# - (b) be able to tell OBS to free resources when done.
|
||||
ds_fdw_metadata = plpy.execute(
|
||||
"SELECT schemaname, tabname, servername "
|
||||
"FROM cdb_dataservices_client._DST_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, "
|
||||
"{schema}::text, {dbname}::text, {table_name}::text);" .format(
|
||||
username=plpy.quote_nullable(username),
|
||||
orgname=plpy.quote_nullable(orgname),
|
||||
user_db_role=plpy.quote_literal(user_db_role),
|
||||
schema=plpy.quote_literal(user_schema),
|
||||
dbname=plpy.quote_literal(dbname),
|
||||
table_name=plpy.quote_literal(table_name)))
|
||||
|
||||
if ds_fdw_metadata[0]["schemaname"]:
|
||||
server_schema = ds_fdw_metadata[0]["schemaname"]
|
||||
server_table_name = ds_fdw_metadata[0]["tabname"]
|
||||
server_name = ds_fdw_metadata[0]["servername"]
|
||||
else:
|
||||
raise Exception('Error connecting dataset via FDW')
|
||||
|
||||
# Create a new table with the required columns
|
||||
plpy.execute(
|
||||
'INSERT INTO "{schema}".{analysis_table_name} '
|
||||
'SELECT ut.cartodb_id, ut.the_geom, {colname_list} '
|
||||
'FROM "{schema}".{table_name} ut '
|
||||
'LEFT JOIN _DST_FetchJoinFdwTableData({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, '
|
||||
'{function_name}::text, {params}::json) '
|
||||
'AS result ({columns_with_types}, cartodb_id int) '
|
||||
'ON result.cartodb_id = ut.cartodb_id;' .format(
|
||||
schema=user_schema,
|
||||
analysis_table_name=output_table_name,
|
||||
colname_list=aliased_colname_list,
|
||||
table_name=table_name,
|
||||
username=plpy.quote_nullable(username),
|
||||
orgname=plpy.quote_nullable(orgname),
|
||||
server_schema=plpy.quote_literal(server_schema),
|
||||
server_table_name=plpy.quote_literal(server_table_name),
|
||||
function_name=plpy.quote_literal(function_name),
|
||||
params=plpy.quote_literal(params),
|
||||
columns_with_types=columns_with_types))
|
||||
|
||||
# Wipe user FDW data from the server
|
||||
wiped = plpy.execute(
|
||||
"SELECT cdb_dataservices_client._DST_DisconnectUserTable({username}::text, {orgname}::text, {server_schema}::text, "
|
||||
"{server_table_name}::text, {fdw_server}::text)" .format(
|
||||
username=plpy.quote_nullable(username),
|
||||
orgname=plpy.quote_nullable(orgname),
|
||||
server_schema=plpy.quote_literal(server_schema),
|
||||
server_table_name=plpy.quote_literal(server_table_name),
|
||||
fdw_server=plpy.quote_literal(server_name)))
|
||||
|
||||
return True
|
||||
$$ LANGUAGE plpythonu;
|
||||
1792
client/cdb_dataservices_client--0.11.1.sql
Normal file
1792
client/cdb_dataservices_client--0.11.1.sql
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,5 +1,5 @@
|
||||
comment = 'CartoDB dataservices client API extension'
|
||||
default_version = '0.11.0'
|
||||
default_version = '0.11.1'
|
||||
requires = 'plproxy, cartodb'
|
||||
superuser = true
|
||||
schema = cdb_dataservices_client
|
||||
|
||||
@@ -100,7 +100,7 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PrepareTableOBS_GetMeas
|
||||
output_table_name text,
|
||||
params json
|
||||
) RETURNS boolean AS $$
|
||||
function_name = 'GetMeasure'
|
||||
function_name = 'OBS_GetMeasure'
|
||||
# Obtain return types for augmentation procedure
|
||||
ds_return_metadata = plpy.execute("SELECT colnames, coltypes "
|
||||
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);"
|
||||
@@ -146,7 +146,7 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PopulateTableOBS_GetMea
|
||||
output_table_name text,
|
||||
params json
|
||||
) RETURNS boolean AS $$
|
||||
function_name = 'GetMeasure'
|
||||
function_name = 'OBS_GetMeasure'
|
||||
# Obtain return types for augmentation procedure
|
||||
ds_return_metadata = plpy.execute(
|
||||
"SELECT colnames, coltypes "
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# CartoDB dataservices API python module
|
||||
# CARTO dataservices API python module
|
||||
|
||||
This directory contains the python library used by the server side of CARTO LDS (Location Data Services).
|
||||
|
||||
|
||||
@@ -19,3 +19,15 @@ class MalformedResult(Exception):
|
||||
class TimeoutException(Exception):
|
||||
def __str__(self):
|
||||
return repr('Timeout requesting to mapzen server')
|
||||
|
||||
|
||||
class ServiceException(Exception):
|
||||
def __init__(self, message, response):
|
||||
self.message = message
|
||||
self.response = response
|
||||
|
||||
def response(self):
|
||||
return self.response
|
||||
|
||||
def __str__(self):
|
||||
return self.message
|
||||
|
||||
@@ -2,7 +2,7 @@ import requests
|
||||
import json
|
||||
import re
|
||||
|
||||
from exceptions import WrongParams, MalformedResult
|
||||
from exceptions import WrongParams, MalformedResult, ServiceException
|
||||
from qps import qps_retry
|
||||
from cartodb_services.tools import Coordinate, PolyLine
|
||||
|
||||
@@ -17,8 +17,9 @@ class MapzenGeocoder:
|
||||
self._url = base_url
|
||||
self._logger = logger
|
||||
|
||||
@qps_retry
|
||||
def geocode(self, searchtext, city=None, state_province=None, country=None, search_type=None):
|
||||
@qps_retry(qps=20)
|
||||
def geocode(self, searchtext, city=None, state_province=None,
|
||||
country=None, search_type=None):
|
||||
request_params = self._build_requests_parameters(searchtext, city,
|
||||
state_province,
|
||||
country, search_type)
|
||||
@@ -31,21 +32,21 @@ class MapzenGeocoder:
|
||||
else:
|
||||
self._logger.error('Error trying to geocode using mapzen',
|
||||
data={"response_status": response.status_code,
|
||||
"response_reason": response.reason,
|
||||
"response_content": response.text,
|
||||
"reponse_url": response.url,
|
||||
"response_headers": response.headers,
|
||||
"searchtext": searchtext,
|
||||
"city": city, "country": country,
|
||||
"state_province": state_province })
|
||||
raise Exception('Error trying to geocode {0} using mapzen'.format(searchtext))
|
||||
"response_reason": response.reason,
|
||||
"response_content": response.text,
|
||||
"reponse_url": response.url,
|
||||
"response_headers": response.headers,
|
||||
"searchtext": searchtext,
|
||||
"city": city, "country": country,
|
||||
"state_province": state_province})
|
||||
raise ServiceException('Error trying to geocode {0} using mapzen'.format(searchtext),
|
||||
response)
|
||||
except requests.ConnectionError as e:
|
||||
# Don't raise the exception to continue with the geocoding job
|
||||
self._logger.error('Error connecting to Mapzen geocoding server',
|
||||
exception=e)
|
||||
return []
|
||||
|
||||
|
||||
def _build_requests_parameters(self, searchtext, city=None,
|
||||
state_province=None, country=None,
|
||||
search_type=None):
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import requests
|
||||
import json
|
||||
from qps import qps_retry
|
||||
from exceptions import ServiceException
|
||||
|
||||
|
||||
class MatrixClient:
|
||||
@@ -51,6 +52,6 @@ class MatrixClient:
|
||||
"response_headers": response.headers,
|
||||
"locations": locations,
|
||||
"costing": costing})
|
||||
raise Exception('Error trying to get matrix distance from mapzen')
|
||||
raise ServiceException("Error trying to get matrix distance from mapzen", response)
|
||||
|
||||
return response.json()
|
||||
|
||||
@@ -4,18 +4,38 @@ from datetime import datetime
|
||||
from exceptions import TimeoutException
|
||||
|
||||
DEFAULT_RETRY_TIMEOUT = 60
|
||||
DEFAULT_QUERIES_PER_SECOND = 10
|
||||
|
||||
|
||||
def qps_retry(f):
|
||||
def wrapped_f(*args, **kw):
|
||||
return QPSService().call(f, *args, **kw)
|
||||
return wrapped_f
|
||||
def qps_retry(original_function=None,**options):
|
||||
""" Query Per Second retry decorator
|
||||
The intention of this decorator is to retry requests against third
|
||||
party services that has QPS restriction.
|
||||
Parameters:
|
||||
- timeout: Maximum number of seconds to retry
|
||||
- qps: Allowed queries per second. This parameter is used to
|
||||
calculate the next time to retry the request
|
||||
"""
|
||||
if original_function is not None:
|
||||
def wrapped_function(*args, **kwargs):
|
||||
if 'timeout' in options:
|
||||
timeout = options['timeout']
|
||||
else:
|
||||
timeout = DEFAULT_RETRY_TIMEOUT
|
||||
if 'qps' in options:
|
||||
qps = options['qps']
|
||||
else:
|
||||
qps = DEFAULT_QUERIES_PER_SECOND
|
||||
return QPSService(retry_timeout=timeout, queries_per_second=qps).call(original_function, *args, **kwargs)
|
||||
return wrapped_function
|
||||
else:
|
||||
def partial_wrapper(func):
|
||||
return qps_retry(func, **options)
|
||||
return partial_wrapper
|
||||
|
||||
|
||||
class QPSService:
|
||||
|
||||
def __init__(self, queries_per_second=10,
|
||||
retry_timeout=DEFAULT_RETRY_TIMEOUT):
|
||||
def __init__(self, queries_per_second, retry_timeout):
|
||||
self._queries_per_second = queries_per_second
|
||||
self._retry_timeout = retry_timeout
|
||||
|
||||
@@ -27,7 +47,7 @@ class QPSService:
|
||||
return fn(*args, **kwargs)
|
||||
except Exception as e:
|
||||
response = getattr(e, 'response', None)
|
||||
if response and (response.status_code == 429):
|
||||
if response is not None and (response.status_code == 429):
|
||||
self.retry(start_time, attempt_number)
|
||||
else:
|
||||
raise e
|
||||
@@ -35,7 +55,7 @@ class QPSService:
|
||||
|
||||
def retry(self, first_request_time, retry_count):
|
||||
elapsed = datetime.now() - first_request_time
|
||||
if elapsed.seconds > self._retry_timeout:
|
||||
if elapsed.microseconds > (self._retry_timeout * 1000.0):
|
||||
raise TimeoutException()
|
||||
|
||||
# inverse qps * (1.5 ^ i) is an increased sleep time of 1.5x per
|
||||
|
||||
@@ -2,7 +2,7 @@ import requests
|
||||
import json
|
||||
import re
|
||||
|
||||
from exceptions import WrongParams, MalformedResult
|
||||
from exceptions import WrongParams, MalformedResult, ServiceException
|
||||
from qps import qps_retry
|
||||
from cartodb_services.tools import Coordinate, PolyLine
|
||||
|
||||
@@ -57,7 +57,7 @@ class MapzenRouting:
|
||||
"response_headers": response.headers,
|
||||
"waypoints": waypoints, "mode": mode,
|
||||
"options": options})
|
||||
raise Exception('Error trying to calculate route using Mapzen')
|
||||
raise ServiceException('Error trying to calculate route using Mapzen', response)
|
||||
|
||||
def __parse_options(self, options):
|
||||
return dict(option.split('=') for option in options)
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import plpy
|
||||
import rollbar
|
||||
import logging
|
||||
import json
|
||||
@@ -6,7 +5,14 @@ import traceback
|
||||
import sys
|
||||
# Monkey patch because plpython sys module doesn't have argv and rollbar
|
||||
# package use it
|
||||
sys.__dict__['argv'] = []
|
||||
if 'argv' not in sys.__dict__:
|
||||
sys.__dict__['argv'] = []
|
||||
|
||||
# Only can be imported when is called from PLPython
|
||||
try:
|
||||
import plpy
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
class Logger:
|
||||
@@ -30,30 +36,28 @@ class Logger:
|
||||
return
|
||||
self._send_to_rollbar('debug', text, exception, data)
|
||||
self._send_to_log_file('debug', text, exception, data)
|
||||
plpy.debug(text)
|
||||
self._send_to_plpy('debug', text)
|
||||
|
||||
def info(self, text, exception=None, data={}):
|
||||
if not self._check_min_level('info'):
|
||||
return
|
||||
self._send_to_rollbar('info', text, exception, data)
|
||||
self._send_to_log_file('info', text, exception, data)
|
||||
plpy.info(text)
|
||||
self._send_to_plpy('info', text)
|
||||
|
||||
def warning(self, text, exception=None, data={}):
|
||||
if not self._check_min_level('warning'):
|
||||
return
|
||||
self._send_to_rollbar('warning', text, exception, data)
|
||||
self._send_to_log_file('warning', text, exception, data)
|
||||
plpy.warning(text)
|
||||
self._send_to_plpy('warning', text)
|
||||
|
||||
def error(self, text, exception=None, data={}):
|
||||
if not self._check_min_level('error'):
|
||||
return
|
||||
self._send_to_rollbar('error', text, exception, data)
|
||||
self._send_to_log_file('error', text, exception, data)
|
||||
# Plpy.error and fatal raises exceptions and we only want to log an
|
||||
# error, exceptions should be raise explicitly
|
||||
plpy.warning(text)
|
||||
self._send_to_plpy('error', text)
|
||||
|
||||
def _check_min_level(self, level):
|
||||
return True if self.LEVELS[level] >= self._min_level else False
|
||||
@@ -82,6 +86,19 @@ class Logger:
|
||||
elif level == 'error':
|
||||
self._file_logger.error(text, extra=extra_data)
|
||||
|
||||
def _send_to_plpy(self, level, text):
|
||||
if self._check_plpy():
|
||||
if level == 'debug':
|
||||
plpy.debug(text)
|
||||
elif level == 'info':
|
||||
plpy.info(text)
|
||||
elif level == 'warning':
|
||||
plpy.warning(text)
|
||||
elif level == 'error':
|
||||
# Plpy.error and fatal raises exceptions and we only want to
|
||||
# log an error, exceptions should be raise explicitly
|
||||
plpy.warning(text)
|
||||
|
||||
def _parse_log_extra_data(self, exception, data):
|
||||
extra_data = {}
|
||||
if exception:
|
||||
@@ -118,6 +135,13 @@ class Logger:
|
||||
def _log_file_activated(self):
|
||||
return True if self._config.log_file_path else False
|
||||
|
||||
def _check_plpy(self):
|
||||
try:
|
||||
module = sys.modules['plpy']
|
||||
return True
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
|
||||
class ConfigException(Exception):
|
||||
pass
|
||||
|
||||
@@ -10,7 +10,7 @@ from setuptools import setup, find_packages
|
||||
setup(
|
||||
name='cartodb_services',
|
||||
|
||||
version='0.8',
|
||||
version='0.8.1',
|
||||
|
||||
description='CartoDB Services API Python Library',
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import requests_mock
|
||||
from mock import Mock
|
||||
|
||||
from cartodb_services.mapzen import MapzenGeocoder
|
||||
from cartodb_services.mapzen.exceptions import MalformedResult
|
||||
from cartodb_services.mapzen.exceptions import MalformedResult, TimeoutException
|
||||
|
||||
requests_mock.Mocker.TEST_PREFIX = 'test_'
|
||||
|
||||
|
||||
33
server/lib/python/cartodb_services/test/test_qps.py
Normal file
33
server/lib/python/cartodb_services/test/test_qps.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import test_helper
|
||||
import requests
|
||||
from unittest import TestCase
|
||||
from nose.tools import assert_raises
|
||||
from datetime import datetime, date
|
||||
from cartodb_services.mapzen.qps import qps_retry
|
||||
from cartodb_services.mapzen.exceptions import ServiceException, TimeoutException
|
||||
import requests_mock
|
||||
import mock
|
||||
|
||||
requests_mock.Mocker.TEST_PREFIX = 'test_'
|
||||
|
||||
@requests_mock.Mocker()
|
||||
class TestQPS(TestCase):
|
||||
QPS_ERROR_MESSAGE = "Queries per second exceeded: Queries exceeded (10 allowed)"
|
||||
|
||||
def test_qps_timeout(self, req_mock):
|
||||
class TestClass:
|
||||
@qps_retry(timeout=0.001, qps=100)
|
||||
def test(self):
|
||||
response = requests.get('http://localhost/test_qps')
|
||||
if response.status_code == 429:
|
||||
raise ServiceException('Error 429', response)
|
||||
|
||||
def _text_cb(request, context):
|
||||
context.status_code = 429
|
||||
return self.QPS_ERROR_MESSAGE
|
||||
|
||||
req_mock.register_uri('GET', 'http://localhost/test_qps',
|
||||
text=_text_cb)
|
||||
with self.assertRaises(TimeoutException):
|
||||
c = TestClass()
|
||||
c.test()
|
||||
Reference in New Issue
Block a user