1. aiomisc module

1.1. aiomisc.aggregate module

class aiomisc.aggregate.AggregateAsyncFunc(*args, **kwargs)[source]

Bases: Protocol, Generic[V, R]

class aiomisc.aggregate.AggregateFunc(*args, **kwargs)[source]

Bases: Protocol, Generic[S, T]

class aiomisc.aggregate.AggregateStatistic(name: str | None = None)[source]

Bases: Statistic

done: int
error: int
leeway_ms: float
max_count: int
name: str | None
success: int
class aiomisc.aggregate.Aggregator(func: AggregateFunc[V, R], *, leeway_ms: float, max_count: int | None = None, statistic_name: str | None = None)[source]

Bases: AggregatorAsync[V, R], Generic[V, R]

class aiomisc.aggregate.AggregatorAsync(func: AggregateAsyncFunc[V, R], *, leeway_ms: float, max_count: int | None = None, statistic_name: str | None = None)[source]

Bases: EventLoopMixin, Generic[V, R]

async aggregate(arg: V) R[source]
property count: int
property leeway_ms: float
property max_count: int | None
class aiomisc.aggregate.Arg(value: V, future: 'Future[R]')[source]

Bases: Generic[V, R]

future: Future[R]
value: V
exception aiomisc.aggregate.ResultNotSetError[source]

Bases: Exception

aiomisc.aggregate.aggregate(leeway_ms: float, max_count: int | None = None) Callable[[AggregateFunc[V, R]], Callable[[V], Coroutine[Any, Any, R]]][source]

Parametric decorator that aggregates multiple (but no more than max_count defaulting to None) single-argument executions (res1 = await func(arg1), res2 = await func(arg2), …) of an asynchronous function with variadic positional arguments (async def func(*args, pho=1, bo=2) -> Iterable) into its single execution with multiple positional arguments (res1, res2, ... = await func(arg1, arg2, ...)) collected within a time window leeway_ms.

Note

func must return a sequence of values of length equal to the number of arguments (and in the same order).

Note

if some unexpected error occurs, exception is propagated to each future; to set an individual error for each aggregated call refer to aggregate_async.

Parameters:
  • leeway_ms – The maximum approximate delay between the first collected argument and the aggregated execution.

  • max_count – The maximum number of arguments to call decorated function with. Default None.

Returns:

aiomisc.aggregate.aggregate_async(leeway_ms: float, max_count: int | None = None) Callable[[AggregateAsyncFunc[V, R]], Callable[[V], Coroutine[Any, Any, R]]][source]

Same as aggregate, but with func arguments of type Arg containing value and future attributes instead. In this setting func is responsible for setting individual results/exceptions for all of the futures or throwing an exception (it will propagate to futures automatically). If func mistakenly does not set a result of some future, then, ResultNotSetError exception is set.

Returns:

1.2. aiomisc.backoff module

class aiomisc.backoff.Backoff(attempt_timeout: int | float | None, deadline: int | float | None, pause: int | float = 0, exceptions: tuple[type[Exception], ...] = (), max_tries: int | None = None, giveup: ~collections.abc.Callable[[Exception], bool] | None = None, statistic_name: str | None = None, statistic_class: type[~aiomisc.backoff.BackoffStatistic] = <class 'aiomisc.backoff.BackoffStatistic'>)[source]

Bases: object

attempt_timeout
deadline
exceptions
async execute(func: ~collections.abc.Callable[[~P], ~collections.abc.Coroutine[~typing.Any, ~typing.Any, ~aiomisc.backoff.T]], *args: ~typing.~P, **kwargs: ~typing.~P) T[source]
giveup
max_tries
pause
prepare(func: Callable[[P], Coroutine[Any, Any, T]]) BackoffExecution[P, T][source]
statistic
class aiomisc.backoff.BackoffExecution(function: Callable[[P], Coroutine[Any, Any, T]], statistic: BackoffStatistic, attempt_timeout: int | float | None, deadline: int | float | None, pause: int | float = 0, exceptions: tuple[type[Exception], ...] = (), max_tries: int | None = None, giveup: Callable[[Exception], bool] | None = None)[source]

Bases: Generic[P, T]

attempt_timeout
deadline
exceptions
async execute(*args: ~P, **kwargs: ~P) T[source]
function
giveup
last_exception: Exception | None
max_tries
pause
statistic
total_tries: int
class aiomisc.backoff.BackoffStatistic(name: str | None = None)[source]

Bases: Statistic

attempts: int
cancels: int
done: int
errors: int
name: str | None
sum_time: float
class aiomisc.backoff.RetryStatistic(name: str | None = None)[source]

Bases: BackoffStatistic

name: str | None
aiomisc.backoff.asyncbackoff(attempt_timeout: int | float | None, deadline: int | float | None, pause: int | float = 0, *exc: type[Exception], exceptions: tuple[type[Exception], ...] = (), max_tries: int | None = None, giveup: ~collections.abc.Callable[[Exception], bool] | None = None, statistic_name: str | None = None, statistic_class: type[~aiomisc.backoff.BackoffStatistic] = <class 'aiomisc.backoff.BackoffStatistic'>) Callable[[Callable[[P], Coroutine[Any, Any, T]]], Callable[[P], Coroutine[Any, Any, T]]][source]

