Show More
@@ -0,0 +1,75 | |||||
|
1 | """PendingResult objects for the client""" | |||
|
2 | #----------------------------------------------------------------------------- | |||
|
3 | # Copyright (C) 2010 The IPython Development Team | |||
|
4 | # | |||
|
5 | # Distributed under the terms of the BSD License. The full license is in | |||
|
6 | # the file COPYING, distributed as part of this software. | |||
|
7 | #----------------------------------------------------------------------------- | |||
|
8 | ||||
|
9 | #----------------------------------------------------------------------------- | |||
|
10 | # Imports | |||
|
11 | #----------------------------------------------------------------------------- | |||
|
12 | ||||
|
13 | import error | |||
|
14 | ||||
|
15 | #----------------------------------------------------------------------------- | |||
|
16 | # Classes | |||
|
17 | #----------------------------------------------------------------------------- | |||
|
18 | ||||
|
19 | class PendingResult(object): | |||
|
20 | """Class for representing results of non-blocking calls.""" | |||
|
21 | def __init__(self, client, msg_ids): | |||
|
22 | self.client = client | |||
|
23 | self.msg_ids = msg_ids | |||
|
24 | self._result = None | |||
|
25 | self.done = False | |||
|
26 | ||||
|
27 | def __repr__(self): | |||
|
28 | if self.done: | |||
|
29 | return "<%s: finished>"%(self.__class__.__name__) | |||
|
30 | else: | |||
|
31 | return "<%s: %r>"%(self.__class__.__name__,self.msg_ids) | |||
|
32 | ||||
|
33 | @property | |||
|
34 | def result(self): | |||
|
35 | if self._result is not None: | |||
|
36 | return self._result | |||
|
37 | if not self.done: | |||
|
38 | self.wait(0) | |||
|
39 | if self.done: | |||
|
40 | results = map(self.client.results.get, self.msg_ids) | |||
|
41 | results = error.collect_exceptions(results, 'get_result') | |||
|
42 | self._result = self.reconstruct_result(results) | |||
|
43 | return self._result | |||
|
44 | else: | |||
|
45 | raise error.ResultNotCompleted | |||
|
46 | ||||
|
47 | def reconstruct_result(self, res): | |||
|
48 | """ | |||
|
49 | Override me in subclasses for turning a list of results | |||
|
50 | into the expected form. | |||
|
51 | """ | |||
|
52 | if len(res) == 1: | |||
|
53 | return res[0] | |||
|
54 | else: | |||
|
55 | return res | |||
|
56 | ||||
|
57 | def wait(self, timout=-1): | |||
|
58 | self.done = self.client.barrier(self.msg_ids) | |||
|
59 | return self.done | |||
|
60 | ||||
|
61 | class PendingMapResult(PendingResult): | |||
|
62 | """Class for representing results of non-blocking gathers. | |||
|
63 | ||||
|
64 | This will properly reconstruct the gather. | |||
|
65 | """ | |||
|
66 | ||||
|
67 | def __init__(self, client, msg_ids, mapObject): | |||
|
68 | self.mapObject = mapObject | |||
|
69 | PendingResult.__init__(self, client, msg_ids) | |||
|
70 | ||||
|
71 | def reconstruct_result(self, res): | |||
|
72 | """Perform the gather on the actual results.""" | |||
|
73 | return self.mapObject.joinPartitions(res) | |||
|
74 | ||||
|
75 |
@@ -0,0 +1,145 | |||||
|
1 | """Remote Functions and decorators for the client.""" | |||
|
2 | #----------------------------------------------------------------------------- | |||
|
3 | # Copyright (C) 2010 The IPython Development Team | |||
|
4 | # | |||
|
5 | # Distributed under the terms of the BSD License. The full license is in | |||
|
6 | # the file COPYING, distributed as part of this software. | |||
|
7 | #----------------------------------------------------------------------------- | |||
|
8 | ||||
|
9 | #----------------------------------------------------------------------------- | |||
|
10 | # Imports | |||
|
11 | #----------------------------------------------------------------------------- | |||
|
12 | ||||
|
13 | import map as Map | |||
|
14 | from pendingresult import PendingMapResult | |||
|
15 | ||||
|
16 | #----------------------------------------------------------------------------- | |||
|
17 | # Decorators | |||
|
18 | #----------------------------------------------------------------------------- | |||
|
19 | ||||
|
20 | def remote(client, bound=False, block=None, targets=None): | |||
|
21 | """Turn a function into a remote function. | |||
|
22 | ||||
|
23 | This method can be used for map: | |||
|
24 | ||||
|
25 | >>> @remote(client,block=True) | |||
|
26 | def func(a) | |||
|
27 | """ | |||
|
28 | def remote_function(f): | |||
|
29 | return RemoteFunction(client, f, bound, block, targets) | |||
|
30 | return remote_function | |||
|
31 | ||||
|
32 | def parallel(client, dist='b', bound=False, block=None, targets='all'): | |||
|
33 | """Turn a function into a parallel remote function. | |||
|
34 | ||||
|
35 | This method can be used for map: | |||
|
36 | ||||
|
37 | >>> @parallel(client,block=True) | |||
|
38 | def func(a) | |||
|
39 | """ | |||
|
40 | def parallel_function(f): | |||
|
41 | return ParallelFunction(client, f, dist, bound, block, targets) | |||
|
42 | return parallel_function | |||
|
43 | ||||
|
44 | #-------------------------------------------------------------------------- | |||
|
45 | # Classes | |||
|
46 | #-------------------------------------------------------------------------- | |||
|
47 | ||||
|
48 | class RemoteFunction(object): | |||
|
49 | """Turn an existing function into a remote function. | |||
|
50 | ||||
|
51 | Parameters | |||
|
52 | ---------- | |||
|
53 | ||||
|
54 | client : Client instance | |||
|
55 | The client to be used to connect to engines | |||
|
56 | f : callable | |||
|
57 | The function to be wrapped into a remote function | |||
|
58 | bound : bool [default: False] | |||
|
59 | Whether the affect the remote namespace when called | |||
|
60 | block : bool [default: None] | |||
|
61 | Whether to wait for results or not. The default behavior is | |||
|
62 | to use the current `block` attribute of `client` | |||
|
63 | targets : valid target list [default: all] | |||
|
64 | The targets on which to execute. | |||
|
65 | """ | |||
|
66 | ||||
|
67 | client = None # the remote connection | |||
|
68 | func = None # the wrapped function | |||
|
69 | block = None # whether to block | |||
|
70 | bound = None # whether to affect the namespace | |||
|
71 | targets = None # where to execute | |||
|
72 | ||||
|
73 | def __init__(self, client, f, bound=False, block=None, targets=None): | |||
|
74 | self.client = client | |||
|
75 | self.func = f | |||
|
76 | self.block=block | |||
|
77 | self.bound=bound | |||
|
78 | self.targets=targets | |||
|
79 | ||||
|
80 | def __call__(self, *args, **kwargs): | |||
|
81 | return self.client.apply(self.func, args=args, kwargs=kwargs, | |||
|
82 | block=self.block, targets=self.targets, bound=self.bound) | |||
|
83 | ||||
|
84 | ||||
|
85 | class ParallelFunction(RemoteFunction): | |||
|
86 | """Class for mapping a function to sequences.""" | |||
|
87 | def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'): | |||
|
88 | super(ParallelFunction, self).__init__(client,f,bound,block,targets) | |||
|
89 | mapClass = Map.dists[dist] | |||
|
90 | self.mapObject = mapClass() | |||
|
91 | ||||
|
92 | def __call__(self, *sequences): | |||
|
93 | len_0 = len(sequences[0]) | |||
|
94 | for s in sequences: | |||
|
95 | if len(s)!=len_0: | |||
|
96 | raise ValueError('all sequences must have equal length') | |||
|
97 | ||||
|
98 | if self.targets is None: | |||
|
99 | # load-balanced: | |||
|
100 | engines = [None]*len_0 | |||
|
101 | elif isinstance(self.targets, int): | |||
|
102 | engines = [None]*self.targets | |||
|
103 | else: | |||
|
104 | # multiplexed: | |||
|
105 | engines = self.client._build_targets(self.targets)[-1] | |||
|
106 | ||||
|
107 | nparts = len(engines) | |||
|
108 | msg_ids = [] | |||
|
109 | # my_f = lambda *a: map(self.func, *a) | |||
|
110 | for index, engineid in enumerate(engines): | |||
|
111 | args = [] | |||
|
112 | for seq in sequences: | |||
|
113 | part = self.mapObject.getPartition(seq, index, nparts) | |||
|
114 | if not part: | |||
|
115 | continue | |||
|
116 | else: | |||
|
117 | args.append(part) | |||
|
118 | if not args: | |||
|
119 | continue | |||
|
120 | ||||
|
121 | # print (args) | |||
|
122 | if hasattr(self, '_map'): | |||
|
123 | f = map | |||
|
124 | args = [self.func]+args | |||
|
125 | else: | |||
|
126 | f=self.func | |||
|
127 | mid = self.client.apply(f, args=args, block=False, | |||
|
128 | bound=self.bound, | |||
|
129 | targets=engineid).msg_ids[0] | |||
|
130 | msg_ids.append(mid) | |||
|
131 | ||||
|
132 | r = PendingMapResult(self.client, msg_ids, self.mapObject) | |||
|
133 | if self.block: | |||
|
134 | r.wait() | |||
|
135 | return r.result | |||
|
136 | else: | |||
|
137 | return r | |||
|
138 | ||||
|
139 | def map(self, *sequences): | |||
|
140 | """call a function on each element of a sequence remotely.""" | |||
|
141 | self._map = True | |||
|
142 | ret = self.__call__(*sequences) | |||
|
143 | del self._map | |||
|
144 | return ret | |||
|
145 |
@@ -29,6 +29,8 from view import DirectView, LoadBalancedView | |||||
29 | from dependency import Dependency, depend, require |
|
29 | from dependency import Dependency, depend, require | |
30 | import error |
|
30 | import error | |
31 | import map as Map |
|
31 | import map as Map | |
|
32 | from pendingresult import PendingResult,PendingMapResult | |||
|
33 | from remotefunction import remote,parallel,ParallelFunction,RemoteFunction | |||
32 |
|
34 | |||
33 | #-------------------------------------------------------------------------- |
|
35 | #-------------------------------------------------------------------------- | |
34 | # helpers for implementing old MEC API via client.apply |
|
36 | # helpers for implementing old MEC API via client.apply | |
@@ -81,167 +83,6 def defaultblock(f, self, *args, **kwargs): | |||||
81 | self.block = saveblock |
|
83 | self.block = saveblock | |
82 | return ret |
|
84 | return ret | |
83 |
|
85 | |||
84 | def remote(client, bound=False, block=None, targets=None): |
|
|||
85 | """Turn a function into a remote function. |
|
|||
86 |
|
||||
87 | This method can be used for map: |
|
|||
88 |
|
||||
89 | >>> @remote(client,block=True) |
|
|||
90 | def func(a) |
|
|||
91 | """ |
|
|||
92 | def remote_function(f): |
|
|||
93 | return RemoteFunction(client, f, bound, block, targets) |
|
|||
94 | return remote_function |
|
|||
95 |
|
||||
96 | def parallel(client, dist='b', bound=False, block=None, targets='all'): |
|
|||
97 | """Turn a function into a parallel remote function. |
|
|||
98 |
|
||||
99 | This method can be used for map: |
|
|||
100 |
|
||||
101 | >>> @parallel(client,block=True) |
|
|||
102 | def func(a) |
|
|||
103 | """ |
|
|||
104 | def parallel_function(f): |
|
|||
105 | return ParallelFunction(client, f, dist, bound, block, targets) |
|
|||
106 | return parallel_function |
|
|||
107 |
|
||||
108 | #-------------------------------------------------------------------------- |
|
|||
109 | # Classes |
|
|||
110 | #-------------------------------------------------------------------------- |
|
|||
111 |
|
||||
112 | class RemoteFunction(object): |
|
|||
113 | """Turn an existing function into a remote function. |
|
|||
114 |
|
||||
115 | Parameters |
|
|||
116 | ---------- |
|
|||
117 |
|
||||
118 | client : Client instance |
|
|||
119 | The client to be used to connect to engines |
|
|||
120 | f : callable |
|
|||
121 | The function to be wrapped into a remote function |
|
|||
122 | bound : bool [default: False] |
|
|||
123 | Whether the affect the remote namespace when called |
|
|||
124 | block : bool [default: None] |
|
|||
125 | Whether to wait for results or not. The default behavior is |
|
|||
126 | to use the current `block` attribute of `client` |
|
|||
127 | targets : valid target list [default: all] |
|
|||
128 | The targets on which to execute. |
|
|||
129 | """ |
|
|||
130 |
|
||||
131 | client = None # the remote connection |
|
|||
132 | func = None # the wrapped function |
|
|||
133 | block = None # whether to block |
|
|||
134 | bound = None # whether to affect the namespace |
|
|||
135 | targets = None # where to execute |
|
|||
136 |
|
||||
137 | def __init__(self, client, f, bound=False, block=None, targets=None): |
|
|||
138 | self.client = client |
|
|||
139 | self.func = f |
|
|||
140 | self.block=block |
|
|||
141 | self.bound=bound |
|
|||
142 | self.targets=targets |
|
|||
143 |
|
||||
144 | def __call__(self, *args, **kwargs): |
|
|||
145 | return self.client.apply(self.func, args=args, kwargs=kwargs, |
|
|||
146 | block=self.block, targets=self.targets, bound=self.bound) |
|
|||
147 |
|
||||
148 |
|
||||
149 | class ParallelFunction(RemoteFunction): |
|
|||
150 | """Class for mapping a function to sequences.""" |
|
|||
151 | def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'): |
|
|||
152 | super(ParallelFunction, self).__init__(client,f,bound,block,targets) |
|
|||
153 | mapClass = Map.dists[dist] |
|
|||
154 | self.mapObject = mapClass() |
|
|||
155 |
|
||||
156 | def __call__(self, *sequences): |
|
|||
157 | len_0 = len(sequences[0]) |
|
|||
158 | for s in sequences: |
|
|||
159 | if len(s)!=len_0: |
|
|||
160 | raise ValueError('all sequences must have equal length') |
|
|||
161 |
|
||||
162 | if self.targets is None: |
|
|||
163 | # load-balanced: |
|
|||
164 | engines = [None]*len_0 |
|
|||
165 | else: |
|
|||
166 | # multiplexed: |
|
|||
167 | engines = self.client._build_targets(self.targets)[-1] |
|
|||
168 |
|
||||
169 | nparts = len(engines) |
|
|||
170 | msg_ids = [] |
|
|||
171 | for index, engineid in enumerate(engines): |
|
|||
172 | args = [] |
|
|||
173 | for seq in sequences: |
|
|||
174 | args.append(self.mapObject.getPartition(seq, index, nparts)) |
|
|||
175 | mid = self.client.apply(self.func, args=args, block=False, |
|
|||
176 | bound=self.bound, |
|
|||
177 | targets=engineid) |
|
|||
178 | msg_ids.append(mid) |
|
|||
179 |
|
||||
180 | if self.block: |
|
|||
181 | dg = PendingMapResult(self.client, msg_ids, self.mapObject) |
|
|||
182 | dg.wait() |
|
|||
183 | return dg.result |
|
|||
184 | else: |
|
|||
185 | return dg |
|
|||
186 |
|
||||
187 |
|
||||
188 | class PendingResult(object): |
|
|||
189 | """Class for representing results of non-blocking calls.""" |
|
|||
190 | def __init__(self, client, msg_ids): |
|
|||
191 | self.client = client |
|
|||
192 | self.msg_ids = msg_ids |
|
|||
193 | self._result = None |
|
|||
194 | self.done = False |
|
|||
195 |
|
||||
196 | def __repr__(self): |
|
|||
197 | if self.done: |
|
|||
198 | return "<%s: finished>"%(self.__class__.__name__) |
|
|||
199 | else: |
|
|||
200 | return "<%s: %r>"%(self.__class__.__name__,self.msg_ids) |
|
|||
201 |
|
||||
202 | @property |
|
|||
203 | def result(self): |
|
|||
204 | if self._result is not None: |
|
|||
205 | return self._result |
|
|||
206 | if not self.done: |
|
|||
207 | self.wait(0) |
|
|||
208 | if self.done: |
|
|||
209 | results = map(self.client.results.get, self.msg_ids) |
|
|||
210 | results = error.collect_exceptions(results, 'get_result') |
|
|||
211 | self._result = self.reconstruct_result(results) |
|
|||
212 | return self._result |
|
|||
213 | else: |
|
|||
214 | raise error.ResultNotCompleted |
|
|||
215 |
|
||||
216 | def reconstruct_result(self, res): |
|
|||
217 | """ |
|
|||
218 | Override me in subclasses for turning a list of results |
|
|||
219 | into the expected form. |
|
|||
220 | """ |
|
|||
221 | if len(res) == 1: |
|
|||
222 | return res[0] |
|
|||
223 | else: |
|
|||
224 | return res |
|
|||
225 |
|
||||
226 | def wait(self, timout=-1): |
|
|||
227 | self.done = self.client.barrier(self.msg_ids) |
|
|||
228 | return self.done |
|
|||
229 |
|
||||
230 | class PendingMapResult(PendingResult): |
|
|||
231 | """Class for representing results of non-blocking gathers. |
|
|||
232 |
|
||||
233 | This will properly reconstruct the gather. |
|
|||
234 | """ |
|
|||
235 |
|
||||
236 | def __init__(self, client, msg_ids, mapObject): |
|
|||
237 | self.mapObject = mapObject |
|
|||
238 | PendingResult.__init__(self, client, msg_ids) |
|
|||
239 |
|
||||
240 | def reconstruct_result(self, res): |
|
|||
241 | """Perform the gather on the actual results.""" |
|
|||
242 | return self.mapObject.joinPartitions(res) |
|
|||
243 |
|
||||
244 |
|
||||
245 |
|
86 | |||
246 | class AbortedTask(object): |
|
87 | class AbortedTask(object): | |
247 | """A basic wrapper object describing an aborted task.""" |
|
88 | """A basic wrapper object describing an aborted task.""" | |
@@ -944,10 +785,11 class Client(object): | |||||
944 | result[target] = self.results[mid] |
|
785 | result[target] = self.results[mid] | |
945 | return error.collect_exceptions(result, f.__name__) |
|
786 | return error.collect_exceptions(result, f.__name__) | |
946 |
|
787 | |||
947 | @defaultblock |
|
788 | def map(self, f, *sequences): | |
948 | def map(self, f, sequences, targets=None, block=None, bound=False): |
|
789 | """Parallel version of builtin `map`, using all our engines.""" | |
949 |
pf = ParallelFunction(self,f,block=block, |
|
790 | pf = ParallelFunction(self, f, block=self.block, | |
950 | return pf(*sequences) |
|
791 | bound=True, targets='all') | |
|
792 | return pf.map(*sequences) | |||
951 |
|
793 | |||
952 | #-------------------------------------------------------------------------- |
|
794 | #-------------------------------------------------------------------------- | |
953 | # Data movement |
|
795 | # Data movement |
@@ -277,7 +277,12 class Kernel(HasTraits): | |||||
277 | suffix = prefix = "_" # prevent keyword collisions with lambda |
|
277 | suffix = prefix = "_" # prevent keyword collisions with lambda | |
278 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) |
|
278 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | |
279 | # if f.fun |
|
279 | # if f.fun | |
280 | fname = prefix+f.func_name.strip('<>')+suffix |
|
280 | if hasattr(f, 'func_name'): | |
|
281 | fname = f.func_name | |||
|
282 | else: | |||
|
283 | fname = f.__name__ | |||
|
284 | ||||
|
285 | fname = prefix+fname.strip('<>')+suffix | |||
281 | argname = prefix+"args"+suffix |
|
286 | argname = prefix+"args"+suffix | |
282 | kwargname = prefix+"kwargs"+suffix |
|
287 | kwargname = prefix+"kwargs"+suffix | |
283 | resultname = prefix+"result"+suffix |
|
288 | resultname = prefix+"result"+suffix |
@@ -11,6 +11,7 | |||||
11 | #----------------------------------------------------------------------------- |
|
11 | #----------------------------------------------------------------------------- | |
12 |
|
12 | |||
13 | from IPython.external.decorator import decorator |
|
13 | from IPython.external.decorator import decorator | |
|
14 | from IPython.zmq.parallel.remotefunction import ParallelFunction | |||
14 |
|
15 | |||
15 | #----------------------------------------------------------------------------- |
|
16 | #----------------------------------------------------------------------------- | |
16 | # Decorators |
|
17 | # Decorators | |
@@ -28,8 +29,10 def myblock(f, self, *args, **kwargs): | |||||
28 | @decorator |
|
29 | @decorator | |
29 | def save_ids(f, self, *args, **kwargs): |
|
30 | def save_ids(f, self, *args, **kwargs): | |
30 | """Keep our history and outstanding attributes up to date after a method call.""" |
|
31 | """Keep our history and outstanding attributes up to date after a method call.""" | |
|
32 | n_previous = len(self.client.history) | |||
31 | ret = f(self, *args, **kwargs) |
|
33 | ret = f(self, *args, **kwargs) | |
32 |
msg |
|
34 | nmsgs = len(self.client.history) - n_previous | |
|
35 | msg_ids = self.client.history[-nmsgs:] | |||
33 | self.history.extend(msg_ids) |
|
36 | self.history.extend(msg_ids) | |
34 | map(self.outstanding.add, msg_ids) |
|
37 | map(self.outstanding.add, msg_ids) | |
35 | return ret |
|
38 | return ret | |
@@ -172,6 +175,16 class View(object): | |||||
172 | """ |
|
175 | """ | |
173 | return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) |
|
176 | return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) | |
174 |
|
177 | |||
|
178 | @spin_after | |||
|
179 | @save_ids | |||
|
180 | def map(self, f, *sequences): | |||
|
181 | """Parallel version of builtin `map`, using this view's engines.""" | |||
|
182 | if isinstance(self.targets, int): | |||
|
183 | targets = [self.targets] | |||
|
184 | pf = ParallelFunction(self.client, f, block=self.block, | |||
|
185 | bound=True, targets=targets) | |||
|
186 | return pf.map(*sequences) | |||
|
187 | ||||
175 | def abort(self, msg_ids=None, block=None): |
|
188 | def abort(self, msg_ids=None, block=None): | |
176 | """Abort jobs on my engines. |
|
189 | """Abort jobs on my engines. | |
177 |
|
190 |
General Comments 0
You need to be logged in to leave comments.
Login now