This is the written version of my HKPyUG talk. My code is at github.

Python can do multithreading. And it is easy to do. The following is an example of running AES encryption in multithread (using pyaes module):

import Queue
import argparse
import os
import threading

import pyaes

KEY_256 = "TodayMeColdNightSeeSnowFlyAcross" # 32 bytes key
IV_128 = "SeaWide&SkyEmpty" # 16 bytes IV

class WorkerThread(threading.Thread):
    """
    A worker thread that takes filenames from a queue, work on each of them and
    reports the result.

    Ask the thread to stop by calling its join() method.
    """
    def __init__(self, file_q, result_q):
        super(WorkerThread, self).__init__()
        self.file_q = file_q
        self.result_q = result_q
        self.stoprequest = threading.Event()

    def run(self):
        # As long as we weren't asked to stop, try to take new tasks from the
        # queue. The tasks are taken with a blocking 'get', so no CPU
        # cycles are wasted while waiting.
        # Also, 'get' is given a timeout, so stoprequest is always checked,
        # even if there's nothing in the queue.
        while not self.stoprequest.isSet():
            try:
                filepath = self.file_q.get(True, 0.05)
                aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
                infile = file(filepath)
                outfile = file('/dev/null', 'wb')
                pyaes.encrypt_stream(aes, infile, outfile)
                self.result_q.put((self.name, filepath))
            except Queue.Empty:
                continue

    def join(self, timeout=None):
        self.stoprequest.set()
        super(WorkerThread, self).join(timeout)

def main(indir):
    # Create a single input and a single output queue for all threads.
    file_q = Queue.Queue()
    result_q = Queue.Queue()

    # Scan dirs for files and populate file_q
    work_count = 0
    for path, dirs, files in os.walk(indir):
        for basename in files:
            filepath = os.path.join(path, basename)
            file_q.put(filepath)
            work_count += 1

    # Create the "thread pool" of 4
    pool = [WorkerThread(file_q=file_q, result_q=result_q) for _ in range(4)]

    # Start all threads
    for thread in pool:
        thread.start()

    print 'Assigned %s jobs to workers' % work_count

    # Now get all the results
    while work_count > 0:
        # Blocking 'get' from a Queue.
        result = result_q.get()
        print 'From thread %s: AES(%s)' % (result[0], result[1])
        work_count -= 1

    # Ask threads to die and wait for them to do it
    for thread in pool:
        thread.join()

if __name__ == '__main__':
    # argparse
    parser = argparse.ArgumentParser(description='Slow encrypter in multithread')
    parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
    args = parser.parse_args()
    # invoke
    main(args.indir)

This code has horrible performance because the AES is implemented in pure Python and not much I/O is performed. This code will have every instruction handled by the Python interpreter and the GIL (global interpreter lock) will become the bottleneck. In fact, this code will run slower in multithreading than running single-threaded with jobs in sequence.

If we insist to run these in parallel and get a speed up, one way to do is to leverage the multiprocessing module in Python:

import Queue
import argparse
import os
import sys
import time
import multiprocessing

import pyaes

KEY_256 = "TodayMeColdNightSeeSnowFlyAcross" # 32 bytes key
IV_128 = "SeaWide&SkyEmpty" # 16 bytes IV

def aes_files(my_id, file_q, result_q):
    aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
    while True:
        try:
            filepath = file_q.get_nowait() # no wait as the queue is pre-populated
        except Queue.Empty:
            return # if can't get from file queue, we're done
        infile = file(filepath)
        outfile = file('/dev/null', 'wb')
        pyaes.encrypt_stream(aes, infile, outfile)
        result_q.put((my_id, filepath))

def main(indir):
    # Create a single input and single output queue for all processes
    file_q = multiprocessing.Queue()
    result_q = multiprocessing.Queue()

    # Scan dirs for files and populate file_q
    work_count = 0
    for path, dirs, files in os.walk(indir):
        for basename in files:
            filepath = os.path.join(path, basename)
            file_q.put(filepath)
            work_count += 1

    # Create the "process pool" of 4
    pool = [multiprocessing.Process(target=aes_files, args=(i+1, file_q, result_q)) for i in range(4)]
    for process in pool:
        process.start()

    print 'Assigned %s jobs to workers' % work_count

    # Now get all the results
    while work_count > 0:
        # Blocking 'get' from a Queue.
        result = result_q.get()
        print 'From process %s: AES(%s)' % (result[0], result[1])
        work_count -= 1

    # Ask threads to die and wait for them to do it
    for process in pool:
        process.join()

