"""Contains executor implementation for aplex."""
import asyncio
import atexit
import concurrent.futures
import enum
import inspect
import itertools
import os
import queue
import signal
import threading
import time
import warnings
import weakref
from typing import (Any, AsyncIterator, Callable, Dict, Generator,
Iterable, List, Optional, Set, Tuple, Union)
from . import compat
from .futures import ConcurrentFuture, AsyncioFuture
from .load_balancers import LoadBalancer, RoundRobin
if compat.PY36:
from typing import AsyncGenerator
DEFAULT_POOL_SIZE = os.cpu_count() or 2 # os.cpu_count() may return None.
DEFAULT_MAX_WORKS_PER_WORKER = 300
# XXX(Lun): Let users choose whether to shut down if broken or exception
# happens in worker? Below are both true for now.
# SHUTDOWN_IF_BROKEN = False
# SHUTDOWN_IF_EXCEPTION = False
_executor_track_set_lock = threading.Lock()
_executor_track_set = weakref.WeakSet() # type: Set[BaseAsyncPoolExecutor]
class AplexWorkerError(OSError):
"""The executor are broken or an BaseException raised."""
class HandlerCommands(enum.Enum):
"""Commands sent from handler to workers."""
CANCEL = 'Cancel'
CLOSE = 'Close'
class WorkStates(enum.Enum):
"""States of work in workers."""
SUCCESS = 'Success'
CANCELLED = 'Cancelled'
EXCEPTION = 'Exception'
class WorkerStates(enum.Enum):
"""States of workers."""
BROKEN = 'Broken' # For worker closed due to unknown reason.
ERROR = 'Error' # For BaseException raised in worker.
CLOSING = 'Closing'
class SubmitForms(enum.Enum):
"""Forms of submit items."""
WORK = 'Work'
COMMAND = 'Command'
class ResultForms(enum.Enum):
"""Forms of result items."""
WORK_STATE = 'Work state'
WORKER_STATE = 'Worker state'
class SubmitItem(object):
"""Object sent from executor to workers"""
__slots__ = (['form'] +
['work', 'args', 'kwargs',
'work_id', 'load_balancing_meta'] +
['command', 'command_meta'])
@classmethod
def work_form(cls,
work: Callable,
args: Tuple[Any],
kwargs: Dict[str, Any],
work_id: int,
load_balancing_meta: Optional[Any] = None
) -> 'SubmitItem':
"""Generates a submit item in work form."""
item = cls.__new__(cls)
item.form = SubmitForms.WORK
item.work = work # Named "work" to differentiate from asyncio task.
item.args = args
item.kwargs = kwargs
item.work_id = work_id
item.load_balancing_meta = load_balancing_meta
return item
@classmethod
def command_form(cls,
command: HandlerCommands,
command_meta: Optional[Any] = None
) -> 'SubmitItem':
"""Generates a submit item in command form."""
item = cls.__new__(cls)
item.form = SubmitForms.COMMAND
item.command = command
item.command_meta = command_meta
return item
class ResultItem(object):
"""Object received from workers."""
__slots__ = (['form'] +
['work_id', 'work_state', 'work_state_meta'] +
['worker_state', 'worker_state_meta'])
@classmethod
def work_state_form(cls,
work_id: int,
work_state: WorkStates,
work_state_meta: Optional[Any] = None,
) -> 'ResultItem':
"""Generates a result item in work state form."""
item = cls.__new__(cls)
item.form = ResultForms.WORK_STATE
item.work_id = work_id
item.work_state = work_state
item.work_state_meta = work_state_meta
return item
@classmethod
def worker_state_form(cls,
worker_state: WorkerStates,
worker_state_meta: Optional[Any] = None
) -> 'ResultItem':
"""Generates a result item in worker state form."""
item = cls.__new__(cls)
item.form = ResultForms.WORKER_STATE
item.worker_state = worker_state
item.worker_state_meta = worker_state_meta
return item
class BaseWorkerMixin(object):
"""Implementation of worker.
This is a mixin for threading.Thread and multiprocessing.Process.
"""
_submit_queue = None
def __init__(self, result_queue, loop_factory=None):
super().__init__()
self._result_queue = result_queue
self._loop_factory = loop_factory
def submit(self, item: SubmitItem):
self._submit_queue.put(item)
def cancel_work_by_id(self, work_id):
self._submit_queue.put(
SubmitItem.command_form(HandlerCommands.CANCEL,
command_meta=work_id))
def close(self):
""""""
self._submit_queue.put(
SubmitItem.command_form(HandlerCommands.CLOSE, None))
def clear(self):
self._submit_queue = None
self._result_queue = None
def run(self):
"""Target for threading.Thread and multiprocessing.Process."""
# Subtasks generated in corouines are not included to prevent
# from cancel a task twice. Users have to handle CancelledError
# properly to cancel their subtasks.
running_tasks = {}
def cancel(work_id):
# Although not creating task for callables that are not
# a coroutine function, The work id here must map to
# a task, since only this type of work is allowed
# to call cancel interface.
try:
running_tasks[work_id].cancel()
except KeyError:
pass # Task has been done and popped.
def run_uncancellable(uncancellable, args, kwargs, work_id):
"""Runs uncancellable, i.e., works that are not coroutine
functions.
"""
try:
result = uncancellable(*args, **kwargs)
except Exception as e:
item = ResultItem.work_state_form(work_id,
WorkStates.EXCEPTION,
self._make_picklable(e))
else:
item = ResultItem.work_state_form(work_id,
WorkStates.SUCCESS,
result)
self._send_result_back(work_id, item)
def task_done_callback(task):
work_id = task.work_id
running_tasks.pop(work_id)
if task.cancelled():
item = ResultItem.work_state_form(work_id,
WorkStates.CANCELLED,
None)
else:
exception = task.exception()
if exception is not None:
exception = self._make_picklable(exception)
item = ResultItem.work_state_form(work_id,
WorkStates.EXCEPTION,
exception)
else:
item = ResultItem.work_state_form(work_id,
WorkStates.SUCCESS,
task.result())
self._send_result_back(work_id, item)
def run_coroutine(coro_function, args, kwargs, work_id):
"""Creates a task of the coroutine generated from
coroutine function work.
"""
task = loop.create_task(coro_function(*args, **kwargs))
task.work_id = work_id
running_tasks[work_id] = task
task.add_done_callback(task_done_callback)
# TODO(Lun): Wrap this with try-except to catch
# unexpected exception and than close worker.
def handle_submit_queue(loop):
"""Target for the thread handling submit queue."""
while True:
item = self._submit_queue.get()
# Prevents from doing anything after a BaseException raises.
if item is None:
return
if item.form == SubmitForms.COMMAND:
# XXX(Lun): Use dict as switch in C if too many cases.
if item.command == HandlerCommands.CLOSE:
loop.call_soon_threadsafe(loop.stop)
return
elif item.command == HandlerCommands.CANCEL:
work_id = item.command_meta
loop.call_soon_threadsafe(cancel, work_id)
continue
assert False, 'Should be unreachable here.'
if not inspect.iscoroutinefunction(item.work):
# Not create future for a callable that is not a
# coroutine function since it can not be cancelled.
loop.call_soon_threadsafe(
run_uncancellable,
item.work, item.args, item.kwargs, item.work_id)
else:
loop.call_soon_threadsafe(
run_coroutine,
item.work, item.args, item.kwargs, item.work_id)
def cancel_tasks(*tasks):
if not tasks:
return
for task in tasks:
task.cancel()
loop.run_until_complete(
asyncio.gather(*tasks, return_exceptions=True))
try:
if self._loop_factory is None:
loop = asyncio.new_event_loop()
else:
loop = self._loop_factory()
asyncio.set_event_loop(loop)
handle_submit_thread = threading.Thread(
target=handle_submit_queue, args=(loop,), daemon=True)
handle_submit_thread.start()
loop.run_forever()
except BaseException as e:
# Close handle submit thread so that any command from
# handler is not allowed.
self._submit_queue.put(None) # Close handle submit thread.
self._result_queue.put(
ResultItem.worker_state_form(WorkerStates.ERROR,
self._make_picklable(e)))
finally:
try:
cancel_tasks(*running_tasks.values())
# Cancels tasks the user forgot to handle when
# asyncio.CancelledError is raised.
if compat.PY37:
# Must pass loop for get_running_loop().
orphan_tasks = asyncio.tasks.all_tasks(loop)
else:
orphan_tasks = asyncio.Task.all_tasks()
cancel_tasks(*orphan_tasks)
if compat.PY36:
loop.run_until_complete(loop.shutdown_asyncgens())
# Run scheduled callbacks.
loop.call_soon(loop.stop)
loop.run_forever()
# Should report only when close command is received and
# cancellation succeeds. See annotation in below except
# BaseException clause.
self._result_queue.put(
ResultItem.worker_state_form(WorkerStates.CLOSING,
self.ident))
except BaseException as e:
# Cancellation fails with new BaseException when closing
# or fails due to previous BaseException. See details in
# asyncio.Task._wakeup. The previous BaseException would
# raised again due to `future.result()``.
self._result_queue.put(
ResultItem.worker_state_form(WorkerStates.ERROR,
self._make_picklable(e)))
finally:
loop.close()
def _send_result_back(self, work_id, item):
try:
self._result_queue.put(item)
except TypeError as e:
# Item unpicklable.
item = ResultItem.work_state_form(work_id,
WorkStates.EXCEPTION,
self._make_picklable(e))
self._result_queue.put(item)
def _make_picklable(self, stuff):
"""Makes some stuff picklable or not loss information when pickling.
This should be overwritten by ProcessWorker subclass.
"""
return stuff
class BaseWorkerManager(object):
"""A manager of worker, or in other words, a broker."""
# All workers of a executor share a common result queue.
_result_queue = None
_Worker = None
def __init__(self,
pool_size,
max_works_per_worker,
load_balancer,
worker_loop_factory):
self._workers = []
for __ in range(pool_size):
worker = self._Worker(self._result_queue, worker_loop_factory)
worker.start()
self._workers.append(worker)
self._work_owner = {} # To find worker with work_id.
# Dict for load balancing, shared with self._load_balancer.
self._workloads = dict.fromkeys(self._workers, 0)
self._load_balancer = load_balancer(self._workers,
self._workloads,
max_works_per_worker)
def submit(self, item: SubmitItem, future, load_balancing_meta):
work_id = item.work_id
worker = self._load_balancer.get_proper_worker(load_balancing_meta)
# Caculates workload here.
self._work_owner[work_id] = worker
self._workloads[worker] += 1
def work_done_callback(done_future):
worker = self._work_owner.pop(work_id)
self._workloads[worker] -= 1
future.add_done_callback(work_done_callback)
worker.submit(item)
def cancel_work(self, future: ConcurrentFuture):
work_id = id(future)
worker = self._work_owner[work_id]
worker.cancel_work_by_id(work_id)
def wait_return_item(self) -> ResultItem:
"""Waits for workers until result item is returned
Returns:
Result item from some worker.
Raises:
ConnectionAbortedError: If some workers are broken.
"""
raise NotImplementedError()
def remove_closed_worker(self, worker_id):
for worker in self._workers:
if worker.ident == worker_id:
target = worker
break
assert self._workloads[target] == 0
target.clear()
self._workers.remove(target)
del self._workloads[target]
return target # For subclass' super(). No need to find it again.
def are_all_workers_closed(self):
return False if self._workers else True
def wakeup(self):
"""To wake up form wait_return_item method."""
self._result_queue.put(None)
def close(self):
for worker in self._workers:
worker.close()
def join(self):
# XXX(Lun): Join removed worker?
for worker in self._workers:
worker.join()
def clear(self):
self._result_queue = None
for worker in self._workers:
worker.clear()
self._workers.clear()
self._workloads.clear()
class _HandlerThread(threading.Thread):
"""Handler of executor."""
def __init__(self, executor, pool_size, max_works_per_worker,
load_balancer, worker_loop_factory,
WorkerManager):
super().__init__()
self._queue = queue.Queue()
self._executor = weakref.ref(executor)
self._running_futures = {}
self._max_runnings = pool_size * max_works_per_worker
self._worker_manager = WorkerManager(pool_size,
max_works_per_worker,
load_balancer,
worker_loop_factory)
self._executor_shutdown_called = False
self._close_lock = threading.Lock()
def submit(self, item: Tuple[SubmitItem, ConcurrentFuture, Any]):
self._queue.put(item)
# To wake up from wait_return_item in self.run().
self._worker_manager.wakeup()
def cancel_work(self, future: ConcurrentFuture):
"""An interface set in future to cancel work."""
self._worker_manager.cancel_work(future)
def close(self):
with self._close_lock:
self._executor_shutdown_called = True
self._worker_manager.close()
def run(self):
try:
self._run()
except AplexWorkerError:
pass # TODO(Lun): logger.error
except BaseException:
# Close workers if some bug happens.
self._worker_manager.close()
raise
else:
self._worker_manager.join()
finally:
self._clear()
def _run(self):
while True:
with self._close_lock:
if not self._executor_shutdown_called:
self._transfer_to_worker_manager()
try:
# Blocks here until some result is returned or
# some worker is terminated.
item = self._worker_manager.wait_return_item()
except ConnectionAbortedError as e:
item = ResultItem.worker_state_form(WorkerStates.BROKEN, e)
if item is None:
# Just a wakeup sentinel from wait_return_item to take a
# new work submitted by the user.
continue
# Handles worker state information.
if item.form == ResultForms.WORKER_STATE:
self._handle_worker_state(item.worker_state,
item.worker_state_meta)
if self._worker_manager.are_all_workers_closed():
break
else:
continue
# Handles received result from finished or cancelled work.
future = self._running_futures.pop(item.work_id)
state = item.work_state
# A condition to avoid cancelling by the user.
with future._condition:
if state == WorkStates.CANCELLED:
future._cancel_running()
elif future._cancel_interface_called:
exc = concurrent.futures.CancelledError(
'Work was done before cancellation function '
'is scheduled.')
future.set_exception(exc)
elif state == WorkStates.SUCCESS:
future.set_result(item.work_state_meta)
elif state == WorkStates.EXCEPTION:
# concurrent.futures.asyncio.CancelledError and
# asyncio.CancelledError are included here.
future.set_exception(item.work_state_meta)
else:
assert False, 'Should be unreachable here.'
def _transfer_to_worker_manager(self):
"""Sends received work to worker manager, i.e., broker."""
while True:
# All workers reach the limit of max works per worker.
if len(self._running_futures) >= self._max_runnings:
return
try:
item = self._queue.get(block=False)
except queue.Empty:
return
else:
submit_item, future, load_balancing_meta = item
if future.set_running_or_notify_cancel():
self._running_futures[submit_item.work_id] = future
self._worker_manager.submit(
submit_item, future, load_balancing_meta)
def _handle_worker_state(self,
state: WorkerStates,
state_meta: Optional[BaseException]):
if state == WorkerStates.CLOSING:
# This will happen only if executor shutdown is called by users.
assert self._executor_shutdown_called
# Removes this worker from listening list and clear pending
# works. All the remaining running works will be cancelled by
# the other alive workers.
# Make sure removal is after closing.
with self._close_lock:
# This will not cause bugs since submit are not
# allowed after closing.
self._worker_manager.remove_closed_worker(
worker_id=state_meta)
# Putting to queue is not allowed after executor.shutdown.
# Thus no thread-safty issue. Just use deque.
for __, pending_future, __ in self._queue.queue:
pending_future.cancel()
pending_future.set_running_or_notify_cancel()
self._queue.queue.clear()
elif state in (WorkerStates.BROKEN, WorkerStates.ERROR):
# Worker not closed correctly. Work raised
# BaseException or some worker was terminated abruptly.
# TODO(Lun): Add worker type information in the reasons.
# Sets exception reason to executor.
if state == WorkerStates.BROKEN:
worker_error = AplexWorkerError(
'Aplex worker was terminated abruptly.')
elif state == WorkerStates.ERROR:
worker_error = AplexWorkerError(
'Aplex worker was terminated since the event '
'loop has crashed due to raised BaseException.')
worker_error.__cause__ = state_meta
executor = self._executor()
if executor is not None:
with executor._lock:
executor._exception = worker_error
executor = None
# Sets exception reason to undone futures.
future_error = concurrent.futures.CancelledError()
future_error.__cause__ = worker_error
for __, pending_future, in self._queue.queue:
pending_future.set_exception(future_error)
for running_future in self._running_futures.values():
running_future.set_exception(future_error)
self._worker_manager.close() # Closes all related workers.
raise worker_error # Forces handler thread to terminate.
def _clear(self):
self._queue.queue.clear() # I.e., deque.clear().
self._running_futures.clear()
self._worker_manager.join()
self._worker_manager.clear()
# It's needed to define these here since a local object is not pickable.
async def _handle_chunk_async(work, chunk: Iterable[Iterable]) -> List:
return await asyncio.gather(*(work(*args) for args in chunk),
return_exceptions=False)
def _handle_chunk(work, chunk: Iterable[Iterable]) -> List:
return [work(*args) for args in chunk]
class BaseAsyncPoolExecutor(object):
"""Executor implementation."""
_WorkerManager = None
def __init__(self, *,
pool_size: Optional[int] = DEFAULT_POOL_SIZE,
max_works_per_worker:
Optional[int] = DEFAULT_MAX_WORKS_PER_WORKER,
load_balancer: Optional[LoadBalancer] = RoundRobin,
awaitable: Optional[bool] = False,
future_loop: asyncio.AbstractEventLoop = None,
worker_loop_factory:
Optional[asyncio.AbstractEventLoop] = None):
"""Setups executor and adds self to executor track set.
Args:
pool_size: Number of workers, i.e., number of threads or processes.
max_works_per_worker: The max number of works a worker can run at
the same time. This does not **limit** the number of asyncio
tasks of a worker.
load_balancer: A subclass of aplex.LoadBalancer for submitted
item load balancing that has implemented abstract method
``get_proper_worker``.
awaitable: If it's set to True, futures returned from ``submit``
method will be awaitable, and ``map`` will return async
generator(async iterator if python3.5).
future_loop: Loop instance set in awaitable futures returned from
``submit`` method.
If specified, ``awaitable`` must be set to true.
This loop can also be set in ``set_future_loop`` method.
worker_loop_factory: A factory to generate loop instance for
workers to run their job.
Raises:
ValueError:
``future_loop`` is specified while ``awaitable`` is False.
"""
if future_loop and not awaitable:
raise ValueError('Awaitable should be set to True if '
'future_loop is specified.')
with _executor_track_set_lock:
_executor_track_set.add(self)
self._awaitable = awaitable
self._loop = future_loop
self._closed = False
self._exception = None
self._lock = threading.Lock()
self._handler_thread = _HandlerThread(self,
pool_size,
max_works_per_worker,
load_balancer,
worker_loop_factory,
self._WorkerManager)
# Important to set thread daemon to True such that one can
# trigger functions registerd to atexit, closing workers gracefully.
self._handler_thread.setDaemon(True)
self._handler_thread.start()
def set_future_loop(self, loop: asyncio.AbstractEventLoop):
"""Sets loop for awaitable futures to await results.
This loop can also be set in initialization.
Args:
loop: The Loop needed for awaitable futures.
Raises:
RuntimeError: If executor has been shut down, or
executor is set to be unawaitable.
AplexWorkerError: If some workers are broken or raise
BaseException.
"""
with self._lock:
if self._closed:
raise RuntimeError('Executor has been shut down.')
elif self._exception:
raise self._exception
if not self._awaitable:
raise RuntimeError('Assigning event loop to an unawaitable')
self._loop = loop
def submit(self,
work: Callable,
*args: Tuple,
load_balancing_meta: Optional[Any] = None,
**kwargs: Dict
) -> Union[AsyncioFuture, ConcurrentFuture]:
"""submits your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that
the load balancer choose.
Note:
The work you submit should be a callable, And a ``coroutine``
is **not** a callable. You should submit a ``coroutine function``
and specify its args and kwargs here instead.
Args:
work: The callable that will be run in a worker.
*args: Position arguments for work.
load_balancing_meta: This will be passed to load balancer for
the choice of proper worker.
**kwargs: Keyword arguments for work.
Returns:
A future.
The future will be awaitable like that in asyncio if ``awaitable``
is set to True in executor construction, otherwise, unawaitable
like that in concurrent.futures.
Raises:
RuntimeError: If executor has been shut down.
AplexWorkerError: If some workers are broken or raise
BaseException.
TypeError: If work is not a callable.
"""
with self._lock:
if self._closed:
raise RuntimeError('Executor has been shut down.')
elif self._exception:
raise self._exception
if not callable(work):
raise TypeError('{} is not a callable to run.'.format(work))
# For running callables, only coroutine can be cancelled.
cancel_interface = None
if inspect.iscoroutinefunction(work):
cancel_interface = self._handler_thread.cancel_work
concurrent_future = ConcurrentFuture(cancel_interface)
if self._awaitable:
# Chain these two futures before submit for thread-safty.
asyncio_future = AsyncioFuture(concurrent_future,
self._loop)
item = SubmitItem.work_form(work, args, kwargs,
id(concurrent_future))
self._handler_thread.submit((item, concurrent_future,
load_balancing_meta))
return asyncio_future if self._awaitable else concurrent_future
if compat.PY36:
_MapReturnAsync = AsyncGenerator
else:
_MapReturnAsync = AsyncIterator
def map(self,
work: Callable,
*iterables: Tuple[Iterable],
timeout: Optional[float] = None,
chunksize: int = 1,
load_balancing_meta: Optional[Any] = None
) -> Union[_MapReturnAsync, Generator]:
"""map your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that
the load balancer choose.
Note:
The work you submit should be a callable, And a ``coroutine``
is **not** a callable. You should submit a ``coroutine function``
and specify its args and kwargs here instead.
Args:
work: The callable that will be run in a worker.
*iterables: Position arguments for work. All of them are iterable
and have same length.
timeout: The time limit for waiting results.
chunksize: Works are gathered, partitioned as chunks in this size,
and then sent to workers.
load_balancing_meta: This will be passed to load balancer for
the choice of proper worker.
Returns:
A async generator yielding the map results if ``awaitable`` is set
to True, otherwise a generator. In python3.5, async iterator is
used to replace async generator.
If a exception is raised in a work, it will be re-raised in the
generator, and the remaining works will be cancelled.
Raises:
ValueError: If chunksize is less than 1.
TypeError: If work is not a callable.
"""
if chunksize < 1:
raise ValueError('Chunksize should be at least 1.')
if inspect.iscoroutinefunction(work):
work_on_chunk = _handle_chunk_async
elif callable(work):
work_on_chunk = _handle_chunk
else:
raise TypeError('{} is not a callable to run.'.format(work))
# Include internal process/thread communication time.
end_time = timeout + time.time() if timeout is not None else None
futures = []
total = zip(*iterables)
chunk = tuple(itertools.islice(total, chunksize))
while chunk:
futures.append(
self.submit(work_on_chunk, work, chunk,
load_balancing_meta=load_balancing_meta))
chunk = tuple(itertools.islice(total, chunksize))
if not futures:
return []
if self._awaitable:
if not compat.PY36:
return self._MapAsyncIterator(futures, end_time, self._loop)
return self._map_async_gen(futures, end_time)
else:
return self._map_gen(futures, end_time)
if compat.PY36:
# If statement with variable condition can not prevent Python3.5
# from compiling this function to bytecode, causing syntax error
# in Python3.5, which does not support async generator.
from .py35_support import _map_async_gen
else:
# According to PEP525: `asynchronous generators are 2x faster
# than an equivalent implemented as an asynchronous iterator`,
# async iterator is used only for python3.5, which does not
# support async generator.
class _MapAsyncIterator(object):
"""A async iterator to replace async generator in python3.5."""
def __init__(self,
futures: AsyncioFuture,
end_time=None,
loop=None):
self._futures = futures
self._end_time = end_time
self._loop = loop
self._results = None
if compat.PY352:
def __aiter__(self):
return self
else:
async def __aiter__(self):
return self
async def __anext__(self):
if not self._results:
if not self._futures:
raise StopAsyncIteration()
future = self._futures.pop(0)
try:
if self._end_time is not None:
self._results = await asyncio.wait_for(
future, self._end_time - time.time(), loop=self._loop)
else:
self._results = await future
except Exception:
future.cancel()
for future in self._futures:
future.cancel()
raise
return self._results.pop(0)
def _map_gen(self,
futures: List[ConcurrentFuture],
end_time=None) -> Generator:
"""The generator that ``map`` return when ``awaitable`` is False."""
try:
while futures:
future = futures.pop(0)
if end_time is not None:
yield from future.result(end_time - time.time())
else:
yield from future.result()
# Finally clause for generator exit and timeout.
finally:
future.cancel() # This future may have been done.
for future in futures:
future.cancel()
def shutdown(self, wait: bool = True):
"""Shuts down the executor and frees the resource.
Args:
wait: Whether to block until shutdown is finished.
"""
import concurrent
with self._lock:
if self._closed:
return
self._closed = True
# Handler may has already been closed.
if self._exception is None:
self._handler_thread.close()
if wait:
self._handler_thread.join()
self._handler_thread = None
self._loop = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
def __del__(self):
# XXX
if not self._closed and self._exception is None:
warnings.warn('{} got garbage-collected. Started to '
'shutdown'.format(self))
self.shutdown(wait=False)
@atexit.register # This decorator will return _ensure_close back.
def _ensure_close():
"""Closes all workers for graceful exit."""
# Use tuple to construct strong ref to avoid gc-collection during the loop.
for executor in tuple(_executor_track_set):
executor.shutdown(wait=True)
def _register_keyboard_interrupt_handler():
origin = signal.getsignal(signal.SIGINT)
def handler(signum, stack):
_ensure_close()
print(signum)
origin(signum, stack)
signal.signal(signal.SIGINT, handler)
_register_keyboard_interrupt_handler()