##// END OF EJS Templates
Adding things to the sandbox that belong there.
Brian Granger -
Show More
@@ -0,0 +1,172 b''
1 #!/usr/bin/env python
2 """A parallel tasking tool that uses asynchronous programming. This uses
3 blocking client to get taskid, but returns a Deferred as the result of
4 run(). Users should attach their callbacks on these Deferreds.
5
6 Only returning of results is asynchronous. Submitting tasks and getting task
7 ids are done synchronously.
8
9 Yichun Wei 03/2008
10 """
11
12 import inspect
13 import itertools
14 import numpy as N
15
16 from twisted.python import log
17 from ipython1.kernel import client
18 from ipython1.kernel.client import Task
19
20 """ After http://trac.pocoo.org/repos/pocoo/trunk/pocoo/utils/decorators.py
21 """
22 class submit_job(object):
23 """ a decorator factory: takes a MultiEngineClient a TaskClient, returns a
24 decorator, that makes a call to the decorated func as a task in ipython1
25 and submit it to IPython1 controller:
26 """
27 def __init__(self, rc, tc):
28 self.rc = rc
29 self.tc = tc
30
31 def __call__(self, func):
32 return self._decorate(func)
33
34 def _getinfo(self, func):
35 assert inspect.ismethod(func) or inspect.isfunction(func)
36 regargs, varargs, varkwargs, defaults = inspect.getargspec(func)
37 argnames = list(regargs)
38 if varargs:
39 argnames.append(varargs)
40 if varkwargs:
41 argnames.append(varkwargs)
42 counter = itertools.count()
43 fullsign = inspect.formatargspec(
44 regargs, varargs, varkwargs, defaults,
45 formatvalue=lambda value: '=defarg[%i]' % counter.next())[1:-1]
46 shortsign = inspect.formatargspec(
47 regargs, varargs, varkwargs, defaults,
48 formatvalue=lambda value: '')[1:-1]
49 dic = dict(('arg%s' % n, name) for n, name in enumerate(argnames))
50 dic.update(name=func.__name__, argnames=argnames, shortsign=shortsign,
51 fullsign = fullsign, defarg = func.func_defaults or ())
52 return dic
53
54 def _decorate(self, func):
55 """
56 Takes a function and a remote controller and returns a function
57 decorated that is going to submit the job with the controller.
58 The decorated function is obtained by evaluating a lambda
59 function with the correct signature.
60
61 the TaskController setupNS doesn't cope with functions, but we
62 can use RemoteController to push functions/modules into engines.
63
64 Changes:
65 200803. In new ipython1, we use push_function for functions.
66 """
67 rc, tc = self.rc, self.tc
68 infodict = self._getinfo(func)
69 if 'rc' in infodict['argnames']:
70 raise NameError, "You cannot use rc as argument names!"
71
72 # we assume the engines' namepace has been prepared.
73 # ns[func.__name__] is already the decorated closure function.
74 # we need to change it back to the original function:
75 ns = {}
76 ns[func.__name__] = func
77
78 # push func and all its environment/prerequesites to engines
79 rc.push_function(ns, block=True) # note it is nonblock by default, not know if it causes problems
80
81 def do_submit_func(*args, **kwds):
82 jobns = {}
83
84 # Initialize job namespace with args that have default args
85 # now we support calls that uses default args
86 for n in infodict['fullsign'].split(','):
87 try:
88 vname, var = n.split('=')
89 vname, var = vname.strip(), var.strip()
90 except: # no defarg, one of vname, var is None
91 pass
92 else:
93 jobns.setdefault(vname, eval(var, infodict))
94
95 # push args and kwds, overwritting default args if needed.
96 nokwds = dict((n,v) for n,v in zip(infodict['argnames'], args)) # truncated
97 jobns.update(nokwds)
98 jobns.update(kwds)
99
100 task = Task('a_very_long_and_rare_name = %(name)s(%(shortsign)s)' % infodict,
101 pull=['a_very_long_and_rare_name'], push=jobns,)
102 jobid = tc.run(task)
103 # res is a deferred, one can attach callbacks on it
104 res = tc.task_controller.get_task_result(jobid, block=True)
105 res.addCallback(lambda x: x.ns['a_very_long_and_rare_name'])
106 res.addErrback(log.err)
107 return res
108
109 do_submit_func.rc = rc
110 do_submit_func.tc = tc
111 return do_submit_func
112
113
114 def parallelized(rc, tc, initstrlist=[]):
115 """ rc - remote controller
116 tc - taks controller
117 strlist - a list of str that's being executed on engines.
118 """
119 for cmd in initstrlist:
120 rc.execute(cmd, block=True)
121 return submit_job(rc, tc)
122
123
124 from twisted.internet import defer
125 from numpy import array, nan
126
127 def pmap(func, parr, **kwds):
128 """Run func on every element of parr (array), using the elements
129 as the only one parameter (so you can usually use a dict that
130 wraps many parameters). -> a result array of Deferreds with the
131 same shape. func.tc will be used as the taskclient.
132
133 **kwds are passed on to func, not changed.
134 """
135 assert func.tc
136 tc = func.tc
137
138 def run(p, **kwds):
139 if p:
140 return func(p, **kwds)
141 else:
142 return defer.succeed(nan)
143
144 reslist = [run(p, **kwds).addErrback(log.err) for p in parr.flat]
145 resarr = array(reslist)
146 resarr.shape = parr.shape
147 return resarr
148
149
150 if __name__=='__main__':
151
152 rc = client.MultiEngineClient(client.default_address)
153 tc = client.TaskClient(client.default_task_address)
154
155 # if commenting out the decorator you get a local running version
156 # instantly
157 @parallelized(rc, tc)
158 def f(a, b=1):
159 #from time import sleep
160 #sleep(1)
161 print "a,b=", a,b
162 return a+b
163
164 def showres(x):
165 print 'ans:',x
166
167 res = f(11,5)
168 res.addCallback(showres)
169
170 # this is not necessary in Twisted 8.0
171 from twisted.internet import reactor
172 reactor.run()
@@ -0,0 +1,119 b''
1 import types
2
3 class AttributeBase(object):
4
5 def __get__(self, inst, cls=None):
6 if inst is None:
7 return self
8 try:
9 return inst._attributes[self.name]
10 except KeyError:
11 raise AttributeError("object has no attribute %r" % self.name)
12
13 def __set__(self, inst, value):
14 actualValue = self.validate(inst, self.name, value)
15 inst._attributes[self.name] = actualValue
16
17 def validate(self, inst, name, value):
18 raise NotImplementedError("validate must be implemented by a subclass")
19
20 class NameFinder(type):
21
22 def __new__(cls, name, bases, classdict):
23 attributeList = []
24 for k,v in classdict.iteritems():
25 if isinstance(v, AttributeBase):
26 v.name = k
27 attributeList.append(k)
28 classdict['_attributeList'] = attributeList
29 return type.__new__(cls, name, bases, classdict)
30
31 class HasAttributes(object):
32 __metaclass__ = NameFinder
33
34 def __init__(self):
35 self._attributes = {}
36
37 def getAttributeNames(self):
38 return self._attributeList
39
40 def getAttributesOfType(self, t, default=None):
41 result = {}
42 for a in self._attributeList:
43 if self.__class__.__dict__[a].__class__ == t:
44 try:
45 value = getattr(self, a)
46 except AttributeError:
47 value = None
48 result[a] = value
49 return result
50
51 class TypedAttribute(AttributeBase):
52
53 def validate(self, inst, name, value):
54 if type(value) != self._type:
55 raise TypeError("attribute %s must be of type %s" % (name, self._type))
56 else:
57 return value
58
59 # class Option(TypedAttribute):
60 #
61 # _type = types.IntType
62 #
63 # class Param(TypedAttribute):
64 #
65 # _type = types.FloatType
66 #
67 # class String(TypedAttribute):
68 #
69 # _type = types.StringType
70
71 class TypedSequenceAttribute(AttributeBase):
72
73 def validate(self, inst, name, value):
74 if type(value) != types.TupleType and type(value) != types.ListType:
75 raise TypeError("attribute %s must be a list or tuple" % (name))
76 else:
77 for item in value:
78 if type(item) != self._subtype:
79 raise TypeError("attribute %s must be a list or tuple of items with type %s" % (name, self._subtype))
80 return value
81
82 # class Instance(AttributeBase):
83 #
84 # def __init__(self, cls):
85 # self.cls = cls
86 #
87 # def validate(self, inst, name, value):
88 # if not isinstance(value, self.cls):
89 # raise TypeError("attribute %s must be an instance of class %s" % (name, self.cls))
90 # else:
91 # return value
92
93
94 # class OptVec(TypedSequenceAttribute):
95 #
96 # _subtype = types.IntType
97 #
98 # class PrmVec(TypedSequenceAttribute):
99 #
100 # _subtype = types.FloatType
101 #
102 # class StrVec(TypedSequenceAttribute):
103 #
104 # _subtype = types.StringType
105 #
106 #
107 # class Bar(HasAttributes):
108 #
109 # a = Option()
110 #
111 # class Foo(HasAttributes):
112 #
113 # a = Option()
114 # b = Param()
115 # c = String()
116 # d = OptVec()
117 # e = PrmVec()
118 # f = StrVec()
119 # h = Instance(Bar) No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now