Source code for aiomisc.log

import asyncio
import atexit
import logging
import logging.handlers
import time
import traceback
from contextlib import suppress
from functools import partial
from socket import socket
from typing import (
    Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union,
from weakref import finalize

import aiomisc_log
from aiomisc_log.enum import LogFormat, LogLevel

from .thread_pool import run_in_new_thread

def _thread_flusher(
    handler: logging.handlers.MemoryHandler,
    flush_interval: Union[float, int],
    loop: asyncio.AbstractEventLoop,
) -> None:
    def has_no_target() -> bool:
        return True

    def has_target() -> bool:
        return bool(

    is_target = has_no_target

    if isinstance(handler, logging.handlers.MemoryHandler):
        is_target = has_target

    while not loop.is_closed() and is_target():
        with suppress(Exception):
            if handler.buffer:


def suppressor(
    callback: Callable[..., None],
    exceptions: Tuple[Type[BaseException], ...] = (Exception,),
) -> Callable[..., None]:
    def wrapper() -> None:
        with suppress(*exceptions):
    return wrapper

def wrap_logging_handler(
    handler: logging.Handler,
    loop: Optional[asyncio.AbstractEventLoop] = None,
    buffer_size: int = 1024,
    flush_interval: Union[float, int] = 0.1,
) -> logging.Handler:
    buffered_handler = logging.handlers.MemoryHandler(

        _thread_flusher, args=(
            buffered_handler, flush_interval, loop,
        ), no_return=True, statistic_name="logger",

    at_exit_flusher = suppressor(handler.flush)
    finalize(buffered_handler, partial(atexit.unregister, at_exit_flusher))

    return buffered_handler

class UnhandledLoopHook(aiomisc_log.UnhandledHook):
    def _fill_transport_extra(
        transport: Optional[asyncio.Transport],
        extra: Dict[str, Any],
    ) -> None:
        if transport is None:

        extra["transport"] = repr(transport)

        for key in (
            "peername", "sockname", "compression",
            "cipher", "peercert", "pipe", "subprocess",
            value = transport.get_extra_info(key)
            if value:
                extra[f"transport_{key}"] = value

    def __call__(
        self, loop: asyncio.AbstractEventLoop,
        context: Dict[str, Any],
    ) -> None:
        context = dict(context)
        message: str = context.pop("message", "unhandled loop exception")
        exception: Optional[BaseException] = context.pop("exception", None)
        future: Optional[asyncio.Future] = context.pop("future", None)
        task: Optional[asyncio.Task] = context.pop("task", None)
        handle: Optional[asyncio.Handle] = context.pop("handle", None)
        protocol: Optional[asyncio.Protocol] = context.pop("protocol", None)
        transport: Optional[asyncio.Transport] = context.pop("transport", None)
        sock: Optional[socket] = context.pop("socket", None)
        source_traceback: List[traceback.FrameSummary] = (
            context.pop("source_traceback", None) or []

        if exception is None:
            if future is not None:
                exception = future.exception()
            elif task is not None and task.done():
                exception = task.exception()

        extra = context
        if handle is not None:
            extra["handle"] = repr(handle)
        if protocol is not None:
            extra["protocol"] = repr(protocol)
        if sock is not None:
            extra["sock"] = repr(sock)

        self._fill_transport_extra(transport, extra)
        self.logger.exception(message, exc_info=exception, extra=extra)
        if source_traceback:

[docs]def basic_config( level: Union[int, str] = LogLevel.default(), log_format: Union[str, LogFormat] = LogFormat.default(), buffered: bool = True, buffer_size: int = 1024, flush_interval: Union[int, float] = 0.2, loop: Optional[asyncio.AbstractEventLoop] = None, handlers: Iterable[logging.Handler] = (), **kwargs: Any, ) -> None: loop = loop or asyncio.get_event_loop() unhandled_hook = UnhandledLoopHook(logger_name="asyncio.unhandled") def wrap_handler(handler: logging.Handler) -> logging.Handler: nonlocal buffer_size, buffered, loop, unhandled_hook unhandled_hook.add_handler(handler) if buffered: return wrap_logging_handler( handler=handler, buffer_size=buffer_size, flush_interval=flush_interval, loop=loop, ) return handler aiomisc_log.basic_config( level=level, log_format=log_format, handler_wrapper=wrap_handler, handlers=handlers, **kwargs, ) loop.set_exception_handler(unhandled_hook)
__all__ = ( "LogFormat", "LogLevel", "basic_config", )