Source code for aiomisc.log

import asyncio
import logging.handlers
import threading
import traceback
from collections.abc import Callable, Iterable
from contextlib import suppress
from queue import Empty, Queue
from socket import socket
from typing import Any, Union

import aiomisc_log
from aiomisc_log.enum import LogFormat, LogLevel

from .counters import Statistic


class ThreadedHandlerStatistic(Statistic):
    threads: int
    records: int
    errors: int
    flushes: int


[docs] class ThreadedHandler(logging.Handler): def __init__( self, target: logging.Handler, flush_interval: float = 0.1, buffered: bool = True, queue_size: int = 0, ): super().__init__() self._buffered = buffered self._target = target self._flush_interval = flush_interval self._flush_event = threading.Event() self._queue: Queue[logging.LogRecord | None] = Queue(queue_size) self._close_event = threading.Event() self._thread = threading.Thread(target=self._in_thread, daemon=True) self._statistic = ThreadedHandlerStatistic()
[docs] def start(self) -> None: self._statistic.threads += 1 self._thread.start()
[docs] def close(self) -> None: self._queue.put(None) del self._queue self.flush() self._close_event.set() super().close()
[docs] def flush(self) -> None: self._statistic.flushes += 1 self._flush_event.set()
[docs] def emit(self, record: logging.LogRecord) -> None: if self._buffered: self._queue.put_nowait(record) else: self._queue.put(record) self._statistic.records += 1
def _in_thread(self) -> None: queue = self._queue while not self._close_event.is_set(): self._flush_event.wait(self._flush_interval) try: self.acquire() while True: record = queue.get(timeout=self._flush_interval) if record is None: return with suppress(Exception): self._target.handle(record) except Empty: pass finally: self.release() self._statistic.threads -= 1
def suppressor( callback: Callable[..., None], exceptions: tuple[type[BaseException], ...] = (Exception,), ) -> Callable[..., None]: def wrapper() -> None: with suppress(*exceptions): callback() return wrapper class UnhandledLoopHook(aiomisc_log.UnhandledHook): @staticmethod def _fill_transport_extra( transport: asyncio.Transport | None, extra: dict[str, Any] ) -> None: if transport is None: return extra["transport"] = repr(transport) for key in ( "peername", "sockname", "compression", "cipher", "peercert", "pipe", "subprocess", ): value = transport.get_extra_info(key) if value: extra[f"transport_{key}"] = value def __call__( self, loop: asyncio.AbstractEventLoop, context: dict[str, Any] ) -> None: context = dict(context) message: str = context.pop("message", "unhandled loop exception") exception: BaseException | None = context.pop("exception", None) future: asyncio.Future | None = context.pop("future", None) task: asyncio.Task | None = context.pop("task", None) handle: asyncio.Handle | None = context.pop("handle", None) protocol: asyncio.Protocol | None = context.pop("protocol", None) transport: asyncio.Transport | None = context.pop("transport", None) sock: socket | None = context.pop("socket", None) source_tb: list[traceback.FrameSummary] = ( context.pop("source_traceback", None) or [] ) if exception is None: if future is not None: exception = future.exception() elif task is not None and task.done(): exception = task.exception() extra = context if handle is not None: extra["handle"] = repr(handle) if protocol is not None: extra["protocol"] = repr(protocol) if sock is not None: extra["sock"] = repr(sock) self._fill_transport_extra(transport, extra) self.logger.exception(message, exc_info=exception, extra=extra) if source_tb: self.logger.error("".join(traceback.format_list(source_tb)))
[docs] def basic_config( level: int | str = LogLevel.default(), log_format: str | LogFormat = LogFormat.default(), buffered: bool = True, buffer_size: int = 0, flush_interval: int | float = 0.2, loop: asyncio.AbstractEventLoop | None = None, handlers: Iterable[logging.Handler] = (), **kwargs: Any, ) -> None: unhandled_hook = UnhandledLoopHook(logger_name="asyncio.unhandled") if loop is None: loop = asyncio.get_event_loop() forever_task = asyncio.gather(loop.create_future(), return_exceptions=True) loop.set_exception_handler(unhandled_hook) log_handlers = [] for user_handler in handlers: handler = ThreadedHandler( buffered=buffered, flush_interval=flush_interval, queue_size=buffer_size, target=user_handler, ) unhandled_hook.add_handler(handler) forever_task.add_done_callback(lambda _: handler.close()) log_handlers.append(handler) handler.start() aiomisc_log.basic_config( level=level, log_format=log_format, handlers=log_handlers, **kwargs )
__all__ = ("LogFormat", "LogLevel", "ThreadedHandler", "basic_config")