API

Executor Objects

class aplex.ProcessAsyncPoolExecutor(*, pool_size: Optional[int] = 4, max_works_per_worker: Optional[int] = 300, load_balancer: Optional[aplex.load_balancers.LoadBalancer] = <class 'aplex.load_balancers.RoundRobin'>, awaitable: Optional[bool] = False, future_loop: asyncio.events.AbstractEventLoop = None, worker_loop_factory: Optional[asyncio.events.AbstractEventLoop] = None)[source]

Setups executor and adds self to executor track set.

Parameters:
  • pool_size – Number of workers, i.e., number of threads or processes.
  • max_works_per_worker – The max number of works a worker can run at the same time. This does not limit the number of asyncio tasks of a worker.
  • load_balancer – A subclass of aplex.LoadBalancer for submitted item load balancing that has implemented abstract method get_proper_worker.
  • awaitable – If it’s set to True, futures returned from submit method will be awaitable, and map will return async generator(async iterator if python3.5).
  • future_loop

    Loop instance set in awaitable futures returned from submit method.

    If specified, awaitable must be set to true.

    This loop can also be set in set_future_loop method.

  • worker_loop_factory – A factory to generate loop instance for workers to run their job.
Raises:

ValueErrorfuture_loop is specified while awaitable is False.

map(work: Callable, *iterables, timeout: Optional[float] = None, chunksize: int = 1, load_balancing_meta: Optional[Any] = None) → Union[AsyncGenerator[T_co, T_contra], Generator[T_co, T_contra, V_co]]

map your work like the way in concurrent.futures.

The work submitted will be sent to the specific worker that the load balancer choose.

Note

The work you submit should be a callable, And a coroutine is not a callable. You should submit a coroutine function and specify its args and kwargs here instead.

Parameters:
  • work – The callable that will be run in a worker.
  • *iterables – Position arguments for work. All of them are iterable and have same length.
  • timeout – The time limit for waiting results.
  • chunksize – Works are gathered, partitioned as chunks in this size, and then sent to workers.
  • load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
Returns:

A async generator yielding the map results if awaitable is set to True, otherwise a generator. In python3.5, async iterator is used to replace async generator.

If a exception is raised in a work, it will be re-raised in the generator, and the remaining works will be cancelled.

Raises:
set_future_loop(loop: asyncio.events.AbstractEventLoop)

Sets loop for awaitable futures to await results.

This loop can also be set in initialization.

Parameters:

loop – The Loop needed for awaitable futures.

Raises:
  • RuntimeError – If executor has been shut down, or executor is set to be unawaitable.
  • AplexWorkerError – If some workers are broken or raise BaseException.
shutdown(wait: bool = True)

Shuts down the executor and frees the resource.

Parameters:wait – Whether to block until shutdown is finished.
submit(work: Callable, *args, load_balancing_meta: Optional[Any] = None, **kwargs) → Union[aplex.futures.AsyncioFuture, aplex.futures.ConcurrentFuture]

submits your work like the way in concurrent.futures.

The work submitted will be sent to the specific worker that the load balancer choose.

Note

The work you submit should be a callable, And a coroutine is not a callable. You should submit a coroutine function and specify its args and kwargs here instead.

Parameters:
  • work – The callable that will be run in a worker.
  • *args – Position arguments for work.
  • load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
  • **kwargs – Keyword arguments for work.
Returns:

A future.

The future will be awaitable like that in asyncio if awaitable is set to True in executor construction, otherwise, unawaitable like that in concurrent.futures.

Raises:
  • RuntimeError – If executor has been shut down.
  • AplexWorkerError – If some workers are broken or raise BaseException.
  • TypeError – If work is not a callable.
class aplex.ThreadAsyncPoolExecutor(*, pool_size: Optional[int] = 4, max_works_per_worker: Optional[int] = 300, load_balancer: Optional[aplex.load_balancers.LoadBalancer] = <class 'aplex.load_balancers.RoundRobin'>, awaitable: Optional[bool] = False, future_loop: asyncio.events.AbstractEventLoop = None, worker_loop_factory: Optional[asyncio.events.AbstractEventLoop] = None)[source]