if __name__ == '__main__':
    # argparse
    parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
    parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
    args = parser.parse_args()
    # invoke
    main(args.indir)

In fact, the above can be further simplified using multiprocessing.Pool:

import argparse
import os
import multiprocessing

import pyaes

KEY_256 = "TodayMeColdNightSeeSnowFlyAcross" # 32 bytes key
IV_128 = "SeaWide&SkyEmpty" # 16 bytes IV

def aes_a_file(filepath):
    aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
    infile = file(filepath)
    outfile = file('/dev/null', 'wb')
    pyaes.encrypt_stream(aes, infile, outfile)

def main(indir):
    # Scan dirs for files and populate a list
    filepaths = []
    for path, dirs, files in os.walk(indir):
        for basename in files:
            filepath = os.path.join(path, basename)
            filepaths.append(filepath)
    work_count = len(filepaths)

    # Create the "process pool" of 4
    pool = multiprocessing.Pool(4)
    pool.map(aes_a_file, filepaths)

    print 'Completed %s jobs' % work_count

if __name__ == '__main__':
    # argparse
    parser = argparse.ArgumentParser(description='Slow encrypter in multiprocessing')
    parser.add_argument("-i", "--indir", help="Directory of input files", required=True)
    args = parser.parse_args()
    # invoke
    main(args.indir)

But there is a problem: Multithreading is in the same process space, supposed to share the same piece of memory and hence we are easy to keep track of, and controling the child threads from a master thread. We have threading.Event() in the multithreading example above to demonstrate the possibility of signaling between threads. Multiprocessing, on the other hand, is having multiple processes in separate spaces and the we need IPC (inter-process communication), which is not as convenient as simply setting up a variable.

Let us consider a simple problem: How can we kill a thread/process if it cannot complete in certain time?

In threading, it could be as easy as asking each thread to report the time of start and signal the thread to abort if the time elapsed is too long. And we can also know if a thread has completed successfully without exception (e.g., simply set up a flag upon job done). In processes, finding the starting time is easy because that is the time you have the process forked out, which the master process will know. How about to signal it to abort and learn about a process termination is due to success or failure?

In UNIX systems, we can signal a process to abort via kill. It turns out, we can do the following and tell whether a child process terminates successfully, and kill it deliberately if it exceeded time budget:

TIMEOUT_RETCODE = 42
GOOD_RETCODE = 0
BAD_RETCODE = 2

def status_code(status):
    '''
    Decode child process exit code.
    status is the exist status indication from os.waitpid(pid,0)[1]
    '''
    if os.WIFSIGNALED(status): # process exited due to a signal
        # get the signal which caused the process to exit: make it negative to
        # distinguish from exit() call
        return -os.WTERMSIG(status)
    elif os.WIFEXITED(status): # process exited using exit() system call
        # get the integer parameter to exit() call
        return os.WEXITSTATUS(status)
    elif os.WIFSTOPPED(status) or os.WIFCONTINUED(status):
        raise RuntimeError("Child stopped or continued?")
    elif os.WCOREDUMP(status):
        raise RuntimeError("Child core dump!")
    else:
        raise RuntimeError("Unknown child return status!")

def timed_exec(commandline, timeout):
    import select
    if 'poll' in dir(select):
        # for Linux: Using poll
        popenobj = subprocess.Popen(commandline, stdin=subprocess.PIPE)
        pollobj = select.poll()
        pollobj.register(popenobj.stdin, select.POLLHUP)
        if len(pollobj.poll(timeout * 1e3)) == 0: # timeout in seconds -> milliseconds
			popenobj.kill() # Python 2.6+
            return TIMEOUT_RETCODE
        else:
            popenobj.wait()
            return popenobj.returncode
    else:
        # Alternative method (e.g. OS X): using thread
        popenobj = subprocess.Popen(commandline)
        timer = threading.Timer(timeout, popenobj.kill) # popenobj.kill needs Python 2.6+
        timer.start()
        popenobj.communicate() # block until subprocess terminated
        if timer.is_alive():
            timer.cancel() # timer didn't fire before subprocess terminate
            return popenobj.poll() # return the subprocess' retcode
        else:
            logging.info("Killed %s" % commandline)
            return TIMEOUT_RETCODE

