##// END OF EJS Templates
split pendingresult and remotefunction into own files, add view.map.
MinRK -
Show More
@@ -0,0 +1,75 b''
1 """PendingResult objects for the client"""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
8
9 #-----------------------------------------------------------------------------
10 # Imports
11 #-----------------------------------------------------------------------------
12
13 import error
14
15 #-----------------------------------------------------------------------------
16 # Classes
17 #-----------------------------------------------------------------------------
18
19 class PendingResult(object):
20 """Class for representing results of non-blocking calls."""
21 def __init__(self, client, msg_ids):
22 self.client = client
23 self.msg_ids = msg_ids
24 self._result = None
25 self.done = False
26
27 def __repr__(self):
28 if self.done:
29 return "<%s: finished>"%(self.__class__.__name__)
30 else:
31 return "<%s: %r>"%(self.__class__.__name__,self.msg_ids)
32
33 @property
34 def result(self):
35 if self._result is not None:
36 return self._result
37 if not self.done:
38 self.wait(0)
39 if self.done:
40 results = map(self.client.results.get, self.msg_ids)
41 results = error.collect_exceptions(results, 'get_result')
42 self._result = self.reconstruct_result(results)
43 return self._result
44 else:
45 raise error.ResultNotCompleted
46
47 def reconstruct_result(self, res):
48 """
49 Override me in subclasses for turning a list of results
50 into the expected form.
51 """
52 if len(res) == 1:
53 return res[0]
54 else:
55 return res
56
57 def wait(self, timout=-1):
58 self.done = self.client.barrier(self.msg_ids)
59 return self.done
60
61 class PendingMapResult(PendingResult):
62 """Class for representing results of non-blocking gathers.
63
64 This will properly reconstruct the gather.
65 """
66
67 def __init__(self, client, msg_ids, mapObject):
68 self.mapObject = mapObject
69 PendingResult.__init__(self, client, msg_ids)
70
71 def reconstruct_result(self, res):
72 """Perform the gather on the actual results."""
73 return self.mapObject.joinPartitions(res)
74
75
@@ -0,0 +1,145 b''
1 """Remote Functions and decorators for the client."""
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
4 #
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
8
9 #-----------------------------------------------------------------------------
10 # Imports
11 #-----------------------------------------------------------------------------
12
13 import map as Map
14 from pendingresult import PendingMapResult
15
16 #-----------------------------------------------------------------------------
17 # Decorators
18 #-----------------------------------------------------------------------------
19
20 def remote(client, bound=False, block=None, targets=None):
21 """Turn a function into a remote function.
22
23 This method can be used for map:
24
25 >>> @remote(client,block=True)
26 def func(a)
27 """
28 def remote_function(f):
29 return RemoteFunction(client, f, bound, block, targets)
30 return remote_function
31
32 def parallel(client, dist='b', bound=False, block=None, targets='all'):
33 """Turn a function into a parallel remote function.
34
35 This method can be used for map:
36
37 >>> @parallel(client,block=True)
38 def func(a)
39 """
40 def parallel_function(f):
41 return ParallelFunction(client, f, dist, bound, block, targets)
42 return parallel_function
43
44 #--------------------------------------------------------------------------
45 # Classes
46 #--------------------------------------------------------------------------
47
48 class RemoteFunction(object):
49 """Turn an existing function into a remote function.
50
51 Parameters
52 ----------
53
54 client : Client instance
55 The client to be used to connect to engines
56 f : callable
57 The function to be wrapped into a remote function
58 bound : bool [default: False]
59 Whether the affect the remote namespace when called
60 block : bool [default: None]
61 Whether to wait for results or not. The default behavior is
62 to use the current `block` attribute of `client`
63 targets : valid target list [default: all]
64 The targets on which to execute.
65 """
66
67 client = None # the remote connection
68 func = None # the wrapped function
69 block = None # whether to block
70 bound = None # whether to affect the namespace
71 targets = None # where to execute
72
73 def __init__(self, client, f, bound=False, block=None, targets=None):
74 self.client = client
75 self.func = f
76 self.block=block
77 self.bound=bound
78 self.targets=targets
79
80 def __call__(self, *args, **kwargs):
81 return self.client.apply(self.func, args=args, kwargs=kwargs,
82 block=self.block, targets=self.targets, bound=self.bound)
83
84
85 class ParallelFunction(RemoteFunction):
86 """Class for mapping a function to sequences."""
87 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'):
88 super(ParallelFunction, self).__init__(client,f,bound,block,targets)
89 mapClass = Map.dists[dist]
90 self.mapObject = mapClass()
91
92 def __call__(self, *sequences):
93 len_0 = len(sequences[0])
94 for s in sequences:
95 if len(s)!=len_0:
96 raise ValueError('all sequences must have equal length')
97
98 if self.targets is None:
99 # load-balanced:
100 engines = [None]*len_0
101 elif isinstance(self.targets, int):
102 engines = [None]*self.targets
103 else:
104 # multiplexed:
105 engines = self.client._build_targets(self.targets)[-1]
106
107 nparts = len(engines)
108 msg_ids = []
109 # my_f = lambda *a: map(self.func, *a)
110 for index, engineid in enumerate(engines):
111 args = []
112 for seq in sequences:
113 part = self.mapObject.getPartition(seq, index, nparts)
114 if not part:
115 continue
116 else:
117 args.append(part)
118 if not args:
119 continue
120
121 # print (args)
122 if hasattr(self, '_map'):
123 f = map
124 args = [self.func]+args
125 else:
126 f=self.func
127 mid = self.client.apply(f, args=args, block=False,
128 bound=self.bound,
129 targets=engineid).msg_ids[0]
130 msg_ids.append(mid)
131
132 r = PendingMapResult(self.client, msg_ids, self.mapObject)
133 if self.block:
134 r.wait()
135 return r.result
136 else:
137 return r
138
139 def map(self, *sequences):
140 """call a function on each element of a sequence remotely."""
141 self._map = True
142 ret = self.__call__(*sequences)
143 del self._map
144 return ret
145
@@ -29,6 +29,8 b' from view import DirectView, LoadBalancedView'
29 from dependency import Dependency, depend, require
29 from dependency import Dependency, depend, require
30 import error
30 import error
31 import map as Map
31 import map as Map
32 from pendingresult import PendingResult,PendingMapResult
33 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
32
34
33 #--------------------------------------------------------------------------
35 #--------------------------------------------------------------------------
34 # helpers for implementing old MEC API via client.apply
36 # helpers for implementing old MEC API via client.apply
@@ -81,167 +83,6 b' def defaultblock(f, self, *args, **kwargs):'
81 self.block = saveblock
83 self.block = saveblock
82 return ret
84 return ret
83
85
84 def remote(client, bound=False, block=None, targets=None):
85 """Turn a function into a remote function.
86
87 This method can be used for map:
88
89 >>> @remote(client,block=True)
90 def func(a)
91 """
92 def remote_function(f):
93 return RemoteFunction(client, f, bound, block, targets)
94 return remote_function
95
96 def parallel(client, dist='b', bound=False, block=None, targets='all'):
97 """Turn a function into a parallel remote function.
98
99 This method can be used for map:
100
101 >>> @parallel(client,block=True)
102 def func(a)
103 """
104 def parallel_function(f):
105 return ParallelFunction(client, f, dist, bound, block, targets)
106 return parallel_function
107
108 #--------------------------------------------------------------------------
109 # Classes
110 #--------------------------------------------------------------------------
111
112 class RemoteFunction(object):
113 """Turn an existing function into a remote function.
114
115 Parameters
116 ----------
117
118 client : Client instance
119 The client to be used to connect to engines
120 f : callable
121 The function to be wrapped into a remote function
122 bound : bool [default: False]
123 Whether the affect the remote namespace when called
124 block : bool [default: None]
125 Whether to wait for results or not. The default behavior is
126 to use the current `block` attribute of `client`
127 targets : valid target list [default: all]
128 The targets on which to execute.
129 """
130
131 client = None # the remote connection
132 func = None # the wrapped function
133 block = None # whether to block
134 bound = None # whether to affect the namespace
135 targets = None # where to execute
136
137 def __init__(self, client, f, bound=False, block=None, targets=None):
138 self.client = client
139 self.func = f
140 self.block=block
141 self.bound=bound
142 self.targets=targets
143
144 def __call__(self, *args, **kwargs):
145 return self.client.apply(self.func, args=args, kwargs=kwargs,
146 block=self.block, targets=self.targets, bound=self.bound)
147
148
149 class ParallelFunction(RemoteFunction):
150 """Class for mapping a function to sequences."""
151 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'):
152 super(ParallelFunction, self).__init__(client,f,bound,block,targets)
153 mapClass = Map.dists[dist]
154 self.mapObject = mapClass()
155
156 def __call__(self, *sequences):
157 len_0 = len(sequences[0])
158 for s in sequences:
159 if len(s)!=len_0:
160 raise ValueError('all sequences must have equal length')
161
162 if self.targets is None:
163 # load-balanced:
164 engines = [None]*len_0
165 else:
166 # multiplexed:
167 engines = self.client._build_targets(self.targets)[-1]
168
169 nparts = len(engines)
170 msg_ids = []
171 for index, engineid in enumerate(engines):
172 args = []
173 for seq in sequences:
174 args.append(self.mapObject.getPartition(seq, index, nparts))
175 mid = self.client.apply(self.func, args=args, block=False,
176 bound=self.bound,
177 targets=engineid)
178 msg_ids.append(mid)
179
180 if self.block:
181 dg = PendingMapResult(self.client, msg_ids, self.mapObject)
182 dg.wait()
183 return dg.result
184 else:
185 return dg
186
187
188 class PendingResult(object):
189 """Class for representing results of non-blocking calls."""
190 def __init__(self, client, msg_ids):
191 self.client = client
192 self.msg_ids = msg_ids
193 self._result = None
194 self.done = False
195
196 def __repr__(self):
197 if self.done:
198 return "<%s: finished>"%(self.__class__.__name__)
199 else:
200 return "<%s: %r>"%(self.__class__.__name__,self.msg_ids)
201
202 @property
203 def result(self):
204 if self._result is not None:
205 return self._result
206 if not self.done:
207 self.wait(0)
208 if self.done:
209 results = map(self.client.results.get, self.msg_ids)
210 results = error.collect_exceptions(results, 'get_result')
211 self._result = self.reconstruct_result(results)
212 return self._result
213 else:
214 raise error.ResultNotCompleted
215
216 def reconstruct_result(self, res):
217 """
218 Override me in subclasses for turning a list of results
219 into the expected form.
220 """
221 if len(res) == 1:
222 return res[0]
223 else:
224 return res
225
226 def wait(self, timout=-1):
227 self.done = self.client.barrier(self.msg_ids)
228 return self.done
229
230 class PendingMapResult(PendingResult):
231 """Class for representing results of non-blocking gathers.
232
233 This will properly reconstruct the gather.
234 """
235
236 def __init__(self, client, msg_ids, mapObject):
237 self.mapObject = mapObject
238 PendingResult.__init__(self, client, msg_ids)
239
240 def reconstruct_result(self, res):
241 """Perform the gather on the actual results."""
242 return self.mapObject.joinPartitions(res)
243
244
245
86
246 class AbortedTask(object):
87 class AbortedTask(object):
247 """A basic wrapper object describing an aborted task."""
88 """A basic wrapper object describing an aborted task."""
@@ -944,10 +785,11 b' class Client(object):'
944 result[target] = self.results[mid]
785 result[target] = self.results[mid]
945 return error.collect_exceptions(result, f.__name__)
786 return error.collect_exceptions(result, f.__name__)
946
787
947 @defaultblock
788 def map(self, f, *sequences):
948 def map(self, f, sequences, targets=None, block=None, bound=False):
789 """Parallel version of builtin `map`, using all our engines."""
949 pf = ParallelFunction(self,f,block=block,bound=bound,targets=targets)
790 pf = ParallelFunction(self, f, block=self.block,
950 return pf(*sequences)
791 bound=True, targets='all')
792 return pf.map(*sequences)
951
793
952 #--------------------------------------------------------------------------
794 #--------------------------------------------------------------------------
953 # Data movement
795 # Data movement
@@ -277,7 +277,12 b' class Kernel(HasTraits):'
277 suffix = prefix = "_" # prevent keyword collisions with lambda
277 suffix = prefix = "_" # prevent keyword collisions with lambda
278 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
278 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
279 # if f.fun
279 # if f.fun
280 fname = prefix+f.func_name.strip('<>')+suffix
280 if hasattr(f, 'func_name'):
281 fname = f.func_name
282 else:
283 fname = f.__name__
284
285 fname = prefix+fname.strip('<>')+suffix
281 argname = prefix+"args"+suffix
286 argname = prefix+"args"+suffix
282 kwargname = prefix+"kwargs"+suffix
287 kwargname = prefix+"kwargs"+suffix
283 resultname = prefix+"result"+suffix
288 resultname = prefix+"result"+suffix
@@ -11,6 +11,7 b''
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.external.decorator import decorator
13 from IPython.external.decorator import decorator
14 from IPython.zmq.parallel.remotefunction import ParallelFunction
14
15
15 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
16 # Decorators
17 # Decorators
@@ -28,8 +29,10 b' def myblock(f, self, *args, **kwargs):'
28 @decorator
29 @decorator
29 def save_ids(f, self, *args, **kwargs):
30 def save_ids(f, self, *args, **kwargs):
30 """Keep our history and outstanding attributes up to date after a method call."""
31 """Keep our history and outstanding attributes up to date after a method call."""
32 n_previous = len(self.client.history)
31 ret = f(self, *args, **kwargs)
33 ret = f(self, *args, **kwargs)
32 msg_ids = self.client.history[-self._ntargets:]
34 nmsgs = len(self.client.history) - n_previous
35 msg_ids = self.client.history[-nmsgs:]
33 self.history.extend(msg_ids)
36 self.history.extend(msg_ids)
34 map(self.outstanding.add, msg_ids)
37 map(self.outstanding.add, msg_ids)
35 return ret
38 return ret
@@ -172,6 +175,16 b' class View(object):'
172 """
175 """
173 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
176 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
174
177
178 @spin_after
179 @save_ids
180 def map(self, f, *sequences):
181 """Parallel version of builtin `map`, using this view's engines."""
182 if isinstance(self.targets, int):
183 targets = [self.targets]
184 pf = ParallelFunction(self.client, f, block=self.block,
185 bound=True, targets=targets)
186 return pf.map(*sequences)
187
175 def abort(self, msg_ids=None, block=None):
188 def abort(self, msg_ids=None, block=None):
176 """Abort jobs on my engines.
189 """Abort jobs on my engines.
177
190
General Comments 0
You need to be logged in to leave comments. Login now