From dabd59627186a3cd79d2e9514bd8596c6e208080 Mon Sep 17 00:00:00 2001 From: Antonio Date: Wed, 27 Dec 2017 14:55:50 +0100 Subject: [PATCH] Added QPS decorator --- .../cartodb_services/mapbox/geocoder.py | 2 + .../cartodb_services/mapbox/matrix_client.py | 2 + .../cartodb_services/mapbox/routing.py | 2 + .../cartodb_services/tools/exceptions.py | 3 + .../cartodb_services/tools/qps.py | 69 +++++++++++++++++++ 5 files changed, 78 insertions(+) create mode 100644 server/lib/python/cartodb_services/cartodb_services/tools/qps.py diff --git a/server/lib/python/cartodb_services/cartodb_services/mapbox/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/mapbox/geocoder.py index 6187f55..f804be3 100644 --- a/server/lib/python/cartodb_services/cartodb_services/mapbox/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/mapbox/geocoder.py @@ -7,6 +7,7 @@ import requests from mapbox import Geocoder from cartodb_services.metrics import Traceable from cartodb_services.mapbox.exceptions import ServiceException +from cartodb_services.tools.qps import qps_retry ACCESS_TOKEN = 'pk.eyJ1IjoiYWNhcmxvbiIsImEiOiJjamJuZjQ1Zjc0Ymt4Mnh0YmFrMmhtYnY4In0.gt9cw0VeKc3rM2mV5pcEmg' @@ -48,6 +49,7 @@ class MapboxGeocoder(Traceable): latitude = location[1] return [longitude, latitude] + @qps_retry(qps=10) def geocode(self, address, country=None): response = self._geocoder.forward(address=address, country=country, diff --git a/server/lib/python/cartodb_services/cartodb_services/mapbox/matrix_client.py b/server/lib/python/cartodb_services/cartodb_services/mapbox/matrix_client.py index 404b906..42cb2f7 100644 --- a/server/lib/python/cartodb_services/cartodb_services/mapbox/matrix_client.py +++ b/server/lib/python/cartodb_services/cartodb_services/mapbox/matrix_client.py @@ -7,6 +7,7 @@ from cartodb_services.metrics import Traceable from cartodb_services.tools.coordinates import (validate_coordinates, marshall_coordinates) from exceptions import ServiceException +from cartodb_services.tools.qps import qps_retry ACCESS_TOKEN = 'pk.eyJ1IjoiYWNhcmxvbiIsImEiOiJjamJuZjQ1Zjc0Ymt4Mnh0YmFrMmhtYnY4In0.gt9cw0VeKc3rM2mV5pcEmg' @@ -57,6 +58,7 @@ class MapboxMatrixClient(Traceable): def _parse_matrix_response(self, response): return response + @qps_retry(qps=1) def matrix(self, coordinates, profile=DEFAULT_PROFILE): validate_profile(profile) validate_coordinates(coordinates, diff --git a/server/lib/python/cartodb_services/cartodb_services/mapbox/routing.py b/server/lib/python/cartodb_services/cartodb_services/mapbox/routing.py index c3a6e0e..053353c 100644 --- a/server/lib/python/cartodb_services/cartodb_services/mapbox/routing.py +++ b/server/lib/python/cartodb_services/cartodb_services/mapbox/routing.py @@ -9,6 +9,7 @@ from cartodb_services.tools import PolyLine from cartodb_services.tools.coordinates import (validate_coordinates, marshall_coordinates) from cartodb_services.mapbox.exceptions import ServiceException +from cartodb_services.tools.qps import qps_retry ACCESS_TOKEN = 'pk.eyJ1IjoiYWNhcmxvbiIsImEiOiJjamJuZjQ1Zjc0Ymt4Mnh0YmFrMmhtYnY4In0.gt9cw0VeKc3rM2mV5pcEmg' @@ -70,6 +71,7 @@ class MapboxRouting(Traceable): return MapboxRoutingResponse(geometry, distance, duration) + @qps_retry(qps=1) def directions(self, waypoints, profile=DEFAULT_PROFILE): self._validate_profile(profile) validate_coordinates(waypoints, NUM_WAYPOINTS_MIN, NUM_WAYPOINTS_MAX) diff --git a/server/lib/python/cartodb_services/cartodb_services/tools/exceptions.py b/server/lib/python/cartodb_services/cartodb_services/tools/exceptions.py index e69de29..3b7e5ee 100644 --- a/server/lib/python/cartodb_services/cartodb_services/tools/exceptions.py +++ b/server/lib/python/cartodb_services/cartodb_services/tools/exceptions.py @@ -0,0 +1,3 @@ +class TimeoutException(Exception): + def __str__(self): + return repr('Timeout requesting to mapzen server') diff --git a/server/lib/python/cartodb_services/cartodb_services/tools/qps.py b/server/lib/python/cartodb_services/cartodb_services/tools/qps.py new file mode 100644 index 0000000..59cec1e --- /dev/null +++ b/server/lib/python/cartodb_services/cartodb_services/tools/qps.py @@ -0,0 +1,69 @@ +import time +import random +from datetime import datetime +from exceptions import TimeoutException + +DEFAULT_RETRY_TIMEOUT = 60 +DEFAULT_QUERIES_PER_SECOND = 10 + +def qps_retry(original_function=None,**options): + """ Query Per Second retry decorator + The intention of this decorator is to retry requests against third + party services that has QPS restriction. + Parameters: + - timeout: Maximum number of seconds to retry + - qps: Allowed queries per second. This parameter is used to + calculate the next time to retry the request + """ + if original_function is not None: + def wrapped_function(*args, **kwargs): + if 'timeout' in options: + timeout = options['timeout'] + else: + timeout = DEFAULT_RETRY_TIMEOUT + if 'qps' in options: + qps = options['qps'] + else: + qps = DEFAULT_QUERIES_PER_SECOND + return QPSService(retry_timeout=timeout, queries_per_second=qps).call(original_function, *args, **kwargs) + return wrapped_function + else: + def partial_wrapper(func): + return qps_retry(func, **options) + return partial_wrapper + + +class QPSService: + + def __init__(self, queries_per_second, retry_timeout): + self._queries_per_second = queries_per_second + self._retry_timeout = retry_timeout + + def call(self, fn, *args, **kwargs): + start_time = datetime.now() + attempt_number = 1 + while True: + try: + return fn(*args, **kwargs) + except Exception as e: + response = getattr(e, 'response', None) + if response is not None and (response.status_code == 429): + self.retry(start_time, attempt_number) + else: + raise e + attempt_number += 1 + + def retry(self, first_request_time, retry_count): + elapsed = datetime.now() - first_request_time + if elapsed.total_seconds() > self._retry_timeout: + raise TimeoutException() + + # inverse qps * (1.5 ^ i) is an increased sleep time of 1.5x per + # iteration. + delay = (1.0/self._queries_per_second) * 1.5 ** retry_count + + # https://www.awsarchitectureblog.com/2015/03/backoff.html + # https://github.com/googlemaps/google-maps-services-python/blob/master/googlemaps/client.py#L193 + sleep_time = delay * (random.random() + 0.5) + + time.sleep(sleep_time)