Patametric decorator that ensures that attempt_timeout and deadline time limits are met by decorated function.

In case of exception function will be called again with similar arguments after pause seconds.

Parameters:
  • statistic_name – name filed for statistic instances

  • attempt_timeout – is maximum execution time for one execution attempt.

  • deadline – is maximum execution time for all execution attempts.

  • pause – is time gap between execution attempts.

  • exc – retrying when these exceptions were raised.

  • exceptions – similar as exc but keyword only.

  • max_tries – is maximum count of execution attempts (>= 1).

  • giveup – is a predicate function which can decide by a given

  • statistic_class – statistic class

aiomisc.backoff.asyncretry(max_tries: int | None, exceptions: tuple[type[Exception], ...] = (<class 'Exception'>,), pause: int | float = 0, giveup: ~collections.abc.Callable[[Exception], bool] | None = None, statistic_name: str | None = None) Callable[[Callable[[P], Coroutine[Any, Any, T]]], Callable[[P], Coroutine[Any, Any, T]]][source]

Shortcut of asyncbackoff(None, None, 0, Exception).

In case of exception function will be called again with similar arguments after pause seconds.

Parameters:
  • max_tries – is maximum count of execution attempts (>= 1 or None means infinity).

  • exceptions – similar as exc but keyword only.

  • giveup – is a predicate function which can decide by a given

  • pause – is time gap between execution attempts.

  • statistic_name – name filed for statistic instances

1.3. aiomisc.circuit_breaker module

class aiomisc.circuit_breaker.CircuitBreaker(error_ratio: float, response_time: int | float, exceptions: ~collections.abc.Iterable[type[Exception]] = (<class 'Exception'>,), recovery_time: int | float | None = None, broken_time: int | float | None = None, passing_time: int | float | None = None, exception_inspector: ~collections.abc.Callable[[Exception], bool] | None = None, statistic_name: str | None = None)[source]

Bases: EventLoopMixin

Circuit Breaker pattern implementation. The class instance collects call statistics through the call or call async methods.

The state machine has three states: * CircuitBreakerStates.PASSING * CircuitBreakerStates.BROKEN * CircuitBreakerStates.RECOVERING

In passing mode all results or exceptions will be returned as is. Statistic collects for each call.

In broken mode returns exception CircuitBroken for each call. Statistic doesn’t collecting.

In recovering mode the part of calls is real function calls and remainings raises CircuitBroken. The count of real calls grows exponentially in this case but when 20% (by default) will be failed the state returns to broken state.

Parameters:
  • error_ratio – Failed to success calls ratio. The state might be changed if ratio will reach given value within response time (in seconds). Value between 0.0 and 1.0.

  • response_time – Time window to collect statistics (seconds)

  • exceptions – Only this exceptions will affect ratio. Base class Exception used by default.

  • recovery_time – minimal time in recovery state (seconds)

  • broken_time – minimal time in broken state (seconds)

  • passing_time – minimum time in passing state (seconds)

BUCKET_COUNT = 10
PASSING_BROKEN_THRESHOLD = 1
RECOVER_BROKEN_THRESHOLD = 0.5
bucket() int[source]
call(func: Callable[[...], T], *args: Any, **kwargs: Any) T[source]
async call_async(func: Callable[[...], Awaitable[T]], *args: Any, **kwargs: Any) T[source]
context() Generator[Any, Any, Any][source]
counter() Counter[source]
get_state_delay() int | float[source]
property recovery_ratio: int | float
property response_time: int | float
property state: CircuitBreakerStates
class aiomisc.circuit_breaker.CircuitBreakerStates(*values)[source]

Bases: IntEnum

BROKEN = 1
PASSING = 0
RECOVERING = 2
class aiomisc.circuit_breaker.CircuitBreakerStatistic(name: str | None = None)[source]

Bases: Statistic

call_broken: int
call_count: int
call_passing: int
call_recovering: int
call_recovering_failed: int
call_recovering_ok: int
error_ratio: float
error_ratio_threshold: float
name: str | None
exception aiomisc.circuit_breaker.CircuitBroken(last_exception: Exception | None)[source]

Bases: Exception

last_exception
class aiomisc.circuit_breaker.CounterKey(*values)[source]

Bases: IntEnum

FAIL = 0
OK = 1
TOTAL = 2
aiomisc.circuit_breaker.cutout(ratio: float, response_time: int | float, *exceptions: type[Exception], **kwargs: Any) Callable[[Callable[[...], T] | Callable[[...], Awaitable[T]]], Callable[[...], T | Awaitable[T]]][source]
aiomisc.circuit_breaker.random() x in the interval [0, 1).

1.4. aiomisc.compat module

class aiomisc.compat.EntrypointProtocol(*args, **kwargs)[source]

Bases: Protocol

load() Any[source]
property name: str
class aiomisc.compat.EventLoopMixin[source]

Bases: object

property loop: AbstractEventLoop
class aiomisc.compat.ParamSpec

Bases: object

Parameter specification variable.

The preferred way to construct a parameter specification is via the dedicated syntax for generic functions, classes, and type aliases, where the use of ‘**’ creates a parameter specification:

