import asyncio
import logging.handlers
import threading
import traceback
from collections.abc import Callable, Iterable
from contextlib import suppress
from queue import Empty, Queue
from socket import socket
from typing import Any, Union
import aiomisc_log
from aiomisc_log.enum import LogFormat, LogLevel
from .counters import Statistic
class ThreadedHandlerStatistic(Statistic):
threads: int
records: int
errors: int
flushes: int
[docs]
class ThreadedHandler(logging.Handler):
def __init__(
self,
target: logging.Handler,
flush_interval: float = 0.1,
buffered: bool = True,
queue_size: int = 0,
):
super().__init__()
self._buffered = buffered
self._target = target
self._flush_interval = flush_interval
self._flush_event = threading.Event()
self._queue: Queue[logging.LogRecord | None] = Queue(queue_size)
self._close_event = threading.Event()
self._thread = threading.Thread(target=self._in_thread, daemon=True)
self._statistic = ThreadedHandlerStatistic()
[docs]
def start(self) -> None:
self._statistic.threads += 1
self._thread.start()
[docs]
def close(self) -> None:
self._queue.put(None)
del self._queue
self.flush()
self._close_event.set()
super().close()
[docs]
def flush(self) -> None:
self._statistic.flushes += 1
self._flush_event.set()
[docs]
def emit(self, record: logging.LogRecord) -> None:
if self._buffered:
self._queue.put_nowait(record)
else:
self._queue.put(record)
self._statistic.records += 1
def _in_thread(self) -> None:
queue = self._queue
while not self._close_event.is_set():
self._flush_event.wait(self._flush_interval)
try:
self.acquire()
while True:
record = queue.get(timeout=self._flush_interval)
if record is None:
return
with suppress(Exception):
self._target.handle(record)
except Empty:
pass
finally:
self.release()
self._statistic.threads -= 1
def suppressor(
callback: Callable[..., None],
exceptions: tuple[type[BaseException], ...] = (Exception,),
) -> Callable[..., None]:
def wrapper() -> None:
with suppress(*exceptions):
callback()
return wrapper
class UnhandledLoopHook(aiomisc_log.UnhandledHook):
@staticmethod
def _fill_transport_extra(
transport: asyncio.Transport | None, extra: dict[str, Any]
) -> None:
if transport is None:
return
extra["transport"] = repr(transport)
for key in (
"peername",
"sockname",
"compression",
"cipher",
"peercert",
"pipe",
"subprocess",
):
value = transport.get_extra_info(key)
if value:
extra[f"transport_{key}"] = value
def __call__(
self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
) -> None:
context = dict(context)
message: str = context.pop("message", "unhandled loop exception")
exception: BaseException | None = context.pop("exception", None)
future: asyncio.Future | None = context.pop("future", None)
task: asyncio.Task | None = context.pop("task", None)
handle: asyncio.Handle | None = context.pop("handle", None)
protocol: asyncio.Protocol | None = context.pop("protocol", None)
transport: asyncio.Transport | None = context.pop("transport", None)
sock: socket | None = context.pop("socket", None)
source_tb: list[traceback.FrameSummary] = (
context.pop("source_traceback", None) or []
)
if exception is None:
if future is not None:
exception = future.exception()
elif task is not None and task.done():
exception = task.exception()
extra = context
if handle is not None:
extra["handle"] = repr(handle)
if protocol is not None:
extra["protocol"] = repr(protocol)
if sock is not None:
extra["sock"] = repr(sock)
self._fill_transport_extra(transport, extra)
self.logger.exception(message, exc_info=exception, extra=extra)
if source_tb:
self.logger.error("".join(traceback.format_list(source_tb)))
[docs]
def basic_config(
level: int | str = LogLevel.default(),
log_format: str | LogFormat = LogFormat.default(),
buffered: bool = True,
buffer_size: int = 0,
flush_interval: int | float = 0.2,
loop: asyncio.AbstractEventLoop | None = None,
handlers: Iterable[logging.Handler] = (),
**kwargs: Any,
) -> None:
unhandled_hook = UnhandledLoopHook(logger_name="asyncio.unhandled")
if loop is None:
loop = asyncio.get_event_loop()
forever_task = asyncio.gather(loop.create_future(), return_exceptions=True)
loop.set_exception_handler(unhandled_hook)
log_handlers = []
for user_handler in handlers:
handler = ThreadedHandler(
buffered=buffered,
flush_interval=flush_interval,
queue_size=buffer_size,
target=user_handler,
)
unhandled_hook.add_handler(handler)
forever_task.add_done_callback(lambda _: handler.close())
log_handlers.append(handler)
handler.start()
aiomisc_log.basic_config(
level=level, log_format=log_format, handlers=log_handlers, **kwargs
)
__all__ = ("LogFormat", "LogLevel", "ThreadedHandler", "basic_config")