##// END OF EJS Templates
Merging latest trunk....
Merging latest trunk. I got a criss-cross merge in this and did merge --weave. Hope this is OK.

File last commit:

r1741:079041c4
r1890:c3323314 merge
Show More
asynparallel.py
172 lines | 5.8 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""A parallel tasking tool that uses asynchronous programming. This uses
blocking client to get taskid, but returns a Deferred as the result of
run(). Users should attach their callbacks on these Deferreds.
Only returning of results is asynchronous. Submitting tasks and getting task
ids are done synchronously.
Yichun Wei 03/2008
"""
import inspect
import itertools
import numpy as N
from twisted.python import log
from ipython1.kernel import client
from ipython1.kernel.client import Task
""" After http://trac.pocoo.org/repos/pocoo/trunk/pocoo/utils/decorators.py
"""
class submit_job(object):
""" a decorator factory: takes a MultiEngineClient a TaskClient, returns a
decorator, that makes a call to the decorated func as a task in ipython1
and submit it to IPython1 controller:
"""
def __init__(self, rc, tc):
self.rc = rc
self.tc = tc
def __call__(self, func):
return self._decorate(func)
def _getinfo(self, func):
assert inspect.ismethod(func) or inspect.isfunction(func)
regargs, varargs, varkwargs, defaults = inspect.getargspec(func)
argnames = list(regargs)
if varargs:
argnames.append(varargs)
if varkwargs:
argnames.append(varkwargs)
counter = itertools.count()
fullsign = inspect.formatargspec(
regargs, varargs, varkwargs, defaults,
formatvalue=lambda value: '=defarg[%i]' % counter.next())[1:-1]
shortsign = inspect.formatargspec(
regargs, varargs, varkwargs, defaults,
formatvalue=lambda value: '')[1:-1]
dic = dict(('arg%s' % n, name) for n, name in enumerate(argnames))
dic.update(name=func.__name__, argnames=argnames, shortsign=shortsign,
fullsign = fullsign, defarg = func.func_defaults or ())
return dic
def _decorate(self, func):
"""
Takes a function and a remote controller and returns a function
decorated that is going to submit the job with the controller.
The decorated function is obtained by evaluating a lambda
function with the correct signature.
the TaskController setupNS doesn't cope with functions, but we
can use RemoteController to push functions/modules into engines.
Changes:
200803. In new ipython1, we use push_function for functions.
"""
rc, tc = self.rc, self.tc
infodict = self._getinfo(func)
if 'rc' in infodict['argnames']:
raise NameError, "You cannot use rc as argument names!"
# we assume the engines' namepace has been prepared.
# ns[func.__name__] is already the decorated closure function.
# we need to change it back to the original function:
ns = {}
ns[func.__name__] = func
# push func and all its environment/prerequesites to engines
rc.push_function(ns, block=True) # note it is nonblock by default, not know if it causes problems
def do_submit_func(*args, **kwds):
jobns = {}
# Initialize job namespace with args that have default args
# now we support calls that uses default args
for n in infodict['fullsign'].split(','):
try:
vname, var = n.split('=')
vname, var = vname.strip(), var.strip()
except: # no defarg, one of vname, var is None
pass
else:
jobns.setdefault(vname, eval(var, infodict))
# push args and kwds, overwritting default args if needed.
nokwds = dict((n,v) for n,v in zip(infodict['argnames'], args)) # truncated
jobns.update(nokwds)
jobns.update(kwds)
task = Task('a_very_long_and_rare_name = %(name)s(%(shortsign)s)' % infodict,
pull=['a_very_long_and_rare_name'], push=jobns,)
jobid = tc.run(task)
# res is a deferred, one can attach callbacks on it
res = tc.task_controller.get_task_result(jobid, block=True)
res.addCallback(lambda x: x.ns['a_very_long_and_rare_name'])
res.addErrback(log.err)
return res
do_submit_func.rc = rc
do_submit_func.tc = tc
return do_submit_func
def parallelized(rc, tc, initstrlist=[]):
""" rc - remote controller
tc - taks controller
strlist - a list of str that's being executed on engines.
"""
for cmd in initstrlist:
rc.execute(cmd, block=True)
return submit_job(rc, tc)
from twisted.internet import defer
from numpy import array, nan
def pmap(func, parr, **kwds):
"""Run func on every element of parr (array), using the elements
as the only one parameter (so you can usually use a dict that
wraps many parameters). -> a result array of Deferreds with the
same shape. func.tc will be used as the taskclient.
**kwds are passed on to func, not changed.
"""
assert func.tc
tc = func.tc
def run(p, **kwds):
if p:
return func(p, **kwds)
else:
return defer.succeed(nan)
reslist = [run(p, **kwds).addErrback(log.err) for p in parr.flat]
resarr = array(reslist)
resarr.shape = parr.shape
return resarr
if __name__=='__main__':
rc = client.MultiEngineClient(client.default_address)
tc = client.TaskClient(client.default_task_address)
# if commenting out the decorator you get a local running version
# instantly
@parallelized(rc, tc)
def f(a, b=1):
#from time import sleep
#sleep(1)
print "a,b=", a,b
return a+b
def showres(x):
print 'ans:',x
res = f(11,5)
res.addCallback(showres)
# this is not necessary in Twisted 8.0
from twisted.internet import reactor
reactor.run()