type IntFunc[**P] = Callable[P, int]

For compatibility with Python 3.11 and earlier, ParamSpec objects can also be created as follows:

P = ParamSpec('P')

Parameter specification variables exist primarily for the benefit of static type checkers. They are used to forward the parameter types of one callable to another callable, a pattern commonly found in higher-order functions and decorators. They are only valid when used in Concatenate, or as the first argument to Callable, or as parameters for user-defined Generics. See class Generic for more information on generic types.

An example for annotating a decorator:

def add_logging[**P, T](f: Callable[P, T]) -> Callable[P, T]:
    '''A type-safe decorator to add logging to a function.'''
    def inner(*args: P.args, **kwargs: P.kwargs) -> T:
        logging.info(f'{f.__name__} was called')
        return f(*args, **kwargs)
    return inner

@add_logging
def add_two(x: float, y: float) -> float:
    '''Add two numbers together.'''
    return x + y

Parameter specification variables can be introspected. e.g.:

>>> P = ParamSpec("P")
>>> P.__name__
'P'

Note that only parameter specification variables defined in the global scope can be pickled.

args

Represents positional arguments.

kwargs

Represents keyword arguments.

class aiomisc.compat.Protocol[source]

Bases: Generic

Base class for protocol classes.

Protocol classes are defined as:

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example:

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as:

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...
aiomisc.compat.entry_pont_iterator(entry_point: str) Iterator[EntrypointProtocol][source]
aiomisc.compat.final(f)[source]

Decorator to indicate final methods and final classes.

Use this decorator to indicate to type checkers that the decorated method cannot be overridden, and decorated class cannot be subclassed.

For example:

class Base:
    @final
    def done(self) -> None:
        ...
class Sub(Base):
    def done(self) -> None:  # Error reported by type checker
        ...

@final
class Leaf:
    ...
class Other(Leaf):  # Error reported by type checker
    ...

There is no runtime checking of these properties. The decorator attempts to set the __final__ attribute to True on the decorated object to allow runtime introspection.

aiomisc.compat.get_current_loop() CT
aiomisc.compat.set_current_loop(value: CT) None
aiomisc.compat.sock_set_nodelay(sock: socket) None[source]
aiomisc.compat.sock_set_reuseport(sock: socket, reuse_port: bool) None[source]
aiomisc.compat.time_ns() int

Return the current time in nanoseconds since the Epoch.

1.5. aiomisc.context module

class aiomisc.context.Context(loop: AbstractEventLoop)[source]

Bases: object

close() None[source]
aiomisc.context.get_context(loop: AbstractEventLoop | None = None) Context[source]

1.6. aiomisc.counters module

class aiomisc.counters.AbstractStatistic[source]

Bases: object

name: str | None
class aiomisc.counters.MetaStatistic(name: str, bases: tuple[type, ...], dct: dict[str, Any])[source]

Bases: type

class aiomisc.counters.Metric(name: str, counter: MutableMapping[str, float | int], default: float | int = 0)[source]

Bases: object

counter
name: str
class aiomisc.counters.Statistic(name: str | None = None)[source]

Bases: AbstractStatistic

name: str | None
class aiomisc.counters.StatisticResult(kind: type[aiomisc.counters.AbstractStatistic], name: str | None, metric: str, value: int | float)[source]

Bases: object

kind: type[AbstractStatistic]
metric: str
name: str | None
value: int | float
aiomisc.counters.get_statistics(*kind: type[Statistic]) Generator[Any, tuple[Statistic, str, int], None][source]

1.7. aiomisc.cron module

class aiomisc.cron.CronCallback(coroutine_func: Callable[[...], Any | Awaitable[Any]], *args: Any, **kwargs: Any)[source]

Bases: EventLoopMixin

Note

When the cron function executes longer then execution interval a next call will be skipping and warning will be logged.

static get_next(cron: croniter, _: RecurringCallback) float[source]
start(spec: str, loop: AbstractEventLoop | None = None, *, shield: bool = False, suppress_exceptions: tuple[type[Exception], ...] = ()) None[source]
stop() Future[source]

1.8. aiomisc.entrypoint module

class aiomisc.entrypoint.Entrypoint(*services: Service, loop: AbstractEventLoop | None = None, pool_size: int | None = None, log_level: int | str = 'info', log_format: str | LogFormat = 'plain', log_buffering: bool = True, log_buffer_size: int = 1024, log_date_format: str | None = None, log_flush_interval: float = 0.2, log_config: bool = True, log_handlers: Iterable[Handler] = (), policy: AbstractEventLoopPolicy = <asyncio.unix_events._UnixDefaultEventLoopPolicy object>, debug: bool = False, catch_signals: tuple[int, ...] | None=None, shutdown_timeout: int | float = 60.0)[source]

Bases: object

Creates a new Entrypoint

Parameters:
  • debug – set debug to event-loop

  • loop – loop

  • services – Service instances which will be starting.

  • pool_size – thread pool size

  • log_level – Logging level which will be configured

  • log_format – Logging format which will be configured

  • log_buffer_size – Buffer size for logging

  • log_flush_interval – interval in seconds for flushing logs

  • log_config – if False do not configure logging

  • catch_signals – Perform shutdown when this signals will be received

  • shutdown_timeout – Timeout in seconds for graceful shutdown

