Source code for aplex.futures
"""Defines the futures returned to users."""
import asyncio
import concurrent.futures
from concurrent.futures._base import PENDING
[docs]class ConcurrentFuture(concurrent.futures.Future):
"""A concurrent.futures.Future subclass that cancels like asyncio.Task.
"""
def __init__(self, cancel_interface):
super().__init__()
self._cancel_interface = cancel_interface
# A flag to avoid cancelling twice and that work finishs
# before the cancellation function scheduled.
self._cancel_interface_called = False
[docs] def cancel(self):
"""Tries to cancel the work submitted to worker.
*Unlike* ``concurrent.futures``, the *running* work is *cancellable*
as long as it's a ``coroutine function``.
Returns:
True if cancellable, False otherwise.
"""
with self._condition:
if super().cancel():
# Cancellable if it's pending.
return True
else:
# Running or finished.
if self.running():
# The interface didn't be provided to this future, since
# the submitted work is not a coroutine function.
if self._cancel_interface is None:
return False
# Avoid cancelling twice.
if not self._cancel_interface_called:
self._cancel_interface_called = True
self._cancel_interface(self)
return True # Returns True as asyncio.Task.cancel() does.
else:
return False
def _cancel_running(self):
"""A method called by executor's handler to force the running
futures into cancelled state.
"""
with self._condition:
assert self.running() and self._cancel_interface_called
self._state = PENDING # Unable to cancel if its RUNNING.
super().cancel()
self.set_running_or_notify_cancel()
return True
[docs]class AsyncioFuture(asyncio.Future):
"""Asyncio.Future subclass that cancels like asyncio.Task.
"""
def __init__(self, concurrent_future, loop=None):
super().__init__(loop=loop)
self._future = concurrent_future
# TODO(Lun): Not to use _chain_future, since it's private.
asyncio.futures._chain_future(self._future, self)
[docs] def cancel(self):
"""Tries to cancel the work submitted to worker.
*Unlike* ``concurrent.futures``, the *running* work is *cancellable*
as long as it's a ``coroutine function``.
Returns:
True if cancellable, False otherwise.
"""
# This method will call by users and be called again by the
# callback scheduled in the chained concurrent future.
# When called by a callback, ``asyncio.Future.cancel`` will
# be executed via super().cancel().
# The chained future will be cancelled if and only if the work is
# done by cancelling. Then just run ``asyncio.Future.cancel``.
if self._future.cancelled():
return super().cancel()
# The below block will be executed when it's called by the user.
self._log_traceback = False
return self._future.cancel()