import asyncio
import logging
from datetime import datetime, timezone
from functools import partial
from typing import Any, Awaitable, Callable, Optional, Tuple, Type, Union
from croniter import croniter
from .compat import EventLoopMixin
from .recurring import RecurringCallback
log = logging.getLogger(__name__)
[docs]class CronCallback(EventLoopMixin):
"""
.. note::
When the cron function executes longer then execution interval a
next call will be skipping and warning will be logged.
"""
__slots__ = ("_recurring_cb", "_task") + EventLoopMixin.__slots__
def __init__(
self,
coroutine_func: Callable[..., Union[Any, Awaitable[Any]]],
*args: Any, **kwargs: Any,
):
self._recurring_cb = RecurringCallback(
coroutine_func, *args, **kwargs,
)
self._task: Optional[asyncio.Task] = None
[docs] @staticmethod
def get_next(cron: croniter, _: RecurringCallback) -> float:
timestamp = datetime.now(timezone.utc).timestamp()
next_date = cron.get_next(float, timestamp)
return next_date - timestamp
[docs] def start(
self,
spec: str,
loop: Optional[asyncio.AbstractEventLoop] = None,
*, shield: bool = False,
suppress_exceptions: Tuple[Type[Exception], ...] = (),
) -> None:
if self._task and not self._task.done():
raise asyncio.InvalidStateError
self._loop = loop
# noinspection PyAttributeOutsideInit
strategy = partial(self.get_next, croniter(spec))
self._task = self._recurring_cb.start(
strategy=strategy,
loop=loop,
shield=shield,
suppress_exceptions=suppress_exceptions,
)
[docs] def stop(self) -> asyncio.Future:
if self._task is None:
task = self.loop.create_future()
task.set_exception(RuntimeError("Callback not started"))
return task
elif not self._task.done():
self._task.cancel()
return self._task
def __repr__(self) -> str:
return f"<{self.__class__.__name__}({self._recurring_cb.name})>"