AIOMISC_SHUTDOWN_TIMEOUT: float = 60.0
DEFAULT_AIOMISC_BUFFERING: bool = True
DEFAULT_AIOMISC_BUFFER_SIZE: int = 1024
DEFAULT_AIOMISC_DEBUG: bool = False
DEFAULT_AIOMISC_LOG_CONFIG: bool = True
DEFAULT_AIOMISC_LOG_FLUSH: float = 0.2
DEFAULT_AIOMISC_POOL_SIZE: int | None = None
DEFAULT_LOG_DATE_FORMAT: str | None = None
DEFAULT_LOG_FORMAT: str = 'plain'
DEFAULT_LOG_LEVEL: str = 'info'
POST_START = <aiomisc.signal.Signal object>
POST_STOP = <aiomisc.signal.Signal object>
PRE_START = <aiomisc.signal.Signal object>
PRE_STOP = <aiomisc.signal.Signal object>
async closing() None[source]
classmethod get_current() Entrypoint[source]
async graceful_shutdown(exception: Exception) None[source]
property loop: AbstractEventLoop
property services: tuple[Service, ...]
async start_services(*svc: Service) None[source]
async stop_services(*svc: Service, exc: Exception | None = None) None[source]
aiomisc.entrypoint.entrypoint

alias of Entrypoint

aiomisc.entrypoint.get_context(loop: AbstractEventLoop | None = None) Context[source]
aiomisc.entrypoint.run(coro: Coroutine[None, Any, T], *services: Service, **kwargs: Any) T[source]

1.9. aiomisc.io module

class aiomisc.io.AsyncBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]

Bases: AsyncFileIO[bytes]

executor
class aiomisc.io.AsyncBz2BinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]

Bases: AsyncBinaryIO

executor
static get_opener() Callable[[...], IO][source]
class aiomisc.io.AsyncBz2TextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]

Bases: AsyncTextIO

executor
static get_opener() Callable[[...], IO][source]
class aiomisc.io.AsyncFileIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]

Bases: EventLoopMixin, Generic

async close() None[source]
closed() bool[source]
executor
fileno() int[source]
async flush() None[source]
property fp: IO
static get_opener() Callable[[...], IO][source]
isatty() bool[source]
property mode: str
property name: str
async open() None[source]
classmethod open_fp(fp: IO, executor: Executor | None = None, loop: AbstractEventLoop | None = None) AsyncFileIO[source]
async read(n: int = -1) AnyStr[source]
async readable() bool[source]
async readline(limit: int = -1) AnyStr[source]
async readlines(limit: int = -1) list[AnyStr][source]
async seek(offset: int, whence: int = 0) int[source]
async seekable() bool[source]
async tell() int[source]
async truncate(size: int | None = None) int[source]
async writable() bool[source]
async write(s: AnyStr) int[source]
async writelines(lines: list[AnyStr]) None[source]
class aiomisc.io.AsyncGzipBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]

Bases: AsyncBinaryIO

executor
static get_opener() Callable[[...], IO][source]
class aiomisc.io.AsyncGzipTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]

Bases: AsyncTextIO

executor
static get_opener() Callable[[...], IO][source]
class aiomisc.io.AsyncLzmaBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]

Bases: AsyncBinaryIO

executor
static get_opener() Callable[[...], IO][source]
class aiomisc.io.AsyncLzmaTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]

Bases: AsyncTextIO

executor
static get_opener() Callable[[...], IO][source]
class aiomisc.io.AsyncTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]

Bases: AsyncFileIO[str]

buffer() AsyncBinaryIO[source]
property encoding: str
property errors: str | None
executor
property fp: TextIO
property line_buffering: int
property newlines: Any
class aiomisc.io.Compression(*values)[source]

Bases: Enum

BZ2 = (<class 'aiomisc.io.AsyncBz2BinaryIO'>, <class 'aiomisc.io.AsyncBz2TextIO'>)
GZIP = (<class 'aiomisc.io.AsyncGzipBinaryIO'>, <class 'aiomisc.io.AsyncGzipTextIO'>)
LZMA = (<class 'aiomisc.io.AsyncLzmaBinaryIO'>, <class 'aiomisc.io.AsyncLzmaTextIO'>)
NONE = (<class 'aiomisc.io.AsyncBinaryIO'>, <class 'aiomisc.io.AsyncTextIO'>)
aiomisc.io.async_open(fname: str | Path, mode: Literal['rb', 'wb', 'ab', 'xb', 'rb+', 'wb+', 'ab+', 'xb+', 'br', 'br+', 'bw+', 'ba+', 'bx+'], compression: Compression = Compression.NONE, encoding: str = sys.getdefaultencoding(), *args: Any, **kwargs: Any) AsyncBinaryIO[source]
aiomisc.io.async_open(fname: str | Path, mode: Literal['r', 'w', 'a', 'x', 'r+', 'w+', 'a+', 'x+'], compression: Compression = Compression.NONE, encoding: str = sys.getdefaultencoding(), *args: Any, **kwargs: Any) AsyncTextIO

