##// END OF EJS Templates
cast sequences to list if we need to alter it
MinRK -
Show More
@@ -1,276 +1,279 b''
1 1 """Remote Functions and decorators for Views.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 * Min RK
7 7 """
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2010-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import sys
22 22 import warnings
23 23
24 24 from IPython.external.decorator import decorator
25 25 from IPython.testing.skipdoctest import skip_doctest
26 26
27 27 from . import map as Map
28 28 from .asyncresult import AsyncMapResult
29 29
30 30 #-----------------------------------------------------------------------------
31 31 # Functions and Decorators
32 32 #-----------------------------------------------------------------------------
33 33
34 34 @skip_doctest
35 35 def remote(view, block=None, **flags):
36 36 """Turn a function into a remote function.
37 37
38 38 This method can be used for map:
39 39
40 40 In [1]: @remote(view,block=True)
41 41 ...: def func(a):
42 42 ...: pass
43 43 """
44 44
45 45 def remote_function(f):
46 46 return RemoteFunction(view, f, block=block, **flags)
47 47 return remote_function
48 48
49 49 @skip_doctest
50 50 def parallel(view, dist='b', block=None, ordered=True, **flags):
51 51 """Turn a function into a parallel remote function.
52 52
53 53 This method can be used for map:
54 54
55 55 In [1]: @parallel(view, block=True)
56 56 ...: def func(a):
57 57 ...: pass
58 58 """
59 59
60 60 def parallel_function(f):
61 61 return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags)
62 62 return parallel_function
63 63
64 64 def getname(f):
65 65 """Get the name of an object.
66 66
67 67 For use in case of callables that are not functions, and
68 68 thus may not have __name__ defined.
69 69
70 70 Order: f.__name__ > f.name > str(f)
71 71 """
72 72 try:
73 73 return f.__name__
74 74 except:
75 75 pass
76 76 try:
77 77 return f.name
78 78 except:
79 79 pass
80 80
81 81 return str(f)
82 82
83 83 @decorator
84 84 def sync_view_results(f, self, *args, **kwargs):
85 85 """sync relevant results from self.client to our results attribute.
86 86
87 87 This is a clone of view.sync_results, but for remote functions
88 88 """
89 89 view = self.view
90 90 if view._in_sync_results:
91 91 return f(self, *args, **kwargs)
92 92 print 'in sync results', f
93 93 view._in_sync_results = True
94 94 try:
95 95 ret = f(self, *args, **kwargs)
96 96 finally:
97 97 view._in_sync_results = False
98 98 view._sync_results()
99 99 return ret
100 100
101 101 #--------------------------------------------------------------------------
102 102 # Classes
103 103 #--------------------------------------------------------------------------
104 104
105 105 class RemoteFunction(object):
106 106 """Turn an existing function into a remote function.
107 107
108 108 Parameters
109 109 ----------
110 110
111 111 view : View instance
112 112 The view to be used for execution
113 113 f : callable
114 114 The function to be wrapped into a remote function
115 115 block : bool [default: None]
116 116 Whether to wait for results or not. The default behavior is
117 117 to use the current `block` attribute of `view`
118 118
119 119 **flags : remaining kwargs are passed to View.temp_flags
120 120 """
121 121
122 122 view = None # the remote connection
123 123 func = None # the wrapped function
124 124 block = None # whether to block
125 125 flags = None # dict of extra kwargs for temp_flags
126 126
127 127 def __init__(self, view, f, block=None, **flags):
128 128 self.view = view
129 129 self.func = f
130 130 self.block=block
131 131 self.flags=flags
132 132
133 133 def __call__(self, *args, **kwargs):
134 134 block = self.view.block if self.block is None else self.block
135 135 with self.view.temp_flags(block=block, **self.flags):
136 136 return self.view.apply(self.func, *args, **kwargs)
137 137
138 138
139 139 class ParallelFunction(RemoteFunction):
140 140 """Class for mapping a function to sequences.
141 141
142 142 This will distribute the sequences according the a mapper, and call
143 143 the function on each sub-sequence. If called via map, then the function
144 144 will be called once on each element, rather that each sub-sequence.
145 145
146 146 Parameters
147 147 ----------
148 148
149 149 view : View instance
150 150 The view to be used for execution
151 151 f : callable
152 152 The function to be wrapped into a remote function
153 153 dist : str [default: 'b']
154 154 The key for which mapObject to use to distribute sequences
155 155 options are:
156 156 * 'b' : use contiguous chunks in order
157 157 * 'r' : use round-robin striping
158 158 block : bool [default: None]
159 159 Whether to wait for results or not. The default behavior is
160 160 to use the current `block` attribute of `view`
161 161 chunksize : int or None
162 162 The size of chunk to use when breaking up sequences in a load-balanced manner
163 163 ordered : bool [default: True]
164 164 Whether the result should be kept in order. If False,
165 165 results become available as they arrive, regardless of submission order.
166 166 **flags : remaining kwargs are passed to View.temp_flags
167 167 """
168 168
169 169 chunksize = None
170 170 ordered = None
171 171 mapObject = None
172 172 _mapping = False
173 173
174 174 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
175 175 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
176 176 self.chunksize = chunksize
177 177 self.ordered = ordered
178 178
179 179 mapClass = Map.dists[dist]
180 180 self.mapObject = mapClass()
181 181
182 182 @sync_view_results
183 183 def __call__(self, *sequences):
184 184 client = self.view.client
185 185
186 186 lens = []
187 187 maxlen = minlen = -1
188 188 for i, seq in enumerate(sequences):
189 189 try:
190 190 n = len(seq)
191 191 except Exception:
192 192 seq = list(seq)
193 if isinstance(sequences, tuple):
194 # can't alter a tuple
195 sequences = list(sequences)
193 196 sequences[i] = seq
194 197 n = len(seq)
195 198 if n > maxlen:
196 199 maxlen = n
197 200 if minlen == -1 or n < minlen:
198 201 minlen = n
199 202 lens.append(n)
200 203
201 204 # check that the length of sequences match
202 205 if not self._mapping and minlen != maxlen:
203 206 msg = 'all sequences must have equal length, but have %s' % lens
204 207 raise ValueError(msg)
205 208
206 209 balanced = 'Balanced' in self.view.__class__.__name__
207 210 if balanced:
208 211 if self.chunksize:
209 212 nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0)
210 213 else:
211 214 nparts = maxlen
212 215 targets = [None]*nparts
213 216 else:
214 217 if self.chunksize:
215 218 warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
216 219 # multiplexed:
217 220 targets = self.view.targets
218 221 # 'all' is lazily evaluated at execution time, which is now:
219 222 if targets == 'all':
220 223 targets = client._build_targets(targets)[1]
221 224 elif isinstance(targets, int):
222 225 # single-engine view, targets must be iterable
223 226 targets = [targets]
224 227 nparts = len(targets)
225 228
226 229 msg_ids = []
227 230 for index, t in enumerate(targets):
228 231 args = []
229 232 for seq in sequences:
230 233 part = self.mapObject.getPartition(seq, index, nparts, maxlen)
231 234 args.append(part)
232 235 if not any(args):
233 236 continue
234 237
235 238 if self._mapping:
236 239 if sys.version_info[0] >= 3:
237 240 f = lambda f, *sequences: list(map(f, *sequences))
238 241 else:
239 242 f = map
240 243 args = [self.func] + args
241 244 else:
242 245 f=self.func
243 246
244 247 view = self.view if balanced else client[t]
245 248 with view.temp_flags(block=False, **self.flags):
246 249 ar = view.apply(f, *args)
247 250
248 251 msg_ids.extend(ar.msg_ids)
249 252
250 253 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
251 254 fname=getname(self.func),
252 255 ordered=self.ordered
253 256 )
254 257
255 258 if self.block:
256 259 try:
257 260 return r.get()
258 261 except KeyboardInterrupt:
259 262 return r
260 263 else:
261 264 return r
262 265
263 266 def map(self, *sequences):
264 267 """call a function on each element of a sequence remotely.
265 268 This should behave very much like the builtin map, but return an AsyncMapResult
266 269 if self.block is False.
267 270 """
268 271 # set _mapping as a flag for use inside self.__call__
269 272 self._mapping = True
270 273 try:
271 274 ret = self(*sequences)
272 275 finally:
273 276 self._mapping = False
274 277 return ret
275 278
276 279 __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']
General Comments 0
You need to be logged in to leave comments. Login now