Source code for aiomisc.io

import asyncio
import bz2
import gzip
import lzma
import sys
from collections.abc import Awaitable, Callable, Generator
from concurrent.futures import Executor
from enum import Enum
from functools import partial, total_ordering
from pathlib import Path
from typing import IO, Any, AnyStr, Generic, Literal, TextIO, TypeVar, overload

from .compat import EventLoopMixin, TypeAlias

T = TypeVar("T", bound=Any)
FilePath: TypeAlias = str | Path


def proxy_method_async(
    name: str, in_executor: bool = True
) -> Callable[..., Any]:
    def wrap_to_future(
        loop: asyncio.AbstractEventLoop,
        func: Callable[..., T],
        *args: Any,
        **kwargs: Any,
    ) -> asyncio.Future:
        future = loop.create_future()

        def _inner():  # type: ignore
            try:
                return future.set_result(func(*args, **kwargs))
            except Exception as e:
                return future.set_exception(e)

        loop.call_soon(_inner)
        return future

    def wrap_to_thread(
        loop: asyncio.AbstractEventLoop,
        func: Callable[..., T],
        executor: Executor,
        *args: Any,
        **kwargs: Any,
    ) -> Awaitable[T]:
        callee = partial(func, *args, **kwargs)
        # noinspection PyTypeChecker
        return loop.run_in_executor(executor, callee)

    async def method(self, *args, **kwargs):  # type: ignore
        func = getattr(self.fp, name)

        if in_executor:
            return await wrap_to_thread(
                self.loop, func, self.executor, *args, **kwargs
            )

        return await wrap_to_future(self.loop, func, *args, **kwargs)

    method.__name__ = name
    return method


