asynparallel.py
172 lines
| 5.8 KiB
| text/x-python
|
PythonLexer
/ sandbox / asynparallel.py
Brian Granger
|
r1741 | #!/usr/bin/env python | ||
"""A parallel tasking tool that uses asynchronous programming. This uses | ||||
blocking client to get taskid, but returns a Deferred as the result of | ||||
run(). Users should attach their callbacks on these Deferreds. | ||||
Only returning of results is asynchronous. Submitting tasks and getting task | ||||
ids are done synchronously. | ||||
Yichun Wei 03/2008 | ||||
""" | ||||
import inspect | ||||
import itertools | ||||
import numpy as N | ||||
from twisted.python import log | ||||
from ipython1.kernel import client | ||||
from ipython1.kernel.client import Task | ||||
""" After http://trac.pocoo.org/repos/pocoo/trunk/pocoo/utils/decorators.py | ||||
""" | ||||
class submit_job(object): | ||||
""" a decorator factory: takes a MultiEngineClient a TaskClient, returns a | ||||
decorator, that makes a call to the decorated func as a task in ipython1 | ||||
and submit it to IPython1 controller: | ||||
""" | ||||
def __init__(self, rc, tc): | ||||
self.rc = rc | ||||
self.tc = tc | ||||
def __call__(self, func): | ||||
return self._decorate(func) | ||||
def _getinfo(self, func): | ||||
assert inspect.ismethod(func) or inspect.isfunction(func) | ||||
regargs, varargs, varkwargs, defaults = inspect.getargspec(func) | ||||
argnames = list(regargs) | ||||
if varargs: | ||||
argnames.append(varargs) | ||||
if varkwargs: | ||||
argnames.append(varkwargs) | ||||
counter = itertools.count() | ||||
fullsign = inspect.formatargspec( | ||||
regargs, varargs, varkwargs, defaults, | ||||
formatvalue=lambda value: '=defarg[%i]' % counter.next())[1:-1] | ||||
shortsign = inspect.formatargspec( | ||||
regargs, varargs, varkwargs, defaults, | ||||
formatvalue=lambda value: '')[1:-1] | ||||
dic = dict(('arg%s' % n, name) for n, name in enumerate(argnames)) | ||||
dic.update(name=func.__name__, argnames=argnames, shortsign=shortsign, | ||||
fullsign = fullsign, defarg = func.func_defaults or ()) | ||||
return dic | ||||
def _decorate(self, func): | ||||
""" | ||||
Takes a function and a remote controller and returns a function | ||||
decorated that is going to submit the job with the controller. | ||||
The decorated function is obtained by evaluating a lambda | ||||
function with the correct signature. | ||||
the TaskController setupNS doesn't cope with functions, but we | ||||
can use RemoteController to push functions/modules into engines. | ||||
Changes: | ||||
200803. In new ipython1, we use push_function for functions. | ||||
""" | ||||
rc, tc = self.rc, self.tc | ||||
infodict = self._getinfo(func) | ||||
if 'rc' in infodict['argnames']: | ||||
raise NameError, "You cannot use rc as argument names!" | ||||
# we assume the engines' namepace has been prepared. | ||||
# ns[func.__name__] is already the decorated closure function. | ||||
# we need to change it back to the original function: | ||||
ns = {} | ||||
ns[func.__name__] = func | ||||
# push func and all its environment/prerequesites to engines | ||||
rc.push_function(ns, block=True) # note it is nonblock by default, not know if it causes problems | ||||
def do_submit_func(*args, **kwds): | ||||
jobns = {} | ||||
# Initialize job namespace with args that have default args | ||||
# now we support calls that uses default args | ||||
for n in infodict['fullsign'].split(','): | ||||
try: | ||||
vname, var = n.split('=') | ||||
vname, var = vname.strip(), var.strip() | ||||
except: # no defarg, one of vname, var is None | ||||
pass | ||||
else: | ||||
jobns.setdefault(vname, eval(var, infodict)) | ||||
# push args and kwds, overwritting default args if needed. | ||||
nokwds = dict((n,v) for n,v in zip(infodict['argnames'], args)) # truncated | ||||
jobns.update(nokwds) | ||||
jobns.update(kwds) | ||||
task = Task('a_very_long_and_rare_name = %(name)s(%(shortsign)s)' % infodict, | ||||
pull=['a_very_long_and_rare_name'], push=jobns,) | ||||
jobid = tc.run(task) | ||||
# res is a deferred, one can attach callbacks on it | ||||
res = tc.task_controller.get_task_result(jobid, block=True) | ||||
res.addCallback(lambda x: x.ns['a_very_long_and_rare_name']) | ||||
res.addErrback(log.err) | ||||
return res | ||||
do_submit_func.rc = rc | ||||
do_submit_func.tc = tc | ||||
return do_submit_func | ||||
def parallelized(rc, tc, initstrlist=[]): | ||||
""" rc - remote controller | ||||
tc - taks controller | ||||
strlist - a list of str that's being executed on engines. | ||||
""" | ||||
for cmd in initstrlist: | ||||
rc.execute(cmd, block=True) | ||||
return submit_job(rc, tc) | ||||
from twisted.internet import defer | ||||
from numpy import array, nan | ||||
def pmap(func, parr, **kwds): | ||||
"""Run func on every element of parr (array), using the elements | ||||
as the only one parameter (so you can usually use a dict that | ||||
wraps many parameters). -> a result array of Deferreds with the | ||||
same shape. func.tc will be used as the taskclient. | ||||
**kwds are passed on to func, not changed. | ||||
""" | ||||
assert func.tc | ||||
tc = func.tc | ||||
def run(p, **kwds): | ||||
if p: | ||||
return func(p, **kwds) | ||||
else: | ||||
return defer.succeed(nan) | ||||
reslist = [run(p, **kwds).addErrback(log.err) for p in parr.flat] | ||||
resarr = array(reslist) | ||||
resarr.shape = parr.shape | ||||
return resarr | ||||
if __name__=='__main__': | ||||
rc = client.MultiEngineClient(client.default_address) | ||||
tc = client.TaskClient(client.default_task_address) | ||||
# if commenting out the decorator you get a local running version | ||||
# instantly | ||||
@parallelized(rc, tc) | ||||
def f(a, b=1): | ||||
#from time import sleep | ||||
#sleep(1) | ||||
print "a,b=", a,b | ||||
return a+b | ||||
def showres(x): | ||||
print 'ans:',x | ||||
res = f(11,5) | ||||
res.addCallback(showres) | ||||
# this is not necessary in Twisted 8.0 | ||||
from twisted.internet import reactor | ||||
reactor.run() | ||||