Source code for aiomisc.timeout
import asyncio
from collections.abc import Callable, Coroutine
from functools import wraps
from typing import Any, ParamSpec, TypeVar
T = TypeVar("T")
P = ParamSpec("P")
Number = int | float
[docs]
def timeout(
value: Number,
) -> Callable[
[Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T]]
]:
def decorator(
func: Callable[P, Coroutine[Any, Any, T]],
) -> Callable[P, Coroutine[Any, Any, T]]:
if not asyncio.iscoroutinefunction(func):
raise TypeError("Function is not a coroutine function")
@wraps(func)
async def wrap(*args: P.args, **kwargs: P.kwargs) -> T:
return await asyncio.wait_for(func(*args, **kwargs), timeout=value)
return wrap
return decorator