Setups executor and adds self to executor track set.

Parameters:
  • pool_size – Number of workers, i.e., number of threads or processes.
  • max_works_per_worker – The max number of works a worker can run at the same time. This does not limit the number of asyncio tasks of a worker.
  • load_balancer – A subclass of aplex.LoadBalancer for submitted item load balancing that has implemented abstract method get_proper_worker.
  • awaitable – If it’s set to True, futures returned from submit method will be awaitable, and map will return async generator(async iterator if python3.5).
  • future_loop

    Loop instance set in awaitable futures returned from submit method.

    If specified, awaitable must be set to true.

    This loop can also be set in set_future_loop method.

  • worker_loop_factory – A factory to generate loop instance for workers to run their job.
Raises:

ValueErrorfuture_loop is specified while awaitable is False.

map(work: Callable, *iterables, timeout: Optional[float] = None, chunksize: int = 1, load_balancing_meta: Optional[Any] = None) → Union[AsyncGenerator[T_co, T_contra], Generator[T_co, T_contra, V_co]]

map your work like the way in concurrent.futures.

The work submitted will be sent to the specific worker that the load balancer choose.

Note

The work you submit should be a callable, And a coroutine is not a callable. You should submit a coroutine function and specify its args and kwargs here instead.

Parameters:
  • work – The callable that will be run in a worker.
  • *iterables – Position arguments for work. All of them are iterable and have same length.
  • timeout – The time limit for waiting results.
  • chunksize – Works are gathered, partitioned as chunks in this size, and then sent to workers.
  • load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
Returns:

A async generator yielding the map results if awaitable is set to True, otherwise a generator. In python3.5, async iterator is used to replace async generator.

If a exception is raised in a work, it will be re-raised in the generator, and the remaining works will be cancelled.

Raises:
set_future_loop(loop: asyncio.events.AbstractEventLoop)

Sets loop for awaitable futures to await results.

This loop can also be set in initialization.

Parameters:

loop – The Loop needed for awaitable futures.

Raises:
  • RuntimeError – If executor has been shut down, or executor is set to be unawaitable.
  • AplexWorkerError – If some workers are broken or raise BaseException.
shutdown(wait: bool = True)

Shuts down the executor and frees the resource.

Parameters:wait – Whether to block until shutdown is finished.
submit(work: Callable, *args, load_balancing_meta: Optional[Any] = None, **kwargs) → Union[aplex.futures.AsyncioFuture, aplex.futures.ConcurrentFuture]

submits your work like the way in concurrent.futures.

The work submitted will be sent to the specific worker that the load balancer choose.

Note

The work you submit should be a callable, And a coroutine is not a callable. You should submit a coroutine function and specify its args and kwargs here instead.

Parameters:
  • work – The callable that will be run in a worker.
  • *args – Position arguments for work.
  • load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
  • **kwargs – Keyword arguments for work.
Returns:

A future.

The future will be awaitable like that in asyncio if awaitable is set to True in executor construction, otherwise, unawaitable like that in concurrent.futures.

Raises:
  • RuntimeError – If executor has been shut down.
  • AplexWorkerError – If some workers are broken or raise BaseException.
  • TypeError – If work is not a callable.

Future Objects

class aplex.futures.ConcurrentFuture(cancel_interface)[source]

A concurrent.futures.Future subclass that cancels like asyncio.Task.

cancel()[source]

Tries to cancel the work submitted to worker.

Unlike concurrent.futures, the running work is cancellable as long as it’s a coroutine function.

Returns:True if cancellable, False otherwise.
class aplex.futures.AsyncioFuture(concurrent_future, loop=None)[source]

Asyncio.Future subclass that cancels like asyncio.Task.

cancel()[source]

Tries to cancel the work submitted to worker.

Unlike concurrent.futures, the running work is cancellable as long as it’s a coroutine function.

Returns:True if cancellable, False otherwise.

Load Balancer Objects

class aplex.load_balancers.LoadBalancer(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]

The base class of all load balancers.

Users can inherit this to write their own load balancers.

Initialization.

