Added QPS decorator
This commit is contained in:
@@ -7,6 +7,7 @@ import requests
|
||||
from mapbox import Geocoder
|
||||
from cartodb_services.metrics import Traceable
|
||||
from cartodb_services.mapbox.exceptions import ServiceException
|
||||
from cartodb_services.tools.qps import qps_retry
|
||||
|
||||
ACCESS_TOKEN = 'pk.eyJ1IjoiYWNhcmxvbiIsImEiOiJjamJuZjQ1Zjc0Ymt4Mnh0YmFrMmhtYnY4In0.gt9cw0VeKc3rM2mV5pcEmg'
|
||||
|
||||
@@ -48,6 +49,7 @@ class MapboxGeocoder(Traceable):
|
||||
latitude = location[1]
|
||||
return [longitude, latitude]
|
||||
|
||||
@qps_retry(qps=10)
|
||||
def geocode(self, address, country=None):
|
||||
response = self._geocoder.forward(address=address,
|
||||
country=country,
|
||||
|
||||
@@ -7,6 +7,7 @@ from cartodb_services.metrics import Traceable
|
||||
from cartodb_services.tools.coordinates import (validate_coordinates,
|
||||
marshall_coordinates)
|
||||
from exceptions import ServiceException
|
||||
from cartodb_services.tools.qps import qps_retry
|
||||
|
||||
ACCESS_TOKEN = 'pk.eyJ1IjoiYWNhcmxvbiIsImEiOiJjamJuZjQ1Zjc0Ymt4Mnh0YmFrMmhtYnY4In0.gt9cw0VeKc3rM2mV5pcEmg'
|
||||
|
||||
@@ -57,6 +58,7 @@ class MapboxMatrixClient(Traceable):
|
||||
def _parse_matrix_response(self, response):
|
||||
return response
|
||||
|
||||
@qps_retry(qps=1)
|
||||
def matrix(self, coordinates, profile=DEFAULT_PROFILE):
|
||||
validate_profile(profile)
|
||||
validate_coordinates(coordinates,
|
||||
|
||||
@@ -9,6 +9,7 @@ from cartodb_services.tools import PolyLine
|
||||
from cartodb_services.tools.coordinates import (validate_coordinates,
|
||||
marshall_coordinates)
|
||||
from cartodb_services.mapbox.exceptions import ServiceException
|
||||
from cartodb_services.tools.qps import qps_retry
|
||||
|
||||
ACCESS_TOKEN = 'pk.eyJ1IjoiYWNhcmxvbiIsImEiOiJjamJuZjQ1Zjc0Ymt4Mnh0YmFrMmhtYnY4In0.gt9cw0VeKc3rM2mV5pcEmg'
|
||||
|
||||
@@ -70,6 +71,7 @@ class MapboxRouting(Traceable):
|
||||
|
||||
return MapboxRoutingResponse(geometry, distance, duration)
|
||||
|
||||
@qps_retry(qps=1)
|
||||
def directions(self, waypoints, profile=DEFAULT_PROFILE):
|
||||
self._validate_profile(profile)
|
||||
validate_coordinates(waypoints, NUM_WAYPOINTS_MIN, NUM_WAYPOINTS_MAX)
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
class TimeoutException(Exception):
|
||||
def __str__(self):
|
||||
return repr('Timeout requesting to mapzen server')
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
import time
|
||||
import random
|
||||
from datetime import datetime
|
||||
from exceptions import TimeoutException
|
||||
|
||||
DEFAULT_RETRY_TIMEOUT = 60
|
||||
DEFAULT_QUERIES_PER_SECOND = 10
|
||||
|
||||
def qps_retry(original_function=None,**options):
|
||||
""" Query Per Second retry decorator
|
||||
The intention of this decorator is to retry requests against third
|
||||
party services that has QPS restriction.
|
||||
Parameters:
|
||||
- timeout: Maximum number of seconds to retry
|
||||
- qps: Allowed queries per second. This parameter is used to
|
||||
calculate the next time to retry the request
|
||||
"""
|
||||
if original_function is not None:
|
||||
def wrapped_function(*args, **kwargs):
|
||||
if 'timeout' in options:
|
||||
timeout = options['timeout']
|
||||
else:
|
||||
timeout = DEFAULT_RETRY_TIMEOUT
|
||||
if 'qps' in options:
|
||||
qps = options['qps']
|
||||
else:
|
||||
qps = DEFAULT_QUERIES_PER_SECOND
|
||||
return QPSService(retry_timeout=timeout, queries_per_second=qps).call(original_function, *args, **kwargs)
|
||||
return wrapped_function
|
||||
else:
|
||||
def partial_wrapper(func):
|
||||
return qps_retry(func, **options)
|
||||
return partial_wrapper
|
||||
|
||||
|
||||
class QPSService:
|
||||
|
||||
def __init__(self, queries_per_second, retry_timeout):
|
||||
self._queries_per_second = queries_per_second
|
||||
self._retry_timeout = retry_timeout
|
||||
|
||||
def call(self, fn, *args, **kwargs):
|
||||
start_time = datetime.now()
|
||||
attempt_number = 1
|
||||
while True:
|
||||
try:
|
||||
return fn(*args, **kwargs)
|
||||
except Exception as e:
|
||||
response = getattr(e, 'response', None)
|
||||
if response is not None and (response.status_code == 429):
|
||||
self.retry(start_time, attempt_number)
|
||||
else:
|
||||
raise e
|
||||
attempt_number += 1
|
||||
|
||||
def retry(self, first_request_time, retry_count):
|
||||
elapsed = datetime.now() - first_request_time
|
||||
if elapsed.total_seconds() > self._retry_timeout:
|
||||
raise TimeoutException()
|
||||
|
||||
# inverse qps * (1.5 ^ i) is an increased sleep time of 1.5x per
|
||||
# iteration.
|
||||
delay = (1.0/self._queries_per_second) * 1.5 ** retry_count
|
||||
|
||||
# https://www.awsarchitectureblog.com/2015/03/backoff.html
|
||||
# https://github.com/googlemaps/google-maps-services-python/blob/master/googlemaps/client.py#L193
|
||||
sleep_time = delay * (random.random() + 0.5)
|
||||
|
||||
time.sleep(sleep_time)
|
||||
Reference in New Issue
Block a user