I gave a talk before on how to do concurrent processing in
Python, with the focus on how to control the workers while they are running.
As time passed, we now have concurrent.future and asyncio in Python 3. The
following shows how these new features can help us to craft our parallel code.
Old method
Multithreading in Python is limited by the GIL. So usually we use
multiprocessing instead. A simple way is to use multiprocessing.Pool() to
create a process pool and delegate jobs to each process using map():
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import multiprocessing
import os
import pyaes
KEY_256 = os.urandom(32)
IV_128 = os.urandom(16)
def aes_a_file(filepath):
    aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
    infile = open(filepath, 'rb')
    outfile = open('/dev/null', 'wb')
    print("encrypting {}".format(filepath))
    pyaes.encrypt_stream(aes, infile, outfile)
    print("done {}".format(filepath))
def main(indir):
    # Scan dirs for files and populate a list
    filepaths = []
    for path, dirs, files in os.walk(indir):
        for basename in files:
            filepaths.append(os.path.join(path, basename))
    # Create the "process pool" of 4
    pool = multiprocessing.Pool(4)
    pool.map(aes_a_file, filepaths)
if __name__ == '__main__':
    # argparse
    parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
    parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
    args = parser.parse_args()
    # invoke
    main(args.indir)
This is simple but map() requires to have everything done before you can
proceed, because the output is correspond to the same order of the input. If we
allow out-of-order return of output, we will need to create a
multiprocessing.Queue() to do FIFO operation. This will make the code much more complicated.
concurrent.futures
This is a new invention introduced by Python 3.2, that simplified the
multiprocessing or multithreading code in such a way that we no longer manage
the concurrency in detail. The key concept here is an Executor object, which
we can submit a job to it and the job will then run behind the scene.
There are two types of Executor, the ThreadPoolExecutor and
ProcessPoolExecutor, both supports the following methods:
- submit(fn, *args, **kwargs): execute- fn(*args, **kwargs)and return a- Futureobject immediately
- map(fn, *iterables): execute- map(fn, *iterables)asynchronously
- shutdown(wait=True): signal the executor to stop (immediately or until all pending futures are done), but never kill any futures
using map() is the simplest case, which is same as the example above using
multiprocessing.Pool(). Using submit(), however, will return a Future
object immediately while the job is pending to run. Future objects has the
following methods that allows us to do job control:
- cancel(): cancel the job, only if it is pending to run
- cancelled(): return True if job was cancelled
- running(): return True if job is running
- done(): return True if job was cancelled or completed
- result(timeout=None): return the value returned by the job, wait up to timeout to raise- TimeoutError
- exception(timeout=None): return the exception raised by the job, wait up to timeout to raise- TimeoutError. Return- Noneif no exception is raised
- add_done_callback(fn): Attach a function- fn, calling- fn(future)upon the job is cancelled or finished with this- Futureas the only argument
The nice thing about using concurrent.futures is that it creates a Future
object for each job created and then we can wait for all jobs to complete,
using concurrent.futures.wait(futures, timeout=None) (returns a done set and
a not_done set of futures upon return), or process each competed or cancelled
job as soon as they are available using concurrent.futures.as_completed(fs, tmieout=None).
The above example code can be rewritten using concurrent.futures as follows,
but collecting output in the order of completion:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import concurrent.futures
import os
import pyaes
KEY_256 = os.urandom(32)
IV_128 = os.urandom(16)
def aes_a_file(filepath):
    aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
    infile = open(filepath, 'rb')
    outfile = open('/dev/null', 'wb')
    print("encrypting {}".format(filepath))
    pyaes.encrypt_stream(aes, infile, outfile)
    return filepath
def main(indir):
    # Scan dirs for files and populate a list
    filepaths = []
    for path, dirs, files in os.walk(indir):
        for basename in files:
            filepaths.append(os.path.join(path, basename))
    # Create the "process pool" of 4
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(aes_a_file, f) for f in filepaths]
        for fut in concurrent.futures.as_completed(futures):
            try:
                filepath = fut.result()
				print("done {}".format(filepath))
            except Exception as exc:
                print("failed one job")
