##// END OF EJS Templates
add note to map docstring
MinRK -
Show More
@@ -1,279 +1,282 b''
1 1 """Remote Functions and decorators for Views.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 * Min RK
7 7 """
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2010-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import sys
22 22 import warnings
23 23
24 24 from IPython.external.decorator import decorator
25 25 from IPython.testing.skipdoctest import skip_doctest
26 26
27 27 from . import map as Map
28 28 from .asyncresult import AsyncMapResult
29 29
30 30 #-----------------------------------------------------------------------------
31 31 # Functions and Decorators
32 32 #-----------------------------------------------------------------------------
33 33
34 34 @skip_doctest
35 35 def remote(view, block=None, **flags):
36 36 """Turn a function into a remote function.
37 37
38 38 This method can be used for map:
39 39
40 40 In [1]: @remote(view,block=True)
41 41 ...: def func(a):
42 42 ...: pass
43 43 """
44 44
45 45 def remote_function(f):
46 46 return RemoteFunction(view, f, block=block, **flags)
47 47 return remote_function
48 48
49 49 @skip_doctest
50 50 def parallel(view, dist='b', block=None, ordered=True, **flags):
51 51 """Turn a function into a parallel remote function.
52 52
53 53 This method can be used for map:
54 54
55 55 In [1]: @parallel(view, block=True)
56 56 ...: def func(a):
57 57 ...: pass
58 58 """
59 59
60 60 def parallel_function(f):
61 61 return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags)
62 62 return parallel_function
63 63
64 64 def getname(f):
65 65 """Get the name of an object.
66 66
67 67 For use in case of callables that are not functions, and
68 68 thus may not have __name__ defined.
69 69
70 70 Order: f.__name__ > f.name > str(f)
71 71 """
72 72 try:
73 73 return f.__name__
74 74 except:
75 75 pass
76 76 try:
77 77 return f.name
78 78 except:
79 79 pass
80 80
81 81 return str(f)
82 82
83 83 @decorator
84 84 def sync_view_results(f, self, *args, **kwargs):
85 85 """sync relevant results from self.client to our results attribute.
86 86
87 87 This is a clone of view.sync_results, but for remote functions
88 88 """
89 89 view = self.view
90 90 if view._in_sync_results:
91 91 return f(self, *args, **kwargs)
92 92 print 'in sync results', f
93 93 view._in_sync_results = True
94 94 try:
95 95 ret = f(self, *args, **kwargs)
96 96 finally:
97 97 view._in_sync_results = False
98 98 view._sync_results()
99 99 return ret
100 100
101 101 #--------------------------------------------------------------------------
102 102 # Classes
103 103 #--------------------------------------------------------------------------
104 104
105 105 class RemoteFunction(object):
106 106 """Turn an existing function into a remote function.
107 107
108 108 Parameters
109 109 ----------
110 110
111 111 view : View instance
112 112 The view to be used for execution
113 113 f : callable
114 114 The function to be wrapped into a remote function
115 115 block : bool [default: None]
116 116 Whether to wait for results or not. The default behavior is
117 117 to use the current `block` attribute of `view`
118 118
119 119 **flags : remaining kwargs are passed to View.temp_flags
120 120 """
121 121
122 122 view = None # the remote connection
123 123 func = None # the wrapped function
124 124 block = None # whether to block
125 125 flags = None # dict of extra kwargs for temp_flags
126 126
127 127 def __init__(self, view, f, block=None, **flags):
128 128 self.view = view
129 129 self.func = f
130 130 self.block=block
131 131 self.flags=flags
132 132
133 133 def __call__(self, *args, **kwargs):
134 134 block = self.view.block if self.block is None else self.block
135 135 with self.view.temp_flags(block=block, **self.flags):
136 136 return self.view.apply(self.func, *args, **kwargs)
137 137
138 138
139 139 class ParallelFunction(RemoteFunction):
140 140 """Class for mapping a function to sequences.
141 141
142 142 This will distribute the sequences according the a mapper, and call
143 143 the function on each sub-sequence. If called via map, then the function
144 144 will be called once on each element, rather that each sub-sequence.
145 145
146 146 Parameters
147 147 ----------
148 148
149 149 view : View instance
150 150 The view to be used for execution
151 151 f : callable
152 152 The function to be wrapped into a remote function
153 153 dist : str [default: 'b']
154 154 The key for which mapObject to use to distribute sequences
155 155 options are:
156 156 * 'b' : use contiguous chunks in order
157 157 * 'r' : use round-robin striping
158 158 block : bool [default: None]
159 159 Whether to wait for results or not. The default behavior is
160 160 to use the current `block` attribute of `view`
161 161 chunksize : int or None
162 162 The size of chunk to use when breaking up sequences in a load-balanced manner
163 163 ordered : bool [default: True]
164 164 Whether the result should be kept in order. If False,
165 165 results become available as they arrive, regardless of submission order.
166 166 **flags : remaining kwargs are passed to View.temp_flags
167 167 """
168 168
169 169 chunksize = None
170 170 ordered = None
171 171 mapObject = None
172 172 _mapping = False
173 173
174 174 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
175 175 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
176 176 self.chunksize = chunksize
177 177 self.ordered = ordered
178 178
179 179 mapClass = Map.dists[dist]
180 180 self.mapObject = mapClass()
181 181
182 182 @sync_view_results
183 183 def __call__(self, *sequences):
184 184 client = self.view.client
185 185
186 186 lens = []
187 187 maxlen = minlen = -1
188 188 for i, seq in enumerate(sequences):
189 189 try:
190 190 n = len(seq)
191 191 except Exception:
192 192 seq = list(seq)
193 193 if isinstance(sequences, tuple):
194 194 # can't alter a tuple
195 195 sequences = list(sequences)
196 196 sequences[i] = seq
197 197 n = len(seq)
198 198 if n > maxlen:
199 199 maxlen = n
200 200 if minlen == -1 or n < minlen:
201 201 minlen = n
202 202 lens.append(n)
203 203
204 204 # check that the length of sequences match
205 205 if not self._mapping and minlen != maxlen:
206 206 msg = 'all sequences must have equal length, but have %s' % lens
207 207 raise ValueError(msg)
208 208
209 209 balanced = 'Balanced' in self.view.__class__.__name__
210 210 if balanced:
211 211 if self.chunksize:
212 212 nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0)
213 213 else:
214 214 nparts = maxlen
215 215 targets = [None]*nparts
216 216 else:
217 217 if self.chunksize:
218 218 warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
219 219 # multiplexed:
220 220 targets = self.view.targets
221 221 # 'all' is lazily evaluated at execution time, which is now:
222 222 if targets == 'all':
223 223 targets = client._build_targets(targets)[1]
224 224 elif isinstance(targets, int):
225 225 # single-engine view, targets must be iterable
226 226 targets = [targets]
227 227 nparts = len(targets)
228 228
229 229 msg_ids = []
230 230 for index, t in enumerate(targets):
231 231 args = []
232 232 for seq in sequences:
233 233 part = self.mapObject.getPartition(seq, index, nparts, maxlen)
234 234 args.append(part)
235 235 if not any(args):
236 236 continue
237 237
238 238 if self._mapping:
239 239 if sys.version_info[0] >= 3:
240 240 f = lambda f, *sequences: list(map(f, *sequences))
241 241 else:
242 242 f = map
243 243 args = [self.func] + args
244 244 else:
245 245 f=self.func
246 246
247 247 view = self.view if balanced else client[t]
248 248 with view.temp_flags(block=False, **self.flags):
249 249 ar = view.apply(f, *args)
250 250
251 251 msg_ids.extend(ar.msg_ids)
252 252
253 253 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
254 254 fname=getname(self.func),
255 255 ordered=self.ordered
256 256 )
257 257
258 258 if self.block:
259 259 try:
260 260 return r.get()
261 261 except KeyboardInterrupt:
262 262 return r
263 263 else:
264 264 return r
265 265
266 266 def map(self, *sequences):
267 """call a function on each element of a sequence remotely.
267 """call a function on each element of one or more sequence(s) remotely.
268 268 This should behave very much like the builtin map, but return an AsyncMapResult
269 269 if self.block is False.
270
271 That means it can take generators (will be cast to lists locally),
272 and mismatched sequence lengths will be padded with None.
270 273 """
271 274 # set _mapping as a flag for use inside self.__call__
272 275 self._mapping = True
273 276 try:
274 277 ret = self(*sequences)
275 278 finally:
276 279 self._mapping = False
277 280 return ret
278 281
279 282 __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']
General Comments 0
You need to be logged in to leave comments. Login now