Show More
@@ -0,0 +1,172 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """A parallel tasking tool that uses asynchronous programming. This uses | |||
|
3 | blocking client to get taskid, but returns a Deferred as the result of | |||
|
4 | run(). Users should attach their callbacks on these Deferreds. | |||
|
5 | ||||
|
6 | Only returning of results is asynchronous. Submitting tasks and getting task | |||
|
7 | ids are done synchronously. | |||
|
8 | ||||
|
9 | Yichun Wei 03/2008 | |||
|
10 | """ | |||
|
11 | ||||
|
12 | import inspect | |||
|
13 | import itertools | |||
|
14 | import numpy as N | |||
|
15 | ||||
|
16 | from twisted.python import log | |||
|
17 | from ipython1.kernel import client | |||
|
18 | from ipython1.kernel.client import Task | |||
|
19 | ||||
|
20 | """ After http://trac.pocoo.org/repos/pocoo/trunk/pocoo/utils/decorators.py | |||
|
21 | """ | |||
|
22 | class submit_job(object): | |||
|
23 | """ a decorator factory: takes a MultiEngineClient a TaskClient, returns a | |||
|
24 | decorator, that makes a call to the decorated func as a task in ipython1 | |||
|
25 | and submit it to IPython1 controller: | |||
|
26 | """ | |||
|
27 | def __init__(self, rc, tc): | |||
|
28 | self.rc = rc | |||
|
29 | self.tc = tc | |||
|
30 | ||||
|
31 | def __call__(self, func): | |||
|
32 | return self._decorate(func) | |||
|
33 | ||||
|
34 | def _getinfo(self, func): | |||
|
35 | assert inspect.ismethod(func) or inspect.isfunction(func) | |||
|
36 | regargs, varargs, varkwargs, defaults = inspect.getargspec(func) | |||
|
37 | argnames = list(regargs) | |||
|
38 | if varargs: | |||
|
39 | argnames.append(varargs) | |||
|
40 | if varkwargs: | |||
|
41 | argnames.append(varkwargs) | |||
|
42 | counter = itertools.count() | |||
|
43 | fullsign = inspect.formatargspec( | |||
|
44 | regargs, varargs, varkwargs, defaults, | |||
|
45 | formatvalue=lambda value: '=defarg[%i]' % counter.next())[1:-1] | |||
|
46 | shortsign = inspect.formatargspec( | |||
|
47 | regargs, varargs, varkwargs, defaults, | |||
|
48 | formatvalue=lambda value: '')[1:-1] | |||
|
49 | dic = dict(('arg%s' % n, name) for n, name in enumerate(argnames)) | |||
|
50 | dic.update(name=func.__name__, argnames=argnames, shortsign=shortsign, | |||
|
51 | fullsign = fullsign, defarg = func.func_defaults or ()) | |||
|
52 | return dic | |||
|
53 | ||||
|
54 | def _decorate(self, func): | |||
|
55 | """ | |||
|
56 | Takes a function and a remote controller and returns a function | |||
|
57 | decorated that is going to submit the job with the controller. | |||
|
58 | The decorated function is obtained by evaluating a lambda | |||
|
59 | function with the correct signature. | |||
|
60 | ||||
|
61 | the TaskController setupNS doesn't cope with functions, but we | |||
|
62 | can use RemoteController to push functions/modules into engines. | |||
|
63 | ||||
|
64 | Changes: | |||
|
65 | 200803. In new ipython1, we use push_function for functions. | |||
|
66 | """ | |||
|
67 | rc, tc = self.rc, self.tc | |||
|
68 | infodict = self._getinfo(func) | |||
|
69 | if 'rc' in infodict['argnames']: | |||
|
70 | raise NameError, "You cannot use rc as argument names!" | |||
|
71 | ||||
|
72 | # we assume the engines' namepace has been prepared. | |||
|
73 | # ns[func.__name__] is already the decorated closure function. | |||
|
74 | # we need to change it back to the original function: | |||
|
75 | ns = {} | |||
|
76 | ns[func.__name__] = func | |||
|
77 | ||||
|
78 | # push func and all its environment/prerequesites to engines | |||
|
79 | rc.push_function(ns, block=True) # note it is nonblock by default, not know if it causes problems | |||
|
80 | ||||
|
81 | def do_submit_func(*args, **kwds): | |||
|
82 | jobns = {} | |||
|
83 | ||||
|
84 | # Initialize job namespace with args that have default args | |||
|
85 | # now we support calls that uses default args | |||
|
86 | for n in infodict['fullsign'].split(','): | |||
|
87 | try: | |||
|
88 | vname, var = n.split('=') | |||
|
89 | vname, var = vname.strip(), var.strip() | |||
|
90 | except: # no defarg, one of vname, var is None | |||
|
91 | pass | |||
|
92 | else: | |||
|
93 | jobns.setdefault(vname, eval(var, infodict)) | |||
|
94 | ||||
|
95 | # push args and kwds, overwritting default args if needed. | |||
|
96 | nokwds = dict((n,v) for n,v in zip(infodict['argnames'], args)) # truncated | |||
|
97 | jobns.update(nokwds) | |||
|
98 | jobns.update(kwds) | |||
|
99 | ||||
|
100 | task = Task('a_very_long_and_rare_name = %(name)s(%(shortsign)s)' % infodict, | |||
|
101 | pull=['a_very_long_and_rare_name'], push=jobns,) | |||
|
102 | jobid = tc.run(task) | |||
|
103 | # res is a deferred, one can attach callbacks on it | |||
|
104 | res = tc.task_controller.get_task_result(jobid, block=True) | |||
|
105 | res.addCallback(lambda x: x.ns['a_very_long_and_rare_name']) | |||
|
106 | res.addErrback(log.err) | |||
|
107 | return res | |||
|
108 | ||||
|
109 | do_submit_func.rc = rc | |||
|
110 | do_submit_func.tc = tc | |||
|
111 | return do_submit_func | |||
|
112 | ||||
|
113 | ||||
|
114 | def parallelized(rc, tc, initstrlist=[]): | |||
|
115 | """ rc - remote controller | |||
|
116 | tc - taks controller | |||
|
117 | strlist - a list of str that's being executed on engines. | |||
|
118 | """ | |||
|
119 | for cmd in initstrlist: | |||
|
120 | rc.execute(cmd, block=True) | |||
|
121 | return submit_job(rc, tc) | |||
|
122 | ||||
|
123 | ||||
|
124 | from twisted.internet import defer | |||
|
125 | from numpy import array, nan | |||
|
126 | ||||
|
127 | def pmap(func, parr, **kwds): | |||
|
128 | """Run func on every element of parr (array), using the elements | |||
|
129 | as the only one parameter (so you can usually use a dict that | |||
|
130 | wraps many parameters). -> a result array of Deferreds with the | |||
|
131 | same shape. func.tc will be used as the taskclient. | |||
|
132 | ||||
|
133 | **kwds are passed on to func, not changed. | |||
|
134 | """ | |||
|
135 | assert func.tc | |||
|
136 | tc = func.tc | |||
|
137 | ||||
|
138 | def run(p, **kwds): | |||
|
139 | if p: | |||
|
140 | return func(p, **kwds) | |||
|
141 | else: | |||
|
142 | return defer.succeed(nan) | |||
|
143 | ||||
|
144 | reslist = [run(p, **kwds).addErrback(log.err) for p in parr.flat] | |||
|
145 | resarr = array(reslist) | |||
|
146 | resarr.shape = parr.shape | |||
|
147 | return resarr | |||
|
148 | ||||
|
149 | ||||
|
150 | if __name__=='__main__': | |||
|
151 | ||||
|
152 | rc = client.MultiEngineClient(client.default_address) | |||
|
153 | tc = client.TaskClient(client.default_task_address) | |||
|
154 | ||||
|
155 | # if commenting out the decorator you get a local running version | |||
|
156 | # instantly | |||
|
157 | @parallelized(rc, tc) | |||
|
158 | def f(a, b=1): | |||
|
159 | #from time import sleep | |||
|
160 | #sleep(1) | |||
|
161 | print "a,b=", a,b | |||
|
162 | return a+b | |||
|
163 | ||||
|
164 | def showres(x): | |||
|
165 | print 'ans:',x | |||
|
166 | ||||
|
167 | res = f(11,5) | |||
|
168 | res.addCallback(showres) | |||
|
169 | ||||
|
170 | # this is not necessary in Twisted 8.0 | |||
|
171 | from twisted.internet import reactor | |||
|
172 | reactor.run() |
@@ -0,0 +1,119 b'' | |||||
|
1 | import types | |||
|
2 | ||||
|
3 | class AttributeBase(object): | |||
|
4 | ||||
|
5 | def __get__(self, inst, cls=None): | |||
|
6 | if inst is None: | |||
|
7 | return self | |||
|
8 | try: | |||
|
9 | return inst._attributes[self.name] | |||
|
10 | except KeyError: | |||
|
11 | raise AttributeError("object has no attribute %r" % self.name) | |||
|
12 | ||||
|
13 | def __set__(self, inst, value): | |||
|
14 | actualValue = self.validate(inst, self.name, value) | |||
|
15 | inst._attributes[self.name] = actualValue | |||
|
16 | ||||
|
17 | def validate(self, inst, name, value): | |||
|
18 | raise NotImplementedError("validate must be implemented by a subclass") | |||
|
19 | ||||
|
20 | class NameFinder(type): | |||
|
21 | ||||
|
22 | def __new__(cls, name, bases, classdict): | |||
|
23 | attributeList = [] | |||
|
24 | for k,v in classdict.iteritems(): | |||
|
25 | if isinstance(v, AttributeBase): | |||
|
26 | v.name = k | |||
|
27 | attributeList.append(k) | |||
|
28 | classdict['_attributeList'] = attributeList | |||
|
29 | return type.__new__(cls, name, bases, classdict) | |||
|
30 | ||||
|
31 | class HasAttributes(object): | |||
|
32 | __metaclass__ = NameFinder | |||
|
33 | ||||
|
34 | def __init__(self): | |||
|
35 | self._attributes = {} | |||
|
36 | ||||
|
37 | def getAttributeNames(self): | |||
|
38 | return self._attributeList | |||
|
39 | ||||
|
40 | def getAttributesOfType(self, t, default=None): | |||
|
41 | result = {} | |||
|
42 | for a in self._attributeList: | |||
|
43 | if self.__class__.__dict__[a].__class__ == t: | |||
|
44 | try: | |||
|
45 | value = getattr(self, a) | |||
|
46 | except AttributeError: | |||
|
47 | value = None | |||
|
48 | result[a] = value | |||
|
49 | return result | |||
|
50 | ||||
|
51 | class TypedAttribute(AttributeBase): | |||
|
52 | ||||
|
53 | def validate(self, inst, name, value): | |||
|
54 | if type(value) != self._type: | |||
|
55 | raise TypeError("attribute %s must be of type %s" % (name, self._type)) | |||
|
56 | else: | |||
|
57 | return value | |||
|
58 | ||||
|
59 | # class Option(TypedAttribute): | |||
|
60 | # | |||
|
61 | # _type = types.IntType | |||
|
62 | # | |||
|
63 | # class Param(TypedAttribute): | |||
|
64 | # | |||
|
65 | # _type = types.FloatType | |||
|
66 | # | |||
|
67 | # class String(TypedAttribute): | |||
|
68 | # | |||
|
69 | # _type = types.StringType | |||
|
70 | ||||
|
71 | class TypedSequenceAttribute(AttributeBase): | |||
|
72 | ||||
|
73 | def validate(self, inst, name, value): | |||
|
74 | if type(value) != types.TupleType and type(value) != types.ListType: | |||
|
75 | raise TypeError("attribute %s must be a list or tuple" % (name)) | |||
|
76 | else: | |||
|
77 | for item in value: | |||
|
78 | if type(item) != self._subtype: | |||
|
79 | raise TypeError("attribute %s must be a list or tuple of items with type %s" % (name, self._subtype)) | |||
|
80 | return value | |||
|
81 | ||||
|
82 | # class Instance(AttributeBase): | |||
|
83 | # | |||
|
84 | # def __init__(self, cls): | |||
|
85 | # self.cls = cls | |||
|
86 | # | |||
|
87 | # def validate(self, inst, name, value): | |||
|
88 | # if not isinstance(value, self.cls): | |||
|
89 | # raise TypeError("attribute %s must be an instance of class %s" % (name, self.cls)) | |||
|
90 | # else: | |||
|
91 | # return value | |||
|
92 | ||||
|
93 | ||||
|
94 | # class OptVec(TypedSequenceAttribute): | |||
|
95 | # | |||
|
96 | # _subtype = types.IntType | |||
|
97 | # | |||
|
98 | # class PrmVec(TypedSequenceAttribute): | |||
|
99 | # | |||
|
100 | # _subtype = types.FloatType | |||
|
101 | # | |||
|
102 | # class StrVec(TypedSequenceAttribute): | |||
|
103 | # | |||
|
104 | # _subtype = types.StringType | |||
|
105 | # | |||
|
106 | # | |||
|
107 | # class Bar(HasAttributes): | |||
|
108 | # | |||
|
109 | # a = Option() | |||
|
110 | # | |||
|
111 | # class Foo(HasAttributes): | |||
|
112 | # | |||
|
113 | # a = Option() | |||
|
114 | # b = Param() | |||
|
115 | # c = String() | |||
|
116 | # d = OptVec() | |||
|
117 | # e = PrmVec() | |||
|
118 | # f = StrVec() | |||
|
119 | # h = Instance(Bar) No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now