1. aiomisc module#

1.1. aiomisc.aggregate module#

class aiomisc.aggregate.AggregateAsyncFunc(*args, **kwargs)[source]#

Bases: Protocol, Generic[V, R]

class aiomisc.aggregate.AggregateFunc(*args, **kwargs)[source]#

Bases: Protocol, Generic[S, T]

class aiomisc.aggregate.AggregateStatistic(name: str | None = None)[source]#

Bases: Statistic

done: int#
error: int#
leeway_ms: float#
max_count: int#
name: str | None#
success: int#
class aiomisc.aggregate.Aggregator(func: AggregateFunc[V, R], *, leeway_ms: float, max_count: int | None = None, statistic_name: str | None = None)[source]#

Bases: AggregatorAsync[V, R], Generic[V, R]

class aiomisc.aggregate.AggregatorAsync(func: AggregateAsyncFunc[V, R], *, leeway_ms: float, max_count: int | None = None, statistic_name: str | None = None)[source]#

Bases: EventLoopMixin, Generic[V, R]

async aggregate(arg: V) R[source]#
property count: int#
property leeway_ms: float#
property max_count: int | None#
class aiomisc.aggregate.Arg(value: V, future: 'Future[R]')[source]#

Bases: Generic[V, R]

future: Future[R]#
value: V#
exception aiomisc.aggregate.ResultNotSetError[source]#

Bases: Exception

aiomisc.aggregate.aggregate(leeway_ms: float, max_count: int | None = None) Callable[[AggregateFunc[V, R]], Callable[[V], Coroutine[Any, Any, R]]][source]#

Parametric decorator that aggregates multiple (but no more than max_count defaulting to None) single-argument executions (res1 = await func(arg1), res2 = await func(arg2), …) of an asynchronous function with variadic positional arguments (async def func(*args, pho=1, bo=2) -> Iterable) into its single execution with multiple positional arguments (res1, res2, ... = await func(arg1, arg2, ...)) collected within a time window leeway_ms.

Note

func must return a sequence of values of length equal to the number of arguments (and in the same order).

Note

if some unexpected error occurs, exception is propagated to each future; to set an individual error for each aggregated call refer to aggregate_async.

Parameters:
  • leeway_ms – The maximum approximate delay between the first collected argument and the aggregated execution.

  • max_count – The maximum number of arguments to call decorated function with. Default None.

Returns:

aiomisc.aggregate.aggregate_async(leeway_ms: float, max_count: int | None = None) Callable[[AggregateAsyncFunc[V, R]], Callable[[V], Coroutine[Any, Any, R]]][source]#

Same as aggregate, but with func arguments of type Arg containing value and future attributes instead. In this setting func is responsible for setting individual results/exceptions for all of the futures or throwing an exception (it will propagate to futures automatically). If func mistakenly does not set a result of some future, then, ResultNotSetError exception is set.

Returns:

1.2. aiomisc.backoff module#

class aiomisc.backoff.BackoffStatistic(name: str | None = None)[source]#

Bases: Statistic

attempts: int#
cancels: int#
done: int#
errors: int#
name: str | None#
sum_time: float#
class aiomisc.backoff.RetryStatistic(name: str | None = None)[source]#

Bases: BackoffStatistic

name: str | None#
aiomisc.backoff.asyncbackoff(attempt_timeout: int | float | None, deadline: int | float | None, pause: int | float = 0, *exc: ~typing.Type[Exception], exceptions: ~typing.Tuple[~typing.Type[Exception], ...] = (), max_tries: int | None = None, giveup: ~typing.Callable[[Exception], bool] | None = None, statistic_name: str | None = None, statistic_class: ~typing.Type[~aiomisc.backoff.BackoffStatistic] = <class 'aiomisc.backoff.BackoffStatistic'>) Callable[[Callable[[P], Coroutine[Any, Any, T]]], Callable[[P], Coroutine[Any, Any, T]]][source]#

Patametric decorator that ensures that attempt_timeout and deadline time limits are met by decorated function.

In case of exception function will be called again with similar arguments after pause seconds.

Parameters:
  • statistic_name – name filed for statistic instances

  • attempt_timeout – is maximum execution time for one execution attempt.

  • deadline – is maximum execution time for all execution attempts.

  • pause – is time gap between execution attempts.

  • exc – retrying when this exceptions was raised.

  • exceptions – similar as exc but keyword only.

  • max_tries – is maximum count of execution attempts (>= 1).

  • giveup – is a predicate function which can decide by a given

  • statistic_class – statistic class

