Source code for aiomisc.process_pool
import asyncio
from concurrent.futures import Future
from concurrent.futures import ProcessPoolExecutor as ProcessPoolExecutorBase
from multiprocessing import cpu_count
from typing import Any
from .compat import EventLoopMixin
from .counters import Statistic
[docs]class ProcessPoolStatistic(Statistic):
processes: int
done: int
error: int
success: int
submitted: int
sum_time: float
[docs]class ProcessPoolExecutor(ProcessPoolExecutorBase, EventLoopMixin):
DEFAULT_MAX_WORKERS = max((cpu_count(), 4))
def __init__(self, max_workers: int = DEFAULT_MAX_WORKERS, **kwargs: Any):
super().__init__(max_workers=max_workers, **kwargs)
self._statistic = ProcessPoolStatistic()
self._statistic.processes = max_workers
def _statistic_callback(
self,
future: Future,
start_time: float,
loop: asyncio.AbstractEventLoop,
) -> None:
if future.exception():
self._statistic.error += 1
else:
self._statistic.success += 1
self._statistic.done += 1
self._statistic.sum_time += loop.time() - start_time
[docs] def submit(self, *args: Any, **kwargs: Any) -> Future:
"""Submit blocking function to the pool"""
loop = asyncio.get_running_loop()
start_time = loop.time()
future = super().submit(*args, **kwargs)
self._statistic.submitted += 1
future.add_done_callback(
lambda f: self._statistic_callback(f, start_time, loop),
)
return future
def __del__(self) -> None:
self.shutdown()