Source code for aiomisc.compat

import asyncio
import logging
import os
import socket
import sys
from typing import Any, Iterator, Optional

from ._context_vars import EVENT_LOOP


log = logging.getLogger(__name__)

try:
    from time import time_ns
except ImportError:
    from time import time

    def time_ns() -> int:
        return int(time() * 1000000000)

try:
    from typing import final
except ImportError:
    from typing_extensions import final  # type: ignore


try:
    from typing import TypeAlias
except ImportError:
    from typing_extensions import TypeAlias


if sys.version_info >= (3, 10):
    from typing import ParamSpec
else:
    from typing_extensions import ParamSpec


if sys.version_info >= (3, 8):
    from typing import Protocol
else:
    from typing_extensions import Protocol


[docs]class EntrypointProtocol(Protocol): @property def name(self) -> str: ...
[docs] def load(self) -> Any: ...
# noinspection PyUnresolvedReferences try: from importlib.metadata import Distribution, EntryPoint def entry_pont_iterator(entry_point: str) -> Iterator[EntrypointProtocol]: ep: EntryPoint for dist in Distribution.discover(): for ep in dist.entry_points: if ep.group == entry_point: yield ep except ImportError: import pkg_resources
[docs] def entry_pont_iterator(entry_point: str) -> Iterator[EntrypointProtocol]: yield from pkg_resources.iter_entry_points(entry_point)
[docs]class EventLoopMixin: __slots__ = "_loop", _loop: Optional[asyncio.AbstractEventLoop] @property def loop(self) -> asyncio.AbstractEventLoop: if not getattr(self, "_loop", None): self._loop = asyncio.get_running_loop() return self._loop # type: ignore
event_loop_policy: asyncio.AbstractEventLoopPolicy try: import uvloop if ( os.getenv("AIOMISC_USE_UVLOOP", "1").lower() in ("yes", "1", "enabled", "enable", "on", "true") ): event_loop_policy = uvloop.EventLoopPolicy() else: event_loop_policy = asyncio.DefaultEventLoopPolicy() except ImportError: event_loop_policy = asyncio.DefaultEventLoopPolicy() if hasattr(socket, "TCP_NODELAY"): def sock_set_nodelay(sock: socket.socket) -> None: if sock.proto != socket.IPPROTO_TCP: return None sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) else:
[docs] def sock_set_nodelay(sock: socket.socket) -> None: return None
if hasattr(socket, "SO_REUSEPORT"): def sock_set_reuseport(sock: socket.socket, reuse_port: bool) -> None: sock.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEPORT, reuse_port.real, ) else:
[docs] def sock_set_reuseport(sock: socket.socket, reuse_port: bool) -> None: log.debug( "SO_REUSEPORT is not implemented by " "underlying library. Skipping.", )
# 16.x.x reverse compatibility set_current_loop = EVENT_LOOP.set get_current_loop = EVENT_LOOP.get __all__ = ( "EntrypointProtocol", "EventLoopMixin", "ParamSpec", "Protocol", "TypeAlias", "entry_pont_iterator", "event_loop_policy", "final", "get_current_loop", "set_current_loop", "sock_set_nodelay", "sock_set_reuseport", "time_ns", )