##// END OF EJS Templates
add note to map docstring
MinRK -
Show More
@@ -1,279 +1,282
1 """Remote Functions and decorators for Views.
1 """Remote Functions and decorators for Views.
2
2
3 Authors:
3 Authors:
4
4
5 * Brian Granger
5 * Brian Granger
6 * Min RK
6 * Min RK
7 """
7 """
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2010-2011 The IPython Development Team
9 # Copyright (C) 2010-2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import sys
21 import sys
22 import warnings
22 import warnings
23
23
24 from IPython.external.decorator import decorator
24 from IPython.external.decorator import decorator
25 from IPython.testing.skipdoctest import skip_doctest
25 from IPython.testing.skipdoctest import skip_doctest
26
26
27 from . import map as Map
27 from . import map as Map
28 from .asyncresult import AsyncMapResult
28 from .asyncresult import AsyncMapResult
29
29
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31 # Functions and Decorators
31 # Functions and Decorators
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33
33
34 @skip_doctest
34 @skip_doctest
35 def remote(view, block=None, **flags):
35 def remote(view, block=None, **flags):
36 """Turn a function into a remote function.
36 """Turn a function into a remote function.
37
37
38 This method can be used for map:
38 This method can be used for map:
39
39
40 In [1]: @remote(view,block=True)
40 In [1]: @remote(view,block=True)
41 ...: def func(a):
41 ...: def func(a):
42 ...: pass
42 ...: pass
43 """
43 """
44
44
45 def remote_function(f):
45 def remote_function(f):
46 return RemoteFunction(view, f, block=block, **flags)
46 return RemoteFunction(view, f, block=block, **flags)
47 return remote_function
47 return remote_function
48
48
49 @skip_doctest
49 @skip_doctest
50 def parallel(view, dist='b', block=None, ordered=True, **flags):
50 def parallel(view, dist='b', block=None, ordered=True, **flags):
51 """Turn a function into a parallel remote function.
51 """Turn a function into a parallel remote function.
52
52
53 This method can be used for map:
53 This method can be used for map:
54
54
55 In [1]: @parallel(view, block=True)
55 In [1]: @parallel(view, block=True)
56 ...: def func(a):
56 ...: def func(a):
57 ...: pass
57 ...: pass
58 """
58 """
59
59
60 def parallel_function(f):
60 def parallel_function(f):
61 return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags)
61 return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags)
62 return parallel_function
62 return parallel_function
63
63
64 def getname(f):
64 def getname(f):
65 """Get the name of an object.
65 """Get the name of an object.
66
66
67 For use in case of callables that are not functions, and
67 For use in case of callables that are not functions, and
68 thus may not have __name__ defined.
68 thus may not have __name__ defined.
69
69
70 Order: f.__name__ > f.name > str(f)
70 Order: f.__name__ > f.name > str(f)
71 """
71 """
72 try:
72 try:
73 return f.__name__
73 return f.__name__
74 except:
74 except:
75 pass
75 pass
76 try:
76 try:
77 return f.name
77 return f.name
78 except:
78 except:
79 pass
79 pass
80
80
81 return str(f)
81 return str(f)
82
82
83 @decorator
83 @decorator
84 def sync_view_results(f, self, *args, **kwargs):
84 def sync_view_results(f, self, *args, **kwargs):
85 """sync relevant results from self.client to our results attribute.
85 """sync relevant results from self.client to our results attribute.
86
86
87 This is a clone of view.sync_results, but for remote functions
87 This is a clone of view.sync_results, but for remote functions
88 """
88 """
89 view = self.view
89 view = self.view
90 if view._in_sync_results:
90 if view._in_sync_results:
91 return f(self, *args, **kwargs)
91 return f(self, *args, **kwargs)
92 print 'in sync results', f
92 print 'in sync results', f
93 view._in_sync_results = True
93 view._in_sync_results = True
94 try:
94 try:
95 ret = f(self, *args, **kwargs)
95 ret = f(self, *args, **kwargs)
96 finally:
96 finally:
97 view._in_sync_results = False
97 view._in_sync_results = False
98 view._sync_results()
98 view._sync_results()
99 return ret
99 return ret
100
100
101 #--------------------------------------------------------------------------
101 #--------------------------------------------------------------------------
102 # Classes
102 # Classes
103 #--------------------------------------------------------------------------
103 #--------------------------------------------------------------------------
104
104
105 class RemoteFunction(object):
105 class RemoteFunction(object):
106 """Turn an existing function into a remote function.
106 """Turn an existing function into a remote function.
107
107
108 Parameters
108 Parameters
109 ----------
109 ----------
110
110
111 view : View instance
111 view : View instance
112 The view to be used for execution
112 The view to be used for execution
113 f : callable
113 f : callable
114 The function to be wrapped into a remote function
114 The function to be wrapped into a remote function
115 block : bool [default: None]
115 block : bool [default: None]
116 Whether to wait for results or not. The default behavior is
116 Whether to wait for results or not. The default behavior is
117 to use the current `block` attribute of `view`
117 to use the current `block` attribute of `view`
118
118
119 **flags : remaining kwargs are passed to View.temp_flags
119 **flags : remaining kwargs are passed to View.temp_flags
120 """
120 """
121
121
122 view = None # the remote connection
122 view = None # the remote connection
123 func = None # the wrapped function
123 func = None # the wrapped function
124 block = None # whether to block
124 block = None # whether to block
125 flags = None # dict of extra kwargs for temp_flags
125 flags = None # dict of extra kwargs for temp_flags
126
126
127 def __init__(self, view, f, block=None, **flags):
127 def __init__(self, view, f, block=None, **flags):
128 self.view = view
128 self.view = view
129 self.func = f
129 self.func = f
130 self.block=block
130 self.block=block
131 self.flags=flags
131 self.flags=flags
132
132
133 def __call__(self, *args, **kwargs):
133 def __call__(self, *args, **kwargs):
134 block = self.view.block if self.block is None else self.block
134 block = self.view.block if self.block is None else self.block
135 with self.view.temp_flags(block=block, **self.flags):
135 with self.view.temp_flags(block=block, **self.flags):
136 return self.view.apply(self.func, *args, **kwargs)
136 return self.view.apply(self.func, *args, **kwargs)
137
137
138
138
139 class ParallelFunction(RemoteFunction):
139 class ParallelFunction(RemoteFunction):
140 """Class for mapping a function to sequences.
140 """Class for mapping a function to sequences.
141
141
142 This will distribute the sequences according the a mapper, and call
142 This will distribute the sequences according the a mapper, and call
143 the function on each sub-sequence. If called via map, then the function
143 the function on each sub-sequence. If called via map, then the function
144 will be called once on each element, rather that each sub-sequence.
144 will be called once on each element, rather that each sub-sequence.
145
145
146 Parameters
146 Parameters
147 ----------
147 ----------
148
148
149 view : View instance
149 view : View instance
150 The view to be used for execution
150 The view to be used for execution
151 f : callable
151 f : callable
152 The function to be wrapped into a remote function
152 The function to be wrapped into a remote function
153 dist : str [default: 'b']
153 dist : str [default: 'b']
154 The key for which mapObject to use to distribute sequences
154 The key for which mapObject to use to distribute sequences
155 options are:
155 options are:
156 * 'b' : use contiguous chunks in order
156 * 'b' : use contiguous chunks in order
157 * 'r' : use round-robin striping
157 * 'r' : use round-robin striping
158 block : bool [default: None]
158 block : bool [default: None]
159 Whether to wait for results or not. The default behavior is
159 Whether to wait for results or not. The default behavior is
160 to use the current `block` attribute of `view`
160 to use the current `block` attribute of `view`
161 chunksize : int or None
161 chunksize : int or None
162 The size of chunk to use when breaking up sequences in a load-balanced manner
162 The size of chunk to use when breaking up sequences in a load-balanced manner
163 ordered : bool [default: True]
163 ordered : bool [default: True]
164 Whether the result should be kept in order. If False,
164 Whether the result should be kept in order. If False,
165 results become available as they arrive, regardless of submission order.
165 results become available as they arrive, regardless of submission order.
166 **flags : remaining kwargs are passed to View.temp_flags
166 **flags : remaining kwargs are passed to View.temp_flags
167 """
167 """
168
168
169 chunksize = None
169 chunksize = None
170 ordered = None
170 ordered = None
171 mapObject = None
171 mapObject = None
172 _mapping = False
172 _mapping = False
173
173
174 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
174 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
175 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
175 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
176 self.chunksize = chunksize
176 self.chunksize = chunksize
177 self.ordered = ordered
177 self.ordered = ordered
178
178
179 mapClass = Map.dists[dist]
179 mapClass = Map.dists[dist]
180 self.mapObject = mapClass()
180 self.mapObject = mapClass()
181
181
182 @sync_view_results
182 @sync_view_results
183 def __call__(self, *sequences):
183 def __call__(self, *sequences):
184 client = self.view.client
184 client = self.view.client
185
185
186 lens = []
186 lens = []
187 maxlen = minlen = -1
187 maxlen = minlen = -1
188 for i, seq in enumerate(sequences):
188 for i, seq in enumerate(sequences):
189 try:
189 try:
190 n = len(seq)
190 n = len(seq)
191 except Exception:
191 except Exception:
192 seq = list(seq)
192 seq = list(seq)
193 if isinstance(sequences, tuple):
193 if isinstance(sequences, tuple):
194 # can't alter a tuple
194 # can't alter a tuple
195 sequences = list(sequences)
195 sequences = list(sequences)
196 sequences[i] = seq
196 sequences[i] = seq
197 n = len(seq)
197 n = len(seq)
198 if n > maxlen:
198 if n > maxlen:
199 maxlen = n
199 maxlen = n
200 if minlen == -1 or n < minlen:
200 if minlen == -1 or n < minlen:
201 minlen = n
201 minlen = n
202 lens.append(n)
202 lens.append(n)
203
203
204 # check that the length of sequences match
204 # check that the length of sequences match
205 if not self._mapping and minlen != maxlen:
205 if not self._mapping and minlen != maxlen:
206 msg = 'all sequences must have equal length, but have %s' % lens
206 msg = 'all sequences must have equal length, but have %s' % lens
207 raise ValueError(msg)
207 raise ValueError(msg)
208
208
209 balanced = 'Balanced' in self.view.__class__.__name__
209 balanced = 'Balanced' in self.view.__class__.__name__
210 if balanced:
210 if balanced:
211 if self.chunksize:
211 if self.chunksize:
212 nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0)
212 nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0)
213 else:
213 else:
214 nparts = maxlen
214 nparts = maxlen
215 targets = [None]*nparts
215 targets = [None]*nparts
216 else:
216 else:
217 if self.chunksize:
217 if self.chunksize:
218 warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
218 warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
219 # multiplexed:
219 # multiplexed:
220 targets = self.view.targets
220 targets = self.view.targets
221 # 'all' is lazily evaluated at execution time, which is now:
221 # 'all' is lazily evaluated at execution time, which is now:
222 if targets == 'all':
222 if targets == 'all':
223 targets = client._build_targets(targets)[1]
223 targets = client._build_targets(targets)[1]
224 elif isinstance(targets, int):
224 elif isinstance(targets, int):
225 # single-engine view, targets must be iterable
225 # single-engine view, targets must be iterable
226 targets = [targets]
226 targets = [targets]
227 nparts = len(targets)
227 nparts = len(targets)
228
228
229 msg_ids = []
229 msg_ids = []
230 for index, t in enumerate(targets):
230 for index, t in enumerate(targets):
231 args = []
231 args = []
232 for seq in sequences:
232 for seq in sequences:
233 part = self.mapObject.getPartition(seq, index, nparts, maxlen)
233 part = self.mapObject.getPartition(seq, index, nparts, maxlen)
234 args.append(part)
234 args.append(part)
235 if not any(args):
235 if not any(args):
236 continue
236 continue
237
237
238 if self._mapping:
238 if self._mapping:
239 if sys.version_info[0] >= 3:
239 if sys.version_info[0] >= 3:
240 f = lambda f, *sequences: list(map(f, *sequences))
240 f = lambda f, *sequences: list(map(f, *sequences))
241 else:
241 else:
242 f = map
242 f = map
243 args = [self.func] + args
243 args = [self.func] + args
244 else:
244 else:
245 f=self.func
245 f=self.func
246
246
247 view = self.view if balanced else client[t]
247 view = self.view if balanced else client[t]
248 with view.temp_flags(block=False, **self.flags):
248 with view.temp_flags(block=False, **self.flags):
249 ar = view.apply(f, *args)
249 ar = view.apply(f, *args)
250
250
251 msg_ids.extend(ar.msg_ids)
251 msg_ids.extend(ar.msg_ids)
252
252
253 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
253 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
254 fname=getname(self.func),
254 fname=getname(self.func),
255 ordered=self.ordered
255 ordered=self.ordered
256 )
256 )
257
257
258 if self.block:
258 if self.block:
259 try:
259 try:
260 return r.get()
260 return r.get()
261 except KeyboardInterrupt:
261 except KeyboardInterrupt:
262 return r
262 return r
263 else:
263 else:
264 return r
264 return r
265
265
266 def map(self, *sequences):
266 def map(self, *sequences):
267 """call a function on each element of a sequence remotely.
267 """call a function on each element of one or more sequence(s) remotely.
268 This should behave very much like the builtin map, but return an AsyncMapResult
268 This should behave very much like the builtin map, but return an AsyncMapResult
269 if self.block is False.
269 if self.block is False.
270
271 That means it can take generators (will be cast to lists locally),
272 and mismatched sequence lengths will be padded with None.
270 """
273 """
271 # set _mapping as a flag for use inside self.__call__
274 # set _mapping as a flag for use inside self.__call__
272 self._mapping = True
275 self._mapping = True
273 try:
276 try:
274 ret = self(*sequences)
277 ret = self(*sequences)
275 finally:
278 finally:
276 self._mapping = False
279 self._mapping = False
277 return ret
280 return ret
278
281
279 __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']
282 __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']
General Comments 0
You need to be logged in to leave comments. Login now