1.10. aiomisc.iterator_wrapper module

exception aiomisc.iterator_wrapper.ChannelClosed[source]

Bases: RuntimeError

class aiomisc.iterator_wrapper.DequeWrapper[source]

Bases: QueueWrapperBase

get() Any[source]
put(item: Any) None[source]
queue: deque[Any]
class aiomisc.iterator_wrapper.FromThreadChannel(maxsize: int = 0)[source]

Bases: object

SLEEP_DIFFERENCE_DIVIDER = 10
SLEEP_LOW_THRESHOLD = 0.0001
close() None[source]
async get() Any[source]
property is_closed: bool
put(item: Any) None[source]
queue: QueueWrapperBase
class aiomisc.iterator_wrapper.IteratorProxy(iterator: AsyncIterator[T], finalizer: Callable[[], Any])[source]

Bases: Generic[T], AsyncIterator

class aiomisc.iterator_wrapper.IteratorWrapper(gen_func: Callable[[P], Generator[T, None, None]], loop: AbstractEventLoop | None = None, max_size: int = 0, executor: Executor | None = None, statistic_name: str | None = None)[source]

Bases: Generic[P, T], AsyncIterator, EventLoopMixin

close() Awaitable[None][source]
property closed: bool
executor
async wait_closed() None[source]
class aiomisc.iterator_wrapper.IteratorWrapperStatistic(name: str | None = None)[source]

Bases: Statistic

enqueued: int
name: str | None
queue_length: int
queue_size: int
started: int
yielded: int
class aiomisc.iterator_wrapper.QueueWrapper(max_size: int)[source]

Bases: QueueWrapperBase

get() Any[source]
put(item: Any) None[source]
queue: Queue
class aiomisc.iterator_wrapper.QueueWrapperBase[source]

Bases: object

get() Any[source]
abstractmethod put(item: Any) None[source]
aiomisc.iterator_wrapper.make_queue(max_size: int = 0) QueueWrapperBase[source]

1.11. aiomisc.log module

class aiomisc.log.LogFormat(*values)[source]

Bases: IntEnum

classmethod choices() tuple[str, ...][source]
color = 1
classmethod default() str[source]
disabled = -1
journald = 5
json = 2
plain = 4
rich = 6
rich_tb = 7
stream = 0
syslog = 3
class aiomisc.log.LogLevel(*values)[source]

Bases: IntEnum

classmethod choices() tuple[str, ...][source]
critical = 50
debug = 10
classmethod default() str[source]
error = 40
info = 20
notset = 0
warning = 30
class aiomisc.log.ThreadedHandler(target: Handler, flush_interval: float = 0.1, buffered: bool = True, queue_size: int = 0)[source]

Bases: Handler

Initializes the instance - basically setting the formatter to None and the filter list to empty.

close() None[source]

Tidy up any resources used by the handler.

This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.

emit(record: LogRecord) None[source]

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

flush() None[source]

Ensure all logging output has been flushed.

This version does nothing and is intended to be implemented by subclasses.

start() None[source]
aiomisc.log.basic_config(level: int | str = 'info', log_format: str | LogFormat = 'plain', buffered: bool = True, buffer_size: int = 0, flush_interval: int | float = 0.2, loop: AbstractEventLoop | None = None, handlers: Iterable[Handler] = (), **kwargs: Any) None[source]

1.12. aiomisc.periodic module

class aiomisc.periodic.PeriodicCallback(coroutine_func: Callable[[...], Awaitable[Any] | Any], *args: Any, **kwargs: Any)[source]

Bases: EventLoopMixin

Note

When the periodic function executes longer then execution interval a next call would be skipped and warning would be logged.

start(interval: int | float, loop: AbstractEventLoop | None = None, *, delay: int | float = 0, shield: bool = False, suppress_exceptions: tuple[type[Exception], ...] = ()) None[source]
stop(return_exceptions: bool = False) Future[source]

1.13. aiomisc.plugins module

1.14. aiomisc.pool module

class aiomisc.pool.ContextManager(aenter: Callable[[...], Awaitable[T]], aexit: Callable[[...], Awaitable[T]])[source]

Bases: AbstractAsyncContextManager

sentinel = <object object>
class aiomisc.pool.PoolBase(maxsize: int = 10, recycle: int | None = None)[source]

Bases: ABC, EventLoopMixin, Generic[T]

acquire() AbstractAsyncContextManager[T][source]
async close(timeout: int | float | None = None) None[source]
aiomisc.pool.random() x in the interval [0, 1).

1.15. aiomisc.process_pool module

class aiomisc.process_pool.ProcessPoolExecutor(max_workers: int = 4, **kwargs: Any)[source]

Bases: ProcessPoolExecutor, EventLoopMixin

Process pool executor with statistic

Usage:

from time import sleep
from aiomisc import ProcessPoolExecutor

# NOTE: blocking function must be defined at the top level
# of the module to be able to be pickled and sent to the
# child processes.
def blocking_fn():
    sleep(1)
    return 42

async def main():
    executor = ProcessPoolExecutor()
    the_answer = await executor.submit(blocking_fn)
    print("The answer is:", the_answer)

asyncio.run(main())