aiomisc.backoff.asyncretry(max_tries: int | None, exceptions: ~typing.Tuple[~typing.Type[Exception], ...] = (<class 'Exception'>,), pause: int | float = 0, giveup: ~typing.Callable[[Exception], bool] | None = None, statistic_name: str | None = None) Callable[[Callable[[P], Coroutine[Any, Any, T]]], Callable[[P], Coroutine[Any, Any, T]]][source]#

Shortcut of asyncbackoff(None, None, 0, Exception).

In case of exception function will be called again with similar arguments after pause seconds.

Parameters:
  • max_tries – is maximum count of execution attempts (>= 1 or None means infinity).

  • exceptions – similar as exc but keyword only.

  • giveup – is a predicate function which can decide by a given

  • pause – is time gap between execution attempts.

  • statistic_name – name filed for statistic instances

1.3. aiomisc.circuit_breaker module#

class aiomisc.circuit_breaker.CircuitBreaker(error_ratio: float, response_time: int | float, exceptions: ~typing.Iterable[~typing.Type[Exception]] = (<class 'Exception'>,), recovery_time: int | float | None = None, broken_time: int | float | None = None, passing_time: int | float | None = None, exception_inspector: ~typing.Callable[[Exception], bool] | None = None, statistic_name: str | None = None)[source]#

Bases: EventLoopMixin

Circuit Breaker pattern implementation. The class instance collects call statistics through the call or call async methods.

The state machine has three states: * CircuitBreakerStates.PASSING * CircuitBreakerStates.BROKEN * CircuitBreakerStates.RECOVERING

In passing mode all results or exceptions will be returned as is. Statistic collects for each call.

In broken mode returns exception CircuitBroken for each call. Statistic doesn’t collecting.

In recovering mode the part of calls is real function calls and remainings raises CircuitBroken. The count of real calls grows exponentially in this case but when 20% (by default) will be failed the state returns to broken state.

Parameters:
  • error_ratio – Failed to success calls ratio. The state might be changed if ratio will reach given value within response time (in seconds). Value between 0.0 and 1.0.

  • response_time – Time window to collect statistics (seconds)

  • exceptions – Only this exceptions will affect ratio. Base class Exception used by default.

  • recovery_time – minimal time in recovery state (seconds)

  • broken_time – minimal time in broken state (seconds)

  • passing_time – minimum time in passing state (seconds)

