|
|
#!/usr/bin/env python
|
|
|
"""A parallel tasking tool that uses asynchronous programming. This uses
|
|
|
blocking client to get taskid, but returns a Deferred as the result of
|
|
|
run(). Users should attach their callbacks on these Deferreds.
|
|
|
|
|
|
Only returning of results is asynchronous. Submitting tasks and getting task
|
|
|
ids are done synchronously.
|
|
|
|
|
|
Yichun Wei 03/2008
|
|
|
"""
|
|
|
|
|
|
import inspect
|
|
|
import itertools
|
|
|
import numpy as N
|
|
|
|
|
|
from twisted.python import log
|
|
|
from ipython1.kernel import client
|
|
|
from ipython1.kernel.client import Task
|
|
|
|
|
|
""" After http://trac.pocoo.org/repos/pocoo/trunk/pocoo/utils/decorators.py
|
|
|
"""
|
|
|
class submit_job(object):
|
|
|
""" a decorator factory: takes a MultiEngineClient a TaskClient, returns a
|
|
|
decorator, that makes a call to the decorated func as a task in ipython1
|
|
|
and submit it to IPython1 controller:
|
|
|
"""
|
|
|
def __init__(self, rc, tc):
|
|
|
self.rc = rc
|
|
|
self.tc = tc
|
|
|
|
|
|
def __call__(self, func):
|
|
|
return self._decorate(func)
|
|
|
|
|
|
def _getinfo(self, func):
|
|
|
assert inspect.ismethod(func) or inspect.isfunction(func)
|
|
|
regargs, varargs, varkwargs, defaults = inspect.getargspec(func)
|
|
|
argnames = list(regargs)
|
|
|
if varargs:
|
|
|
argnames.append(varargs)
|
|
|
if varkwargs:
|
|
|
argnames.append(varkwargs)
|
|
|
counter = itertools.count()
|
|
|
fullsign = inspect.formatargspec(
|
|
|
regargs, varargs, varkwargs, defaults,
|
|
|
formatvalue=lambda value: '=defarg[%i]' % counter.next())[1:-1]
|
|
|
shortsign = inspect.formatargspec(
|
|
|
regargs, varargs, varkwargs, defaults,
|
|
|
formatvalue=lambda value: '')[1:-1]
|
|
|
dic = dict(('arg%s' % n, name) for n, name in enumerate(argnames))
|
|
|
dic.update(name=func.__name__, argnames=argnames, shortsign=shortsign,
|
|
|
fullsign = fullsign, defarg = func.func_defaults or ())
|
|
|
return dic
|
|
|
|
|
|
def _decorate(self, func):
|
|
|
"""
|
|
|
Takes a function and a remote controller and returns a function
|
|
|
decorated that is going to submit the job with the controller.
|
|
|
The decorated function is obtained by evaluating a lambda
|
|
|
function with the correct signature.
|
|
|
|
|
|
the TaskController setupNS doesn't cope with functions, but we
|
|
|
can use RemoteController to push functions/modules into engines.
|
|
|
|
|
|
Changes:
|
|
|
200803. In new ipython1, we use push_function for functions.
|
|
|
"""
|
|
|
rc, tc = self.rc, self.tc
|
|
|
infodict = self._getinfo(func)
|
|
|
if 'rc' in infodict['argnames']:
|
|
|
raise NameError, "You cannot use rc as argument names!"
|
|
|
|
|
|
# we assume the engines' namepace has been prepared.
|
|
|
# ns[func.__name__] is already the decorated closure function.
|
|
|
# we need to change it back to the original function:
|
|
|
ns = {}
|
|
|
ns[func.__name__] = func
|
|
|
|
|
|
# push func and all its environment/prerequesites to engines
|
|
|
rc.push_function(ns, block=True) # note it is nonblock by default, not know if it causes problems
|
|
|
|
|
|
def do_submit_func(*args, **kwds):
|
|
|
jobns = {}
|
|
|
|
|
|
# Initialize job namespace with args that have default args
|
|
|
# now we support calls that uses default args
|
|
|
for n in infodict['fullsign'].split(','):
|
|
|
try:
|
|
|
vname, var = n.split('=')
|
|
|
vname, var = vname.strip(), var.strip()
|
|
|
except: # no defarg, one of vname, var is None
|
|
|
pass
|
|
|
else:
|
|
|
jobns.setdefault(vname, eval(var, infodict))
|
|
|
|
|
|
# push args and kwds, overwritting default args if needed.
|
|
|
nokwds = dict((n,v) for n,v in zip(infodict['argnames'], args)) # truncated
|
|
|
jobns.update(nokwds)
|
|
|
jobns.update(kwds)
|
|
|
|
|
|
task = Task('a_very_long_and_rare_name = %(name)s(%(shortsign)s)' % infodict,
|
|
|
pull=['a_very_long_and_rare_name'], push=jobns,)
|
|
|
jobid = tc.run(task)
|
|
|
# res is a deferred, one can attach callbacks on it
|
|
|
res = tc.task_controller.get_task_result(jobid, block=True)
|
|
|
res.addCallback(lambda x: x.ns['a_very_long_and_rare_name'])
|
|
|
res.addErrback(log.err)
|
|
|
return res
|
|
|
|
|
|
do_submit_func.rc = rc
|
|
|
do_submit_func.tc = tc
|
|
|
return do_submit_func
|
|
|
|
|
|
|
|
|
def parallelized(rc, tc, initstrlist=[]):
|
|
|
""" rc - remote controller
|
|
|
tc - taks controller
|
|
|
strlist - a list of str that's being executed on engines.
|
|
|
"""
|
|
|
for cmd in initstrlist:
|
|
|
rc.execute(cmd, block=True)
|
|
|
return submit_job(rc, tc)
|
|
|
|
|
|
|
|
|
from twisted.internet import defer
|
|
|
from numpy import array, nan
|
|
|
|
|
|
def pmap(func, parr, **kwds):
|
|
|
"""Run func on every element of parr (array), using the elements
|
|
|
as the only one parameter (so you can usually use a dict that
|
|
|
wraps many parameters). -> a result array of Deferreds with the
|
|
|
same shape. func.tc will be used as the taskclient.
|
|
|
|
|
|
**kwds are passed on to func, not changed.
|
|
|
"""
|
|
|
assert func.tc
|
|
|
tc = func.tc
|
|
|
|
|
|
def run(p, **kwds):
|
|
|
if p:
|
|
|
return func(p, **kwds)
|
|
|
else:
|
|
|
return defer.succeed(nan)
|
|
|
|
|
|
reslist = [run(p, **kwds).addErrback(log.err) for p in parr.flat]
|
|
|
resarr = array(reslist)
|
|
|
resarr.shape = parr.shape
|
|
|
return resarr
|
|
|
|
|
|
|
|
|
if __name__=='__main__':
|
|
|
|
|
|
rc = client.MultiEngineClient(client.default_address)
|
|
|
tc = client.TaskClient(client.default_task_address)
|
|
|
|
|
|
# if commenting out the decorator you get a local running version
|
|
|
# instantly
|
|
|
@parallelized(rc, tc)
|
|
|
def f(a, b=1):
|
|
|
#from time import sleep
|
|
|
#sleep(1)
|
|
|
print "a,b=", a,b
|
|
|
return a+b
|
|
|
|
|
|
def showres(x):
|
|
|
print 'ans:',x
|
|
|
|
|
|
res = f(11,5)
|
|
|
res.addCallback(showres)
|
|
|
|
|
|
# this is not necessary in Twisted 8.0
|
|
|
from twisted.internet import reactor
|
|
|
reactor.run()
|
|
|
|