This is the written version of my HKPyUG talk. My code is at github.
Python can do multithreading. And it is easy to do. The following is an example
of running AES encryption in multithread (using pyaes
module):
import Queue
import argparse
import os
import threading
import pyaes
KEY_256 = "TodayMeColdNightSeeSnowFlyAcross" # 32 bytes key
IV_128 = "SeaWide&SkyEmpty" # 16 bytes IV
class WorkerThread(threading.Thread):
"""
A worker thread that takes filenames from a queue, work on each of them and
reports the result.
Ask the thread to stop by calling its join() method.
"""
def __init__(self, file_q, result_q):
super(WorkerThread, self).__init__()
self.file_q = file_q
self.result_q = result_q
self.stoprequest = threading.Event()
def run(self):
# As long as we weren't asked to stop, try to take new tasks from the
# queue. The tasks are taken with a blocking 'get', so no CPU
# cycles are wasted while waiting.
# Also, 'get' is given a timeout, so stoprequest is always checked,
# even if there's nothing in the queue.
while not self.stoprequest.isSet():
try:
filepath = self.file_q.get(True, 0.05)
aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
infile = file(filepath)
outfile = file('/dev/null', 'wb')
pyaes.encrypt_stream(aes, infile, outfile)
self.result_q.put((self.name, filepath))
except Queue.Empty:
continue
def join(self, timeout=None):
self.stoprequest.set()
super(WorkerThread, self).join(timeout)
def main(indir):
# Create a single input and a single output queue for all threads.
file_q = Queue.Queue()
result_q = Queue.Queue()
# Scan dirs for files and populate file_q
work_count = 0
for path, dirs, files in os.walk(indir):
for basename in files:
filepath = os.path.join(path, basename)
file_q.put(filepath)
work_count += 1
# Create the "thread pool" of 4
pool = [WorkerThread(file_q=file_q, result_q=result_q) for _ in range(4)]
# Start all threads
for thread in pool:
thread.start()
print 'Assigned %s jobs to workers' % work_count
# Now get all the results
while work_count > 0:
# Blocking 'get' from a Queue.
result = result_q.get()
print 'From thread %s: AES(%s)' % (result[0], result[1])
work_count -= 1
# Ask threads to die and wait for them to do it
for thread in pool:
thread.join()
if __name__ == '__main__':
# argparse
parser = argparse.ArgumentParser(description='Slow encrypter in multithread')
parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
args = parser.parse_args()
# invoke
main(args.indir)
This code has horrible performance because the AES is implemented in pure Python and not much I/O is performed. This code will have every instruction handled by the Python interpreter and the GIL (global interpreter lock) will become the bottleneck. In fact, this code will run slower in multithreading than running single-threaded with jobs in sequence.
If we insist to run these in parallel and get a speed up, one way to do is to
leverage the multiprocessing
module in Python:
import Queue
import argparse
import os
import sys
import time
import multiprocessing
import pyaes
KEY_256 = "TodayMeColdNightSeeSnowFlyAcross" # 32 bytes key
IV_128 = "SeaWide&SkyEmpty" # 16 bytes IV
def aes_files(my_id, file_q, result_q):
aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
while True:
try:
filepath = file_q.get_nowait() # no wait as the queue is pre-populated
except Queue.Empty:
return # if can't get from file queue, we're done
infile = file(filepath)
outfile = file('/dev/null', 'wb')
pyaes.encrypt_stream(aes, infile, outfile)
result_q.put((my_id, filepath))
def main(indir):
# Create a single input and single output queue for all processes
file_q = multiprocessing.Queue()
result_q = multiprocessing.Queue()
# Scan dirs for files and populate file_q
work_count = 0
for path, dirs, files in os.walk(indir):
for basename in files:
filepath = os.path.join(path, basename)
file_q.put(filepath)
work_count += 1
# Create the "process pool" of 4
pool = [multiprocessing.Process(target=aes_files, args=(i+1, file_q, result_q)) for i in range(4)]
for process in pool:
process.start()
print 'Assigned %s jobs to workers' % work_count
# Now get all the results
while work_count > 0:
# Blocking 'get' from a Queue.
result = result_q.get()
print 'From process %s: AES(%s)' % (result[0], result[1])
work_count -= 1
# Ask threads to die and wait for them to do it
for process in pool:
process.join()
if __name__ == '__main__':
# argparse
parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
args = parser.parse_args()
# invoke
main(args.indir)
In fact, the above can be further simplified using multiprocessing.Pool
:
import argparse
import os
import multiprocessing
import pyaes
KEY_256 = "TodayMeColdNightSeeSnowFlyAcross" # 32 bytes key
IV_128 = "SeaWide&SkyEmpty" # 16 bytes IV
def aes_a_file(filepath):
aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
infile = file(filepath)
outfile = file('/dev/null', 'wb')
pyaes.encrypt_stream(aes, infile, outfile)
def main(indir):
# Scan dirs for files and populate a list
filepaths = []
for path, dirs, files in os.walk(indir):
for basename in files:
filepath = os.path.join(path, basename)
filepaths.append(filepath)
work_count = len(filepaths)
# Create the "process pool" of 4
pool = multiprocessing.Pool(4)
pool.map(aes_a_file, filepaths)
print 'Completed %s jobs' % work_count
if __name__ == '__main__':
# argparse
parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
args = parser.parse_args()
# invoke
main(args.indir)
But there is a problem: Multithreading is in the same process space, supposed
to share the same piece of memory and hence we are easy to keep track of, and
controling the child threads from a master thread. We have threading.Event()
in the multithreading example above to demonstrate the possibility of signaling
between threads. Multiprocessing, on the other hand, is having multiple
processes in separate spaces and the we need IPC (inter-process communication),
which is not as convenient as simply setting up a variable.
Let us consider a simple problem: How can we kill a thread/process if it cannot complete in certain time?
In threading, it could be as easy as asking each thread to report the time of start and signal the thread to abort if the time elapsed is too long. And we can also know if a thread has completed successfully without exception (e.g., simply set up a flag upon job done). In processes, finding the starting time is easy because that is the time you have the process forked out, which the master process will know. How about to signal it to abort and learn about a process termination is due to success or failure?
In UNIX systems, we can signal a process to abort via kill
. It turns out, we
can do the following and tell whether a child process terminates successfully,
and kill it deliberately if it exceeded time budget:
TIMEOUT_RETCODE = 42
GOOD_RETCODE = 0
BAD_RETCODE = 2
def status_code(status):
'''
Decode child process exit code.
status is the exist status indication from os.waitpid(pid,0)[1]
'''
if os.WIFSIGNALED(status): # process exited due to a signal
# get the signal which caused the process to exit: make it negative to
# distinguish from exit() call
return -os.WTERMSIG(status)
elif os.WIFEXITED(status): # process exited using exit() system call
# get the integer parameter to exit() call
return os.WEXITSTATUS(status)
elif os.WIFSTOPPED(status) or os.WIFCONTINUED(status):
raise RuntimeError("Child stopped or continued?")
elif os.WCOREDUMP(status):
raise RuntimeError("Child core dump!")
else:
raise RuntimeError("Unknown child return status!")
def timed_exec(commandline, timeout):
import select
if 'poll' in dir(select):
# for Linux: Using poll
popenobj = subprocess.Popen(commandline, stdin=subprocess.PIPE)
pollobj = select.poll()
pollobj.register(popenobj.stdin, select.POLLHUP)
if len(pollobj.poll(timeout * 1e3)) == 0: # timeout in seconds -> milliseconds
popenobj.kill() # Python 2.6+
return TIMEOUT_RETCODE
else:
popenobj.wait()
return popenobj.returncode
else:
# Alternative method (e.g. OS X): using thread
popenobj = subprocess.Popen(commandline)
timer = threading.Timer(timeout, popenobj.kill) # popenobj.kill needs Python 2.6+
timer.start()
popenobj.communicate() # block until subprocess terminated
if timer.is_alive():
timer.cancel() # timer didn't fire before subprocess terminate
return popenobj.poll() # return the subprocess' retcode
else:
logging.info("Killed %s" % commandline)
return TIMEOUT_RETCODE
def timed_fork(functor, timeout):
r, w = os.pipe()
pid = os.fork()
if not pid: # child process
os.close(r)
try:
functor()
except:
os.close(w)
os._exit(BAD_RETCODE)
finally:
os.close(w)
os._exit(GOOD_RETCODE)
else: # parent process
try:
os.close(w)
p = select.poll()
p.register(r, select.POLLHUP)
if len(p.poll(timeout)) == 0:
os.kill(pid, 9)
return TIMEOUT_RETCODE
finally:
rcode = status_code(os.waitpid(pid,0)[1])
os.close(r)
return rcode
The function timed_exec()
is to run a command line and timed_fork()
is to
run a python function, but they are similar in the logic. We fork a process out
to run in separate process space (hence a different interpretor and no longer
bound by the GIL problem), and maintain a pipe between the parent and child
process so that the parent, by polling the pipe, knows immediately when the
child ends. If it times out before the polling detected the pipe terminated, we
use system’s signal to kill the child process.
It is worth to note that select.poll
availables in Linux but not in Mac.
Therefore in the code above, I have an alternative solution that use
threading.Timer
. This would work but it seems to be that not as responsive as
poll
on a pipe.
Now we are ready to work around the GIL. Going back to the code at the top, we
want a program that runs jobs from a queue in a parallel fashion, with a little
bit more. The simple multithreaded program does not account for the case that
the thread may die without finishing the job. In case that happens, we want to
retry that on next round. While we may still use the Queue
in that code but
we can also implement our own queue:
class JobQueue(object):
def __init__(self):
self.__lock = threading.Lock()
self.__queue = []
self.__working = []
self.__done = False
self.lastdone = None
def is_done(self):
return self.__done
def is_empty(self):
return not self.__queue and not self.__working
def destroy(self):
self.__done = True
def completed(self, job):
'Remove a job from the working set'
with self.__lock:
self.__working = [w for w in self.__working if w != job]
self.lastdone = time.time()
def get(self):
'Get the first job from self.__queue'
with self.__lock:
if not self.__working and not self.__queue and self.__done:
raise Queue.Empty # raise if nothing in the queue and JobQueue marked done
if not self.__queue:
return None
job = self.__queue[0]
self.__working.append(job)
self.__queue = self.__queue[1:]
return job
def put(self, job):
'Append a tuple into self.__queue, unless it already exists'
with self.__lock:
if [w for w in self.__queue if w==job]:
return
if [w for w in self.__working if w==job]:
return
self.__queue.append(job)
And we derive a class for job threads, which encapsulates all interactions with the job queue:
class JobThread(threading.Thread):
def __init__(self, jobqueue, jobfunc):
self.__queue = jobqueue # JobQueue object to hold job tuples
self.__done = False # If True, this thread terminates
self.__worker = jobfunc # func takes job tuple as argument and fork out
threading.Thread.__init__(self)
def done(self):
self.__done = True
self.__queue.destroy()
def run(self):
while not self.__done:
try:
logging.info("Try getting job from queue")
job = self.__queue.get()
except Queue.Empty:
if not self.__queue.is_done():
time.sleep(0.25) # sleep for a short while to prevent CPU overheat
continue
else:
break
if job is None:
time.sleep(0.25) # queue depleted but not sure if working job can finish
continue
success = self.__worker(job)
self.__queue.completed(job)
if not success: # job isn't success, try again
self.__queue.put(job)
And now we can run the following program:
import argparse
import os
import logging
import time
import traceback
from fork import timed_exec, JobQueue, JobThread
import pyaes
KEY_256 = "TodayMeColdNightSeeSnowFlyAcross" # 32 bytes key
IV_128 = "SeaWide&SkyEmpty" # 16 bytes IV
PROCESS_COUNT = 4
JOB_TIMEOUT = 30
def worker_main(filepath):
"""
Real worker: doing all CPU intensive thing
"""
aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
infile = file(filepath)
outfile = file('/dev/null', 'wb')
pyaes.encrypt_stream(aes, infile, outfile)
def foreman(filename):
'''
Worker function. It calls timed_exec() to spawn a new process instead of running as a thread
'''
cmdline = [os.path.realpath(__file__), '-f', filename] # command line to call
logging.info("Running "+str(cmdline))
return (timed_exec(cmdline, JOB_TIMEOUT) >= 0)
def master_main(indir):
'''
Clone several threads to synchronize the local workspace cache. Run this for a few seconds then die.
'''
# create thread objects, with function specified as worker
jobqueue = JobQueue()
jobthreads = [JobThread(jobqueue, foreman) for _ in range(PROCESS_COUNT)]
# Scan dirs for files and populate
work_count = 0
for path, dirs, files in os.walk(indir):
for basename in files:
filepath = os.path.join(path, basename)
jobqueue.put(filepath)
work_count += 1
# start all threads
for t in jobthreads:
t.start()
# run for some time, then terminate all threads
BUDGET = 30
RECHECK_TIMEOUT = 1
try:
logging.info('Total wall-clock time budget of %d seconds' % BUDGET)
starttime = nowtime = time.time()
endtime = starttime + BUDGET
while True: # Make sure there is progress
time.sleep(min(endtime-nowtime, RECHECK_TIMEOUT))
nowtime = time.time()
if nowtime >= endtime or jobqueue.is_empty():
break
elif jobqueue.lastdone and not jobqueue.is_empty() and (nowtime-jobqueue.lastdone) > JOB_TIMEOUT:
raise RuntimeError('Stalled progress?')
elif not all(t.is_alive() for t in jobthreads):
raise RuntimeError('Some thread dead?')
logging.info('Done')
except KeyboardInterrupt, e:
logging.error('Interrupted by user')
logging.error(e)
logging.error(traceback.format_exc())
except Exception, e:
logging.error('Interrupted by exception')
logging.error(e)
logging.error(traceback.format_exc())
finally:
logging.info('Terminating all threads...')
for t in jobthreads:
t.done()
for t in jobthreads:
t.join()
logging.info('all done')
if __name__ == '__main__':
#logging.basicConfig(level=logging.DEBUG)
# argparse
parser = argparse.ArgumentParser(description='Slow encrypter in multithread')
parser.add_argument("-i", "--indir", help="Directory of input files")
parser.add_argument("-f", "--filename", help="Path to one input file")
args = parser.parse_args()
# invoke
if args.indir:
master_main(args.indir)
else:
worker_main(args.filename)
When this program is run, the structure is as follows:
main thread [pid X0]
|-- jobthread 1
| +-timed_exec(command with job 1) [pid X1]
|
|-- jobthread 2
| +-timed_exec(command with job 2) [pid X2]
|
|-- jobthread 3
| +-timed_exec(command with job 3) [pid X3]
|
+-- jobthread 4
+-timed_exec(command with job 4) [pid X4]
We have the main program running in multithread but doing no actual work. The
actual works are done by child processes forked by each thread. The job queue
provides job management. One nice trick here is that, we use timed_exec()
,
which calls a command line, instead of timed_fork()
, which invokes a Python
function. I make this design merely to make sure all jobs take nothing more
than what passed in as command line arguments – and thus guaranteed no sharing
of data and easier to debug. Speaking of the ease of debug, what can be better
than making the program dual-use that can run both single job and multiple
jobs? Thus the code path is controlled by arguments.
All these code is in github.