Note: Must call super().__init__(*args, **kwargs) in the beginning of the __init__ block if you are trying to overwrite this.

Parameters:
  • workers – A argument for workers property.
  • workloads – A argument for workloads property.
  • max_works_per_worker – A argument for max_works_per_worker property.
get_available_workers() → Iterator[Worker][source]

Returns the workers that does not reach the max_works_per_worker limit.

Returns:A iterator of the available workers.
get_proper_worker(load_balancing_meta: Optional[Any]) → Worker[source]

The method to be implemented by users. Returns an available worker.

Note

There is always at least an available worker when this method is called.

Parameters:load_balancing_meta – An optional argument specified in submit and map methods that users may need for choosing a proper worker.
Returns:A worker that is available for work assignment.
is_available(worker: Worker) → bool[source]

Returns if the given worker reaches the max_works_per_worker limit.

Parameters:worker – A worker object.
Returns:True if available, else False.
max_works_per_worker

Returns tha max number of works a worker can run at the same time.

workers

Returns worker list.

workloads

Returns worker workload mapping.

class aplex.load_balancers.RoundRobin(*args, **kwargs)[source]

A load balancer based on round-robin algorithm.

get_available_workers() → Iterator[Worker]

Returns the workers that does not reach the max_works_per_worker limit.

Returns:A iterator of the available workers.
get_proper_worker(load_balancing_meta: Optional[Any]) → Worker[source]

Returns the next available worker.

Parameters:load_balancing_meta – An optional argument specified in submit and map methods that users may need for choosing a proper worker.
Returns:A worker that is available for work assignment.
is_available(worker: Worker) → bool

Returns if the given worker reaches the max_works_per_worker limit.

Parameters:worker – A worker object.
Returns:True if available, else False.
max_works_per_worker

Returns tha max number of works a worker can run at the same time.

workers

Returns worker list.

workloads

Returns worker workload mapping.

class aplex.load_balancers.Random(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]

A load balancer that chooses proper worker randomly.

Initialization.

Note: Must call super().__init__(*args, **kwargs) in the beginning of the __init__ block if you are trying to overwrite this.

Parameters:
  • workers – A argument for workers property.
  • workloads – A argument for workloads property.
  • max_works_per_worker – A argument for max_works_per_worker property.
get_available_workers() → Iterator[Worker]

Returns the workers that does not reach the max_works_per_worker limit.

Returns:A iterator of the available workers.
get_proper_worker(load_balancing_meta: Optional[Any]) → Worker[source]

Randomly picks an avaiable worker.

Parameters:load_balancing_meta – An optional argument specified in submit and map methods that users may need for choosing a proper worker.
Returns:A worker that is available for work assignment.
is_available(worker: Worker) → bool

Returns if the given worker reaches the max_works_per_worker limit.

Parameters:worker – A worker object.
Returns:True if available, else False.
max_works_per_worker

Returns tha max number of works a worker can run at the same time.

workers

Returns worker list.

workloads

Returns worker workload mapping.

class aplex.load_balancers.Average(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]

A load balancer that tries to equalize the workloads of all the workers.

To put it otherwise, it assign work to the worker having minimun workload.

Initialization.

Note: Must call super().__init__(*args, **kwargs) in the beginning of the __init__ block if you are trying to overwrite this.

Parameters:
  • workers – A argument for workers property.
  • workloads – A argument for workloads property.
  • max_works_per_worker – A argument for max_works_per_worker property.
get_available_workers() → Iterator[Worker]

Returns the workers that does not reach the max_works_per_worker limit.

Returns:A iterator of the available workers.
get_proper_worker(load_balancing_meta: Optional[Any]) → Worker[source]

Returns the worker with minimum workload.

Parameters:load_balancing_meta – An optional argument specified in submit and map methods that users may need for choosing a proper worker.
Returns:A worker that is available for work assignment.
is_available(worker: Worker) → bool

Returns if the given worker reaches the max_works_per_worker limit.

Parameters:worker – A worker object.
Returns:True if available, else False.
max_works_per_worker

Returns tha max number of works a worker can run at the same time.

workers

Returns worker list.

workloads

Returns worker workload mapping.