Initializes a new ProcessPoolExecutor instance.

  • max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors.

  • mp_context: A multiprocessing context to launch the workers. This object should provide SimpleQueue, Queue and Process. Useful to allow specific multiprocessing start methods.

  • initializer: A callable used to initialize worker processes.

  • initargs: A tuple of arguments to pass to the initializer.

  • max_tasks_per_child: The maximum number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process. The default of None means worker process will live as long as the executor. Requires a non-‘fork’ mp_context start method. When given, we default to using ‘spawn’ if no mp_context is supplied.

DEFAULT_MAX_WORKERS = 4
submit(*args: Any, **kwargs: Any) Future[source]

Submit blocking function to the pool

class aiomisc.process_pool.ProcessPoolStatistic(name: str | None = None)[source]

Bases: Statistic

done: int
error: int
name: str | None
processes: int
submitted: int
success: int
sum_time: float

1.16. aiomisc.recurring module

class aiomisc.recurring.RecurringCallback(coroutine_func: Callable[[...], Awaitable[Any] | Any], *args: Any, name: str | None = None, **kwargs: Any)[source]

Bases: object

args: tuple[Any, ...]
func: Callable[[...], Awaitable[Any]]
kwargs: Mapping[str, Any]
name: str
start(strategy: Callable[[RecurringCallback], int | float | Awaitable[int] | Awaitable[float]], loop: AbstractEventLoop | None = None, *, shield: bool = False, suppress_exceptions: tuple[type[Exception], ...] = ()) Task[source]
class aiomisc.recurring.RecurringCallbackStatistic(name: str | None = None)[source]

Bases: Statistic

call_count: int
done: int
fail: int
name: str | None
sum_time: float
exception aiomisc.recurring.StrategyException[source]

Bases: Exception

exception aiomisc.recurring.StrategySkip(next_attempt_delay: int | float)[source]

Bases: StrategyException

Strategy function might raise this exception as way to skip current call

exception aiomisc.recurring.StrategyStop[source]

Bases: StrategyException

Strategy function might raise this exception as way to stop recurring

1.17. aiomisc.signal module

class aiomisc.signal.Signal[source]

Bases: object

async call(*args: Any, **kwargs: Any) None[source]
connect(receiver: Callable[[...], Any]) None[source]
copy() Signal[source]
disconnect(receiver: Callable[[...], Any]) None[source]
freeze() None[source]
property is_frozen: bool
aiomisc.signal.receiver(s: Signal) Callable[[...], Callable[[...], T]][source]

1.18. aiomisc.thread_pool module

class aiomisc.thread_pool.BoundThreaded(func: Callable[[...], T], instance: Any)[source]

Bases: ThreadedBase[P, T]

class aiomisc.thread_pool.BoundThreadedIterable(func: Callable[[...], Generator[T, None, None]], instance: Any, max_size: int = 0)[source]

Bases: ThreadedIterableBase[P, T]

class aiomisc.thread_pool.CoroutineWaiter(coroutine: Coroutine[Any, Any, T], loop: AbstractEventLoop | None = None)[source]

Bases: object

start() None[source]
wait() Any[source]
class aiomisc.thread_pool.IteratorWrapperSeparate(gen_func: Callable[[P], Generator[T, None, None]], loop: AbstractEventLoop | None = None, max_size: int = 0, executor: Executor | None = None, statistic_name: str | None = None)[source]

Bases: IteratorWrapper

executor
class aiomisc.thread_pool.TaskChannel[source]

Bases: SimpleQueue

close() None[source]
closed_event: Event
get(*args: Any, **kwargs: Any) WorkItem[source]

Remove and return an item from the queue.

If optional args ‘block’ is true and ‘timeout’ is None (the default), block if necessary until an item is available. If ‘timeout’ is a non-negative number, it blocks at most ‘timeout’ seconds and raises the Empty exception if no item was available within that time. Otherwise (‘block’ is false), return an item if one is immediately available, else raise the Empty exception (‘timeout’ is ignored in that case).

exception aiomisc.thread_pool.TaskChannelCloseException[source]

Bases: RuntimeError

exception aiomisc.thread_pool.ThreadPoolException[source]

Bases: RuntimeError

class aiomisc.thread_pool.ThreadPoolExecutor(max_workers: int = 4, statistic_name: str | None = None)[source]

Bases: ThreadPoolExecutor

Initializes a new ThreadPoolExecutor instance.

Args:
max_workers: The maximum number of threads that can be used to

execute the given calls.

thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer.

DEFAULT_POOL_SIZE = 4
SHUTDOWN_TIMEOUT = 10
shutdown(wait: bool = True) None[source]

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Args:
wait: If True then shutdown will not return until all running

futures have finished executing and the resources used by the executor have been reclaimed.

cancel_futures: If True then shutdown will cancel all pending

futures. Futures that are completed or running will not be cancelled.

submit(fn: F, *args: Any, **kwargs: Any) Future[source]

Submit blocking function to the pool

class aiomisc.thread_pool.ThreadPoolStatistic(name: str | None = None)[source]

Bases: Statistic

done: int
error: int
name: str | None
submitted: int
success: int
sum_time: float
threads: int
class aiomisc.thread_pool.Threaded(func: Callable[[P], T])[source]

