Source code for aiomisc_worker.forking

import atexit
import gc
import os
import signal
import sys
from contextlib import suppress
from threading import Event, Lock
from typing import Any, MutableMapping, Optional, Tuple

from aiomisc_log import basic_config

from . import INT_SIGNAL, AddressType, log
from .protocol import FileIOProtocol
from .worker import bad_initializer, worker


PROCESSES: MutableMapping[int, bytes] = {}
STOPPING = Event()
EXIT_LOCK = Lock()


[docs]def at_exit() -> None: global PROCESSES with EXIT_LOCK: pids = tuple(PROCESSES.keys()) PROCESSES.clear() for pid in pids: log.debug("Terminating PID: %d", pid) with suppress(Exception): os.kill(pid, signal.SIGTERM)
[docs]def handle_interrupt(*_: Any) -> None: STOPPING.set() raise KeyboardInterrupt
DEFAULT_SIGNAL_HANDLER = signal.getsignal(INT_SIGNAL)
[docs]def fork(worker_id: bytes, cookie: bytes, address: AddressType) -> None: global PROCESSES gc.disable() pid = os.fork() gc.enable() if pid != 0: # In master process PROCESSES[pid] = worker_id log.debug("Started child process PID: %d", pid) return # In child process PROCESSES = {} signal.signal(INT_SIGNAL, DEFAULT_SIGNAL_HANDLER) worker(address, cookie, worker_id) raise SystemExit(0)
[docs]def main() -> int: global STOPPING proto_stdin = FileIOProtocol(sys.stdin.buffer) log_level, log_format = proto_stdin.receive() basic_config(level=log_level, log_format=log_format) address: AddressType = proto_stdin.receive() cookie: bytes = proto_stdin.receive() worker_ids: Tuple[bytes, ...] = proto_stdin.receive() initializer, initializer_args, initializer_kwargs = proto_stdin.receive() worker_id: Optional[bytes] def run_initializer() -> None: # Saving the initializer result and prevent freeing it if not initializer: return # noinspection PyBroadException try: initializer(*initializer_args, **initializer_kwargs) except BaseException as e: log.exception( "WorkerPool initializer %r has been failed", initializer, ) bad_initializer(address, cookie, worker_ids[0], e) raise SystemExit(0) sys.stdin.close() del proto_stdin run_initializer() signal.signal(INT_SIGNAL, handle_interrupt) atexit.register(at_exit) log.debug("Forking %d processes", len(worker_ids)) for worker_id in worker_ids: fork(worker_id, cookie, address) log.debug("Waiting workers") while not STOPPING.is_set(): try: pid, status = os.wait() except KeyboardInterrupt: return exit(0) log_func = log.debug if status == 0 else log.warning log_func("Worker PID: %d exited with status %d", pid, status) if status == 0: continue worker_id = PROCESSES.pop(pid, None) if worker_id is None: continue fork(worker_id, cookie, address) return 0