Source code for aiomisc.timeout

import asyncio
from collections.abc import Callable, Coroutine
from functools import wraps
from typing import Any, ParamSpec, TypeVar

T = TypeVar("T")
P = ParamSpec("P")
Number = int | float


[docs] def timeout( value: Number, ) -> Callable[ [Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T]] ]: def decorator( func: Callable[P, Coroutine[Any, Any, T]], ) -> Callable[P, Coroutine[Any, Any, T]]: if not asyncio.iscoroutinefunction(func): raise TypeError("Function is not a coroutine function") @wraps(func) async def wrap(*args: P.args, **kwargs: P.kwargs) -> T: return await asyncio.wait_for(func(*args, **kwargs), timeout=value) return wrap return decorator