I gave a talk before on how to do concurrent processing in Python, with the focus on how to control the workers while they are running. As time passed, we now have concurrent.future and asyncio in Python 3. The following shows how these new features can help us to craft our parallel code.

Old method

Multithreading in Python is limited by the GIL. So usually we use multiprocessing instead. A simple way is to use multiprocessing.Pool() to create a process pool and delegate jobs to each process using map():

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import multiprocessing
import os

import pyaes

KEY_256 = os.urandom(32)
IV_128 = os.urandom(16)

def aes_a_file(filepath):
    aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
    infile = open(filepath, 'rb')
    outfile = open('/dev/null', 'wb')
    print("encrypting {}".format(filepath))
    pyaes.encrypt_stream(aes, infile, outfile)
    print("done {}".format(filepath))

def main(indir):
    # Scan dirs for files and populate a list
    filepaths = []
    for path, dirs, files in os.walk(indir):
        for basename in files:
            filepaths.append(os.path.join(path, basename))

    # Create the "process pool" of 4
    pool = multiprocessing.Pool(4)
    pool.map(aes_a_file, filepaths)

if __name__ == '__main__':
    # argparse
    parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
    parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
    args = parser.parse_args()
    # invoke
    main(args.indir)

This is simple but map() requires to have everything done before you can proceed, because the output is correspond to the same order of the input. If we allow out-of-order return of output, we will need to create a multiprocessing.Queue() to do FIFO operation. This will make the code much more complicated.

concurrent.futures

This is a new invention introduced by Python 3.2, that simplified the multiprocessing or multithreading code in such a way that we no longer manage the concurrency in detail. The key concept here is an Executor object, which we can submit a job to it and the job will then run behind the scene.

There are two types of Executor, the ThreadPoolExecutor and ProcessPoolExecutor, both supports the following methods:

  • submit(fn, *args, **kwargs): execute fn(*args, **kwargs) and return a Future object immediately
  • map(fn, *iterables): execute map(fn, *iterables) asynchronously
  • shutdown(wait=True): signal the executor to stop (immediately or until all pending futures are done), but never kill any futures

using map() is the simplest case, which is same as the example above using multiprocessing.Pool(). Using submit(), however, will return a Future object immediately while the job is pending to run. Future objects has the following methods that allows us to do job control:

  • cancel(): cancel the job, only if it is pending to run
  • cancelled(): return True if job was cancelled
  • running(): return True if job is running
  • done(): return True if job was cancelled or completed
  • result(timeout=None): return the value returned by the job, wait up to timeout to raise TimeoutError
  • exception(timeout=None): return the exception raised by the job, wait up to timeout to raise TimeoutError. Return None if no exception is raised
  • add_done_callback(fn): Attach a function fn, calling fn(future) upon the job is cancelled or finished with this Future as the only argument

The nice thing about using concurrent.futures is that it creates a Future object for each job created and then we can wait for all jobs to complete, using concurrent.futures.wait(futures, timeout=None) (returns a done set and a not_done set of futures upon return), or process each competed or cancelled job as soon as they are available using concurrent.futures.as_completed(fs, tmieout=None).

The above example code can be rewritten using concurrent.futures as follows, but collecting output in the order of completion:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import concurrent.futures
import os

import pyaes

KEY_256 = os.urandom(32)
IV_128 = os.urandom(16)

def aes_a_file(filepath):
    aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
    infile = open(filepath, 'rb')
    outfile = open('/dev/null', 'wb')
    print("encrypting {}".format(filepath))
    pyaes.encrypt_stream(aes, infile, outfile)
    return filepath

def main(indir):
    # Scan dirs for files and populate a list
    filepaths = []
    for path, dirs, files in os.walk(indir):
        for basename in files:
            filepaths.append(os.path.join(path, basename))

    # Create the "process pool" of 4
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(aes_a_file, f) for f in filepaths]
        for fut in concurrent.futures.as_completed(futures):
            try:
                filepath = fut.result()
				print("done {}".format(filepath))
            except Exception as exc:
                print("failed one job")

