2.17. WorkerPool
#
Python has the multiprocessing
module with Pool
class which
implements a similar worker pool. The IPC in this case uses a completely
synchronous communication method. This module reimplements the process-based
worker pool but IPC is completely asynchronous on the caller side,
meanwhile, workers in separate processes aren’t asynchronous.
2.17.1. Example#
This would be useful when you want to process the data in a separate process, and the input and output data are not very large. Otherwise, it will work fine, of course, but you would have to spend time transmitting the data over IPC.
A good example is parallel image processing. Of course, you can transfer bytes of images through the IPC of the working pool, but in general, case passing the file name to the worker will be better. The exception will be cases when the image payload is significantly smaller the 1KB for example.
Let’s write a program that accepts images in JPEG format and creates thumbnails for this. In this case, you have a file with the original image, and you should generate the output path for the ‘thumbnail’ function.
Note
You have to install the Pillow image processing library to run this example.
Installing pillow with pip:
pip install Pillow
import asyncio
import sys
from multiprocessing import cpu_count
from typing import Tuple
from pathlib import Path
from PIL import Image
from aiomisc import entrypoint, WorkerPool
def thumbnail(src_path: str, dest_path: str, box: Tuple[int, int]):
img = Image.open(src_path)
img.thumbnail(box)
img.save(
dest_path, "JPEG", quality=65,
optimize=True,
icc_profile=img.info.get('icc_profile'),
exif=img.info.get('exif'),
)
return img.size
sizes = [
(1024, 1024),
(512, 512),
(256, 256),
(128, 128),
(64, 64),
(32, 32),
]
async def amain(path: Path):
# Create directories
for size in sizes:
size_dir = "x".join(map(str, size))
size_path = (path / 'thumbnails' / size_dir)
size_path.mkdir(parents=True, exist_ok=True)
# Create and run WorkerPool
async with WorkerPool(cpu_count()) as pool:
tasks = []
for image in path.iterdir():
if not image.name.endswith(".jpg"):
continue
if image.is_relative_to(path / 'thumbnails'):
continue
for size in sizes:
rel_path = image.relative_to(path).parent
size_dir = "x".join(map(str, size))
dest_path = (
path / rel_path /
'thumbnails' / size_dir /
image.name
)
tasks.append(
pool.create_task(
thumbnail,
str(image),
str(dest_path),
size
)
)
await asyncio.gather(*tasks)
if __name__ == '__main__':
with entrypoint() as loop:
image_dir = Path(sys.argv[1])
loop.run_until_complete(amain(image_dir))
This example takes the image directory as the first command-line argument and
creates directories for the thumbnails. After that, a WorkerPool
is started
with as many processes as the processor has cores.
The main process creates tasks for the workers, each task is a conversion of
one file to one size, after which all tasks fall into the WorkerPool
instance.
The WorkerPool
processes the tasks concurrently, but only one job for one
a worker at the same time.