##// END OF EJS Templates
cast sequences to list if we need to alter it
MinRK -
Show More
@@ -1,276 +1,279 b''
1 """Remote Functions and decorators for Views.
1 """Remote Functions and decorators for Views.
2
2
3 Authors:
3 Authors:
4
4
5 * Brian Granger
5 * Brian Granger
6 * Min RK
6 * Min RK
7 """
7 """
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2010-2011 The IPython Development Team
9 # Copyright (C) 2010-2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import sys
21 import sys
22 import warnings
22 import warnings
23
23
24 from IPython.external.decorator import decorator
24 from IPython.external.decorator import decorator
25 from IPython.testing.skipdoctest import skip_doctest
25 from IPython.testing.skipdoctest import skip_doctest
26
26
27 from . import map as Map
27 from . import map as Map
28 from .asyncresult import AsyncMapResult
28 from .asyncresult import AsyncMapResult
29
29
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31 # Functions and Decorators
31 # Functions and Decorators
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33
33
34 @skip_doctest
34 @skip_doctest
35 def remote(view, block=None, **flags):
35 def remote(view, block=None, **flags):
36 """Turn a function into a remote function.
36 """Turn a function into a remote function.
37
37
38 This method can be used for map:
38 This method can be used for map:
39
39
40 In [1]: @remote(view,block=True)
40 In [1]: @remote(view,block=True)
41 ...: def func(a):
41 ...: def func(a):
42 ...: pass
42 ...: pass
43 """
43 """
44
44
45 def remote_function(f):
45 def remote_function(f):
46 return RemoteFunction(view, f, block=block, **flags)
46 return RemoteFunction(view, f, block=block, **flags)
47 return remote_function
47 return remote_function
48
48
49 @skip_doctest
49 @skip_doctest
50 def parallel(view, dist='b', block=None, ordered=True, **flags):
50 def parallel(view, dist='b', block=None, ordered=True, **flags):
51 """Turn a function into a parallel remote function.
51 """Turn a function into a parallel remote function.
52
52
53 This method can be used for map:
53 This method can be used for map:
54
54
55 In [1]: @parallel(view, block=True)
55 In [1]: @parallel(view, block=True)
56 ...: def func(a):
56 ...: def func(a):
57 ...: pass
57 ...: pass
58 """
58 """
59
59
60 def parallel_function(f):
60 def parallel_function(f):
61 return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags)
61 return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags)
62 return parallel_function
62 return parallel_function
63
63
64 def getname(f):
64 def getname(f):
65 """Get the name of an object.
65 """Get the name of an object.
66
66
67 For use in case of callables that are not functions, and
67 For use in case of callables that are not functions, and
68 thus may not have __name__ defined.
68 thus may not have __name__ defined.
69
69
70 Order: f.__name__ > f.name > str(f)
70 Order: f.__name__ > f.name > str(f)
71 """
71 """
72 try:
72 try:
73 return f.__name__
73 return f.__name__
74 except:
74 except:
75 pass
75 pass
76 try:
76 try:
77 return f.name
77 return f.name
78 except:
78 except:
79 pass
79 pass
80
80
81 return str(f)
81 return str(f)
82
82
83 @decorator
83 @decorator
84 def sync_view_results(f, self, *args, **kwargs):
84 def sync_view_results(f, self, *args, **kwargs):
85 """sync relevant results from self.client to our results attribute.
85 """sync relevant results from self.client to our results attribute.
86
86
87 This is a clone of view.sync_results, but for remote functions
87 This is a clone of view.sync_results, but for remote functions
88 """
88 """
89 view = self.view
89 view = self.view
90 if view._in_sync_results:
90 if view._in_sync_results:
91 return f(self, *args, **kwargs)
91 return f(self, *args, **kwargs)
92 print 'in sync results', f
92 print 'in sync results', f
93 view._in_sync_results = True
93 view._in_sync_results = True
94 try:
94 try:
95 ret = f(self, *args, **kwargs)
95 ret = f(self, *args, **kwargs)
96 finally:
96 finally:
97 view._in_sync_results = False
97 view._in_sync_results = False
98 view._sync_results()
98 view._sync_results()
99 return ret
99 return ret
100
100
101 #--------------------------------------------------------------------------
101 #--------------------------------------------------------------------------
102 # Classes
102 # Classes
103 #--------------------------------------------------------------------------
103 #--------------------------------------------------------------------------
104
104
105 class RemoteFunction(object):
105 class RemoteFunction(object):
106 """Turn an existing function into a remote function.
106 """Turn an existing function into a remote function.
107
107
108 Parameters
108 Parameters
109 ----------
109 ----------
110
110
111 view : View instance
111 view : View instance
112 The view to be used for execution
112 The view to be used for execution
113 f : callable
113 f : callable
114 The function to be wrapped into a remote function
114 The function to be wrapped into a remote function
115 block : bool [default: None]
115 block : bool [default: None]
116 Whether to wait for results or not. The default behavior is
116 Whether to wait for results or not. The default behavior is
117 to use the current `block` attribute of `view`
117 to use the current `block` attribute of `view`
118
118
119 **flags : remaining kwargs are passed to View.temp_flags
119 **flags : remaining kwargs are passed to View.temp_flags
120 """
120 """
121
121
122 view = None # the remote connection
122 view = None # the remote connection
123 func = None # the wrapped function
123 func = None # the wrapped function
124 block = None # whether to block
124 block = None # whether to block
125 flags = None # dict of extra kwargs for temp_flags
125 flags = None # dict of extra kwargs for temp_flags
126
126
127 def __init__(self, view, f, block=None, **flags):
127 def __init__(self, view, f, block=None, **flags):
128 self.view = view
128 self.view = view
129 self.func = f
129 self.func = f
130 self.block=block
130 self.block=block
131 self.flags=flags
131 self.flags=flags
132
132
133 def __call__(self, *args, **kwargs):
133 def __call__(self, *args, **kwargs):
134 block = self.view.block if self.block is None else self.block
134 block = self.view.block if self.block is None else self.block
135 with self.view.temp_flags(block=block, **self.flags):
135 with self.view.temp_flags(block=block, **self.flags):
136 return self.view.apply(self.func, *args, **kwargs)
136 return self.view.apply(self.func, *args, **kwargs)
137
137
138
138
139 class ParallelFunction(RemoteFunction):
139 class ParallelFunction(RemoteFunction):
140 """Class for mapping a function to sequences.
140 """Class for mapping a function to sequences.
141
141
142 This will distribute the sequences according the a mapper, and call
142 This will distribute the sequences according the a mapper, and call
143 the function on each sub-sequence. If called via map, then the function
143 the function on each sub-sequence. If called via map, then the function
144 will be called once on each element, rather that each sub-sequence.
144 will be called once on each element, rather that each sub-sequence.
145
145
146 Parameters
146 Parameters
147 ----------
147 ----------
148
148
149 view : View instance
149 view : View instance
150 The view to be used for execution
150 The view to be used for execution
151 f : callable
151 f : callable
152 The function to be wrapped into a remote function
152 The function to be wrapped into a remote function
153 dist : str [default: 'b']
153 dist : str [default: 'b']
154 The key for which mapObject to use to distribute sequences
154 The key for which mapObject to use to distribute sequences
155 options are:
155 options are:
156 * 'b' : use contiguous chunks in order
156 * 'b' : use contiguous chunks in order
157 * 'r' : use round-robin striping
157 * 'r' : use round-robin striping
158 block : bool [default: None]
158 block : bool [default: None]
159 Whether to wait for results or not. The default behavior is
159 Whether to wait for results or not. The default behavior is
160 to use the current `block` attribute of `view`
160 to use the current `block` attribute of `view`
161 chunksize : int or None
161 chunksize : int or None
162 The size of chunk to use when breaking up sequences in a load-balanced manner
162 The size of chunk to use when breaking up sequences in a load-balanced manner
163 ordered : bool [default: True]
163 ordered : bool [default: True]
164 Whether the result should be kept in order. If False,
164 Whether the result should be kept in order. If False,
165 results become available as they arrive, regardless of submission order.
165 results become available as they arrive, regardless of submission order.
166 **flags : remaining kwargs are passed to View.temp_flags
166 **flags : remaining kwargs are passed to View.temp_flags
167 """
167 """
168
168
169 chunksize = None
169 chunksize = None
170 ordered = None
170 ordered = None
171 mapObject = None
171 mapObject = None
172 _mapping = False
172 _mapping = False
173
173
174 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
174 def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags):
175 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
175 super(ParallelFunction, self).__init__(view, f, block=block, **flags)
176 self.chunksize = chunksize
176 self.chunksize = chunksize
177 self.ordered = ordered
177 self.ordered = ordered
178
178
179 mapClass = Map.dists[dist]
179 mapClass = Map.dists[dist]
180 self.mapObject = mapClass()
180 self.mapObject = mapClass()
181
181
182 @sync_view_results
182 @sync_view_results
183 def __call__(self, *sequences):
183 def __call__(self, *sequences):
184 client = self.view.client
184 client = self.view.client
185
185
186 lens = []
186 lens = []
187 maxlen = minlen = -1
187 maxlen = minlen = -1
188 for i, seq in enumerate(sequences):
188 for i, seq in enumerate(sequences):
189 try:
189 try:
190 n = len(seq)
190 n = len(seq)
191 except Exception:
191 except Exception:
192 seq = list(seq)
192 seq = list(seq)
193 if isinstance(sequences, tuple):
194 # can't alter a tuple
195 sequences = list(sequences)
193 sequences[i] = seq
196 sequences[i] = seq
194 n = len(seq)
197 n = len(seq)
195 if n > maxlen:
198 if n > maxlen:
196 maxlen = n
199 maxlen = n
197 if minlen == -1 or n < minlen:
200 if minlen == -1 or n < minlen:
198 minlen = n
201 minlen = n
199 lens.append(n)
202 lens.append(n)
200
203
201 # check that the length of sequences match
204 # check that the length of sequences match
202 if not self._mapping and minlen != maxlen:
205 if not self._mapping and minlen != maxlen:
203 msg = 'all sequences must have equal length, but have %s' % lens
206 msg = 'all sequences must have equal length, but have %s' % lens
204 raise ValueError(msg)
207 raise ValueError(msg)
205
208
206 balanced = 'Balanced' in self.view.__class__.__name__
209 balanced = 'Balanced' in self.view.__class__.__name__
207 if balanced:
210 if balanced:
208 if self.chunksize:
211 if self.chunksize:
209 nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0)
212 nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0)
210 else:
213 else:
211 nparts = maxlen
214 nparts = maxlen
212 targets = [None]*nparts
215 targets = [None]*nparts
213 else:
216 else:
214 if self.chunksize:
217 if self.chunksize:
215 warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
218 warnings.warn("`chunksize` is ignored unless load balancing", UserWarning)
216 # multiplexed:
219 # multiplexed:
217 targets = self.view.targets
220 targets = self.view.targets
218 # 'all' is lazily evaluated at execution time, which is now:
221 # 'all' is lazily evaluated at execution time, which is now:
219 if targets == 'all':
222 if targets == 'all':
220 targets = client._build_targets(targets)[1]
223 targets = client._build_targets(targets)[1]
221 elif isinstance(targets, int):
224 elif isinstance(targets, int):
222 # single-engine view, targets must be iterable
225 # single-engine view, targets must be iterable
223 targets = [targets]
226 targets = [targets]
224 nparts = len(targets)
227 nparts = len(targets)
225
228
226 msg_ids = []
229 msg_ids = []
227 for index, t in enumerate(targets):
230 for index, t in enumerate(targets):
228 args = []
231 args = []
229 for seq in sequences:
232 for seq in sequences:
230 part = self.mapObject.getPartition(seq, index, nparts, maxlen)
233 part = self.mapObject.getPartition(seq, index, nparts, maxlen)
231 args.append(part)
234 args.append(part)
232 if not any(args):
235 if not any(args):
233 continue
236 continue
234
237
235 if self._mapping:
238 if self._mapping:
236 if sys.version_info[0] >= 3:
239 if sys.version_info[0] >= 3:
237 f = lambda f, *sequences: list(map(f, *sequences))
240 f = lambda f, *sequences: list(map(f, *sequences))
238 else:
241 else:
239 f = map
242 f = map
240 args = [self.func] + args
243 args = [self.func] + args
241 else:
244 else:
242 f=self.func
245 f=self.func
243
246
244 view = self.view if balanced else client[t]
247 view = self.view if balanced else client[t]
245 with view.temp_flags(block=False, **self.flags):
248 with view.temp_flags(block=False, **self.flags):
246 ar = view.apply(f, *args)
249 ar = view.apply(f, *args)
247
250
248 msg_ids.extend(ar.msg_ids)
251 msg_ids.extend(ar.msg_ids)
249
252
250 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
253 r = AsyncMapResult(self.view.client, msg_ids, self.mapObject,
251 fname=getname(self.func),
254 fname=getname(self.func),
252 ordered=self.ordered
255 ordered=self.ordered
253 )
256 )
254
257
255 if self.block:
258 if self.block:
256 try:
259 try:
257 return r.get()
260 return r.get()
258 except KeyboardInterrupt:
261 except KeyboardInterrupt:
259 return r
262 return r
260 else:
263 else:
261 return r
264 return r
262
265
263 def map(self, *sequences):
266 def map(self, *sequences):
264 """call a function on each element of a sequence remotely.
267 """call a function on each element of a sequence remotely.
265 This should behave very much like the builtin map, but return an AsyncMapResult
268 This should behave very much like the builtin map, but return an AsyncMapResult
266 if self.block is False.
269 if self.block is False.
267 """
270 """
268 # set _mapping as a flag for use inside self.__call__
271 # set _mapping as a flag for use inside self.__call__
269 self._mapping = True
272 self._mapping = True
270 try:
273 try:
271 ret = self(*sequences)
274 ret = self(*sequences)
272 finally:
275 finally:
273 self._mapping = False
276 self._mapping = False
274 return ret
277 return ret
275
278
276 __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']
279 __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']
General Comments 0
You need to be logged in to leave comments. Login now