if __name__ == '__main__':
    # argparse
    parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
    parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
    args = parser.parse_args()
    # invoke
    main(args.indir)

asyncio Futures

asyncio is new in Python 3.7. It brings the new keywords async and await to the language. It has the asyncio.Future object which is almost syntax-compatible with concurrent.futures.Future, on the exception that its return() and exception() functions are returned immediately without the timeout option.

Rewriting the above example in asyncio as follows:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import asyncio
import concurrent.futures
import os

import pyaes

KEY_256 = os.urandom(32)
IV_128 = os.urandom(16)

def aes_a_file(filepath):
    aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
    infile = open(filepath, 'rb')
    outfile = open('/dev/null', 'wb')
    print("encrypting {}".format(filepath))
    pyaes.encrypt_stream(aes, infile, outfile)
    return filepath

async def main(indir):
    # Scan dirs for files and populate a list
    filepaths = []
    for path, dirs, files in os.walk(indir):
        for basename in files:
            filepaths.append(os.path.join(path, basename))

    # Create the "process pool" of 4 and run asyncio
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [loop.run_in_executor(executor, aes_a_file, f) for f in filepaths]
        for fut in asyncio.as_completed(futures):
            try:
                filepath = await fut
				print("done {}".format(filepath))
            except Exception as exc:
                print("failed one job")

if __name__ == '__main__':
    # argparse
    parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
    parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
    args = parser.parse_args()
    # invoke
    asyncio.run(main(args.indir))

From above, there are some characteristics of asyncio illustrated. Firstly, all functions that uses await have to defined as async. And a function defined as async can only be called by a function also defined as async, or called using asyncio.run(). The latter is usually the entry point of a program using asyncio.

The mechanism of asyncio is around a event loop. It is a green thread architecture therefore we should assume that there is only one application thread behind the scene even the code seems to run asynchronously. The main function that uses asyncio will start with asyncio.get_running_loop() and then we create tasks into the loop. In the above, we make use of ProcessPoolExecutor to use multiprocessing for parallelization so we are not limited by the GIL. Besides loop.run_in_executor() as used above, we can also use loop.create_task().

There are several ways of using asyncio: The simplest case is to await fn(), which is to call a async function and wait for it to complete. This is not having any concurrency at all because the caller is blocked while the callee is working. But this is syntatically alike to calling a function as in the case that it is not defined async. The await retrieves the return value of the function (as an awaitable). It returns immediately if the awaitable is completed, which is the case in the example code above.

The other way is to create a task, task = asyncio.create_task(fn()), which the async function is called and wrapped as a task. It is run immediately under the asyncio loop and we can check the task instance from time to time for its status (e.g., pending or completed). But creating a task allows us to add a callback to it, like the example below:

import asyncio

async def foo(n):
    await asyncio.sleep(10)
    return "foo {} done".format(n)

def handler(future):
    print(future.result())

async def main():
    task = asyncio.create_task(foo(1))
    task.add_done_callback(handler)
    print(task)
    await asyncio.sleep(15)
    print(task)

asyncio.run(main())

If we need more sophisticated control, we have to use the functions provided by asyncio:

  • values = await asyncio.gather(*awaitables): takes a sequence of awaitables, returns an aggregate list of successfully awaited values
  • value = await asyncio.wait(awaitables, return_when=asyncio.FIRST_COMPLETED): wait for a sequence of awaitables until the given condition (e.g. completed) is met
  • value = await asyncio.wait_for(awaitable, timeout=N): wait for a single awaitable or until timeout
  • for coro in asyncio.as_completed(awaitables): return Futures that are populated when results are ready

In the example above, we use as_completed as similar to the case of concurrent.futures. Examples of the usage of the different asyncio functions can be found in the Python doc.

Finally, we are not limited to use the default event loop provided by asyncio. An alternative is the uvloop package which is implemented using the libuv library. It can be a drop-in replacement:

import asyncio
import uvloop

async def foo()
    pass
    
async def bar():
    await foo()

uvloop.install()
asyncio.run(bar())