import asyncio
import bz2
import gzip
import lzma
import sys
from concurrent.futures import Executor
from enum import Enum
from functools import partial, total_ordering
from pathlib import Path
from typing import (
IO, Any, AnyStr, Awaitable, Callable, Generator, Generic, List, Literal,
Optional, TextIO, TypeVar, Union, overload,
)
from .compat import EventLoopMixin, TypeAlias
T = TypeVar("T", bound=Any)
FilePath: TypeAlias = Union[str, Path]
def proxy_method_async(
name: str,
in_executor: bool = True,
) -> Callable[..., Any]:
def wrap_to_future(
loop: asyncio.AbstractEventLoop,
func: Callable[..., T],
*args: Any, **kwargs: Any,
) -> asyncio.Future:
future = loop.create_future()
def _inner(): # type: ignore
try:
return future.set_result(func(*args, **kwargs))
except Exception as e:
return future.set_exception(e)
loop.call_soon(_inner)
return future
def wrap_to_thread(
loop: asyncio.AbstractEventLoop, func: Callable[..., T],
executor: Executor,
*args: Any, **kwargs: Any,
) -> Awaitable[T]:
callee = partial(func, *args, **kwargs)
# noinspection PyTypeChecker
return loop.run_in_executor(executor, callee)
async def method(self, *args, **kwargs): # type: ignore
func = getattr(self.fp, name)
if in_executor:
return await wrap_to_thread(
self.loop, func, self.executor, *args, **kwargs,
)
return await wrap_to_future(self.loop, func, *args, **kwargs)
method.__name__ = name
return method
[docs]@total_ordering
class AsyncFileIO(EventLoopMixin, Generic[AnyStr]):
__slots__ = (
"_fp", "executor", "__iterator_lock",
) + EventLoopMixin.__slots__
_fp: Optional[IO[AnyStr]]
[docs] @staticmethod
def get_opener() -> Callable[..., IO[AnyStr]]:
return open
def __init__(
self, fname: Union[str, Path], mode: str = "r",
executor: Optional[Executor] = None, *args: Any,
loop: Optional[asyncio.AbstractEventLoop] = None, **kwargs: Any,
):
self.executor = executor
self._loop = loop
self._fp = None
self.__open_args = (str(fname), mode, *args), kwargs
self.__iterator_lock = asyncio.Lock()
[docs] @classmethod
def open_fp(
cls, fp: IO, executor: Optional[Executor] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> "AsyncFileIO":
async_fp = cls(fp.name, mode=fp.mode, executor=executor, loop=loop)
async_fp._fp = fp
return async_fp
[docs] async def open(self) -> None:
if self._fp is not None:
return
args, kwargs = self.__open_args
self._fp = await self.__execute_in_thread(
self.get_opener(), *args, **kwargs,
)
@property
def fp(self) -> IO[AnyStr]:
if self._fp is not None:
return self._fp
raise RuntimeError("file is not opened")
[docs] def closed(self) -> bool:
return self.fp.closed
def __await__(self) -> Generator[Any, Any, "AsyncFileIO"]:
yield from self.open().__await__()
return self
async def __aenter__(self) -> "AsyncFileIO[AnyStr]":
await self.open()
return self
async def __aexit__(
self, exc_type: Any,
exc_val: Any, exc_tb: Any,
) -> None:
if self.fp is None:
raise RuntimeError("file is not opened")
await self.loop.run_in_executor(
None, self.fp.__exit__, exc_type, exc_val, exc_tb,
)
def __del__(self) -> None:
if not self.fp or self.fp.closed:
return
self.fp.close()
del self._fp
def __aiter__(self) -> "AsyncFileIO[AnyStr]":
return self
async def __anext__(self) -> AnyStr:
async with self.__iterator_lock:
line = await self.readline()
if not len(line):
raise StopAsyncIteration
return line
def __eq__(self, other: Any) -> bool:
return (
self.__class__, self.fp.__eq__(other),
) == (
other.__class__, self.fp.__eq__(other),
)
def __lt__(self, other: Any) -> bool:
return self.fp < other.fp
def __hash__(self) -> int:
return hash((self.__class__, self.fp))
async def __execute_in_thread(
self, method: Callable[..., Any], *args: Any, **kwargs: Any,
) -> Any:
return await self.loop.run_in_executor(
self.executor, partial(method, *args, **kwargs),
)
@property
def mode(self) -> str:
return self.fp.mode
@property
def name(self) -> str:
return self.fp.name
[docs] def fileno(self) -> int:
return self.fp.fileno()
[docs] def isatty(self) -> bool:
return self.fp.isatty()
[docs] async def close(self) -> None:
return await self.__execute_in_thread(self.fp.close)
def __getattr__(self, name: str) -> Callable[..., Awaitable[Any]]:
async def method(*args: Any) -> Any:
getter = getattr(self.fp, name)
if callable(getter):
return await self.__execute_in_thread(getter, *args)
return getter
method.__name__ = name
return method
[docs] async def tell(self) -> int:
return self.fp.tell()
[docs] async def readable(self) -> bool:
return self.fp.readable()
[docs] async def seekable(self) -> bool:
return self.fp.seekable()
[docs] async def writable(self) -> bool:
return self.fp.writable()
[docs] async def flush(self) -> None:
await self.__execute_in_thread(self.fp.flush)
[docs] async def read(self, n: int = -1) -> AnyStr:
return await self.__execute_in_thread(self.fp.read, n)
[docs] async def readline(self, limit: int = -1) -> AnyStr:
return await self.__execute_in_thread(self.fp.readline, limit)
[docs] async def readlines(self, limit: int = -1) -> List[AnyStr]:
return await self.__execute_in_thread(self.fp.readlines, limit)
[docs] async def seek(self, offset: int, whence: int = 0) -> int:
return await self.__execute_in_thread(self.fp.seek, offset, whence)
[docs] async def truncate(self, size: Optional[int] = None) -> int:
return await self.__execute_in_thread(self.fp.truncate, size)
[docs] async def write(self, s: AnyStr) -> int:
return await self.__execute_in_thread(self.fp.write, s)
[docs] async def writelines(self, lines: List[AnyStr]) -> None:
await self.__execute_in_thread(self.fp.writelines, lines)
AsyncFileIOBase: TypeAlias = AsyncFileIO
[docs]class AsyncBinaryIO(AsyncFileIO[bytes]):
pass
[docs]class AsyncTextIO(AsyncFileIO[str]):
@property
def fp(self) -> TextIO:
return self._fp # type: ignore
@property
def newlines(self) -> Any:
return self.fp.newlines
@property
def errors(self) -> Optional[str]:
return self.fp.errors
@property
def line_buffering(self) -> int:
return self.fp.line_buffering
@property
def encoding(self) -> str:
return self.fp.encoding
[docs] def buffer(self) -> AsyncBinaryIO:
return AsyncBinaryIO.open_fp(self.fp.buffer) # type: ignore
# Aliases
AsyncBytesFileIO: TypeAlias = AsyncBinaryIO
AsyncTextFileIO: TypeAlias = AsyncTextIO
[docs]class AsyncGzipBinaryIO(AsyncBytesFileIO):
[docs] @staticmethod
def get_opener() -> Callable[..., IO[AnyStr]]:
return gzip.open # type: ignore
[docs]class AsyncGzipTextIO(AsyncTextFileIO):
[docs] @staticmethod
def get_opener() -> Callable[..., IO[AnyStr]]:
return gzip.open # type: ignore
[docs]class AsyncBz2BinaryIO(AsyncBytesFileIO):
[docs] @staticmethod
def get_opener() -> Callable[..., IO[AnyStr]]:
return bz2.open # type: ignore
[docs]class AsyncBz2TextIO(AsyncTextFileIO):
[docs] @staticmethod
def get_opener() -> Callable[..., IO[AnyStr]]:
return bz2.open # type: ignore
[docs]class AsyncLzmaBinaryIO(AsyncBytesFileIO):
[docs] @staticmethod
def get_opener() -> Callable[..., IO[AnyStr]]:
return lzma.open # type: ignore
[docs]class AsyncLzmaTextIO(AsyncTextFileIO):
[docs] @staticmethod
def get_opener() -> Callable[..., IO[AnyStr]]:
return lzma.open # type: ignore
[docs]class Compression(Enum):
NONE = (AsyncBinaryIO, AsyncTextIO)
GZIP = (AsyncGzipBinaryIO, AsyncGzipTextIO)
BZ2 = (AsyncBz2BinaryIO, AsyncBz2TextIO)
LZMA = (AsyncLzmaBinaryIO, AsyncLzmaTextIO)
AsyncFileType: TypeAlias = Union[
AsyncFileIO[AnyStr], AsyncTextIO, AsyncBinaryIO,
]
BinaryModes: TypeAlias = Literal[
"rb", "wb", "ab", "xb", "rb+", "wb+", "ab+", "xb+", "br", "br+",
"bw+", "ba+", "bx+",
]
TextModes: TypeAlias = Literal["r", "w", "a", "x", "r+", "w+", "a+", "x+"]
@overload
def async_open(
fname: FilePath,
mode: BinaryModes,
compression: Compression = Compression.NONE,
encoding: str = sys.getdefaultencoding(),
*args: Any, **kwargs: Any,
) -> AsyncBinaryIO:
...
@overload
def async_open(
fname: FilePath,
mode: TextModes,
compression: Compression = Compression.NONE,
encoding: str = sys.getdefaultencoding(),
*args: Any, **kwargs: Any,
) -> AsyncTextIO:
...
[docs]def async_open(
fname: FilePath, mode: str = "r",
compression: Compression = Compression.NONE,
encoding: str = sys.getdefaultencoding(),
*args: Any, **kwargs: Any,
) -> AsyncFileType:
binary_io_class, text_io_class = compression.value
if "b" in mode:
return binary_io_class(fname, mode, *args, **kwargs)
if "t" not in mode:
mode = f"t{mode}"
return text_io_class(fname, mode, encoding=encoding, *args, **kwargs)
__all__ = (
"AsyncBinaryIO",
"AsyncBz2BinaryIO",
"AsyncBz2TextIO",
"AsyncFileIO",
"AsyncFileType",
"AsyncGzipBinaryIO",
"AsyncGzipTextIO",
"AsyncLzmaBinaryIO",
"AsyncLzmaTextIO",
"AsyncTextIO",
"Compression",
"async_open",
)