import asyncio
import atexit
import logging
import logging.handlers
import time
import traceback
from contextlib import suppress
from functools import partial
from socket import socket
from typing import (
Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union,
)
from weakref import finalize
import aiomisc_log
from aiomisc_log.enum import LogFormat, LogLevel
from .thread_pool import run_in_new_thread
def _thread_flusher(
handler: logging.handlers.MemoryHandler,
flush_interval: Union[float, int],
loop: asyncio.AbstractEventLoop,
) -> None:
def has_no_target() -> bool:
return True
def has_target() -> bool:
return bool(handler.target)
is_target = has_no_target
if isinstance(handler, logging.handlers.MemoryHandler):
is_target = has_target
while not loop.is_closed() and is_target():
with suppress(Exception):
if handler.buffer:
handler.flush()
time.sleep(flush_interval)
def suppressor(
callback: Callable[..., None],
exceptions: Tuple[Type[BaseException], ...] = (Exception,),
) -> Callable[..., None]:
def wrapper() -> None:
with suppress(*exceptions):
callback()
return wrapper
def wrap_logging_handler(
handler: logging.Handler,
loop: Optional[asyncio.AbstractEventLoop] = None,
buffer_size: int = 1024,
flush_interval: Union[float, int] = 0.1,
) -> logging.Handler:
buffered_handler = logging.handlers.MemoryHandler(
buffer_size,
target=handler,
flushLevel=logging.CRITICAL,
)
run_in_new_thread(
_thread_flusher, args=(
buffered_handler, flush_interval, loop,
), no_return=True, statistic_name="logger",
)
at_exit_flusher = suppressor(handler.flush)
atexit.register(at_exit_flusher)
finalize(buffered_handler, partial(atexit.unregister, at_exit_flusher))
return buffered_handler
class UnhandledLoopHook(aiomisc_log.UnhandledHook):
@staticmethod
def _fill_transport_extra(
transport: Optional[asyncio.Transport],
extra: Dict[str, Any],
) -> None:
if transport is None:
return
extra["transport"] = repr(transport)
for key in (
"peername", "sockname", "compression",
"cipher", "peercert", "pipe", "subprocess",
):
value = transport.get_extra_info(key)
if value:
extra[f"transport_{key}"] = value
def __call__(
self, loop: asyncio.AbstractEventLoop,
context: Dict[str, Any],
) -> None:
context = dict(context)
message: str = context.pop("message", "unhandled loop exception")
exception: Optional[BaseException] = context.pop("exception", None)
future: Optional[asyncio.Future] = context.pop("future", None)
task: Optional[asyncio.Task] = context.pop("task", None)
handle: Optional[asyncio.Handle] = context.pop("handle", None)
protocol: Optional[asyncio.Protocol] = context.pop("protocol", None)
transport: Optional[asyncio.Transport] = context.pop("transport", None)
sock: Optional[socket] = context.pop("socket", None)
source_traceback: List[traceback.FrameSummary] = (
context.pop("source_traceback", None) or []
)
if exception is None:
if future is not None:
exception = future.exception()
elif task is not None and task.done():
exception = task.exception()
extra = context
if handle is not None:
extra["handle"] = repr(handle)
if protocol is not None:
extra["protocol"] = repr(protocol)
if sock is not None:
extra["sock"] = repr(sock)
self._fill_transport_extra(transport, extra)
self.logger.exception(message, exc_info=exception, extra=extra)
if source_traceback:
self.logger.error(
"".join(traceback.format_list(source_traceback)),
)
[docs]def basic_config(
level: Union[int, str] = LogLevel.default(),
log_format: Union[str, LogFormat] = LogFormat.default(),
buffered: bool = True, buffer_size: int = 1024,
flush_interval: Union[int, float] = 0.2,
loop: Optional[asyncio.AbstractEventLoop] = None,
handlers: Iterable[logging.Handler] = (),
**kwargs: Any,
) -> None:
loop = loop or asyncio.get_event_loop()
unhandled_hook = UnhandledLoopHook(logger_name="asyncio.unhandled")
def wrap_handler(handler: logging.Handler) -> logging.Handler:
nonlocal buffer_size, buffered, loop, unhandled_hook
unhandled_hook.add_handler(handler)
if buffered:
return wrap_logging_handler(
handler=handler,
buffer_size=buffer_size,
flush_interval=flush_interval,
loop=loop,
)
return handler
aiomisc_log.basic_config(
level=level,
log_format=log_format,
handler_wrapper=wrap_handler,
handlers=handlers,
**kwargs,
)
loop.set_exception_handler(unhandled_hook)
__all__ = (
"LogFormat",
"LogLevel",
"basic_config",
)