Source code for aiomisc.periodic

import asyncio
import logging
from typing import Any, Union

from .compat import EventLoopMixin
from .recurring import CallbackType, ExceptionsType, RecurringCallback

log = logging.getLogger(__name__)


[docs] class PeriodicCallback(EventLoopMixin): """ .. note:: When the periodic function executes longer then execution interval a next call would be skipped and warning would be logged. """ __slots__ = ("_recurring_callback", "_task") + EventLoopMixin.__slots__ _task: asyncio.Future | None def __init__(self, coroutine_func: CallbackType, *args: Any, **kwargs: Any): self._recurring_callback: RecurringCallback = RecurringCallback( coroutine_func, *args, **kwargs ) self._task: asyncio.Task | None = None
[docs] def start( self, interval: int | float, loop: asyncio.AbstractEventLoop | None = None, *, delay: int | float = 0, shield: bool = False, suppress_exceptions: ExceptionsType = (), ) -> None: assert interval if self._task and not self._task.done(): raise asyncio.InvalidStateError delayed = False def strategy(_: Any) -> int | float: nonlocal delayed if not delayed: delayed = True return delay return interval self._task = self._recurring_callback.start( strategy=strategy, shield=shield, loop=loop, suppress_exceptions=suppress_exceptions, ) def clean_task(_: Any) -> None: self._task = None self._task.add_done_callback(clean_task)
[docs] def stop(self, return_exceptions: bool = False) -> asyncio.Future: if self._task is None: self._task = self.loop.create_future() self._task.set_exception(RuntimeError("Callback not started")) elif not self._task.done(): self._task.cancel() task, self._task = self._task, None return asyncio.gather(task, return_exceptions=return_exceptions)
def __repr__(self) -> str: return f"{self.__class__.__name__}({self._recurring_callback.name})"