Bases: ThreadedBase[P, T]

func_type: type
class aiomisc.thread_pool.ThreadedBase(*args: Any, **kwargs: Any)[source]

Bases: Generic[P, T], ABC

async_call(*args: ~P, **kwargs: ~P) Awaitable[T][source]
func: Callable[[P], T]
sync_call(*args: ~P, **kwargs: ~P) T[source]
class aiomisc.thread_pool.ThreadedIterable(func: Callable[[P], Generator[T, None, None]], max_size: int = 0)[source]

Bases: ThreadedIterableBase[P, T]

func_type: type
class aiomisc.thread_pool.ThreadedIterableBase(*args: Any, **kwargs: Any)[source]

Bases: Generic[P, T], ABC

async_call(*args: ~P, **kwargs: ~P) IteratorWrapper[P, T][source]
create_wrapper(*args: ~P, **kwargs: ~P) IteratorWrapper[P, T][source]
func: Callable[[P], Generator[T, None, None]]
max_size: int
sync_call(*args: ~P, **kwargs: ~P) Generator[T, None, None][source]
class aiomisc.thread_pool.ThreadedIterableSeparate(func: Callable[[P], Generator[T, None, None]], max_size: int = 0)[source]

Bases: ThreadedIterable[P, T]

create_wrapper(*args: ~P, **kwargs: ~P) IteratorWrapperSeparate[source]
class aiomisc.thread_pool.ThreadedSeparate(func: Callable[[P], T], detach: bool = True)[source]

Bases: Threaded[P, T]

async_call(*args: ~P, **kwargs: ~P) Awaitable[T][source]
detach
class aiomisc.thread_pool.WorkItem(func: ~collections.abc.Callable[[...], ~typing.Any], statistic: ~aiomisc.thread_pool.ThreadPoolStatistic, future: ~_asyncio.Future, loop: ~asyncio.events.AbstractEventLoop, args: tuple[~typing.Any, ...] = <factory>, kwargs: dict[str, ~typing.Any] = <factory>, context: ~contextvars.Context = <factory>)[source]

Bases: WorkItemBase

class aiomisc.thread_pool.WorkItemBase(func: collections.abc.Callable[..., typing.Any], statistic: aiomisc.thread_pool.ThreadPoolStatistic, future: _asyncio.Future, loop: asyncio.events.AbstractEventLoop, args: tuple[typing.Any, ...] = <factory>, kwargs: dict[str, typing.Any] = <factory>, context: _contextvars.Context = <factory>)[source]

Bases: object

args: tuple[Any, ...]
context: Context
func: Callable[[...], Any]
future: Future
kwargs: dict[str, Any]
loop: AbstractEventLoop
statistic: ThreadPoolStatistic
aiomisc.thread_pool.context_partial(func: F, *args: Any, **kwargs: Any) Any[source]
aiomisc.thread_pool.run_in_executor(func: Callable[[...], T], executor: ThreadPoolExecutor | None = None, args: Any = (), kwargs: Any = mappingproxy({})) Awaitable[T][source]
aiomisc.thread_pool.run_in_new_thread(func: F, args: Any = (), kwargs: Any = mappingproxy({}), detach: bool = True, no_return: bool = False, statistic_name: str | None = None) Future[source]
aiomisc.thread_pool.sync_await(func: Callable[[...], Awaitable[T]], *args: Any, **kwargs: Any) T[source]
aiomisc.thread_pool.sync_wait_coroutine(loop: AbstractEventLoop | None, coro_func: Callable[[...], Coroutine[Any, Any, T]], *args: Any, **kwargs: Any) T[source]
aiomisc.thread_pool.thread_pool_thread_loop(tasks: TaskChannel, statistic: ThreadPoolStatistic, stop_event: Event, pool_shutdown_event: Event) None[source]
aiomisc.thread_pool.threaded(func: Callable[[P], T]) Threaded[P, T][source]
aiomisc.thread_pool.threaded(func: Callable[[P], Generator[T, None, None]]) Callable[[P], IteratorWrapper[P, T]]
aiomisc.thread_pool.threaded_iterable(func: Callable[[P], Generator[T, None, None]], *, max_size: int = 0) ThreadedIterable[P, T][source]
aiomisc.thread_pool.threaded_iterable(*, max_size: int = 0) Callable[[Callable[[P], Generator[T, None, None]]], ThreadedIterable[P, T]]
aiomisc.thread_pool.threaded_iterable_separate(func: Callable[[P], Generator[T, None, None]], *, max_size: int = 0) ThreadedIterable[P, T][source]
aiomisc.thread_pool.threaded_iterable_separate(*, max_size: int = 0) Callable[[Callable[[P], Generator[T, None, None]]], ThreadedIterableSeparate[P, T]]
aiomisc.thread_pool.threaded_separate(func: Callable[[P], T], detach: bool = True) ThreadedSeparate[P, T][source]
aiomisc.thread_pool.wait_coroutine(coro: Coroutine[Any, Any, T], loop: AbstractEventLoop | None = None) T[source]

1.19. aiomisc.timeout module

