import asyncio
import logging.handlers
import threading
import traceback
import warnings
from contextlib import suppress
from queue import Empty, Queue
from socket import socket
from typing import (
Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union,
)
import aiomisc_log
from aiomisc_log.enum import LogFormat, LogLevel
from .counters import Statistic
class ThreadedHandlerStatistic(Statistic):
threads: int
records: int
errors: int
flushes: int
[docs]class ThreadedHandler(logging.Handler):
def __init__(
self, target: logging.Handler, flush_interval: float = 0.1,
buffered: bool = True, queue_size: int = 0,
):
super().__init__()
self._buffered = buffered
self._target = target
self._flush_interval = flush_interval
self._flush_event = threading.Event()
self._queue: Queue[Optional[logging.LogRecord]] = Queue(queue_size)
self._close_event = threading.Event()
self._thread = threading.Thread(target=self._in_thread, daemon=True)
self._statistic = ThreadedHandlerStatistic()
[docs] def start(self) -> None:
self._statistic.threads += 1
self._thread.start()
[docs] def close(self) -> None:
self._queue.put(None)
del self._queue
self.flush()
self._close_event.set()
super().close()
[docs] def flush(self) -> None:
self._statistic.flushes += 1
self._flush_event.set()
[docs] def emit(self, record: logging.LogRecord) -> None:
if self._buffered:
self._queue.put_nowait(record)
else:
self._queue.put(record)
self._statistic.records += 1
def _in_thread(self) -> None:
queue = self._queue
while not self._close_event.is_set():
self._flush_event.wait(self._flush_interval)
try:
self.acquire()
while True:
record = queue.get(timeout=self._flush_interval)
if record is None:
return
with suppress(Exception):
self._target.handle(record)
except Empty:
pass
finally:
self.release()
self._statistic.threads -= 1
def suppressor(
callback: Callable[..., None],
exceptions: Tuple[Type[BaseException], ...] = (Exception,),
) -> Callable[..., None]:
def wrapper() -> None:
with suppress(*exceptions):
callback()
return wrapper
def wrap_logging_handler(
handler: logging.Handler,
buffer_size: int = 1024,
flush_interval: Union[float, int] = 0.1,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> logging.Handler:
warnings.warn("wrap_logging_handler is deprecated", DeprecationWarning)
handler = ThreadedHandler(
target=handler,
queue_size=buffer_size,
flush_interval=flush_interval,
)
handler.start()
return handler
class UnhandledLoopHook(aiomisc_log.UnhandledHook):
@staticmethod
def _fill_transport_extra(
transport: Optional[asyncio.Transport],
extra: Dict[str, Any],
) -> None:
if transport is None:
return
extra["transport"] = repr(transport)
for key in (
"peername", "sockname", "compression",
"cipher", "peercert", "pipe", "subprocess",
):
value = transport.get_extra_info(key)
if value:
extra[f"transport_{key}"] = value
def __call__(
self, loop: asyncio.AbstractEventLoop,
context: Dict[str, Any],
) -> None:
context = dict(context)
message: str = context.pop("message", "unhandled loop exception")
exception: Optional[BaseException] = context.pop("exception", None)
future: Optional[asyncio.Future] = context.pop("future", None)
task: Optional[asyncio.Task] = context.pop("task", None)
handle: Optional[asyncio.Handle] = context.pop("handle", None)
protocol: Optional[asyncio.Protocol] = context.pop("protocol", None)
transport: Optional[asyncio.Transport] = context.pop("transport", None)
sock: Optional[socket] = context.pop("socket", None)
source_tb: List[traceback.FrameSummary] = (
context.pop("source_traceback", None) or []
)
if exception is None:
if future is not None:
exception = future.exception()
elif task is not None and task.done():
exception = task.exception()
extra = context
if handle is not None:
extra["handle"] = repr(handle)
if protocol is not None:
extra["protocol"] = repr(protocol)
if sock is not None:
extra["sock"] = repr(sock)
self._fill_transport_extra(transport, extra)
self.logger.exception(message, exc_info=exception, extra=extra)
if source_tb:
self.logger.error("".join(traceback.format_list(source_tb)))
[docs]def basic_config(
level: Union[int, str] = LogLevel.default(),
log_format: Union[str, LogFormat] = LogFormat.default(),
buffered: bool = True,
buffer_size: int = 0,
flush_interval: Union[int, float] = 0.2,
loop: Optional[asyncio.AbstractEventLoop] = None,
handlers: Iterable[logging.Handler] = (),
**kwargs: Any,
) -> None:
unhandled_hook = UnhandledLoopHook(logger_name="asyncio.unhandled")
if loop is None:
loop = asyncio.get_event_loop()
forever_task = asyncio.gather(
loop.create_future(), return_exceptions=True,
)
loop.set_exception_handler(unhandled_hook)
log_handlers = []
for user_handler in handlers:
handler = ThreadedHandler(
buffered=buffered,
flush_interval=flush_interval,
queue_size=buffer_size,
target=user_handler,
)
unhandled_hook.add_handler(handler)
forever_task.add_done_callback(lambda _: handler.close())
log_handlers.append(handler)
handler.start()
aiomisc_log.basic_config(
level=level, log_format=log_format, handlers=log_handlers, **kwargs,
)
__all__ = (
"LogFormat",
"LogLevel",
"basic_config",
"ThreadedHandler",
)