def timed_fork(functor, timeout):
    r, w = os.pipe()
	pid = os.fork()
	if not pid: # child process
	    os.close(r)
		try:
		    functor()
        except:
            os.close(w)
            os._exit(BAD_RETCODE)
        finally:
            os.close(w)
            os._exit(GOOD_RETCODE)
    else: # parent process
        try:
            os.close(w)
            p = select.poll()
            p.register(r, select.POLLHUP)
            if len(p.poll(timeout)) == 0:
                os.kill(pid, 9)
                return TIMEOUT_RETCODE
        finally:
            rcode = status_code(os.waitpid(pid,0)[1])
            os.close(r)
            return rcode

The function timed_exec() is to run a command line and timed_fork() is to run a python function, but they are similar in the logic. We fork a process out to run in separate process space (hence a different interpretor and no longer bound by the GIL problem), and maintain a pipe between the parent and child process so that the parent, by polling the pipe, knows immediately when the child ends. If it times out before the polling detected the pipe terminated, we use system’s signal to kill the child process.

It is worth to note that select.poll availables in Linux but not in Mac. Therefore in the code above, I have an alternative solution that use threading.Timer. This would work but it seems to be that not as responsive as poll on a pipe.

Now we are ready to work around the GIL. Going back to the code at the top, we want a program that runs jobs from a queue in a parallel fashion, with a little bit more. The simple multithreaded program does not account for the case that the thread may die without finishing the job. In case that happens, we want to retry that on next round. While we may still use the Queue in that code but we can also implement our own queue:

class JobQueue(object):
    def __init__(self):
        self.__lock = threading.Lock()
        self.__queue = []
        self.__working = []
        self.__done = False
        self.lastdone = None
    def is_done(self):
        return self.__done
    def is_empty(self):
        return not self.__queue and not self.__working
    def destroy(self):
        self.__done = True
    def completed(self, job):
        'Remove a job from the working set'
        with self.__lock:
            self.__working = [w for w in self.__working if w != job]
            self.lastdone = time.time()
    def get(self):
        'Get the first job from self.__queue'
        with self.__lock:
            if not self.__working and not self.__queue and self.__done:
                raise Queue.Empty   # raise if nothing in the queue and JobQueue marked done
            if not self.__queue:
                return None
            job = self.__queue[0]
            self.__working.append(job)
            self.__queue = self.__queue[1:]
            return job
    def put(self, job):
        'Append a tuple into self.__queue, unless it already exists'
        with self.__lock:
            if [w for w in self.__queue if w==job]:
                return
            if [w for w in self.__working if w==job]:
                return
            self.__queue.append(job)

And we derive a class for job threads, which encapsulates all interactions with the job queue:

class JobThread(threading.Thread):
    def __init__(self, jobqueue, jobfunc):
        self.__queue   = jobqueue   # JobQueue object to hold job tuples
        self.__done    = False      # If True, this thread terminates
        self.__worker  = jobfunc    # func takes job tuple as argument and fork out
        threading.Thread.__init__(self)
    def done(self):
        self.__done = True
        self.__queue.destroy()
    def run(self):
        while not self.__done:
            try:
                logging.info("Try getting job from queue")
                job = self.__queue.get()
            except Queue.Empty:
                if not self.__queue.is_done():
                    time.sleep(0.25) # sleep for a short while to prevent CPU overheat
                    continue
                else:
                    break
            if job is None:
                time.sleep(0.25) # queue depleted but not sure if working job can finish
                continue
            success = self.__worker(job)
            self.__queue.completed(job)
            if not success: # job isn't success, try again
                self.__queue.put(job)

And now we can run the following program:

import argparse
import os
import logging
import time
import traceback