BUCKET_COUNT = 10#
PASSING_BROKEN_THRESHOLD = 1#
RECOVER_BROKEN_THRESHOLD = 0.5#
bucket() int[source]#
call(func: Callable[[...], T], *args: Any, **kwargs: Any) T[source]#
async call_async(func: Callable[[...], Awaitable[T]], *args: Any, **kwargs: Any) T[source]#
context() Generator[Any, Any, Any][source]#
counter() Counter[source]#
get_state_delay() int | float[source]#
property recovery_ratio: int | float#
property response_time: int | float#
property state: CircuitBreakerStates#
class aiomisc.circuit_breaker.CircuitBreakerStates(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: IntEnum

BROKEN = 1#
PASSING = 0#
RECOVERING = 2#
class aiomisc.circuit_breaker.CircuitBreakerStatistic(name: str | None = None)[source]#

Bases: Statistic

call_broken: int#
call_count: int#
call_passing: int#
call_recovering: int#
call_recovering_failed: int#
call_recovering_ok: int#
error_ratio: float#
error_ratio_threshold: float#
name: str | None#
exception aiomisc.circuit_breaker.CircuitBroken(last_exception: Exception | None)[source]#

Bases: Exception

last_exception#
class aiomisc.circuit_breaker.CounterKey(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: IntEnum

FAIL = 0#
OK = 1#
TOTAL = 2#
aiomisc.circuit_breaker.cutout(ratio: float, response_time: int | float, *exceptions: Type[Exception], **kwargs: Any) Callable[[Callable[[...], T] | Callable[[...], Awaitable[T]]], Callable[[...], T | Awaitable[T]]][source]#
aiomisc.circuit_breaker.random() x in the interval [0, 1).#

1.4. aiomisc.compat module#

class aiomisc.compat.EntrypointProtocol(*args, **kwargs)[source]#

Bases: Protocol

load() Any[source]#
property name: str#
class aiomisc.compat.EventLoopMixin[source]#

Bases: object

property loop: AbstractEventLoop#
class aiomisc.compat.ParamSpec#

Bases: object

Parameter specification variable.

The preferred way to construct a parameter specification is via the dedicated syntax for generic functions, classes, and type aliases, where the use of ‘**’ creates a parameter specification:

type IntFunc[**P] = Callable[P, int]

For compatibility with Python 3.11 and earlier, ParamSpec objects can also be created as follows:

P = ParamSpec('P')

Parameter specification variables exist primarily for the benefit of static type checkers. They are used to forward the parameter types of one callable to another callable, a pattern commonly found in higher-order functions and decorators. They are only valid when used in Concatenate, or as the first argument to Callable, or as parameters for user-defined Generics. See class Generic for more information on generic types.

An example for annotating a decorator:

def add_logging[**P, T](f: Callable[P, T]) -> Callable[P, T]:
    '''A type-safe decorator to add logging to a function.'''
    def inner(*args: P.args, **kwargs: P.kwargs) -> T:
        logging.info(f'{f.__name__} was called')
        return f(*args, **kwargs)
    return inner

@add_logging
def add_two(x: float, y: float) -> float:
    '''Add two numbers together.'''
    return x + y

Parameter specification variables can be introspected. e.g.:

>>> P = ParamSpec("P")
>>> P.__name__
'P'

Note that only parameter specification variables defined in the global scope can be pickled.

args#

Represents positional arguments.

kwargs#

Represents keyword arguments.

class aiomisc.compat.Protocol[source]#

Bases: Generic

Base class for protocol classes.

Protocol classes are defined as:

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example:

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as:

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...
aiomisc.compat.entry_pont_iterator(entry_point: str) Iterator[EntrypointProtocol][source]#
aiomisc.compat.final(f)[source]#

Decorator to indicate final methods and final classes.

Use this decorator to indicate to type checkers that the decorated method cannot be overridden, and decorated class cannot be subclassed.

For example:

class Base:
    @final
    def done(self) -> None:
        ...
class Sub(Base):
    def done(self) -> None:  # Error reported by type checker
        ...

@final
class Leaf:
    ...
class Other(Leaf):  # Error reported by type checker
    ...

There is no runtime checking of these properties. The decorator attempts to set the __final__ attribute to True on the decorated object to allow runtime introspection.

aiomisc.compat.get_current_loop() CT#
aiomisc.compat.set_current_loop(value: CT) None#
aiomisc.compat.sock_set_nodelay(sock: socket) None[source]#
aiomisc.compat.sock_set_reuseport(sock: socket, reuse_port: bool) None[source]#
aiomisc.compat.time_ns() int#

Return the current time in nanoseconds since the Epoch.

1.5. aiomisc.context module#

class aiomisc.context.Context(loop: AbstractEventLoop)[source]#

Bases: object

close() None[source]#
aiomisc.context.get_context(loop: AbstractEventLoop | None = None) Context[source]#

1.6. aiomisc.counters module#

class aiomisc.counters.AbstractStatistic[source]#

Bases: object

name: str | None#
class aiomisc.counters.MetaStatistic(name: str, bases: Tuple[type, ...], dct: Dict[str, Any])[source]#

Bases: type

class aiomisc.counters.Metric(name: str, counter: MutableMapping[str, float | int], default: float | int = 0)[source]#

Bases: object

counter#
name: str#
class aiomisc.counters.Statistic(name: str | None = None)[source]#

Bases: AbstractStatistic

name: str | None#
class aiomisc.counters.StatisticResult(kind: Type[aiomisc.counters.AbstractStatistic], name: str | None, metric: str, value: int | float)[source]#

Bases: object

kind: Type[AbstractStatistic]#
metric: str#
name: str | None#
value: int | float#
aiomisc.counters.get_statistics(*kind: Type[Statistic]) Generator[Any, Tuple[Statistic, str, int], None][source]#

1.7. aiomisc.cron module#

class aiomisc.cron.CronCallback(coroutine_func: Callable[[...], Awaitable[Any] | Any], *args: Any, **kwargs: Any)[source]#

Bases: EventLoopMixin

Note

When the cron function executes longer then execution interval a next call will be skipping and warning will be logged.

static get_next(cron: croniter, _: RecurringCallback) float[source]#
start(spec: str, loop: AbstractEventLoop | None = None, *, shield: bool = False, suppress_exceptions: Tuple[Type[Exception], ...] = ()) None[source]#
stop() Future[source]#

1.8. aiomisc.entrypoint module#

class aiomisc.entrypoint.Entrypoint(*services: ~aiomisc.service.base.Service, loop: ~asyncio.events.AbstractEventLoop | None = None, pool_size: int | None = None, log_level: int | str = 'info', log_format: str | ~aiomisc_log.enum.LogFormat = 'plain', log_buffering: bool = True, log_buffer_size: int = 1024, log_date_format: str | None = None, log_flush_interval: float = 0.2, log_config: bool = True, policy: ~asyncio.events.AbstractEventLoopPolicy = <uvloop.EventLoopPolicy object>, debug: bool = False, catch_signals: ~typing.Tuple[int, ...] | None = None, shutdown_timeout: int | float = 60.0)[source]#

Bases: object

Creates a new Entrypoint

Parameters:
  • debug – set debug to event-loop

  • loop – loop

  • services – Service instances which will be starting.

  • pool_size – thread pool size

  • log_level – Logging level which will be configured

  • log_format – Logging format which will be configured

  • log_buffer_size – Buffer size for logging

  • log_flush_interval – interval in seconds for flushing logs

  • log_config – if False do not configure logging

  • catch_signals – Perform shutdown when this signals will be received

  • shutdown_timeout – Timeout in seconds for graceful shutdown

AIOMISC_SHUTDOWN_TIMEOUT: float = 60.0#
DEFAULT_AIOMISC_BUFFERING: bool = True#
DEFAULT_AIOMISC_BUFFER_SIZE: int = 1024#
DEFAULT_AIOMISC_DEBUG: bool = False#
DEFAULT_AIOMISC_LOG_CONFIG: bool = True#
DEFAULT_AIOMISC_LOG_FLUSH: float = 0.2#
DEFAULT_AIOMISC_POOL_SIZE: int | None = None#
DEFAULT_LOG_DATE_FORMAT: str | None = None#
DEFAULT_LOG_FORMAT: str = 'plain'#
DEFAULT_LOG_LEVEL: str = 'info'#
POST_START = <aiomisc.signal.Signal object>#
POST_STOP = <aiomisc.signal.Signal object>#
PRE_START = <aiomisc.signal.Signal object>#
PRE_STOP = <aiomisc.signal.Signal object>#
async closing() None[source]#
classmethod get_current() Entrypoint[source]#
async graceful_shutdown(exception: Exception) None[source]#
property loop: AbstractEventLoop#
property services: Tuple[Service, ...]#
async start_services(*svc: Service) None[source]#
async stop_services(*svc: Service, exc: Exception | None = None) None[source]#
aiomisc.entrypoint.entrypoint#

alias of Entrypoint

aiomisc.entrypoint.get_context(loop: AbstractEventLoop | None = None) Context[source]#
aiomisc.entrypoint.run(coro: Coroutine[None, Any, T], *services: Service, **kwargs: Any) T[source]#

1.9. aiomisc.io module#

class aiomisc.io.AsyncBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#

Bases: AsyncFileIO[bytes]

executor#
class aiomisc.io.AsyncBz2BinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#

Bases: AsyncBinaryIO

executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncBz2TextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#

Bases: AsyncTextIO

executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncFileIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#

Bases: EventLoopMixin, Generic

async close() None[source]#
closed() bool[source]#
executor#
fileno() int[source]#
async flush() None[source]#
property fp: IO#
static get_opener() Callable[[...], IO][source]#
isatty() bool[source]#
property mode: str#
property name: str#
async open() None[source]#
classmethod open_fp(fp: IO, executor: Executor | None = None, loop: AbstractEventLoop | None = None) AsyncFileIO[source]#
async read(n: int = -1) AnyStr[source]#
async readable() bool[source]#
async readline(limit: int = -1) AnyStr[source]#
async readlines(limit: int = -1) List[source]#
async seek(offset: int, whence: int = 0) int[source]#
async seekable() bool[source]#
async tell() int[source]#
async truncate(size: int | None = None) int[source]#
async writable() bool[source]#
async write(s: AnyStr) int[source]#
async writelines(lines: List) None[source]#
class aiomisc.io.AsyncGzipBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#

Bases: AsyncBinaryIO

executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncGzipTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#

Bases: AsyncTextIO

executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncLzmaBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#

Bases: AsyncBinaryIO

executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncLzmaTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#

Bases: AsyncTextIO

executor#
static get_opener() Callable[[...], IO][source]#
class aiomisc.io.AsyncTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#

Bases: AsyncFileIO[str]

buffer() AsyncBinaryIO[source]#
property encoding: str#
property errors: str | None#
executor#
property fp: TextIO#
property line_buffering: int#
property newlines: Any#
class aiomisc.io.Compression(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: Enum

BZ2 = (<class 'aiomisc.io.AsyncBz2BinaryIO'>, <class 'aiomisc.io.AsyncBz2TextIO'>)#
GZIP = (<class 'aiomisc.io.AsyncGzipBinaryIO'>, <class 'aiomisc.io.AsyncGzipTextIO'>)#
LZMA = (<class 'aiomisc.io.AsyncLzmaBinaryIO'>, <class 'aiomisc.io.AsyncLzmaTextIO'>)#
NONE = (<class 'aiomisc.io.AsyncBinaryIO'>, <class 'aiomisc.io.AsyncTextIO'>)#
aiomisc.io.async_open(fname: FilePath, mode: BinaryModes, compression: Compression = Compression.NONE, encoding: str = sys.getdefaultencoding(), *args: Any, **kwargs: Any) AsyncBinaryIO[source]#
aiomisc.io.async_open(fname: FilePath, mode: TextModes, compression: Compression = Compression.NONE, encoding: str = sys.getdefaultencoding(), *args: Any, **kwargs: Any) AsyncTextIO

1.10. aiomisc.iterator_wrapper module#

exception aiomisc.iterator_wrapper.ChannelClosed[source]#

Bases: RuntimeError

class aiomisc.iterator_wrapper.DequeWrapper[source]#

Bases: QueueWrapperBase

get() Any[source]#
put(item: Any) None[source]#
queue: Deque[Any]#
class aiomisc.iterator_wrapper.FromThreadChannel(maxsize: int = 0)[source]#

Bases: object

SLEEP_DIFFERENCE_DIVIDER = 10#
SLEEP_LOW_THRESHOLD = 0.0001#
close() None[source]#
async get() Any[source]#
property is_closed: bool#
put(item: Any) None[source]#
queue: QueueWrapperBase#
class aiomisc.iterator_wrapper.IteratorProxy(iterator: AsyncIterator, finalizer: Callable[[], Any])[source]#

Bases: AsyncIterator

class aiomisc.iterator_wrapper.IteratorWrapper(gen_func: Callable[[], Generator[T, R, None]], loop: AbstractEventLoop | None = None, max_size: int = 0, executor: Executor | None = None, statistic_name: str | None = None)[source]#

Bases: AsyncIterator, EventLoopMixin

close() Awaitable[None][source]#
property closed: bool#
executor#
async wait_closed() None[source]#
class aiomisc.iterator_wrapper.IteratorWrapperStatistic(name: str | None = None)[source]#

Bases: Statistic

enqueued: int#
name: str | None#
queue_length: int#
queue_size: int#
started: int#
yielded: int#
class aiomisc.iterator_wrapper.QueueWrapper(max_size: int)[source]#

Bases: QueueWrapperBase

get() Any[source]#
put(item: Any) None[source]#
queue: Queue#
class aiomisc.iterator_wrapper.QueueWrapperBase[source]#

Bases: object

get() Any[source]#
abstract put(item: Any) None[source]#
aiomisc.iterator_wrapper.make_queue(max_size: int = 0) QueueWrapperBase[source]#

1.11. aiomisc.log module#

class aiomisc.log.LogFormat(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: IntEnum

classmethod choices() Tuple[str, ...][source]#
color = 1#
classmethod default() str[source]#
disabled = -1#
journald = 5#
json = 2#
plain = 4#
rich = 6#
rich_tb = 7#
stream = 0#
syslog = 3#
class aiomisc.log.LogLevel(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: IntEnum

classmethod choices() Tuple[str, ...][source]#
critical = 50#
debug = 10#
classmethod default() str[source]#
error = 40#
info = 20#
notset = 0#
warning = 30#
aiomisc.log.basic_config(level: int | str = 'info', log_format: str | LogFormat = 'plain', buffered: bool = True, buffer_size: int = 1024, flush_interval: int | float = 0.2, loop: AbstractEventLoop | None = None, handlers: Iterable[Handler] = (), **kwargs: Any) None[source]#

1.12. aiomisc.periodic module#

class aiomisc.periodic.PeriodicCallback(coroutine_func: Callable[[...], Awaitable[Any] | Any], *args: Any, **kwargs: Any)[source]#

Bases: EventLoopMixin

Note

When the periodic function executes longer then execution interval a next call would be skipped and warning would be logged.

start(interval: int | float, loop: AbstractEventLoop | None = None, *, delay: int | float = 0, shield: bool = False, suppress_exceptions: Tuple[Type[Exception], ...] = ()) None[source]#
stop(return_exceptions: bool = False) Future[source]#

1.13. aiomisc.plugins module#

1.14. aiomisc.pool module#

class aiomisc.pool.ContextManager(aenter: Callable[[...], Awaitable[T]], aexit: Callable[[...], Awaitable[T]])[source]#

Bases: AsyncContextManager

sentinel = <object object>#
class aiomisc.pool.PoolBase(maxsize: int = 10, recycle: int | None = None)[source]#

Bases: ABC, EventLoopMixin, Generic[T]

acquire() AsyncContextManager[T][source]#
async close(timeout: int | float | None = None) None[source]#
aiomisc.pool.random() x in the interval [0, 1).#

1.15. aiomisc.process_pool module#

class aiomisc.process_pool.ProcessPoolExecutor(max_workers: int = 4, **kwargs: Any)[source]#

Bases: ProcessPoolExecutor, EventLoopMixin

Initializes a new ProcessPoolExecutor instance.

Args:
max_workers: The maximum number of processes that can be used to

execute the given calls. If None or not given then as many worker processes will be created as the machine has processors.

mp_context: A multiprocessing context to launch the workers created

using the multiprocessing.get_context(‘start method’) API. This object should provide SimpleQueue, Queue and Process.

initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. max_tasks_per_child: The maximum number of tasks a worker process

can complete before it will exit and be replaced with a fresh worker process. The default of None means worker process will live as long as the executor. Requires a non-‘fork’ mp_context start method. When given, we default to using ‘spawn’ if no mp_context is supplied.

DEFAULT_MAX_WORKERS = 4#
submit(*args: Any, **kwargs: Any) Future[source]#

Submit blocking function to the pool

class aiomisc.process_pool.ProcessPoolStatistic(name: str | None = None)[source]#

Bases: Statistic

done: int#
error: int#
name: str | None#
processes: int#
submitted: int#
success: int#
sum_time: float#

1.16. aiomisc.recurring module#

class aiomisc.recurring.RecurringCallback(coroutine_func: Callable[[...], Awaitable[Any] | Any], *args: Any, **kwargs: Any)[source]#

Bases: object

args: Tuple[Any, ...]#
func: Callable[[...], Awaitable[Any]]#
kwargs: Mapping[str, Any]#
name: str#
start(strategy: Callable[[RecurringCallback], int | float | Awaitable[int] | Awaitable[float]], loop: AbstractEventLoop | None = None, *, shield: bool = False, suppress_exceptions: Tuple[Type[Exception], ...] = ()) Task[source]#
class aiomisc.recurring.RecurringCallbackStatistic(name: str | None = None)[source]#

Bases: Statistic

call_count: int#
done: int#
fail: int#
name: str | None#
sum_time: float#
exception aiomisc.recurring.StrategyException[source]#

Bases: Exception

exception aiomisc.recurring.StrategySkip(next_attempt_delay: int | float)[source]#

Bases: StrategyException

Strategy function might raise this exception as way to skip current call

exception aiomisc.recurring.StrategyStop[source]#

Bases: StrategyException

Strategy function might raise this exception as way to stop recurring

1.17. aiomisc.signal module#

class aiomisc.signal.Signal[source]#

Bases: object

async call(*args: Any, **kwargs: Any) None[source]#
connect(receiver: Callable[[...], Any]) None[source]#
copy() Signal[source]#
disconnect(receiver: Callable[[...], Any]) None[source]#
freeze() None[source]#
property is_frozen: bool#
aiomisc.signal.receiver(s: Signal) Callable[[...], Callable[[...], T]][source]#

1.18. aiomisc.thread_pool module#

class aiomisc.thread_pool.CoroutineWaiter(coroutine: Coroutine[Any, Any, T], loop: AbstractEventLoop | None = None)[source]#

Bases: object

start() None[source]#
wait() Any[source]#
class aiomisc.thread_pool.IteratorWrapperSeparate(gen_func: Callable[[], Generator[T, R, None]], loop: AbstractEventLoop | None = None, max_size: int = 0, executor: Executor | None = None, statistic_name: str | None = None)[source]#

Bases: IteratorWrapper

executor#
class aiomisc.thread_pool.TaskChannel[source]#

Bases: SimpleQueue

close() None[source]#
closed_event: Event#
get(*args: Any, **kwargs: Any) WorkItem[source]#

Remove and return an item from the queue.

If optional args ‘block’ is true and ‘timeout’ is None (the default), block if necessary until an item is available. If ‘timeout’ is a non-negative number, it blocks at most ‘timeout’ seconds and raises the Empty exception if no item was available within that time. Otherwise (‘block’ is false), return an item if one is immediately available, else raise the Empty exception (‘timeout’ is ignored in that case).

exception aiomisc.thread_pool.TaskChannelCloseException[source]#

Bases: RuntimeError

exception aiomisc.thread_pool.ThreadPoolException[source]#

Bases: RuntimeError

class aiomisc.thread_pool.ThreadPoolExecutor(max_workers: int = 4, statistic_name: str | None = None)[source]#

Bases: ThreadPoolExecutor

Initializes a new ThreadPoolExecutor instance.

Args:
max_workers: The maximum number of threads that can be used to

execute the given calls.

thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer.

DEFAULT_POOL_SIZE = 4#
SHUTDOWN_TIMEOUT = 10#
shutdown(wait: bool = True) None[source]#

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Args:
wait: If True then shutdown will not return until all running

futures have finished executing and the resources used by the executor have been reclaimed.

cancel_futures: If True then shutdown will cancel all pending

futures. Futures that are completed or running will not be cancelled.

submit(fn: F, *args: Any, **kwargs: Any) Future[source]#

Submit blocking function to the pool

class aiomisc.thread_pool.ThreadPoolStatistic(name: str | None = None)[source]#

Bases: Statistic

done: int#
error: int#
name: str | None#
submitted: int#
success: int#
sum_time: float#
threads: int#
class aiomisc.thread_pool.WorkItem(func: ~typing.Callable[[...], ~typing.Any], statistic: ~aiomisc.thread_pool.ThreadPoolStatistic, future: ~_asyncio.Future, loop: ~asyncio.events.AbstractEventLoop, args: ~typing.Tuple[~typing.Any, ...] = <factory>, kwargs: ~typing.Dict[str, ~typing.Any] = <factory>, context: ~_contextvars.Context = <factory>)[source]#

Bases: WorkItemBase

class aiomisc.thread_pool.WorkItemBase(func: Callable[..., Any], statistic: aiomisc.thread_pool.ThreadPoolStatistic, future: _asyncio.Future, loop: asyncio.events.AbstractEventLoop, args: Tuple[Any, ...] = <factory>, kwargs: Dict[str, Any] = <factory>, context: _contextvars.Context = <factory>)[source]#

Bases: object

args: Tuple[Any, ...]#
context: Context#
func: Callable[[...], Any]#
future: Future#
kwargs: Dict[str, Any]#
loop: AbstractEventLoop#
statistic: ThreadPoolStatistic#
aiomisc.thread_pool.context_partial(func: F, *args: Any, **kwargs: Any) Any[source]#
aiomisc.thread_pool.run_in_executor(func: Callable[[...], T], executor: ThreadPoolExecutor | None = None, args: Any = (), kwargs: Any = mappingproxy({})) Awaitable[T][source]#
aiomisc.thread_pool.run_in_new_thread(func: F, args: Any = (), kwargs: Any = mappingproxy({}), detach: bool = True, no_return: bool = False, statistic_name: str | None = None) Future[source]#
aiomisc.thread_pool.sync_await(func: Callable[[...], Awaitable[T]], *args: Any, **kwargs: Any) T[source]#
aiomisc.thread_pool.sync_wait_coroutine(loop: AbstractEventLoop | None, coro_func: Callable[[...], Coroutine[Any, Any, T]], *args: Any, **kwargs: Any) T[source]#
aiomisc.thread_pool.thread_pool_thread_loop(tasks: TaskChannel, statistic: ThreadPoolStatistic, stop_event: Event, pool_shutdown_event: Event) None[source]#
aiomisc.thread_pool.threaded(func: Callable[[P], T]) Callable[[P], Awaitable[T]][source]#
aiomisc.thread_pool.threaded_iterable(func: F | None = None, max_size: int = 0) Any[source]#
aiomisc.thread_pool.threaded_iterable_separate(func: F | None = None, max_size: int = 0) Any[source]#
aiomisc.thread_pool.threaded_separate(func: F, detach: bool = True) Callable[[...], Awaitable[Any]][source]#
aiomisc.thread_pool.wait_coroutine(coro: Coroutine[Any, Any, T], loop: AbstractEventLoop | None = None) T[source]#

1.19. aiomisc.timeout module#

aiomisc.timeout.timeout(value: int | float) Callable[[Callable[[P], Coroutine[Any, Any, T]]], Callable[[P], Coroutine[Any, Any, T]]][source]#

1.20. aiomisc.utils module#

class aiomisc.utils.SelectAwaitable(*awaitables: Awaitable[T], return_exceptions: bool = False, cancel: bool = True, timeout: int | float | None = None, wait: bool = True, loop: AbstractEventLoop | None = None)[source]#

Bases: object

Select one of passed awaitables

Parameters:
  • awaitables – awaitable objects

  • return_exceptions – if True exception will not be raised just returned as result

  • cancel – cancel unfinished coroutines (default True)

  • timeout – execution timeout

  • wait – when False and cancel=True, unfinished coroutines will be cancelled in the background.

  • loop – event loop

property loop: AbstractEventLoop#
class aiomisc.utils.SelectResult(length: int)[source]#

Bases: Collection

done() bool[source]#
is_exception: bool | None#
length#
result() Any[source]#
result_idx: int | None#
set_result(idx: int, value: Any, is_exception: bool) None[source]#
value: Any#
aiomisc.utils.awaitable(func: Callable[[...], AT | Awaitable[AT]]) Callable[[...], Awaitable[AT]][source]#

Decorator wraps function and returns a function which returns awaitable object. In case than a function returns a future, the original future will be returned. In case then the function returns a coroutine, the original coroutine will be returned. In case than function returns non-awaitable object, it will be wrapped to a new coroutine which just returns this object. It’s useful when you don’t want to check function result before use it in await expression.

aiomisc.utils.bind_socket(*args: Any, address: str, port: int = 0, options: Iterable[Tuple[int, int, int]] = (), reuse_addr: bool = True, reuse_port: bool = True, proto_name: str | None = None) socket[source]#

Bind socket and set setblocking(False) for just created socket. This detects address format and select socket family automatically.

Parameters:
  • args – which will be passed to stdlib’s socket constructor (optional)

  • address – bind address

  • port – bind port

  • options – Tuple of pairs which contain socket option to set and the option value.

  • reuse_addr – set socket.SO_REUSEADDR

  • reuse_port – set socket.SO_REUSEPORT

  • proto_name – protocol name which will be logged after binding

Returns:

socket.socket

aiomisc.utils.cancel_tasks(tasks: Iterable[Future]) Future[source]#

All passed tasks will be cancelled and a new task will be returned.

Parameters:

tasks – tasks which will be cancelled

aiomisc.utils.chunk_list(iterable: Iterable[T], size: int) Iterable[List[T]][source]#

Split list or generator by chunks with fixed maximum size.

aiomisc.utils.create_default_event_loop(pool_size: int | None = None, policy: ~asyncio.events.AbstractEventLoopPolicy = <uvloop.EventLoopPolicy object>, debug: bool = False) Tuple[AbstractEventLoop, ThreadPoolExecutor][source]#

Creates an event loop and thread pool executor

Parameters:
  • pool_size – thread pool maximal size

  • policy – event loop policy

  • debug – set loop.set_debug(True) if True

aiomisc.utils.fast_uuid1() UUID[source]#

Fast UUID1 like identifier

aiomisc.utils.fast_uuid4() UUID[source]#

Fast UUID4 like identifier

aiomisc.utils.getrandbits(k) x.  Generates an int with k random bits.#
aiomisc.utils.new_event_loop(pool_size: int | None = None, policy: ~asyncio.events.AbstractEventLoopPolicy = <uvloop.EventLoopPolicy object>) AbstractEventLoop[source]#
aiomisc.utils.pending_futures(futures: Iterable[Future]) Iterator[Future][source]#
aiomisc.utils.select#

alias of SelectAwaitable

aiomisc.utils.set_exception(futures: Iterable[Future], exc: BaseException = CancelledError()) Set[Task][source]#
aiomisc.utils.shield(func: Callable[[...], Coroutine[Any, Any, T]]) Callable[[...], Coroutine[Any, Any, T]][source]#

Simple and useful decorator for wrap the coroutine to asyncio.shield.

>>> @shield
... async def non_cancelable_func():
...     await asyncio.sleep(1)

1.21. aiomisc.worker_pool module#

class aiomisc.worker_pool.WorkerPool(workers: int, max_overflow: int = 0, *, initializer: Callable[[...], Any] | None = None, initializer_args: Tuple[Any, ...] = (), initializer_kwargs: Mapping[str, Any] = mappingproxy({}), statistic_name: str | None = None)[source]#

Bases: object

SERVER_CLOSE_TIMEOUT = 1#
address: str | Tuple[str, int]#
close() None[source]#
async create_task(func: Callable[[...], T], *args: Any, **kwargs: Any) T[source]#
initializer: Callable[[], Any] | None#
initializer_args: Tuple[Any, ...]#
initializer_kwargs: Mapping[str, Any]#
property loop: AbstractEventLoop#
pids: Set[int]#
server: AbstractServer#
async start() None[source]#
tasks: Queue#
worker_ids: Tuple[bytes, ...]#
class aiomisc.worker_pool.WorkerPoolStatistic(name: str | None = None)[source]#

Bases: Statistic

bad_auth: int#
done: int#
error: int#
name: str | None#
processes: int#
queue_size: int#
spawning: int#
submitted: int#
success: int#
sum_time: float#
task_added: int#