I gave a talk before on how to do concurrent processing in
Python, with the focus on how to control the workers while they are running.
As time passed, we now have concurrent.future
and asyncio
in Python 3. The
following shows how these new features can help us to craft our parallel code.
Old method
Multithreading in Python is limited by the GIL. So usually we use
multiprocessing instead. A simple way is to use multiprocessing.Pool()
to
create a process pool and delegate jobs to each process using map()
:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import multiprocessing
import os
import pyaes
KEY_256 = os.urandom(32)
IV_128 = os.urandom(16)
def aes_a_file(filepath):
aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
infile = open(filepath, 'rb')
outfile = open('/dev/null', 'wb')
print("encrypting {}".format(filepath))
pyaes.encrypt_stream(aes, infile, outfile)
print("done {}".format(filepath))
def main(indir):
# Scan dirs for files and populate a list
filepaths = []
for path, dirs, files in os.walk(indir):
for basename in files:
filepaths.append(os.path.join(path, basename))
# Create the "process pool" of 4
pool = multiprocessing.Pool(4)
pool.map(aes_a_file, filepaths)
if __name__ == '__main__':
# argparse
parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
args = parser.parse_args()
# invoke
main(args.indir)
This is simple but map()
requires to have everything done before you can
proceed, because the output is correspond to the same order of the input. If we
allow out-of-order return of output, we will need to create a
multiprocessing.Queue()
to do FIFO operation. This will make the code much more complicated.
concurrent.futures
This is a new invention introduced by Python 3.2, that simplified the
multiprocessing or multithreading code in such a way that we no longer manage
the concurrency in detail. The key concept here is an Executor
object, which
we can submit a job to it and the job will then run behind the scene.
There are two types of Executor
, the ThreadPoolExecutor
and
ProcessPoolExecutor
, both supports the following methods:
submit(fn, *args, **kwargs)
: executefn(*args, **kwargs)
and return aFuture
object immediatelymap(fn, *iterables)
: executemap(fn, *iterables)
asynchronouslyshutdown(wait=True)
: signal the executor to stop (immediately or until all pending futures are done), but never kill any futures
using map()
is the simplest case, which is same as the example above using
multiprocessing.Pool()
. Using submit()
, however, will return a Future
object immediately while the job is pending to run. Future
objects has the
following methods that allows us to do job control:
cancel()
: cancel the job, only if it is pending to runcancelled()
: return True if job was cancelledrunning()
: return True if job is runningdone()
: return True if job was cancelled or completedresult(timeout=None)
: return the value returned by the job, wait up to timeout to raiseTimeoutError
exception(timeout=None)
: return the exception raised by the job, wait up to timeout to raiseTimeoutError
. ReturnNone
if no exception is raisedadd_done_callback(fn)
: Attach a functionfn
, callingfn(future)
upon the job is cancelled or finished with thisFuture
as the only argument
The nice thing about using concurrent.futures
is that it creates a Future
object for each job created and then we can wait for all jobs to complete,
using concurrent.futures.wait(futures, timeout=None)
(returns a done set and
a not_done set of futures upon return), or process each competed or cancelled
job as soon as they are available using concurrent.futures.as_completed(fs, tmieout=None)
.
The above example code can be rewritten using concurrent.futures
as follows,
but collecting output in the order of completion:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import concurrent.futures
import os
import pyaes
KEY_256 = os.urandom(32)
IV_128 = os.urandom(16)
def aes_a_file(filepath):
aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
infile = open(filepath, 'rb')
outfile = open('/dev/null', 'wb')
print("encrypting {}".format(filepath))
pyaes.encrypt_stream(aes, infile, outfile)
return filepath
def main(indir):
# Scan dirs for files and populate a list
filepaths = []
for path, dirs, files in os.walk(indir):
for basename in files:
filepaths.append(os.path.join(path, basename))
# Create the "process pool" of 4
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(aes_a_file, f) for f in filepaths]
for fut in concurrent.futures.as_completed(futures):
try:
filepath = fut.result()
print("done {}".format(filepath))
except Exception as exc:
print("failed one job")
if __name__ == '__main__':
# argparse
parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
args = parser.parse_args()
# invoke
main(args.indir)
asyncio Futures
asyncio
is new in Python 3.7. It brings the new keywords async
and await
to the language. It has the asyncio.Future
object which is almost
syntax-compatible with concurrent.futures.Future
, on the exception that its
return()
and exception()
functions are returned immediately without the
timeout option.
Rewriting the above example in asyncio
as follows:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import asyncio
import concurrent.futures
import os
import pyaes
KEY_256 = os.urandom(32)
IV_128 = os.urandom(16)
def aes_a_file(filepath):
aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
infile = open(filepath, 'rb')
outfile = open('/dev/null', 'wb')
print("encrypting {}".format(filepath))
pyaes.encrypt_stream(aes, infile, outfile)
return filepath
async def main(indir):
# Scan dirs for files and populate a list
filepaths = []
for path, dirs, files in os.walk(indir):
for basename in files:
filepaths.append(os.path.join(path, basename))
# Create the "process pool" of 4 and run asyncio
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
futures = [loop.run_in_executor(executor, aes_a_file, f) for f in filepaths]
for fut in asyncio.as_completed(futures):
try:
filepath = await fut
print("done {}".format(filepath))
except Exception as exc:
print("failed one job")
if __name__ == '__main__':
# argparse
parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
args = parser.parse_args()
# invoke
asyncio.run(main(args.indir))
From above, there are some characteristics of asyncio
illustrated. Firstly,
all functions that uses await
have to defined as async
. And a function
defined as async
can only be called by a function also defined as async
, or
called using asyncio.run()
. The latter is usually the entry point of a
program using asyncio
.
The mechanism of asyncio
is around a event loop. It is a green thread
architecture therefore we should assume that there is only one application
thread behind the scene even the code seems to run asynchronously. The main
function that uses asyncio
will start with asyncio.get_running_loop()
and
then we create tasks into the loop. In the above, we make use of
ProcessPoolExecutor
to use multiprocessing for parallelization so we are not
limited by the GIL. Besides loop.run_in_executor()
as used above, we can also
use loop.create_task()
.
There are several ways of using asyncio
: The simplest case is to await
fn()
, which is to call a async
function and wait for it to complete. This is
not having any concurrency at all because the caller is blocked while the
callee is working. But this is syntatically alike to calling a function as in
the case that it is not defined async
. The await
retrieves the return value
of the function (as an awaitable). It returns immediately if the awaitable is
completed, which is the case in the example code above.
The other way is to create a task, task = asyncio.create_task(fn())
, which
the async
function is called and wrapped as a task. It is run immediately
under the asyncio loop and we can check the task
instance from time to time
for its status (e.g., pending or completed). But creating a task allows us to
add a callback to it, like the example below:
import asyncio
async def foo(n):
await asyncio.sleep(10)
return "foo {} done".format(n)
def handler(future):
print(future.result())
async def main():
task = asyncio.create_task(foo(1))
task.add_done_callback(handler)
print(task)
await asyncio.sleep(15)
print(task)
asyncio.run(main())
If we need more sophisticated control, we have to use the functions provided by asyncio
:
values = await asyncio.gather(*awaitables)
: takes a sequence of awaitables, returns an aggregate list of successfully awaited valuesvalue = await asyncio.wait(awaitables, return_when=asyncio.FIRST_COMPLETED)
: wait for a sequence of awaitables until the given condition (e.g. completed) is metvalue = await asyncio.wait_for(awaitable, timeout=N)
: wait for a single awaitable or until timeoutfor coro in asyncio.as_completed(awaitables)
: returnFuture
s that are populated when results are ready
In the example above, we use as_completed
as similar to the case of
concurrent.futures
. Examples of the usage of the different asyncio
functions can be found in the Python doc.
Finally, we are not limited to use the default event loop provided by
asyncio
. An alternative is the uvloop
package which is implemented using
the libuv
library. It can be a drop-in replacement:
import asyncio
import uvloop
async def foo()
pass
async def bar():
await foo()
uvloop.install()
asyncio.run(bar())