if __name__ == '__main__':
    # argparse
    parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
    parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
    args = parser.parse_args()
    # invoke
    main(args.indir)
asyncio Futures
asyncio is new in Python 3.7. It brings the new keywords async and await
to the language.  It has the asyncio.Future object which is almost
syntax-compatible with concurrent.futures.Future, on the exception that its
return() and exception() functions are returned immediately without the
timeout option.
Rewriting the above example in asyncio as follows:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import asyncio
import concurrent.futures
import os
import pyaes
KEY_256 = os.urandom(32)
IV_128 = os.urandom(16)
def aes_a_file(filepath):
    aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
    infile = open(filepath, 'rb')
    outfile = open('/dev/null', 'wb')
    print("encrypting {}".format(filepath))
    pyaes.encrypt_stream(aes, infile, outfile)
    return filepath
async def main(indir):
    # Scan dirs for files and populate a list
    filepaths = []
    for path, dirs, files in os.walk(indir):
        for basename in files:
            filepaths.append(os.path.join(path, basename))
    # Create the "process pool" of 4 and run asyncio
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [loop.run_in_executor(executor, aes_a_file, f) for f in filepaths]
        for fut in asyncio.as_completed(futures):
            try:
                filepath = await fut
				print("done {}".format(filepath))
            except Exception as exc:
                print("failed one job")
if __name__ == '__main__':
    # argparse
    parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
    parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
    args = parser.parse_args()
    # invoke
    asyncio.run(main(args.indir))
From above, there are some characteristics of asyncio illustrated. Firstly,
all functions that uses await have to defined as async. And a function
defined as async can only be called by a function also defined as async, or
called using asyncio.run(). The latter is usually the entry point of a
program using asyncio.
The mechanism of asyncio is around a event loop. It is a green thread
architecture therefore we should assume that there is only one application
thread behind the scene even the code seems to run asynchronously. The main
function that uses asyncio will start with asyncio.get_running_loop() and
then we create tasks into the loop. In the above, we make use of
ProcessPoolExecutor to use multiprocessing for parallelization so we are not
limited by the GIL. Besides loop.run_in_executor() as used above, we can also
use loop.create_task().
There are several ways of using asyncio: The simplest case is to await
fn(), which is to call a async function and wait for it to complete. This is
not having any concurrency at all because the caller is blocked while the
callee is working. But this is syntatically alike to calling a function as in
the case that it is not defined async. The await retrieves the return value
of the function (as an awaitable). It returns immediately if the awaitable is
completed, which is the case in the example code above.
The other way is to create a task, task = asyncio.create_task(fn()), which
the async function is called and wrapped as a task. It is run immediately
under the asyncio loop and we can check the task instance from time to time
for its status (e.g., pending or completed). But creating a task allows us to
add a callback to it, like the example below:
import asyncio
async def foo(n):
    await asyncio.sleep(10)
    return "foo {} done".format(n)
def handler(future):
    print(future.result())
async def main():
    task = asyncio.create_task(foo(1))
    task.add_done_callback(handler)
    print(task)
    await asyncio.sleep(15)
    print(task)
asyncio.run(main())
If we need more sophisticated control, we have to use the functions provided by asyncio:
- values = await asyncio.gather(*awaitables): takes a sequence of awaitables, returns an aggregate list of successfully awaited values
- value = await asyncio.wait(awaitables, return_when=asyncio.FIRST_COMPLETED): wait for a sequence of awaitables until the given condition (e.g. completed) is met
- value = await asyncio.wait_for(awaitable, timeout=N): wait for a single awaitable or until timeout
- for coro in asyncio.as_completed(awaitables): return- Futures that are populated when results are ready
In the example above, we use as_completed as similar to the case of
concurrent.futures.  Examples of the usage of the different asyncio
functions can be found in the Python doc.
Finally, we are not limited to use the default event loop provided by
asyncio. An alternative is the uvloop package which is implemented using
the libuv library. It can be a drop-in replacement:
import asyncio
import uvloop
async def foo()
    pass
    
async def bar():
    await foo()
uvloop.install()
asyncio.run(bar())