Source code for aplex.process
"""Inherits base classes in executor.py for multiprocessing."""
import multiprocessing
import signal
from multiprocessing.pool import ExceptionWithTraceback
from .executor import (BaseAsyncPoolExecutor, BaseWorkerManager,
BaseWorkerMixin, ResultItem)
__all__ = ['ProcessAsyncPoolExecutor']
class _ProcessWorker(BaseWorkerMixin, multiprocessing.Process):
def __init__(self, *args, **kwargs):
# Not to use pipe for the sake of ForkingPickler in SimpleQueue.
self._submit_queue = multiprocessing.SimpleQueue()
super().__init__(*args, **kwargs)
def run(self):
# Ignore SIGINT as worker processes should be closed by executor.
signal.signal(signal.SIGINT, signal.SIG_IGN)
super().run()
def _make_picklable(self, stuff):
if isinstance(stuff, BaseException) and stuff.__traceback__:
return ExceptionWithTraceback(stuff, stuff.__traceback__)
return stuff
class _ProcessWorkerManager(BaseWorkerManager):
_Worker = _ProcessWorker
def __init__(self, *args, **kwargs):
# All process workers share a common result queue. Not for
# submit queue.
self._result_queue = multiprocessing.SimpleQueue()
super().__init__(*args, **kwargs)
# For listening result queue pipe reader and child process sentinel.
self._listeners = [self._result_queue._reader]
for worker in self._workers:
self._listeners.append(worker.sentinel)
def wait_return_item(self) -> ResultItem:
ready = multiprocessing.connection.wait(self._listeners)
if self._result_queue._reader not in ready:
# Process will put CLOSING to result queue right before
# it closes and thus be removed from listening list. If not,
# process does not closed in the right way.
raise ConnectionAbortedError()
return self._result_queue._reader.recv() # No need the read lock.
def remove_closed_worker(self, worker_id):
target = super().remove_closed_worker(worker_id)
self._listeners.remove(target.sentinel)
[docs]class ProcessAsyncPoolExecutor(BaseAsyncPoolExecutor):
_WorkerManager = _ProcessWorkerManager