import asyncio
import sys
from functools import wraps
from typing import (
Any, Callable, Coroutine, Optional, Tuple, Type, TypeVar, Union,
)
from .counters import Statistic
from .timeout import timeout
if sys.version_info >= (3, 10):
from typing import ParamSpec
else:
from typing_extensions import ParamSpec
Number = Union[int, float]
T = TypeVar("T")
P = ParamSpec("P")
[docs]class BackoffStatistic(Statistic):
done: int
attempts: int
cancels: int
errors: int
sum_time: float
[docs]class RetryStatistic(BackoffStatistic):
pass
# noinspection SpellCheckingInspection
[docs]def asyncbackoff(
attempt_timeout: Optional[Number],
deadline: Optional[Number],
pause: Number = 0,
*exc: Type[Exception], exceptions: Tuple[Type[Exception], ...] = (),
max_tries: Optional[int] = None,
giveup: Optional[Callable[[Exception], bool]] = None,
statistic_name: Optional[str] = None,
statistic_class: Type[BackoffStatistic] = BackoffStatistic,
) -> Callable[
[Callable[P, Coroutine[Any, Any, T]]],
Callable[P, Coroutine[Any, Any, T]],
]:
"""
Patametric decorator that ensures that ``attempt_timeout`` and
``deadline`` time limits are met by decorated function.
In case of exception function will be called again with similar
arguments after ``pause`` seconds.
:param statistic_name: name filed for statistic instances
:param attempt_timeout: is maximum execution time for one
execution attempt.
:param deadline: is maximum execution time for all execution attempts.
:param pause: is time gap between execution attempts.
:param exc: retrying when this exceptions was raised.
:param exceptions: similar as exc but keyword only.
:param max_tries: is maximum count of execution attempts (>= 1).
:param giveup: is a predicate function which can decide by a given
:param statistic_class: statistic class
"""
exceptions = exc + tuple(exceptions)
statistic = statistic_class(statistic_name)
if not pause:
pause = 0
elif pause < 0:
raise ValueError("'pause' must be positive")
if attempt_timeout is not None and attempt_timeout < 0:
raise ValueError("'attempt_timeout' must be positive or None")
if deadline is not None and deadline < 0:
raise ValueError("'deadline' must be positive or None")
if max_tries is not None and max_tries < 1:
raise ValueError("'max_retries' must be >= 1 or None")
if giveup is not None and not callable(giveup):
raise ValueError("'giveup' must be a callable or None")
exceptions = tuple(exceptions) or ()
exceptions += asyncio.TimeoutError,
def decorator(
func: Callable[P, Coroutine[Any, Any, T]]
) -> Callable[P, Coroutine[Any, Any, T]]:
if attempt_timeout is not None:
func = timeout(attempt_timeout)(func)
@wraps(func)
async def wrap(*args: P.args, **kwargs: P.kwargs) -> T:
last_exc = None
tries = 0
async def run() -> Any:
nonlocal last_exc, tries
loop = asyncio.get_running_loop()
while True:
statistic.attempts += 1
tries += 1
delta = -loop.time()
try:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=attempt_timeout,
)
except asyncio.CancelledError:
statistic.cancels += 1
raise
except exceptions as e:
statistic.errors += 1
last_exc = e
if max_tries is not None and tries >= max_tries:
raise
if giveup and giveup(e):
raise
await asyncio.sleep(pause)
except Exception as e:
last_exc = e
raise
finally:
delta += loop.time()
statistic.sum_time += delta
statistic.done += 1
try:
return await asyncio.wait_for(run(), timeout=deadline)
except Exception:
if last_exc:
raise last_exc
raise
return wrap
return decorator
[docs]def asyncretry(
max_tries: Optional[int],
exceptions: Tuple[Type[Exception], ...] = (Exception,),
pause: Number = 0,
giveup: Optional[Callable[[Exception], bool]] = None,
statistic_name: Optional[str] = None,
) -> Callable[
[Callable[P, Coroutine[Any, Any, T]]],
Callable[P, Coroutine[Any, Any, T]],
]:
"""
Shortcut of ``asyncbackoff(None, None, 0, Exception)``.
In case of exception function will be called again with similar
arguments after ``pause`` seconds.
:param max_tries: is maximum count of execution attempts
(>= 1 or ``None`` means infinity).
:param exceptions: similar as exc but keyword only.
:param giveup: is a predicate function which can decide by a given
:param pause: is time gap between execution attempts.
:param statistic_name: name filed for statistic instances
"""
return asyncbackoff(
attempt_timeout=None,
deadline=None,
exceptions=exceptions,
giveup=giveup,
max_tries=max_tries,
pause=pause,
statistic_class=RetryStatistic,
statistic_name=statistic_name,
)