1. aiomisc
module#
1.1. aiomisc.aggregate
module#
- class aiomisc.aggregate.AggregateStatistic(name: str | None = None)[source]#
Bases:
Statistic
- done: int#
- error: int#
- leeway_ms: float#
- max_count: int#
- name: str | None#
- success: int#
- class aiomisc.aggregate.Aggregator(func: AggregateFunc[V, R], *, leeway_ms: float, max_count: int | None = None, statistic_name: str | None = None)[source]#
Bases:
AggregatorAsync
[V
,R
],Generic
[V
,R
]
- class aiomisc.aggregate.AggregatorAsync(func: AggregateAsyncFunc[V, R], *, leeway_ms: float, max_count: int | None = None, statistic_name: str | None = None)[source]#
Bases:
EventLoopMixin
,Generic
[V
,R
]- property count: int#
- property leeway_ms: float#
- property max_count: int | None#
- class aiomisc.aggregate.Arg(value: V, future: 'Future[R]')[source]#
Bases:
Generic
[V
,R
]- future: Future[R]#
- value: V#
- aiomisc.aggregate.aggregate(leeway_ms: float, max_count: int | None = None) Callable[[AggregateFunc[V, R]], Callable[[V], Coroutine[Any, Any, R]]] [source]#
Parametric decorator that aggregates multiple (but no more than
max_count
defaulting toNone
) single-argument executions (res1 = await func(arg1)
,res2 = await func(arg2)
, …) of an asynchronous function with variadic positional arguments (async def func(*args, pho=1, bo=2) -> Iterable
) into its single execution with multiple positional arguments (res1, res2, ... = await func(arg1, arg2, ...)
) collected within a time windowleeway_ms
.Note
func
must return a sequence of values of length equal to the number of arguments (and in the same order).Note
if some unexpected error occurs, exception is propagated to each future; to set an individual error for each aggregated call refer to
aggregate_async
.- Parameters:
leeway_ms – The maximum approximate delay between the first collected argument and the aggregated execution.
max_count – The maximum number of arguments to call decorated function with. Default
None
.
- Returns:
- aiomisc.aggregate.aggregate_async(leeway_ms: float, max_count: int | None = None) Callable[[AggregateAsyncFunc[V, R]], Callable[[V], Coroutine[Any, Any, R]]] [source]#
Same as
aggregate
, but withfunc
arguments of typeArg
containingvalue
andfuture
attributes instead. In this settingfunc
is responsible for setting individual results/exceptions for all of the futures or throwing an exception (it will propagate to futures automatically). Iffunc
mistakenly does not set a result of some future, then,ResultNotSetError
exception is set.- Returns:
1.2. aiomisc.backoff
module#
- class aiomisc.backoff.BackoffStatistic(name: str | None = None)[source]#
Bases:
Statistic
- attempts: int#
- cancels: int#
- done: int#
- errors: int#
- name: str | None#
- sum_time: float#
- class aiomisc.backoff.RetryStatistic(name: str | None = None)[source]#
Bases:
BackoffStatistic
- name: str | None#
- aiomisc.backoff.asyncbackoff(attempt_timeout: int | float | None, deadline: int | float | None, pause: int | float = 0, *exc: ~typing.Type[Exception], exceptions: ~typing.Tuple[~typing.Type[Exception], ...] = (), max_tries: int | None = None, giveup: ~typing.Callable[[Exception], bool] | None = None, statistic_name: str | None = None, statistic_class: ~typing.Type[~aiomisc.backoff.BackoffStatistic] = <class 'aiomisc.backoff.BackoffStatistic'>) Callable[[Callable[[P], Coroutine[Any, Any, T]]], Callable[[P], Coroutine[Any, Any, T]]] [source]#
Patametric decorator that ensures that
attempt_timeout
anddeadline
time limits are met by decorated function.In case of exception function will be called again with similar arguments after
pause
seconds.- Parameters:
statistic_name – name filed for statistic instances
attempt_timeout – is maximum execution time for one execution attempt.
deadline – is maximum execution time for all execution attempts.
pause – is time gap between execution attempts.
exc – retrying when this exceptions was raised.
exceptions – similar as exc but keyword only.
max_tries – is maximum count of execution attempts (>= 1).
giveup – is a predicate function which can decide by a given
statistic_class – statistic class
- aiomisc.backoff.asyncretry(max_tries: int | None, exceptions: ~typing.Tuple[~typing.Type[Exception], ...] = (<class 'Exception'>,), pause: int | float = 0, giveup: ~typing.Callable[[Exception], bool] | None = None, statistic_name: str | None = None) Callable[[Callable[[P], Coroutine[Any, Any, T]]], Callable[[P], Coroutine[Any, Any, T]]] [source]#
Shortcut of
asyncbackoff(None, None, 0, Exception)
.In case of exception function will be called again with similar arguments after
pause
seconds.- Parameters:
max_tries – is maximum count of execution attempts (>= 1 or
None
means infinity).exceptions – similar as exc but keyword only.
giveup – is a predicate function which can decide by a given
pause – is time gap between execution attempts.
statistic_name – name filed for statistic instances
1.3. aiomisc.circuit_breaker
module#
- class aiomisc.circuit_breaker.CircuitBreaker(error_ratio: float, response_time: int | float, exceptions: ~typing.Iterable[~typing.Type[Exception]] = (<class 'Exception'>,), recovery_time: int | float | None = None, broken_time: int | float | None = None, passing_time: int | float | None = None, exception_inspector: ~typing.Callable[[Exception], bool] | None = None, statistic_name: str | None = None)[source]#
Bases:
EventLoopMixin
Circuit Breaker pattern implementation. The class instance collects call statistics through the
call
orcall async
methods.The state machine has three states: *
CircuitBreakerStates.PASSING
*CircuitBreakerStates.BROKEN
*CircuitBreakerStates.RECOVERING
In passing mode all results or exceptions will be returned as is. Statistic collects for each call.
In broken mode returns exception
CircuitBroken
for each call. Statistic doesn’t collecting.In recovering mode the part of calls is real function calls and remainings raises
CircuitBroken
. The count of real calls grows exponentially in this case but when 20% (by default) will be failed the state returns to broken state.- Parameters:
error_ratio – Failed to success calls ratio. The state might be changed if ratio will reach given value within
response time
(in seconds). Value between 0.0 and 1.0.response_time – Time window to collect statistics (seconds)
exceptions – Only this exceptions will affect ratio. Base class
Exception
used by default.recovery_time – minimal time in recovery state (seconds)
broken_time – minimal time in broken state (seconds)
passing_time – minimum time in passing state (seconds)
- BUCKET_COUNT = 10#
- PASSING_BROKEN_THRESHOLD = 1#
- RECOVER_BROKEN_THRESHOLD = 0.5#
- property recovery_ratio: int | float#
- property response_time: int | float#
- property state: CircuitBreakerStates#
- class aiomisc.circuit_breaker.CircuitBreakerStates(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
IntEnum
- BROKEN = 1#
- PASSING = 0#
- RECOVERING = 2#
- class aiomisc.circuit_breaker.CircuitBreakerStatistic(name: str | None = None)[source]#
Bases:
Statistic
- call_broken: int#
- call_count: int#
- call_passing: int#
- call_recovering: int#
- call_recovering_failed: int#
- call_recovering_ok: int#
- error_ratio: float#
- error_ratio_threshold: float#
- name: str | None#
- exception aiomisc.circuit_breaker.CircuitBroken(last_exception: Exception | None)[source]#
Bases:
Exception
- last_exception#
- class aiomisc.circuit_breaker.CounterKey(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
IntEnum
- FAIL = 0#
- OK = 1#
- TOTAL = 2#
- aiomisc.circuit_breaker.cutout(ratio: float, response_time: int | float, *exceptions: Type[Exception], **kwargs: Any) Callable[[Callable[[...], T] | Callable[[...], Awaitable[T]]], Callable[[...], T | Awaitable[T]]] [source]#
- aiomisc.circuit_breaker.random() x in the interval [0, 1). #
1.4. aiomisc.compat
module#
- class aiomisc.compat.EntrypointProtocol(*args, **kwargs)[source]#
Bases:
Protocol
- property name: str#
- class aiomisc.compat.ParamSpec#
Bases:
object
Parameter specification variable.
The preferred way to construct a parameter specification is via the dedicated syntax for generic functions, classes, and type aliases, where the use of ‘**’ creates a parameter specification:
type IntFunc[**P] = Callable[P, int]
For compatibility with Python 3.11 and earlier, ParamSpec objects can also be created as follows:
P = ParamSpec('P')
Parameter specification variables exist primarily for the benefit of static type checkers. They are used to forward the parameter types of one callable to another callable, a pattern commonly found in higher-order functions and decorators. They are only valid when used in
Concatenate
, or as the first argument toCallable
, or as parameters for user-defined Generics. See class Generic for more information on generic types.An example for annotating a decorator:
def add_logging[**P, T](f: Callable[P, T]) -> Callable[P, T]: '''A type-safe decorator to add logging to a function.''' def inner(*args: P.args, **kwargs: P.kwargs) -> T: logging.info(f'{f.__name__} was called') return f(*args, **kwargs) return inner @add_logging def add_two(x: float, y: float) -> float: '''Add two numbers together.''' return x + y
Parameter specification variables can be introspected. e.g.:
>>> P = ParamSpec("P") >>> P.__name__ 'P'
Note that only parameter specification variables defined in the global scope can be pickled.
- args#
Represents positional arguments.
- kwargs#
Represents keyword arguments.
- class aiomisc.compat.Protocol[source]#
Bases:
Generic
Base class for protocol classes.
Protocol classes are defined as:
class Proto(Protocol): def meth(self) -> int: ...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example:
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as:
class GenProto[T](Protocol): def meth(self) -> T: ...
- aiomisc.compat.entry_pont_iterator(entry_point: str) Iterator[EntrypointProtocol] [source]#
- aiomisc.compat.final(f)[source]#
Decorator to indicate final methods and final classes.
Use this decorator to indicate to type checkers that the decorated method cannot be overridden, and decorated class cannot be subclassed.
For example:
class Base: @final def done(self) -> None: ... class Sub(Base): def done(self) -> None: # Error reported by type checker ... @final class Leaf: ... class Other(Leaf): # Error reported by type checker ...
There is no runtime checking of these properties. The decorator attempts to set the
__final__
attribute toTrue
on the decorated object to allow runtime introspection.
- aiomisc.compat.get_current_loop() CT #
- aiomisc.compat.set_current_loop(value: CT) None #
- aiomisc.compat.time_ns() int #
Return the current time in nanoseconds since the Epoch.
1.5. aiomisc.context
module#
1.6. aiomisc.counters
module#
- class aiomisc.counters.MetaStatistic(name: str, bases: Tuple[type, ...], dct: Dict[str, Any])[source]#
Bases:
type
- class aiomisc.counters.Metric(name: str, counter: MutableMapping[str, float | int], default: float | int = 0)[source]#
Bases:
object
- counter#
- name: str#
- class aiomisc.counters.Statistic(name: str | None = None)[source]#
Bases:
AbstractStatistic
- name: str | None#
- class aiomisc.counters.StatisticResult(kind: Type[aiomisc.counters.AbstractStatistic], name: str | None, metric: str, value: int | float)[source]#
Bases:
object
- kind: Type[AbstractStatistic]#
- metric: str#
- name: str | None#
- value: int | float#
1.7. aiomisc.cron
module#
- class aiomisc.cron.CronCallback(coroutine_func: Callable[[...], Awaitable[Any] | Any], *args: Any, **kwargs: Any)[source]#
Bases:
EventLoopMixin
Note
When the cron function executes longer then execution interval a next call will be skipping and warning will be logged.
- static get_next(cron: croniter, _: RecurringCallback) float [source]#
1.8. aiomisc.entrypoint
module#
- class aiomisc.entrypoint.Entrypoint(*services: ~aiomisc.service.base.Service, loop: ~asyncio.events.AbstractEventLoop | None = None, pool_size: int | None = None, log_level: int | str = 'info', log_format: str | ~aiomisc_log.enum.LogFormat = 'plain', log_buffering: bool = True, log_buffer_size: int = 1024, log_date_format: str | None = None, log_flush_interval: float = 0.2, log_config: bool = True, policy: ~asyncio.events.AbstractEventLoopPolicy = <uvloop.EventLoopPolicy object>, debug: bool = False, catch_signals: ~typing.Tuple[int, ...] | None = None, shutdown_timeout: int | float = 60.0)[source]#
Bases:
object
Creates a new Entrypoint
- Parameters:
debug – set debug to event-loop
loop – loop
services – Service instances which will be starting.
pool_size – thread pool size
log_level – Logging level which will be configured
log_format – Logging format which will be configured
log_buffer_size – Buffer size for logging
log_flush_interval – interval in seconds for flushing logs
log_config – if False do not configure logging
catch_signals – Perform shutdown when this signals will be received
shutdown_timeout – Timeout in seconds for graceful shutdown
- AIOMISC_SHUTDOWN_TIMEOUT: float = 60.0#
- DEFAULT_AIOMISC_BUFFERING: bool = True#
- DEFAULT_AIOMISC_BUFFER_SIZE: int = 1024#
- DEFAULT_AIOMISC_DEBUG: bool = False#
- DEFAULT_AIOMISC_LOG_CONFIG: bool = True#
- DEFAULT_AIOMISC_LOG_FLUSH: float = 0.2#
- DEFAULT_AIOMISC_POOL_SIZE: int | None = None#
- DEFAULT_LOG_DATE_FORMAT: str | None = None#
- DEFAULT_LOG_FORMAT: str = 'plain'#
- DEFAULT_LOG_LEVEL: str = 'info'#
- POST_START = <aiomisc.signal.Signal object>#
- POST_STOP = <aiomisc.signal.Signal object>#
- PRE_START = <aiomisc.signal.Signal object>#
- PRE_STOP = <aiomisc.signal.Signal object>#
- classmethod get_current() Entrypoint [source]#
- property loop: AbstractEventLoop#
- property services: Tuple[Service, ...]#
- aiomisc.entrypoint.entrypoint#
alias of
Entrypoint
1.9. aiomisc.io
module#
- class aiomisc.io.AsyncBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
Bases:
AsyncFileIO
[bytes
]- executor#
- class aiomisc.io.AsyncBz2BinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
Bases:
AsyncBinaryIO
- executor#
- class aiomisc.io.AsyncBz2TextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
Bases:
AsyncTextIO
- executor#
- class aiomisc.io.AsyncFileIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
Bases:
EventLoopMixin
,Generic
- executor#
- property fp: IO#
- property mode: str#
- property name: str#
- classmethod open_fp(fp: IO, executor: Executor | None = None, loop: AbstractEventLoop | None = None) AsyncFileIO [source]#
- class aiomisc.io.AsyncGzipBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
Bases:
AsyncBinaryIO
- executor#
- class aiomisc.io.AsyncGzipTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
Bases:
AsyncTextIO
- executor#
- class aiomisc.io.AsyncLzmaBinaryIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
Bases:
AsyncBinaryIO
- executor#
- class aiomisc.io.AsyncLzmaTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
Bases:
AsyncTextIO
- executor#
- class aiomisc.io.AsyncTextIO(fname: str | Path, mode: str = 'r', executor: Executor | None = None, *args: Any, loop: AbstractEventLoop | None = None, **kwargs: Any)[source]#
Bases:
AsyncFileIO
[str
]- buffer() AsyncBinaryIO [source]#
- property encoding: str#
- property errors: str | None#
- executor#
- property fp: TextIO#
- property line_buffering: int#
- property newlines: Any#
- class aiomisc.io.Compression(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Enum
- BZ2 = (<class 'aiomisc.io.AsyncBz2BinaryIO'>, <class 'aiomisc.io.AsyncBz2TextIO'>)#
- GZIP = (<class 'aiomisc.io.AsyncGzipBinaryIO'>, <class 'aiomisc.io.AsyncGzipTextIO'>)#
- LZMA = (<class 'aiomisc.io.AsyncLzmaBinaryIO'>, <class 'aiomisc.io.AsyncLzmaTextIO'>)#
- NONE = (<class 'aiomisc.io.AsyncBinaryIO'>, <class 'aiomisc.io.AsyncTextIO'>)#
- aiomisc.io.async_open(fname: FilePath, mode: BinaryModes, compression: Compression = Compression.NONE, encoding: str = sys.getdefaultencoding(), *args: Any, **kwargs: Any) AsyncBinaryIO [source]#
- aiomisc.io.async_open(fname: FilePath, mode: TextModes, compression: Compression = Compression.NONE, encoding: str = sys.getdefaultencoding(), *args: Any, **kwargs: Any) AsyncTextIO
1.10. aiomisc.iterator_wrapper
module#
- class aiomisc.iterator_wrapper.DequeWrapper[source]#
Bases:
QueueWrapperBase
- queue: Deque[Any]#
- class aiomisc.iterator_wrapper.FromThreadChannel(maxsize: int = 0)[source]#
Bases:
object
- SLEEP_DIFFERENCE_DIVIDER = 10#
- SLEEP_LOW_THRESHOLD = 0.0001#
- property is_closed: bool#
- queue: QueueWrapperBase#
- class aiomisc.iterator_wrapper.IteratorProxy(iterator: AsyncIterator, finalizer: Callable[[], Any])[source]#
Bases:
AsyncIterator
- class aiomisc.iterator_wrapper.IteratorWrapper(gen_func: Callable[[], Generator[T, R, None]], loop: AbstractEventLoop | None = None, max_size: int = 0, executor: Executor | None = None, statistic_name: str | None = None)[source]#
Bases:
AsyncIterator
,EventLoopMixin
- property closed: bool#
- executor#
- class aiomisc.iterator_wrapper.IteratorWrapperStatistic(name: str | None = None)[source]#
Bases:
Statistic
- enqueued: int#
- name: str | None#
- queue_length: int#
- queue_size: int#
- started: int#
- yielded: int#
- class aiomisc.iterator_wrapper.QueueWrapper(max_size: int)[source]#
Bases:
QueueWrapperBase
- queue: Queue#
- aiomisc.iterator_wrapper.make_queue(max_size: int = 0) QueueWrapperBase [source]#
1.11. aiomisc.log
module#
- class aiomisc.log.LogFormat(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
IntEnum
- color = 1#
- disabled = -1#
- journald = 5#
- json = 2#
- plain = 4#
- rich = 6#
- rich_tb = 7#
- stream = 0#
- syslog = 3#
1.12. aiomisc.periodic
module#
- class aiomisc.periodic.PeriodicCallback(coroutine_func: Callable[[...], Awaitable[Any] | Any], *args: Any, **kwargs: Any)[source]#
Bases:
EventLoopMixin
Note
When the periodic function executes longer then execution interval a next call would be skipped and warning would be logged.
1.13. aiomisc.plugins
module#
1.14. aiomisc.pool
module#
- class aiomisc.pool.ContextManager(aenter: Callable[[...], Awaitable[T]], aexit: Callable[[...], Awaitable[T]])[source]#
Bases:
AsyncContextManager
- sentinel = <object object>#
- class aiomisc.pool.PoolBase(maxsize: int = 10, recycle: int | None = None)[source]#
Bases:
ABC
,EventLoopMixin
,Generic
[T
]
- aiomisc.pool.random() x in the interval [0, 1). #
1.15. aiomisc.process_pool
module#
- class aiomisc.process_pool.ProcessPoolExecutor(max_workers: int = 4, **kwargs: Any)[source]#
Bases:
ProcessPoolExecutor
,EventLoopMixin
Initializes a new ProcessPoolExecutor instance.
- Args:
- max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many worker processes will be created as the machine has processors.
- mp_context: A multiprocessing context to launch the workers created
using the multiprocessing.get_context(‘start method’) API. This object should provide SimpleQueue, Queue and Process.
initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. max_tasks_per_child: The maximum number of tasks a worker process
can complete before it will exit and be replaced with a fresh worker process. The default of None means worker process will live as long as the executor. Requires a non-‘fork’ mp_context start method. When given, we default to using ‘spawn’ if no mp_context is supplied.
- DEFAULT_MAX_WORKERS = 4#
1.16. aiomisc.recurring
module#
- class aiomisc.recurring.RecurringCallback(coroutine_func: Callable[[...], Awaitable[Any] | Any], *args: Any, **kwargs: Any)[source]#
Bases:
object
- args: Tuple[Any, ...]#
- func: Callable[[...], Awaitable[Any]]#
- kwargs: Mapping[str, Any]#
- name: str#
- start(strategy: Callable[[RecurringCallback], int | float | Awaitable[int] | Awaitable[float]], loop: AbstractEventLoop | None = None, *, shield: bool = False, suppress_exceptions: Tuple[Type[Exception], ...] = ()) Task [source]#
- class aiomisc.recurring.RecurringCallbackStatistic(name: str | None = None)[source]#
Bases:
Statistic
- call_count: int#
- done: int#
- fail: int#
- name: str | None#
- sum_time: float#
- exception aiomisc.recurring.StrategySkip(next_attempt_delay: int | float)[source]#
Bases:
StrategyException
Strategy function might raise this exception as way to skip current call
- exception aiomisc.recurring.StrategyStop[source]#
Bases:
StrategyException
Strategy function might raise this exception as way to stop recurring
1.17. aiomisc.signal
module#
1.18. aiomisc.thread_pool
module#
- class aiomisc.thread_pool.CoroutineWaiter(coroutine: Coroutine[Any, Any, T], loop: AbstractEventLoop | None = None)[source]#
Bases:
object
- class aiomisc.thread_pool.IteratorWrapperSeparate(gen_func: Callable[[], Generator[T, R, None]], loop: AbstractEventLoop | None = None, max_size: int = 0, executor: Executor | None = None, statistic_name: str | None = None)[source]#
Bases:
IteratorWrapper
- executor#
- class aiomisc.thread_pool.TaskChannel[source]#
Bases:
SimpleQueue
- closed_event: Event#
- get(*args: Any, **kwargs: Any) WorkItem [source]#
Remove and return an item from the queue.
If optional args ‘block’ is true and ‘timeout’ is None (the default), block if necessary until an item is available. If ‘timeout’ is a non-negative number, it blocks at most ‘timeout’ seconds and raises the Empty exception if no item was available within that time. Otherwise (‘block’ is false), return an item if one is immediately available, else raise the Empty exception (‘timeout’ is ignored in that case).
- class aiomisc.thread_pool.ThreadPoolExecutor(max_workers: int = 4, statistic_name: str | None = None)[source]#
Bases:
ThreadPoolExecutor
Initializes a new ThreadPoolExecutor instance.
- Args:
- max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer.
- DEFAULT_POOL_SIZE = 4#
- SHUTDOWN_TIMEOUT = 10#
- shutdown(wait: bool = True) None [source]#
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Args:
- wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the executor have been reclaimed.
- cancel_futures: If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be cancelled.
- class aiomisc.thread_pool.ThreadPoolStatistic(name: str | None = None)[source]#
Bases:
Statistic
- done: int#
- error: int#
- name: str | None#
- submitted: int#
- success: int#
- sum_time: float#
- threads: int#
- class aiomisc.thread_pool.WorkItem(func: ~typing.Callable[[...], ~typing.Any], statistic: ~aiomisc.thread_pool.ThreadPoolStatistic, future: ~_asyncio.Future, loop: ~asyncio.events.AbstractEventLoop, args: ~typing.Tuple[~typing.Any, ...] = <factory>, kwargs: ~typing.Dict[str, ~typing.Any] = <factory>, context: ~_contextvars.Context = <factory>)[source]#
Bases:
WorkItemBase
- class aiomisc.thread_pool.WorkItemBase(func: Callable[..., Any], statistic: aiomisc.thread_pool.ThreadPoolStatistic, future: _asyncio.Future, loop: asyncio.events.AbstractEventLoop, args: Tuple[Any, ...] = <factory>, kwargs: Dict[str, Any] = <factory>, context: _contextvars.Context = <factory>)[source]#
Bases:
object
- args: Tuple[Any, ...]#
- context: Context#
- func: Callable[[...], Any]#
- future: Future#
- kwargs: Dict[str, Any]#
- loop: AbstractEventLoop#
- statistic: ThreadPoolStatistic#
- aiomisc.thread_pool.run_in_executor(func: Callable[[...], T], executor: ThreadPoolExecutor | None = None, args: Any = (), kwargs: Any = mappingproxy({})) Awaitable[T] [source]#
- aiomisc.thread_pool.run_in_new_thread(func: F, args: Any = (), kwargs: Any = mappingproxy({}), detach: bool = True, no_return: bool = False, statistic_name: str | None = None) Future [source]#
- aiomisc.thread_pool.sync_await(func: Callable[[...], Awaitable[T]], *args: Any, **kwargs: Any) T [source]#
- aiomisc.thread_pool.sync_wait_coroutine(loop: AbstractEventLoop | None, coro_func: Callable[[...], Coroutine[Any, Any, T]], *args: Any, **kwargs: Any) T [source]#
- aiomisc.thread_pool.thread_pool_thread_loop(tasks: TaskChannel, statistic: ThreadPoolStatistic, stop_event: Event, pool_shutdown_event: Event) None [source]#
- aiomisc.thread_pool.threaded_iterable_separate(func: F | None = None, max_size: int = 0) Any [source]#
1.19. aiomisc.timeout
module#
1.20. aiomisc.utils
module#
- class aiomisc.utils.SelectAwaitable(*awaitables: Awaitable[T], return_exceptions: bool = False, cancel: bool = True, timeout: int | float | None = None, wait: bool = True, loop: AbstractEventLoop | None = None)[source]#
Bases:
object
Select one of passed awaitables
- Parameters:
awaitables – awaitable objects
return_exceptions – if True exception will not be raised just returned as result
cancel – cancel unfinished coroutines (default True)
timeout – execution timeout
wait – when False and
cancel=True
, unfinished coroutines will be cancelled in the background.loop – event loop
- property loop: AbstractEventLoop#
- class aiomisc.utils.SelectResult(length: int)[source]#
Bases:
Collection
- is_exception: bool | None#
- length#
- result_idx: int | None#
- value: Any#
- aiomisc.utils.awaitable(func: Callable[[...], AT | Awaitable[AT]]) Callable[[...], Awaitable[AT]] [source]#
Decorator wraps function and returns a function which returns awaitable object. In case than a function returns a future, the original future will be returned. In case then the function returns a coroutine, the original coroutine will be returned. In case than function returns non-awaitable object, it will be wrapped to a new coroutine which just returns this object. It’s useful when you don’t want to check function result before use it in
await
expression.
- aiomisc.utils.bind_socket(*args: Any, address: str, port: int = 0, options: Iterable[Tuple[int, int, int]] = (), reuse_addr: bool = True, reuse_port: bool = True, proto_name: str | None = None) socket [source]#
Bind socket and set
setblocking(False)
for just created socket. This detectsaddress
format and select socket family automatically.- Parameters:
args – which will be passed to stdlib’s socket constructor (optional)
address – bind address
port – bind port
options – Tuple of pairs which contain socket option to set and the option value.
reuse_addr – set socket.SO_REUSEADDR
reuse_port – set socket.SO_REUSEPORT
proto_name – protocol name which will be logged after binding
- Returns:
socket.socket
- aiomisc.utils.cancel_tasks(tasks: Iterable[Future]) Future [source]#
All passed tasks will be cancelled and a new task will be returned.
- Parameters:
tasks – tasks which will be cancelled
- aiomisc.utils.chunk_list(iterable: Iterable[T], size: int) Iterable[List[T]] [source]#
Split list or generator by chunks with fixed maximum size.
- aiomisc.utils.create_default_event_loop(pool_size: int | None = None, policy: ~asyncio.events.AbstractEventLoopPolicy = <uvloop.EventLoopPolicy object>, debug: bool = False) Tuple[AbstractEventLoop, ThreadPoolExecutor] [source]#
Creates an event loop and thread pool executor
- Parameters:
pool_size – thread pool maximal size
policy – event loop policy
debug – set
loop.set_debug(True)
if True
- aiomisc.utils.getrandbits(k) x. Generates an int with k random bits. #
- aiomisc.utils.new_event_loop(pool_size: int | None = None, policy: ~asyncio.events.AbstractEventLoopPolicy = <uvloop.EventLoopPolicy object>) AbstractEventLoop [source]#
- aiomisc.utils.select#
alias of
SelectAwaitable
1.21. aiomisc.worker_pool
module#
- class aiomisc.worker_pool.WorkerPool(workers: int, max_overflow: int = 0, *, initializer: Callable[[...], Any] | None = None, initializer_args: Tuple[Any, ...] = (), initializer_kwargs: Mapping[str, Any] = mappingproxy({}), statistic_name: str | None = None)[source]#
Bases:
object
- SERVER_CLOSE_TIMEOUT = 1#
- address: str | Tuple[str, int]#
- initializer: Callable[[], Any] | None#
- initializer_args: Tuple[Any, ...]#
- initializer_kwargs: Mapping[str, Any]#
- property loop: AbstractEventLoop#
- pids: Set[int]#
- server: AbstractServer#
- tasks: Queue#
- worker_ids: Tuple[bytes, ...]#