aiomisc.timeout.timeout(value: int | float) Callable[[Callable[[P], Coroutine[Any, Any, T]]], Callable[[P], Coroutine[Any, Any, T]]][source]

1.20. aiomisc.utils module

class aiomisc.utils.SelectAwaitable(*awaitables: Awaitable[T], return_exceptions: bool = False, cancel: bool = True, timeout: int | float | None = None, wait: bool = True, loop: AbstractEventLoop | None = None)[source]

Bases: object

Select one of passed awaitables

Parameters:
  • awaitables – awaitable objects

  • return_exceptions – if True exception will not be raised just returned as result

  • cancel – cancel unfinished coroutines (default True)

  • timeout – execution timeout

  • wait – when False and cancel=True, unfinished coroutines will be cancelled in the background.

  • loop – event loop

property loop: AbstractEventLoop
class aiomisc.utils.SelectResult(length: int)[source]

Bases: Collection

done() bool[source]
is_exception: bool | None
length
result() Any[source]
result_idx: int | None
set_result(idx: int, value: Any, is_exception: bool) None[source]
value: Any
aiomisc.utils.awaitable(func: Callable[[...], AT | Awaitable[AT]]) Callable[[...], Awaitable[AT]][source]

Decorator wraps function and returns a function which returns awaitable object. In case than a function returns a future, the original future will be returned. In case then the function returns a coroutine, the original coroutine will be returned. In case than function returns non-awaitable object, it will be wrapped to a new coroutine which just returns this object. It’s useful when you don’t want to check function result before use it in await expression.

aiomisc.utils.bind_socket(*args: Any, address: str, port: int = 0, options: Iterable[tuple[int, int, int]] = (), reuse_addr: bool = True, reuse_port: bool = True, proto_name: str | None = None) socket[source]

Bind socket and set setblocking(False) for just created socket. This detects address format and select socket family automatically.

Parameters:
  • args – which will be passed to stdlib’s socket constructor (optional)

  • address – bind address

  • port – bind port

  • options – Tuple of pairs which contain socket option to set and the option value.

  • reuse_addr – set socket.SO_REUSEADDR

  • reuse_port – set socket.SO_REUSEPORT

  • proto_name – protocol name which will be logged after binding

Returns:

socket.socket

aiomisc.utils.cancel_tasks(tasks: Iterable[Future]) Future[source]

All passed tasks will be cancelled and a new task will be returned.

Parameters:

tasks – tasks which will be cancelled

aiomisc.utils.chunk_list(iterable: Iterable[T], size: int) Iterable[list[T]][source]

Split list or generator by chunks with fixed maximum size.

aiomisc.utils.create_default_event_loop(pool_size: int | None = None, policy: AbstractEventLoopPolicy = <asyncio.unix_events._UnixDefaultEventLoopPolicy object>, debug: bool = False) tuple[AbstractEventLoop, ThreadPoolExecutor][source]

Creates an event loop and thread pool executor

Parameters:
  • pool_size – thread pool maximal size

  • policy – event loop policy

  • debug – set loop.set_debug(True) if True

aiomisc.utils.fast_uuid1() UUID[source]

Fast UUID1 like identifier

aiomisc.utils.fast_uuid4() UUID[source]

Fast UUID4 like identifier

aiomisc.utils.getrandbits(k) x.  Generates an int with k random bits.
aiomisc.utils.new_event_loop(pool_size: int | None = None, policy: AbstractEventLoopPolicy = <asyncio.unix_events._UnixDefaultEventLoopPolicy object>) AbstractEventLoop[source]
aiomisc.utils.pending_futures(futures: Iterable[Future]) Iterator[Future][source]
aiomisc.utils.select

alias of SelectAwaitable

aiomisc.utils.set_exception(futures: Iterable[Future], exc: BaseException = CancelledError()) set[Task][source]
aiomisc.utils.shield(func: Callable[[...], Coroutine[Any, Any, T]]) Callable[[...], Coroutine[Any, Any, T]][source]

Simple and useful decorator for wrap the coroutine to asyncio.shield.

>>> @shield
... async def non_cancelable_func():
...     await asyncio.sleep(1)

1.21. aiomisc.worker_pool module

class aiomisc.worker_pool.WorkerPool(workers: int, max_overflow: int = 0, *, initializer: Callable[[...], Any] | None = None, initializer_args: tuple[Any, ...] = (), initializer_kwargs: Mapping[str, Any] = mappingproxy({}), statistic_name: str | None = None)[source]

Bases: object

SERVER_CLOSE_TIMEOUT = 1
address: str | tuple[str, int]
close() None[source]
async create_task(func: Callable[[...], T], *args: Any, **kwargs: Any) T[source]
initializer: Callable[[], Any] | None
initializer_args: tuple[Any, ...]
initializer_kwargs: Mapping[str, Any]
property loop: AbstractEventLoop
pids: set[int]
server: AbstractServer
async start() None[source]
tasks: Queue
worker_ids: tuple[bytes, ...]
class aiomisc.worker_pool.WorkerPoolStatistic(name: str | None = None)[source]

Bases: Statistic

bad_auth: int
done: int
error: int
name: str | None
processes: int
queue_size: int
spawning: int
submitted: int
success: int
sum_time: float
task_added: int