from fork import timed_exec, JobQueue, JobThread
import pyaes

KEY_256 = "TodayMeColdNightSeeSnowFlyAcross" # 32 bytes key
IV_128 = "SeaWide&SkyEmpty" # 16 bytes IV

PROCESS_COUNT = 4
JOB_TIMEOUT = 30

def worker_main(filepath):
    """
    Real worker: doing all CPU intensive thing
    """
    aes = pyaes.AESModeOfOperationCBC(KEY_256, iv=IV_128)
    infile = file(filepath)
    outfile = file('/dev/null', 'wb')
    pyaes.encrypt_stream(aes, infile, outfile)

def foreman(filename):
    '''
    Worker function. It calls timed_exec() to spawn a new process instead of running as a thread
    '''
    cmdline = [os.path.realpath(__file__), '-f', filename] # command line to call
    logging.info("Running "+str(cmdline))
    return (timed_exec(cmdline, JOB_TIMEOUT) >= 0)

def master_main(indir):
    '''
    Clone several threads to synchronize the local workspace cache. Run this for a few seconds then die.
    '''
    # create thread objects, with function specified as worker
    jobqueue = JobQueue()
    jobthreads = [JobThread(jobqueue, foreman) for _ in range(PROCESS_COUNT)]

    # Scan dirs for files and populate 
    work_count = 0
    for path, dirs, files in os.walk(indir):
        for basename in files:
            filepath = os.path.join(path, basename)
            jobqueue.put(filepath)
            work_count += 1

    # start all threads
    for t in jobthreads:
        t.start()

    # run for some time, then terminate all threads
    BUDGET = 30
    RECHECK_TIMEOUT = 1
    try:
        logging.info('Total wall-clock time budget of %d seconds' % BUDGET)
        starttime = nowtime = time.time()
        endtime = starttime + BUDGET
        while True: # Make sure there is progress
            time.sleep(min(endtime-nowtime, RECHECK_TIMEOUT))
            nowtime = time.time()
            if nowtime >= endtime or jobqueue.is_empty():
                break
            elif jobqueue.lastdone and not jobqueue.is_empty() and (nowtime-jobqueue.lastdone) > JOB_TIMEOUT:
                raise RuntimeError('Stalled progress?')
            elif not all(t.is_alive() for t in jobthreads):
                raise RuntimeError('Some thread dead?')
        logging.info('Done')
    except KeyboardInterrupt, e:
        logging.error('Interrupted by user')
        logging.error(e)
        logging.error(traceback.format_exc())
    except Exception, e:
        logging.error('Interrupted by exception')
        logging.error(e)
        logging.error(traceback.format_exc())
    finally:
        logging.info('Terminating all threads...')
        for t in jobthreads:
            t.done()
        for t in jobthreads:
            t.join()
        logging.info('all done')


if __name__ == '__main__':
    #logging.basicConfig(level=logging.DEBUG)
    # argparse
    parser = argparse.ArgumentParser(description='Slow encrypter in multithread')
    parser.add_argument("-i", "--indir", help="Directory of input files")
    parser.add_argument("-f", "--filename", help="Path to one input file")
    args = parser.parse_args()
    # invoke
    if args.indir:
        master_main(args.indir)
    else:
        worker_main(args.filename)

When this program is run, the structure is as follows:

main thread [pid X0]
|-- jobthread 1
|     +-timed_exec(command with job 1) [pid X1]
|
|-- jobthread 2
|     +-timed_exec(command with job 2) [pid X2]
|
|-- jobthread 3
|     +-timed_exec(command with job 3) [pid X3]
|
+-- jobthread 4
      +-timed_exec(command with job 4) [pid X4]

We have the main program running in multithread but doing no actual work. The actual works are done by child processes forked by each thread. The job queue provides job management. One nice trick here is that, we use timed_exec(), which calls a command line, instead of timed_fork(), which invokes a Python function. I make this design merely to make sure all jobs take nothing more than what passed in as command line arguments – and thus guaranteed no sharing of data and easier to debug. Speaking of the ease of debug, what can be better than making the program dual-use that can run both single job and multiple jobs? Thus the code path is controlled by arguments.

All these code is in github.