Source code for aiothreads.threaded_iterable

from abc import ABC, abstractmethod
from functools import partial
from typing import Any, Callable, Concatenate, Generator, Generic, MutableMapping, Optional, Union, overload
from weakref import WeakKeyDictionary

from .iterator_wrapper import IteratorWrapper
from .new_thread import run_in_new_thread
from .types import BP, P, S, T


class ThreadedIterableBase(Generic[P, T], ABC):
    func: Callable[P, Generator[T, None, None]]
    max_size: int

    @abstractmethod
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        ...

    def sync_call(
        self, *args: P.args, **kwargs: P.kwargs,
    ) -> Generator[T, None, None]:
        return self.func(*args, **kwargs)

    def async_call(
        self, *args: P.args, **kwargs: P.kwargs,
    ) -> IteratorWrapper[P, T]:
        return self.create_wrapper(*args, **kwargs)

    def create_wrapper(
        self, *args: P.args, **kwargs: P.kwargs,
    ) -> IteratorWrapper[P, T]:
        return IteratorWrapper(
            partial(self.func, *args, **kwargs),
            max_size=self.max_size,
        )

    def __call__(
        self,
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> IteratorWrapper[P, T]:
        return self.async_call(*args, **kwargs)


class ThreadedIterable(ThreadedIterableBase[P, T]):
    func_type: type

    def __init__(
        self,
        func: Callable[P, Generator[T, None, None]],
        max_size: int = 0,
    ) -> None:
        if isinstance(func, staticmethod):
            self.func_type = staticmethod
            actual_func = func.__func__
        elif isinstance(func, classmethod):
            self.func_type = classmethod
            actual_func = func.__func__
        else:
            self.func_type = type(func)
            actual_func = func

        self.func = actual_func
        self.max_size = max_size
        self.__cache: MutableMapping[Any, Any] = WeakKeyDictionary()

    @overload
    def __get__(
        self: "ThreadedIterable[Concatenate[S, BP], T]",
        instance: S,
        owner: Optional[type] = ...,
    ) -> "BoundThreadedIterable[BP, T]":
        ...

    @overload
    def __get__(
        self: "ThreadedIterable[P, T]",
        instance: None,
        owner: Optional[type] = ...,
    ) -> "ThreadedIterable[P, T]":
        ...

    def __get__(
        self,
        instance: Any,
        owner: Optional[type] = None,
    ) -> "ThreadedIterable[P, T] | BoundThreadedIterable[Any, T]":
        key = instance
        result: Any
        if key in self.__cache:
            return self.__cache[key]

        if self.func_type is staticmethod:
            result = self
        elif self.func_type is classmethod:
            cls = owner if instance is None else type(instance)
            result = BoundThreadedIterable(self.func, cls, self.max_size)
        elif instance is not None:
            result = BoundThreadedIterable(self.func, instance, self.max_size)
        else:
            result = self

        self.__cache[key] = result
        return result


class BoundThreadedIterable(ThreadedIterableBase[P, T]):
    __instance: Any

    def __init__(
        self,
        func: Callable[..., Generator[T, None, None]],
        instance: Any,
        max_size: int = 0,
    ) -> None:
        self.__instance = instance
        self.func = lambda *args, **kwargs: func(instance, *args, **kwargs)
        self.max_size = max_size


@overload
def threaded_iterable(
    func: Callable[P, Generator[T, None, None]],
    *,
    max_size: int = 0,
) -> "ThreadedIterable[P, T]":
    ...


@overload
def threaded_iterable(
    *,
    max_size: int = 0,
) -> Callable[
    [Callable[P, Generator[T, None, None]]], ThreadedIterable[P, T],
]:
    ...


[docs] def threaded_iterable( func: Optional[Callable[P, Generator[T, None, None]]] = None, *, max_size: int = 0, ) -> Union[ ThreadedIterable[P, T], Callable[ [Callable[P, Generator[T, None, None]]], ThreadedIterable[P, T], ], ]: if func is None: return lambda f: ThreadedIterable(f, max_size=max_size) return ThreadedIterable(func, max_size=max_size)
class IteratorWrapperSeparate(IteratorWrapper): def _run(self) -> Any: return run_in_new_thread(self._in_thread) class ThreadedIterableSeparate(ThreadedIterable[P, T]): def create_wrapper( self, *args: P.args, **kwargs: P.kwargs, ) -> IteratorWrapperSeparate: return IteratorWrapperSeparate( partial(self.func, *args, **kwargs), max_size=self.max_size, ) @overload def threaded_iterable_separate( func: Callable[P, Generator[T, None, None]], *, max_size: int = 0, ) -> "ThreadedIterable[P, T]": ... @overload def threaded_iterable_separate( *, max_size: int = 0, ) -> Callable[ [Callable[P, Generator[T, None, None]]], ThreadedIterableSeparate[P, T], ]: ...
[docs] def threaded_iterable_separate( func: Optional[Callable[P, Generator[T, None, None]]] = None, *, max_size: int = 0, ) -> Union[ ThreadedIterable[P, T], Callable[ [Callable[P, Generator[T, None, None]]], ThreadedIterableSeparate[P, T], ], ]: if func is None: return lambda f: ThreadedIterableSeparate(f, max_size=max_size) return ThreadedIterableSeparate(func, max_size=max_size)