Welcome to Aplex!¶
User’s Guide¶
Installation¶
Python Version¶
Aplex supports Python3.5+.
Install Aplex¶
For General Users¶
Use the package manager pip or pipenv to install aplex.
With pip:
$ pip install aplex
Or with pipenv:
$ pipenv install aplex
For Contributors¶
Install with pipenv(recommand if you want to build docs):
git clone https://github.com/lunluen/aplex.git
cd aplex
pipenv install --dev
or with setuptools:
git clone https://github.com/lunluen/aplex.git
cd aplex
python setup.py develop
Aplex Quickstart¶
“Aplex”, short for “asynchronous pool executor”, is a Python library for combining asyncio with multiprocessing and threading.
- Aplex helps you run coroutines and functions in other processes or threads with asyncio concurrently and in parallel (if with processes).
- Aplex provides a usage like that of standard library
concurrent.futures
, which is familiar to you and intuitive. - Aplex lets you do load balancing in a simple way if you need.
Installation¶
For general users, use the package manager pip to install aplex.
pip install aplex
For contributors, install with pipenv:
git clone https://github.com/lunluen/aplex.git
cd aplex
pipenv install --dev
or with setuptools:
git clone https://github.com/lunluen/aplex.git
cd aplex
python setup.py develop
Usage¶
Definition to know:
Awork
is acallable
you want to run with asyncio and multiprocessing or threading. It can be a coroutine function or just a function.
In below case, the work
is the coroutine function demo
.
Submit¶
You can submit your work like:
import aiohttp
from aplex import ProcessAsyncPoolExecutor
async def demo(url):
async with aiohttp.request('GET', url) as response:
return response.status
if __name__ == '__main__':
pool = ProcessAsyncPoolExecutor(pool_size=8)
future = pool.submit(demo, 'http://httpbin.org')
print('Status: %d.' % future.result())
Note: If you are running python on windows, if __name__ == '__main__':
is necessary. That’s the design of multiprocessing.
Result:
Status: 200
Map¶
For multiple works, try map
:
iterable = ('http://httpbin.org' for __ in range(10))
for status in pool.map(demo, iterable, timeout=10):
print('Status: %d.' % status)
Awaiting results¶
Aplex allows one to await
results with the event loop that already exists.
It’s quite simple.
Just set keyword argument awaitable
to True
!
For example:
pool = ProcessAsyncPoolExecutor(awaitable=True)
Then
future = pool.submit(demo, 'http://httpbin.org')
status = await future
How about map
?
async for status in pool.map(demo, iterable, timeout=10):
print('Status: %d.' % status)
Load balancing¶
In aplex, each worker running your works is the process or thread on your computer. That is, they have the same capability computing. But, your works might have different workloads. Then you need a load balancer.
Aplex provides some useful load balancers. They are RoundRobin
, Random
, and Average
. The default is RoundRobin
.
Simply set what you want in the keyword argument of contruction:
from aplex import ProcessAsyncPoolExecutor
from aplex.load_balancers import Average
if __name__ == '__main__':
pool = ProcessAsyncPoolExecutor(load_balancer=Average)
Done. So easy. :100:
You can also customize one:
from aplex import LoadBalancer
class MyAwesomeLoadBalancer(LoadBalancer):
def __init__(*args, **kwargs):
super().__init__(*args, **kwargs) # Don't forget this.
awesome_attribute = 'Hello Aplex!'
def get_proper_worker(self):
the_poor_guy = self.workers[0]
return the_poor_guy
See details of how to implement a load balancer at: LoadBalancer | API Reference
Worker loop factory¶
By the way, if you think the build-in asyncio loop is too slow:
import uvloop
from aplex import ProcessAsyncPoolExecutor
if __name__ == '__main__':
pool = ProcessAsyncPoolExecutor(worker_loop_factory=uvloop.Loop)
Graceful Exit¶
Taking Python3.6 for example, a graceful exit without aplex would be something like this:
try:
loop.run_forever()
finally:
try:
tasks = asyncio.Task.all_tasks()
if tasks:
for task in tasks:
task.cancel()
gather = asyncio.gather(*tasks)
loop.run_until_complete(gather)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
…It’s definitely a joke.
Here, just treat pool as a context manager:
with ProcessAsyncPoolExecutor() as pool:
do_something()
or remember to call pool.shutdown()
.
These help you deal with that joke.
…
What? You forget to call pool.shutdown()
?!
Ok, fine. It will shut down automatically when the program exits or it gets garbage-collected.
Like this?¶
Scroll up and click Watch - Releases only
and Star
as a thumbs up! :+1:
Any feedback?¶
Feel free to open a issue (just don’t abuse it).
Or contact me: mas581301@gmail.com
:mailbox:
Anything about aplex is welcome, such like bugs, system design, variable naming, even English grammer of docstrings!
How to contribute¶
Contribution are welcome.
Asking and advising are also kinds of contribution.
Please see CONTRIBUTING.md
API Reference¶
API¶
Executor Objects¶
-
class
aplex.
ProcessAsyncPoolExecutor
(*, pool_size: Optional[int] = 4, max_works_per_worker: Optional[int] = 300, load_balancer: Optional[aplex.load_balancers.LoadBalancer] = <class 'aplex.load_balancers.RoundRobin'>, awaitable: Optional[bool] = False, future_loop: asyncio.events.AbstractEventLoop = None, worker_loop_factory: Optional[asyncio.events.AbstractEventLoop] = None)[source]¶ Setups executor and adds self to executor track set.
Parameters: - pool_size – Number of workers, i.e., number of threads or processes.
- max_works_per_worker – The max number of works a worker can run at the same time. This does not limit the number of asyncio tasks of a worker.
- load_balancer – A subclass of aplex.LoadBalancer for submitted
item load balancing that has implemented abstract method
get_proper_worker
. - awaitable – If it’s set to True, futures returned from
submit
method will be awaitable, andmap
will return async generator(async iterator if python3.5). - future_loop –
Loop instance set in awaitable futures returned from
submit
method.If specified,
awaitable
must be set to true.This loop can also be set in
set_future_loop
method. - worker_loop_factory – A factory to generate loop instance for workers to run their job.
Raises: ValueError
–future_loop
is specified whileawaitable
is False.-
map
(work: Callable, *iterables, timeout: Optional[float] = None, chunksize: int = 1, load_balancing_meta: Optional[Any] = None) → Union[AsyncGenerator[T_co, T_contra], Generator[T_co, T_contra, V_co]]¶ map your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutine
is not a callable. You should submit acoroutine function
and specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *iterables – Position arguments for work. All of them are iterable and have same length.
- timeout – The time limit for waiting results.
- chunksize – Works are gathered, partitioned as chunks in this size, and then sent to workers.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
Returns: A async generator yielding the map results if
awaitable
is set to True, otherwise a generator. In python3.5, async iterator is used to replace async generator.If a exception is raised in a work, it will be re-raised in the generator, and the remaining works will be cancelled.
Raises: ValueError
– If chunksize is less than 1.TypeError
– If work is not a callable.
-
set_future_loop
(loop: asyncio.events.AbstractEventLoop)¶ Sets loop for awaitable futures to await results.
This loop can also be set in initialization.
Parameters: loop – The Loop needed for awaitable futures.
Raises: RuntimeError
– If executor has been shut down, or executor is set to be unawaitable.AplexWorkerError
– If some workers are broken or raise BaseException.
-
shutdown
(wait: bool = True)¶ Shuts down the executor and frees the resource.
Parameters: wait – Whether to block until shutdown is finished.
-
submit
(work: Callable, *args, load_balancing_meta: Optional[Any] = None, **kwargs) → Union[aplex.futures.AsyncioFuture, aplex.futures.ConcurrentFuture]¶ submits your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutine
is not a callable. You should submit acoroutine function
and specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *args – Position arguments for work.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
- **kwargs – Keyword arguments for work.
Returns: A future.
The future will be awaitable like that in asyncio if
awaitable
is set to True in executor construction, otherwise, unawaitable like that in concurrent.futures.Raises: RuntimeError
– If executor has been shut down.AplexWorkerError
– If some workers are broken or raise BaseException.TypeError
– If work is not a callable.
-
class
aplex.
ThreadAsyncPoolExecutor
(*, pool_size: Optional[int] = 4, max_works_per_worker: Optional[int] = 300, load_balancer: Optional[aplex.load_balancers.LoadBalancer] = <class 'aplex.load_balancers.RoundRobin'>, awaitable: Optional[bool] = False, future_loop: asyncio.events.AbstractEventLoop = None, worker_loop_factory: Optional[asyncio.events.AbstractEventLoop] = None)[source]¶ Setups executor and adds self to executor track set.
Parameters: - pool_size – Number of workers, i.e., number of threads or processes.
- max_works_per_worker – The max number of works a worker can run at the same time. This does not limit the number of asyncio tasks of a worker.
- load_balancer – A subclass of aplex.LoadBalancer for submitted
item load balancing that has implemented abstract method
get_proper_worker
. - awaitable – If it’s set to True, futures returned from
submit
method will be awaitable, andmap
will return async generator(async iterator if python3.5). - future_loop –
Loop instance set in awaitable futures returned from
submit
method.If specified,
awaitable
must be set to true.This loop can also be set in
set_future_loop
method. - worker_loop_factory – A factory to generate loop instance for workers to run their job.
Raises: ValueError
–future_loop
is specified whileawaitable
is False.-
map
(work: Callable, *iterables, timeout: Optional[float] = None, chunksize: int = 1, load_balancing_meta: Optional[Any] = None) → Union[AsyncGenerator[T_co, T_contra], Generator[T_co, T_contra, V_co]]¶ map your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutine
is not a callable. You should submit acoroutine function
and specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *iterables – Position arguments for work. All of them are iterable and have same length.
- timeout – The time limit for waiting results.
- chunksize – Works are gathered, partitioned as chunks in this size, and then sent to workers.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
Returns: A async generator yielding the map results if
awaitable
is set to True, otherwise a generator. In python3.5, async iterator is used to replace async generator.If a exception is raised in a work, it will be re-raised in the generator, and the remaining works will be cancelled.
Raises: ValueError
– If chunksize is less than 1.TypeError
– If work is not a callable.
-
set_future_loop
(loop: asyncio.events.AbstractEventLoop)¶ Sets loop for awaitable futures to await results.
This loop can also be set in initialization.
Parameters: loop – The Loop needed for awaitable futures.
Raises: RuntimeError
– If executor has been shut down, or executor is set to be unawaitable.AplexWorkerError
– If some workers are broken or raise BaseException.
-
shutdown
(wait: bool = True)¶ Shuts down the executor and frees the resource.
Parameters: wait – Whether to block until shutdown is finished.
-
submit
(work: Callable, *args, load_balancing_meta: Optional[Any] = None, **kwargs) → Union[aplex.futures.AsyncioFuture, aplex.futures.ConcurrentFuture]¶ submits your work like the way in concurrent.futures.
The work submitted will be sent to the specific worker that the load balancer choose.
Note
The work you submit should be a callable, And a
coroutine
is not a callable. You should submit acoroutine function
and specify its args and kwargs here instead.Parameters: - work – The callable that will be run in a worker.
- *args – Position arguments for work.
- load_balancing_meta – This will be passed to load balancer for the choice of proper worker.
- **kwargs – Keyword arguments for work.
Returns: A future.
The future will be awaitable like that in asyncio if
awaitable
is set to True in executor construction, otherwise, unawaitable like that in concurrent.futures.Raises: RuntimeError
– If executor has been shut down.AplexWorkerError
– If some workers are broken or raise BaseException.TypeError
– If work is not a callable.
Future Objects¶
-
class
aplex.futures.
ConcurrentFuture
(cancel_interface)[source]¶ A concurrent.futures.Future subclass that cancels like asyncio.Task.
Load Balancer Objects¶
-
class
aplex.load_balancers.
LoadBalancer
(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]¶ The base class of all load balancers.
Users can inherit this to write their own load balancers.
Initialization.
Note: Must call
super().__init__(*args, **kwargs)
in the beginning of the__init__
block if you are trying to overwrite this.Parameters: - workers – A argument for
workers
property. - workloads – A argument for
workloads
property. - max_works_per_worker – A argument for
max_works_per_worker
property.
-
get_available_workers
() → Iterator[Worker][source]¶ Returns the workers that does not reach the
max_works_per_worker
limit.Returns: A iterator of the available workers.
-
get_proper_worker
(load_balancing_meta: Optional[Any]) → Worker[source]¶ The method to be implemented by users. Returns an available worker.
Note
There is always at least an available worker when this method is called.
Parameters: load_balancing_meta – An optional argument specified in submit
andmap
methods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available
(worker: Worker) → bool[source]¶ Returns if the given worker reaches the
max_works_per_worker
limit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker
¶ Returns tha max number of works a worker can run at the same time.
-
workers
¶ Returns worker list.
-
workloads
¶ Returns worker workload mapping.
- workers – A argument for
-
class
aplex.load_balancers.
RoundRobin
(*args, **kwargs)[source]¶ A load balancer based on round-robin algorithm.
-
get_available_workers
() → Iterator[Worker]¶ Returns the workers that does not reach the
max_works_per_worker
limit.Returns: A iterator of the available workers.
-
get_proper_worker
(load_balancing_meta: Optional[Any]) → Worker[source]¶ Returns the next available worker.
Parameters: load_balancing_meta – An optional argument specified in submit
andmap
methods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available
(worker: Worker) → bool¶ Returns if the given worker reaches the
max_works_per_worker
limit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker
¶ Returns tha max number of works a worker can run at the same time.
-
workers
¶ Returns worker list.
-
workloads
¶ Returns worker workload mapping.
-
-
class
aplex.load_balancers.
Random
(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]¶ A load balancer that chooses proper worker randomly.
Initialization.
Note: Must call
super().__init__(*args, **kwargs)
in the beginning of the__init__
block if you are trying to overwrite this.Parameters: - workers – A argument for
workers
property. - workloads – A argument for
workloads
property. - max_works_per_worker – A argument for
max_works_per_worker
property.
-
get_available_workers
() → Iterator[Worker]¶ Returns the workers that does not reach the
max_works_per_worker
limit.Returns: A iterator of the available workers.
-
get_proper_worker
(load_balancing_meta: Optional[Any]) → Worker[source]¶ Randomly picks an avaiable worker.
Parameters: load_balancing_meta – An optional argument specified in submit
andmap
methods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available
(worker: Worker) → bool¶ Returns if the given worker reaches the
max_works_per_worker
limit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker
¶ Returns tha max number of works a worker can run at the same time.
-
workers
¶ Returns worker list.
-
workloads
¶ Returns worker workload mapping.
- workers – A argument for
-
class
aplex.load_balancers.
Average
(workers: List[Worker], workloads: Dict[Worker, int], max_works_per_worker: int)[source]¶ A load balancer that tries to equalize the workloads of all the workers.
To put it otherwise, it assign work to the worker having minimun workload.
Initialization.
Note: Must call
super().__init__(*args, **kwargs)
in the beginning of the__init__
block if you are trying to overwrite this.Parameters: - workers – A argument for
workers
property. - workloads – A argument for
workloads
property. - max_works_per_worker – A argument for
max_works_per_worker
property.
-
get_available_workers
() → Iterator[Worker]¶ Returns the workers that does not reach the
max_works_per_worker
limit.Returns: A iterator of the available workers.
-
get_proper_worker
(load_balancing_meta: Optional[Any]) → Worker[source]¶ Returns the worker with minimum workload.
Parameters: load_balancing_meta – An optional argument specified in submit
andmap
methods that users may need for choosing a proper worker.Returns: A worker that is available for work assignment.
-
is_available
(worker: Worker) → bool¶ Returns if the given worker reaches the
max_works_per_worker
limit.Parameters: worker – A worker object. Returns: True if available, else False.
-
max_works_per_worker
¶ Returns tha max number of works a worker can run at the same time.
-
workers
¶ Returns worker list.
-
workloads
¶ Returns worker workload mapping.
- workers – A argument for
Changelog¶
The Contributor Guide¶
The Contributor Guide¶
Questions¶
It’s better to ask on Stack Overflow, but not limited to. Remember to add a tag of aplex.
Bug Reports¶
It’s better to tell me but not limited to:
- What you expected to happen
- What actually happens (include the complete traceback)
- How to reproduce the issue
- Your python and aplex versions
Pull requests¶
Keep the code style consistent. This package follows Google Style Guide.
License¶
MIT License¶
MIT License
Copyright (c) 2019 Lun
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.