Friday, February 27, 2009

Usage pattern for multiprocessing

I have found myself using multiprocessing more and more these days, especially since it has been backported to Python 2.5.x.
I can no longer remember what was the last computer I owned, which had a single core/processor. Multiprocessing make me much happier as I know I can milk my hardware for all it has to offer. I numpy, scipy and related packages find ways to build parallelization into their libraries as soon as possible.

This goal of this article is to share the simple usage pattern I have adopted to parallelize my code which is both simple and works in both single and multi-core systems. As a disclaimer, much of my day-to-day code, involves repeated calculations of some sort, which are prone to be asynchronously distributed. This explains the pattern I am going to present here.

I start setting up a process pool, as close as possible to the code which will be distributed, preferably in the same local namespace. This way I don't keep processes floating around after my parallel computation is done:


# -*- coding: utf-8 -*-
from numpy import arange,sqrt, random, linalg
from multiprocessing import Pool

global counter
counter = 0
def cb(r):
global counter
print counter, r
counter +=1

def det(M):
return linalg.det(M)

po = Pool()
for i in xrange(1,300):
j = random.normal(1,1,(100,100))
po.apply_async(det,(j,),callback=cb)
po.close()
po.join()
print counter


The call Pool() returns a pool of as many processes as there are cores/cpus available. This makes the code perform optimally on any number of cores, even on single core machines.

The use of a callback function allows for true asynchronicity, since my loop does not have to wait for apply_async to return.

finally, the po.close() and po.join() calls are essential to make sure that all 300 processes which have been fired up finish execution and are terminated. This also eliminates any footprints from your parallel execution, such as zombie processes left behind.

So this is my main pattern! what is yours? please share it and any comments you may have on mine.

8 comments:

Dmitrey said...

Thank you for the information. I have mentioned your example (modified, with time elapsed output) here
http://forum.openopt.org/viewtopic.php?id=51

rgz said...

Excuse me for being pedantic, the first global declaration is unnecessary only the one in the cb is necessary.

second, det could have been defined more easily as
det = linalg.det

third, and this depends entirely on the multiprocessing library, but I'd write a method in Pool that takes an iterable instead of repeatedly calling apply_async, as a bonus, it should return a decorator to be used on cb, it would look like this:

pool = Pool()
args = (random.normal(1,1,(100,100)) for x in xrange(1, 300))
@pool.async_map(det, args)
def callback(r): ...

usagi said...

@rgz: Pool already has a method called map_async, which does what you want. But to use that, the arguments to your function must be available as an iterable structure, which is possible for this simple example, but not in many of my use cases. The idea here was to put out something very general, and let users customize the example as they want, like you did.

Marius Gedminas said...

A good example, thanks for posting it. Bookmarked.

I wonder if using a global counter in this example isn't going to give people wrong ideas? Usually when you get a result for a job you want to know which job it was, not just that job's number in the result queue. Making det return the argument as well as the computed value, and making cb print it instead of a counter might make a slightly better example.

Dan said...

This was a useful start as it is a very common usage pattern for me too - thanks! But I have a question, how can one store the multiple return value of the det function?

For example, say I have a very long calculation wrapped in a function call, long(a, b), where a and b are just floats, long() returns a single float. How can I store the return values of the call to long and correlate them to the input parameters?

In code, how can I parallelise this:
import numpy as np
data = []
for a in np.linspace(0,1):
for b in np.linspace(0,1):
answer = long(a,b)
data.append((a,b,answer))

Best regards,
Dan

phpp.plasma said...

Hi,
I know the post is pretty dated (about an year old... however...), I was surfing the net trying to find some Python command that might be useful to me and I figure out your blog.

I am sorry for bothering, I hope you'll have a chance to give me some help or at least an hint since I am pretty new to python.

I am using a Fortran program, lately I figure out the necessity to run the same program several times (several means something like 10 times, and in a near future hopefully 1000 times). I have a python script that generates folder and in each folder writes the namelist the program needs to run.

Now my question: what I would like to do is to launch a python script that does the following, goes in each sudirectory, get the namelist and start running it. Let's say I tell him to run 4 process on 4 different CPUs at the time: when one process finish the program starts a new process and so on... untill the very end...

Yeah I am really sorry for bothering you but I got a bit lost and I had no idea how to implement something like that.

Cheers!
Alberto

faheem said...

@usagi

You said:

"Pool already has a method called map_async, which does what you want. But to use that, the arguments to your function must be available as an iterable structure, which is possible for this simple example, but not in many of my use cases."

But one can easily back multiple arguments into a single argument, say by using a dict.

jay@thecapacity said...

Don't you need locking for incrementing the global counter?

Technically two threads could try to update it correct?

ccp

Amazon