[docs] @total_ordering class AsyncFileIO(EventLoopMixin, Generic[AnyStr]): __slots__ = ( "_fp", "executor", "__iterator_lock", ) + EventLoopMixin.__slots__ _fp: IO[AnyStr] | None
[docs] @staticmethod def get_opener() -> Callable[..., IO[AnyStr]]: return open
def __init__( self, fname: str | Path, mode: str = "r", executor: Executor | None = None, *args: Any, loop: asyncio.AbstractEventLoop | None = None, **kwargs: Any, ): self.executor = executor self._loop = loop self._fp = None self.__open_args = (str(fname), mode, *args), kwargs self.__iterator_lock = asyncio.Lock()
[docs] @classmethod def open_fp( cls, fp: IO, executor: Executor | None = None, loop: asyncio.AbstractEventLoop | None = None, ) -> "AsyncFileIO": async_fp = cls(fp.name, mode=fp.mode, executor=executor, loop=loop) async_fp._fp = fp return async_fp
[docs] async def open(self) -> None: if self._fp is not None: return args, kwargs = self.__open_args self._fp = await self.__execute_in_thread( self.get_opener(), *args, **kwargs )
@property def fp(self) -> IO[AnyStr]: if self._fp is not None: return self._fp raise RuntimeError("file is not opened")
[docs] def closed(self) -> bool: return self.fp.closed
def __await__(self) -> Generator[Any, Any, "AsyncFileIO"]: yield from self.open().__await__() return self async def __aenter__(self) -> "AsyncFileIO[AnyStr]": await self.open() return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: if self.fp is None: raise RuntimeError("file is not opened") await self.loop.run_in_executor( None, self.fp.__exit__, exc_type, exc_val, exc_tb ) def __del__(self) -> None: if not self.fp or self.fp.closed: return self.fp.close() del self._fp def __aiter__(self) -> "AsyncFileIO[AnyStr]": return self async def __anext__(self) -> AnyStr: async with self.__iterator_lock: line = await self.readline() if not len(line): raise StopAsyncIteration return line def __eq__(self, other: Any) -> bool: return (self.__class__, self.fp.__eq__(other)) == ( other.__class__, self.fp.__eq__(other), ) def __lt__(self, other: Any) -> bool: return self.fp < other.fp def __hash__(self) -> int: return hash((self.__class__, self.fp)) async def __execute_in_thread( self, method: Callable[..., Any], *args: Any, **kwargs: Any ) -> Any: return await self.loop.run_in_executor( self.executor, partial(method, *args, **kwargs) ) @property def mode(self) -> str: return self.fp.mode @property def name(self) -> str: return self.fp.name
[docs] def fileno(self) -> int: return self.fp.fileno()
[docs] def isatty(self) -> bool: return self.fp.isatty()
[docs] async def close(self) -> None: return await self.__execute_in_thread(self.fp.close)
def __getattr__(self, name: str) -> Callable[..., Awaitable[Any]]: async def method(*args: Any) -> Any: getter = getattr(self.fp, name) if callable(getter): return await self.__execute_in_thread(getter, *args) return getter method.__name__ = name return method
[docs] async def tell(self) -> int: return self.fp.tell()
[docs] async def readable(self) -> bool: return self.fp.readable()
[docs] async def seekable(self) -> bool: return self.fp.seekable()
[docs] async def writable(self) -> bool: return self.fp.writable()
[docs] async def flush(self) -> None: await self.__execute_in_thread(self.fp.flush)
[docs] async def read(self, n: int = -1) -> AnyStr: return await self.__execute_in_thread(self.fp.read, n)
[docs] async def readline(self, limit: int = -1) -> AnyStr: return await self.__execute_in_thread(self.fp.readline, limit)
[docs] async def readlines(self, limit: int = -1) -> list[AnyStr]: return await self.__execute_in_thread(self.fp.readlines, limit)
[docs] async def seek(self, offset: int, whence: int = 0) -> int: return await self.__execute_in_thread(self.fp.seek, offset, whence)
[docs] async def truncate(self, size: int | None = None) -> int: return await self.__execute_in_thread(self.fp.truncate, size)
[docs] async def write(self, s: AnyStr) -> int: return await self.__execute_in_thread(self.fp.write, s)
[docs] async def writelines(self, lines: list[AnyStr]) -> None: await self.__execute_in_thread(self.fp.writelines, lines)
AsyncFileIOBase: TypeAlias = AsyncFileIO
[docs] class AsyncBinaryIO(AsyncFileIO[bytes]): pass
[docs] class AsyncTextIO(AsyncFileIO[str]): @property def fp(self) -> TextIO: return self._fp # type: ignore @property def newlines(self) -> Any: return self.fp.newlines @property def errors(self) -> str | None: return self.fp.errors @property def line_buffering(self) -> int: return self.fp.line_buffering @property def encoding(self) -> str: return self.fp.encoding
[docs] def buffer(self) -> AsyncBinaryIO: return AsyncBinaryIO.open_fp(self.fp.buffer) # type: ignore
# Aliases AsyncBytesFileIO: TypeAlias = AsyncBinaryIO AsyncTextFileIO: TypeAlias = AsyncTextIO
[docs] class AsyncGzipBinaryIO(AsyncBytesFileIO):
[docs] @staticmethod def get_opener() -> Callable[..., IO[AnyStr]]: return gzip.open # type: ignore
[docs] class AsyncGzipTextIO(AsyncTextFileIO):
[docs] @staticmethod def get_opener() -> Callable[..., IO[AnyStr]]: return gzip.open # type: ignore
[docs] class AsyncBz2BinaryIO(AsyncBytesFileIO):
[docs] @staticmethod def get_opener() -> Callable[..., IO[AnyStr]]: return bz2.open # type: ignore
[docs] class AsyncBz2TextIO(AsyncTextFileIO):
[docs] @staticmethod def get_opener() -> Callable[..., IO[AnyStr]]: return bz2.open # type: ignore
[docs] class AsyncLzmaBinaryIO(AsyncBytesFileIO):
[docs] @staticmethod def get_opener() -> Callable[..., IO[AnyStr]]: return lzma.open # type: ignore
[docs] class AsyncLzmaTextIO(AsyncTextFileIO):
[docs] @staticmethod def get_opener() -> Callable[..., IO[AnyStr]]: return lzma.open # type: ignore
[docs] class Compression(Enum): NONE = (AsyncBinaryIO, AsyncTextIO) GZIP = (AsyncGzipBinaryIO, AsyncGzipTextIO) BZ2 = (AsyncBz2BinaryIO, AsyncBz2TextIO) LZMA = (AsyncLzmaBinaryIO, AsyncLzmaTextIO)
AsyncFileType: TypeAlias = AsyncFileIO[AnyStr] | AsyncTextIO | AsyncBinaryIO BinaryModes: TypeAlias = Literal[ "rb", "wb", "ab", "xb", "rb+", "wb+", "ab+", "xb+", "br", "br+", "bw+", "ba+", "bx+", ] TextModes: TypeAlias = Literal["r", "w", "a", "x", "r+", "w+", "a+", "x+"] @overload def async_open( fname: FilePath, mode: BinaryModes, compression: Compression = Compression.NONE, encoding: str = sys.getdefaultencoding(), *args: Any, **kwargs: Any, ) -> AsyncBinaryIO: ... @overload def async_open( fname: FilePath, mode: TextModes, compression: Compression = Compression.NONE, encoding: str = sys.getdefaultencoding(), *args: Any, **kwargs: Any, ) -> AsyncTextIO: ...
[docs] def async_open( fname: FilePath, mode: str = "r", compression: Compression = Compression.NONE, encoding: str = sys.getdefaultencoding(), *args: Any, **kwargs: Any, ) -> AsyncFileType: binary_io_class, text_io_class = compression.value if "b" in mode: return binary_io_class(fname, mode, *args, **kwargs) if "t" not in mode: mode = f"t{mode}" return text_io_class(fname, mode, encoding=encoding, *args, **kwargs)
__all__ = ( "AsyncBinaryIO", "AsyncBz2BinaryIO", "AsyncBz2TextIO", "AsyncFileIO", "AsyncFileType", "AsyncGzipBinaryIO", "AsyncGzipTextIO", "AsyncLzmaBinaryIO", "AsyncLzmaTextIO", "AsyncTextIO", "Compression", "async_open", )