API¶
Executor Objects¶
-
class
aplex.
ProcessAsyncPoolExecutor
(*, pool_size: Optional[int] = 4, max_works_per_worker: Optional[int] = 300, load_balancer: Optional[aplex.load_balancers.LoadBalancer] = <class 'aplex.load_balancers.RoundRobin'>, awaitable: Optional[bool] = False, future_loop: asyncio.events.AbstractEventLoop = None, worker_loop_factory: Optional[asyncio.events.AbstractEventLoop] = None)[source]¶ Setups executor and adds self to executor track set.
Parameters: - pool_size – Number of workers, i.e., number of threads or processes.
- max_works_per_worker – The max number of works a worker can run at the same time. This does not limit the number of asyncio tasks of a worker.
- load_balancer – A subclass of aplex.LoadBalancer for submitted
item load balancing that has implemented abstract method
get_proper_worker
. - awaitable – If it’s set to True, futures returned from
submit
method will be awaitable, andmap
will return async generator(async iterator if python3.5). - future_loop –
Loop instance set in awaitable futures returned from
submit
method.If specified,
awaitable
must be set to true.This loop can also be set in
set_future_loop
method. - worker_loop_factory – A factory to generate loop instance for workers to run their job.
Raises: ValueError
–future_loop
is specified whileawaitable
is False.-
map
(work: Callable, *iterables, timeout: Optional[float] = None, chunksize: int = 1, load_balancing_meta: Optional[Any] = None) → Union[AsyncGenerator[T_co, T_contra], Generator[T_co, T_contra, V_co]]¶ map your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutine
is not a callable. You should submit acoroutine function
and specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *iterables – Position arguments for work. All of them are iterable and have same length.
- timeout – The time limit for waiting results.
- chunksize – Works are gathered, partitioned as chunks in this size, and then sent to workers.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
Returns: A async generator yielding the map results if
awaitable
is set to True, otherwise a generator. In python3.5, async iterator is used to replace async generator.If a exception is raised in a work, it will be re-raised in the generator, and the remaining works will be cancelled.
Raises: ValueError
– If chunksize is less than 1.TypeError
– If work is not a callable.
-
set_future_loop
(loop: asyncio.events.AbstractEventLoop)¶ Sets loop for awaitable futures to await results.
This loop can also be set in initialization.
Parameters: loop – The Loop needed for awaitable futures.
Raises: RuntimeError
– If executor has been shut down, or executor is set to be unawaitable.AplexWorkerError
– If some workers are broken or raise BaseException.
-
shutdown
(wait: bool = True)¶ Shuts down the executor and frees the resource.
Parameters: wait – Whether to block until shutdown is finished.
-
submit
(work: Callable, *args, load_balancing_meta: Optional[Any] = None, **kwargs) → Union[aplex.futures.AsyncioFuture, aplex.futures.ConcurrentFuture]¶ submits your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutine
is not a callable. You should submit acoroutine function
and specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *args – Position arguments for work.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
- **kwargs – Keyword arguments for work.
Returns: A future.
The future will be awaitable like that in asyncio if
awaitable
is set to True in executor construction, otherwise, unawaitable like that in concurrent.futures.Raises: RuntimeError
– If executor has been shut down.AplexWorkerError
– If some workers are broken or raise BaseException.TypeError
– If work is not a callable.
-
class
aplex.
ThreadAsyncPoolExecutor
(*, pool_size: Optional[int] = 4, max_works_per_worker: Optional[int] = 300, load_balancer: Optional[aplex.load_balancers.LoadBalancer] = <class 'aplex.load_balancers.RoundRobin'>, awaitable: Optional[bool] = False, future_loop: asyncio.events.AbstractEventLoop = None, worker_loop_factory: Optional[asyncio.events.AbstractEventLoop] = None)[source]¶ Setups executor and adds self to executor track set.
Parameters: - pool_size – Number of workers, i.e., number of threads or processes.
- max_works_per_worker – The max number of works a worker can run at the same time. This does not limit the number of asyncio tasks of a worker.
- load_balancer – A subclass of aplex.LoadBalancer for submitted
item load balancing that has implemented abstract method
get_proper_worker
. - awaitable – If it’s set to True, futures returned from
submit
method will be awaitable, andmap
will return async generator(async iterator if python3.5). - future_loop –
Loop instance set in awaitable futures returned from
submit
method.If specified,
awaitable
must be set to true.This loop can also be set in
set_future_loop
method. - worker_loop_factory – A factory to generate loop instance for workers to run their job.
Raises: ValueError
–future_loop
is specified whileawaitable
is False.-
map
(work: Callable, *iterables, timeout: Optional[float] = None, chunksize: int = 1, load_balancing_meta: Optional[Any] = None) → Union[AsyncGenerator[T_co, T_contra], Generator[T_co, T_contra, V_co]]¶ map your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutine
is not a callable. You should submit acoroutine function
and specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *iterables – Position arguments for work. All of them are iterable and have same length.
- timeout – The time limit for waiting results.
- chunksize – Works are gathered, partitioned as chunks in this size, and then sent to workers.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
Returns: A async generator yielding the map results if
awaitable
is set to True, otherwise a generator. In python3.5, async iterator is used to replace async generator.If a exception is raised in a work, it will be re-raised in the generator, and the remaining works will be cancelled.
Raises: ValueError
– If chunksize is less than 1.TypeError
– If work is not a callable.
-
set_future_loop
(loop: asyncio.events.AbstractEventLoop)¶ Sets loop for awaitable futures to await results.
This loop can also be set in initialization.
Parameters: loop – The Loop needed for awaitable futures.
Raises: RuntimeError
– If executor has been shut down, or executor is set to be unawaitable.AplexWorkerError
– If some workers are broken or raise BaseException.
-
shutdown
(wait: bool = True)¶ Shuts down the executor and frees the resource.
Parameters: wait – Whether to block until shutdown is finished.
-
submit
(work: Callable, *args, load_balancing_meta: Optional[Any] = None, **kwargs) → Union[aplex.futures.AsyncioFuture, aplex.futures.ConcurrentFuture]¶ submits your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutine
is not a callable. You should submit acoroutine function
and specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *args – Position arguments for work.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
- **kwargs – Keyword arguments for work.
Returns: A future.
The future will be awaitable like that in asyncio if
awaitable
is set to True in executor construction, otherwise, unawaitable like that in concurrent.futures.Raises: RuntimeError
– If executor has been shut down.AplexWorkerError
– If some workers are broken or raise BaseException.TypeError
– If work is not a callable.
Future Objects¶
-
class
aplex.futures.
ConcurrentFuture
(cancel_interface)[source]¶ A concurrent.futures.Future subclass that cancels like asyncio.Task.
Load Balancer Objects¶
-
class
aplex.load_balancers.
LoadBalancer
(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]¶ The base class of all load balancers.
Users can inherit this to write their own load balancers.
Initialization.
Note: Must call
super().__init__(*args, **kwargs)
in the beginning of the__init__
block if you are trying to overwrite this.Parameters: - workers – A argument for
workers
property. - workloads – A argument for
workloads
property. - max_works_per_worker – A argument for
max_works_per_worker
property.
-
get_available_workers
() → Iterator[Worker][source]¶ Returns the workers that does not reach the
max_works_per_worker
limit.Returns: A iterator of the available workers.
-
get_proper_worker
(load_balancing_meta: Optional[Any]) → Worker[source]¶ The method to be implemented by users. Returns an available worker.
Note
There is always at least an available worker when this method is called.
Parameters: load_balancing_meta – An optional argument specified in submit
andmap
methods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available
(worker: Worker) → bool[source]¶ Returns if the given worker reaches the
max_works_per_worker
limit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker
¶ Returns tha max number of works a worker can run at the same time.
-
workers
¶ Returns worker list.
-
workloads
¶ Returns worker workload mapping.
- workers – A argument for
-
class
aplex.load_balancers.
RoundRobin
(*args, **kwargs)[source]¶ A load balancer based on round-robin algorithm.
-
get_available_workers
() → Iterator[Worker]¶ Returns the workers that does not reach the
max_works_per_worker
limit.Returns: A iterator of the available workers.
-
get_proper_worker
(load_balancing_meta: Optional[Any]) → Worker[source]¶ Returns the next available worker.
Parameters: load_balancing_meta – An optional argument specified in submit
andmap
methods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available
(worker: Worker) → bool¶ Returns if the given worker reaches the
max_works_per_worker
limit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker
¶ Returns tha max number of works a worker can run at the same time.
-
workers
¶ Returns worker list.
-
workloads
¶ Returns worker workload mapping.
-
-
class
aplex.load_balancers.
Random
(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]¶ A load balancer that chooses proper worker randomly.
Initialization.
Note: Must call
super().__init__(*args, **kwargs)
in the beginning of the__init__
block if you are trying to overwrite this.Parameters: - workers – A argument for
workers
property. - workloads – A argument for
workloads
property. - max_works_per_worker – A argument for
max_works_per_worker
property.
-
get_available_workers
() → Iterator[Worker]¶ Returns the workers that does not reach the
max_works_per_worker
limit.Returns: A iterator of the available workers.
-
get_proper_worker
(load_balancing_meta: Optional[Any]) → Worker[source]¶ Randomly picks an avaiable worker.
Parameters: load_balancing_meta – An optional argument specified in submit
andmap
methods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available
(worker: Worker) → bool¶ Returns if the given worker reaches the
max_works_per_worker
limit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker
¶ Returns tha max number of works a worker can run at the same time.
-
workers
¶ Returns worker list.
-
workloads
¶ Returns worker workload mapping.
- workers – A argument for
-
class
aplex.load_balancers.
Average
(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]¶ A load balancer that tries to equalize the workloads of all the workers.
To put it otherwise, it assign work to the worker having minimun workload.
Initialization.
Note: Must call
super().__init__(*args, **kwargs)
in the beginning of the__init__
block if you are trying to overwrite this.Parameters: - workers – A argument for
workers
property. - workloads – A argument for
workloads
property. - max_works_per_worker – A argument for
max_works_per_worker
property.
-
get_available_workers
() → Iterator[Worker]¶ Returns the workers that does not reach the
max_works_per_worker
limit.Returns: A iterator of the available workers.
-
get_proper_worker
(load_balancing_meta: Optional[Any]) → Worker[source]¶ Returns the worker with minimum workload.
Parameters: load_balancing_meta – An optional argument specified in submit
andmap
methods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available
(worker: Worker) → bool¶ Returns if the given worker reaches the
max_works_per_worker
limit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker
¶ Returns tha max number of works a worker can run at the same time.
-
workers
¶ Returns worker list.
-
workloads
¶ Returns worker workload mapping.
- workers – A argument for