Source code for aiomisc.compat
import asyncio
import logging
import os
import socket
from collections.abc import Callable, Iterator
from importlib.metadata import Distribution, EntryPoint
from time import time_ns
from typing import (
Any,
Concatenate,
ParamSpec,
Protocol,
TypeAlias,
TypeVar,
final,
)
log = logging.getLogger(__name__)
[docs]
class EntrypointProtocol(Protocol):
@property
def name(self) -> str: ...
[docs]
def load(self) -> Any: ...
[docs]
def entry_pont_iterator(entry_point: str) -> Iterator[EntrypointProtocol]:
ep: EntryPoint
for dist in Distribution.discover():
for ep in dist.entry_points:
if ep.group == entry_point:
yield ep
[docs]
class EventLoopMixin:
__slots__ = ("_loop",)
_loop: asyncio.AbstractEventLoop | None
@property
def loop(self) -> asyncio.AbstractEventLoop:
if not getattr(self, "_loop", None):
self._loop = asyncio.get_running_loop()
return self._loop # type: ignore
# Check if uvloop is available and enabled
_uvloop_module: Any = None
try:
import uvloop as _uvloop_module_import
if os.getenv("AIOMISC_USE_UVLOOP", "1").lower() in (
"yes",
"1",
"enabled",
"enable",
"on",
"true",
):
_uvloop_module = _uvloop_module_import
except ImportError:
pass
LoopFactoryType = Callable[[], asyncio.AbstractEventLoop]
T = TypeVar("T")
# Use native asyncio.Runner (Python 3.11+)
Runner = asyncio.Runner
[docs]
def default_loop_factory() -> asyncio.AbstractEventLoop:
"""Default loop factory - uses uvloop if available, otherwise asyncio."""
if _uvloop_module is not None:
return _uvloop_module.new_event_loop()
return asyncio.new_event_loop()
if hasattr(socket, "TCP_NODELAY"):
def sock_set_nodelay(sock: socket.socket) -> None:
if sock.proto != socket.IPPROTO_TCP:
return None
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
else:
[docs]
def sock_set_nodelay(sock: socket.socket) -> None:
return None
if hasattr(socket, "SO_REUSEPORT"):
def sock_set_reuseport(sock: socket.socket, reuse_port: bool) -> None:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, reuse_port.real)
else:
[docs]
def sock_set_reuseport(sock: socket.socket, reuse_port: bool) -> None:
log.debug(
"SO_REUSEPORT is not implemented by underlying library. Skipping."
)
__all__ = (
"Concatenate",
"EntrypointProtocol",
"EventLoopMixin",
"LoopFactoryType",
"ParamSpec",
"Protocol",
"Runner",
"TypeAlias",
"default_loop_factory",
"entry_pont_iterator",
"final",
"sock_set_nodelay",
"sock_set_reuseport",
"time_ns",
)