import asyncio
from functools import wraps
from typing import (
Any, Callable, Coroutine, Generic, Optional, ParamSpec, Tuple, Type,
TypeVar, Union,
)
from .counters import Statistic
Number = Union[int, float]
T = TypeVar("T")
P = ParamSpec("P")
[docs]class BackoffStatistic(Statistic):
done: int
attempts: int
cancels: int
errors: int
sum_time: float
[docs]class RetryStatistic(BackoffStatistic):
pass
[docs]class Backoff:
__slots__ = (
"attempt_timeout",
"deadline",
"pause",
"max_tries",
"giveup",
"exceptions",
"statistic",
)
def __init__(
self,
attempt_timeout: Optional[Number],
deadline: Optional[Number],
pause: Number = 0,
exceptions: Tuple[Type[Exception], ...] = (),
max_tries: Optional[int] = None,
giveup: Optional[Callable[[Exception], bool]] = None,
statistic_name: Optional[str] = None,
statistic_class: Type[BackoffStatistic] = BackoffStatistic,
):
if not pause:
pause = 0
elif pause < 0:
raise ValueError("'pause' must be positive")
if attempt_timeout is not None and attempt_timeout < 0:
raise ValueError("'attempt_timeout' must be positive or None")
if deadline is not None and deadline < 0:
raise ValueError("'deadline' must be positive or None")
if max_tries is not None and max_tries < 1:
raise ValueError("'max_retries' must be >= 1 or None")
if giveup is not None and not callable(giveup):
raise ValueError("'giveup' must be a callable or None")
exceptions = tuple(exceptions) or ()
exceptions += asyncio.TimeoutError,
self.attempt_timeout = attempt_timeout
self.deadline = deadline
self.pause = pause
self.max_tries = max_tries
self.giveup = giveup
self.exceptions = exceptions
self.statistic = statistic_class(statistic_name)
[docs] def prepare(
self,
func: Callable[P, Coroutine[Any, Any, T]],
) -> "BackoffExecution[P, T]":
return BackoffExecution(
function=func,
statistic=self.statistic,
attempt_timeout=self.attempt_timeout,
deadline=self.deadline,
pause=self.pause,
max_tries=self.max_tries,
giveup=self.giveup,
exceptions=self.exceptions,
)
[docs] async def execute(
self,
func: Callable[P, Coroutine[Any, Any, T]],
*args: P.args,
**kwargs: P.kwargs,
) -> T:
execution = self.prepare(func)
return await execution(*args, **kwargs)
def __call__(
self,
func: Callable[P, Coroutine[Any, Any, T]],
) -> Callable[P, Coroutine[Any, Any, T]]:
if not asyncio.iscoroutinefunction(func):
raise TypeError("Function must be a coroutine function")
@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
return await self.execute(func, *args, **kwargs)
return wrapper
[docs]class BackoffExecution(Generic[P, T]):
__slots__ = (
"attempt_timeout",
"deadline",
"exceptions",
"function",
"giveup",
"last_exception",
"max_tries",
"pause",
"statistic",
"total_tries",
)
def __init__(
self,
function: Callable[P, Coroutine[Any, Any, T]],
statistic: BackoffStatistic,
attempt_timeout: Optional[Number],
deadline: Optional[Number],
pause: Number = 0,
exceptions: Tuple[Type[Exception], ...] = (),
max_tries: Optional[int] = None,
giveup: Optional[Callable[[Exception], bool]] = None,
):
self.function = function
self.statistic = statistic
self.attempt_timeout = attempt_timeout
self.deadline = deadline
self.pause = pause
self.max_tries = max_tries
self.giveup = giveup
self.exceptions = exceptions
self.last_exception: Optional[Exception] = None
self.total_tries: int = 0
async def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
return await self.execute(*args, **kwargs)
[docs] async def execute(self, *args: P.args, **kwargs: P.kwargs) -> T:
async def run() -> Any:
loop = asyncio.get_running_loop()
while True:
self.statistic.attempts += 1
self.total_tries += 1
delta = -loop.time()
try:
return await asyncio.wait_for(
self.function(*args, **kwargs),
timeout=self.attempt_timeout,
)
except asyncio.CancelledError:
self.statistic.cancels += 1
raise
except self.exceptions as e:
self.statistic.errors += 1
self.last_exception = e
if (
self.max_tries is not None and
self.total_tries >= self.max_tries
):
raise
if self.giveup and self.giveup(e):
raise
await asyncio.sleep(self.pause)
except Exception as e:
self.last_exception = e
raise
finally:
delta += loop.time()
self.statistic.sum_time += delta
self.statistic.done += 1
try:
return await asyncio.wait_for(run(), timeout=self.deadline)
except Exception:
if self.last_exception is not None:
raise self.last_exception
raise
# noinspection SpellCheckingInspection
[docs]def asyncbackoff(
attempt_timeout: Optional[Number],
deadline: Optional[Number],
pause: Number = 0,
*exc: Type[Exception], exceptions: Tuple[Type[Exception], ...] = (),
max_tries: Optional[int] = None,
giveup: Optional[Callable[[Exception], bool]] = None,
statistic_name: Optional[str] = None,
statistic_class: Type[BackoffStatistic] = BackoffStatistic,
) -> Callable[
[Callable[P, Coroutine[Any, Any, T]]],
Callable[P, Coroutine[Any, Any, T]],
]:
"""
Patametric decorator that ensures that ``attempt_timeout`` and
``deadline`` time limits are met by decorated function.
In case of exception function will be called again with similar
arguments after ``pause`` seconds.
:param statistic_name: name filed for statistic instances
:param attempt_timeout: is maximum execution time for one
execution attempt.
:param deadline: is maximum execution time for all execution attempts.
:param pause: is time gap between execution attempts.
:param exc: retrying when these exceptions were raised.
:param exceptions: similar as exc but keyword only.
:param max_tries: is maximum count of execution attempts (>= 1).
:param giveup: is a predicate function which can decide by a given
:param statistic_class: statistic class
"""
return Backoff(
attempt_timeout=attempt_timeout,
deadline=deadline,
pause=pause,
exceptions=exceptions or exc,
max_tries=max_tries,
giveup=giveup,
statistic_name=statistic_name,
statistic_class=statistic_class,
)
[docs]def asyncretry(
max_tries: Optional[int],
exceptions: Tuple[Type[Exception], ...] = (Exception,),
pause: Number = 0,
giveup: Optional[Callable[[Exception], bool]] = None,
statistic_name: Optional[str] = None,
) -> Callable[
[Callable[P, Coroutine[Any, Any, T]]],
Callable[P, Coroutine[Any, Any, T]],
]:
"""
Shortcut of ``asyncbackoff(None, None, 0, Exception)``.
In case of exception function will be called again with similar
arguments after ``pause`` seconds.
:param max_tries: is maximum count of execution attempts
(>= 1 or ``None`` means infinity).
:param exceptions: similar as exc but keyword only.
:param giveup: is a predicate function which can decide by a given
:param pause: is time gap between execution attempts.
:param statistic_name: name filed for statistic instances
"""
return asyncbackoff(
attempt_timeout=None,
deadline=None,
exceptions=exceptions,
giveup=giveup,
max_tries=max_tries,
pause=pause,
statistic_class=RetryStatistic,
statistic_name=statistic_name,
)