Show More
@@ -1,279 +1,282 b'' | |||||
1 | """Remote Functions and decorators for Views. |
|
1 | """Remote Functions and decorators for Views. | |
2 |
|
2 | |||
3 | Authors: |
|
3 | Authors: | |
4 |
|
4 | |||
5 | * Brian Granger |
|
5 | * Brian Granger | |
6 | * Min RK |
|
6 | * Min RK | |
7 | """ |
|
7 | """ | |
8 | #----------------------------------------------------------------------------- |
|
8 | #----------------------------------------------------------------------------- | |
9 | # Copyright (C) 2010-2011 The IPython Development Team |
|
9 | # Copyright (C) 2010-2011 The IPython Development Team | |
10 | # |
|
10 | # | |
11 | # Distributed under the terms of the BSD License. The full license is in |
|
11 | # Distributed under the terms of the BSD License. The full license is in | |
12 | # the file COPYING, distributed as part of this software. |
|
12 | # the file COPYING, distributed as part of this software. | |
13 | #----------------------------------------------------------------------------- |
|
13 | #----------------------------------------------------------------------------- | |
14 |
|
14 | |||
15 | #----------------------------------------------------------------------------- |
|
15 | #----------------------------------------------------------------------------- | |
16 | # Imports |
|
16 | # Imports | |
17 | #----------------------------------------------------------------------------- |
|
17 | #----------------------------------------------------------------------------- | |
18 |
|
18 | |||
19 | from __future__ import division |
|
19 | from __future__ import division | |
20 |
|
20 | |||
21 | import sys |
|
21 | import sys | |
22 | import warnings |
|
22 | import warnings | |
23 |
|
23 | |||
24 | from IPython.external.decorator import decorator |
|
24 | from IPython.external.decorator import decorator | |
25 | from IPython.testing.skipdoctest import skip_doctest |
|
25 | from IPython.testing.skipdoctest import skip_doctest | |
26 |
|
26 | |||
27 | from . import map as Map |
|
27 | from . import map as Map | |
28 | from .asyncresult import AsyncMapResult |
|
28 | from .asyncresult import AsyncMapResult | |
29 |
|
29 | |||
30 | #----------------------------------------------------------------------------- |
|
30 | #----------------------------------------------------------------------------- | |
31 | # Functions and Decorators |
|
31 | # Functions and Decorators | |
32 | #----------------------------------------------------------------------------- |
|
32 | #----------------------------------------------------------------------------- | |
33 |
|
33 | |||
34 | @skip_doctest |
|
34 | @skip_doctest | |
35 | def remote(view, block=None, **flags): |
|
35 | def remote(view, block=None, **flags): | |
36 | """Turn a function into a remote function. |
|
36 | """Turn a function into a remote function. | |
37 |
|
37 | |||
38 | This method can be used for map: |
|
38 | This method can be used for map: | |
39 |
|
39 | |||
40 | In [1]: @remote(view,block=True) |
|
40 | In [1]: @remote(view,block=True) | |
41 | ...: def func(a): |
|
41 | ...: def func(a): | |
42 | ...: pass |
|
42 | ...: pass | |
43 | """ |
|
43 | """ | |
44 |
|
44 | |||
45 | def remote_function(f): |
|
45 | def remote_function(f): | |
46 | return RemoteFunction(view, f, block=block, **flags) |
|
46 | return RemoteFunction(view, f, block=block, **flags) | |
47 | return remote_function |
|
47 | return remote_function | |
48 |
|
48 | |||
49 | @skip_doctest |
|
49 | @skip_doctest | |
50 | def parallel(view, dist='b', block=None, ordered=True, **flags): |
|
50 | def parallel(view, dist='b', block=None, ordered=True, **flags): | |
51 | """Turn a function into a parallel remote function. |
|
51 | """Turn a function into a parallel remote function. | |
52 |
|
52 | |||
53 | This method can be used for map: |
|
53 | This method can be used for map: | |
54 |
|
54 | |||
55 | In [1]: @parallel(view, block=True) |
|
55 | In [1]: @parallel(view, block=True) | |
56 | ...: def func(a): |
|
56 | ...: def func(a): | |
57 | ...: pass |
|
57 | ...: pass | |
58 | """ |
|
58 | """ | |
59 |
|
59 | |||
60 | def parallel_function(f): |
|
60 | def parallel_function(f): | |
61 | return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags) |
|
61 | return ParallelFunction(view, f, dist=dist, block=block, ordered=ordered, **flags) | |
62 | return parallel_function |
|
62 | return parallel_function | |
63 |
|
63 | |||
64 | def getname(f): |
|
64 | def getname(f): | |
65 | """Get the name of an object. |
|
65 | """Get the name of an object. | |
66 |
|
66 | |||
67 | For use in case of callables that are not functions, and |
|
67 | For use in case of callables that are not functions, and | |
68 | thus may not have __name__ defined. |
|
68 | thus may not have __name__ defined. | |
69 |
|
69 | |||
70 | Order: f.__name__ > f.name > str(f) |
|
70 | Order: f.__name__ > f.name > str(f) | |
71 | """ |
|
71 | """ | |
72 | try: |
|
72 | try: | |
73 | return f.__name__ |
|
73 | return f.__name__ | |
74 | except: |
|
74 | except: | |
75 | pass |
|
75 | pass | |
76 | try: |
|
76 | try: | |
77 | return f.name |
|
77 | return f.name | |
78 | except: |
|
78 | except: | |
79 | pass |
|
79 | pass | |
80 |
|
80 | |||
81 | return str(f) |
|
81 | return str(f) | |
82 |
|
82 | |||
83 | @decorator |
|
83 | @decorator | |
84 | def sync_view_results(f, self, *args, **kwargs): |
|
84 | def sync_view_results(f, self, *args, **kwargs): | |
85 | """sync relevant results from self.client to our results attribute. |
|
85 | """sync relevant results from self.client to our results attribute. | |
86 |
|
86 | |||
87 | This is a clone of view.sync_results, but for remote functions |
|
87 | This is a clone of view.sync_results, but for remote functions | |
88 | """ |
|
88 | """ | |
89 | view = self.view |
|
89 | view = self.view | |
90 | if view._in_sync_results: |
|
90 | if view._in_sync_results: | |
91 | return f(self, *args, **kwargs) |
|
91 | return f(self, *args, **kwargs) | |
92 | print 'in sync results', f |
|
92 | print 'in sync results', f | |
93 | view._in_sync_results = True |
|
93 | view._in_sync_results = True | |
94 | try: |
|
94 | try: | |
95 | ret = f(self, *args, **kwargs) |
|
95 | ret = f(self, *args, **kwargs) | |
96 | finally: |
|
96 | finally: | |
97 | view._in_sync_results = False |
|
97 | view._in_sync_results = False | |
98 | view._sync_results() |
|
98 | view._sync_results() | |
99 | return ret |
|
99 | return ret | |
100 |
|
100 | |||
101 | #-------------------------------------------------------------------------- |
|
101 | #-------------------------------------------------------------------------- | |
102 | # Classes |
|
102 | # Classes | |
103 | #-------------------------------------------------------------------------- |
|
103 | #-------------------------------------------------------------------------- | |
104 |
|
104 | |||
105 | class RemoteFunction(object): |
|
105 | class RemoteFunction(object): | |
106 | """Turn an existing function into a remote function. |
|
106 | """Turn an existing function into a remote function. | |
107 |
|
107 | |||
108 | Parameters |
|
108 | Parameters | |
109 | ---------- |
|
109 | ---------- | |
110 |
|
110 | |||
111 | view : View instance |
|
111 | view : View instance | |
112 | The view to be used for execution |
|
112 | The view to be used for execution | |
113 | f : callable |
|
113 | f : callable | |
114 | The function to be wrapped into a remote function |
|
114 | The function to be wrapped into a remote function | |
115 | block : bool [default: None] |
|
115 | block : bool [default: None] | |
116 | Whether to wait for results or not. The default behavior is |
|
116 | Whether to wait for results or not. The default behavior is | |
117 | to use the current `block` attribute of `view` |
|
117 | to use the current `block` attribute of `view` | |
118 |
|
118 | |||
119 | **flags : remaining kwargs are passed to View.temp_flags |
|
119 | **flags : remaining kwargs are passed to View.temp_flags | |
120 | """ |
|
120 | """ | |
121 |
|
121 | |||
122 | view = None # the remote connection |
|
122 | view = None # the remote connection | |
123 | func = None # the wrapped function |
|
123 | func = None # the wrapped function | |
124 | block = None # whether to block |
|
124 | block = None # whether to block | |
125 | flags = None # dict of extra kwargs for temp_flags |
|
125 | flags = None # dict of extra kwargs for temp_flags | |
126 |
|
126 | |||
127 | def __init__(self, view, f, block=None, **flags): |
|
127 | def __init__(self, view, f, block=None, **flags): | |
128 | self.view = view |
|
128 | self.view = view | |
129 | self.func = f |
|
129 | self.func = f | |
130 | self.block=block |
|
130 | self.block=block | |
131 | self.flags=flags |
|
131 | self.flags=flags | |
132 |
|
132 | |||
133 | def __call__(self, *args, **kwargs): |
|
133 | def __call__(self, *args, **kwargs): | |
134 | block = self.view.block if self.block is None else self.block |
|
134 | block = self.view.block if self.block is None else self.block | |
135 | with self.view.temp_flags(block=block, **self.flags): |
|
135 | with self.view.temp_flags(block=block, **self.flags): | |
136 | return self.view.apply(self.func, *args, **kwargs) |
|
136 | return self.view.apply(self.func, *args, **kwargs) | |
137 |
|
137 | |||
138 |
|
138 | |||
139 | class ParallelFunction(RemoteFunction): |
|
139 | class ParallelFunction(RemoteFunction): | |
140 | """Class for mapping a function to sequences. |
|
140 | """Class for mapping a function to sequences. | |
141 |
|
141 | |||
142 | This will distribute the sequences according the a mapper, and call |
|
142 | This will distribute the sequences according the a mapper, and call | |
143 | the function on each sub-sequence. If called via map, then the function |
|
143 | the function on each sub-sequence. If called via map, then the function | |
144 | will be called once on each element, rather that each sub-sequence. |
|
144 | will be called once on each element, rather that each sub-sequence. | |
145 |
|
145 | |||
146 | Parameters |
|
146 | Parameters | |
147 | ---------- |
|
147 | ---------- | |
148 |
|
148 | |||
149 | view : View instance |
|
149 | view : View instance | |
150 | The view to be used for execution |
|
150 | The view to be used for execution | |
151 | f : callable |
|
151 | f : callable | |
152 | The function to be wrapped into a remote function |
|
152 | The function to be wrapped into a remote function | |
153 | dist : str [default: 'b'] |
|
153 | dist : str [default: 'b'] | |
154 | The key for which mapObject to use to distribute sequences |
|
154 | The key for which mapObject to use to distribute sequences | |
155 | options are: |
|
155 | options are: | |
156 | * 'b' : use contiguous chunks in order |
|
156 | * 'b' : use contiguous chunks in order | |
157 | * 'r' : use round-robin striping |
|
157 | * 'r' : use round-robin striping | |
158 | block : bool [default: None] |
|
158 | block : bool [default: None] | |
159 | Whether to wait for results or not. The default behavior is |
|
159 | Whether to wait for results or not. The default behavior is | |
160 | to use the current `block` attribute of `view` |
|
160 | to use the current `block` attribute of `view` | |
161 | chunksize : int or None |
|
161 | chunksize : int or None | |
162 | The size of chunk to use when breaking up sequences in a load-balanced manner |
|
162 | The size of chunk to use when breaking up sequences in a load-balanced manner | |
163 | ordered : bool [default: True] |
|
163 | ordered : bool [default: True] | |
164 | Whether the result should be kept in order. If False, |
|
164 | Whether the result should be kept in order. If False, | |
165 | results become available as they arrive, regardless of submission order. |
|
165 | results become available as they arrive, regardless of submission order. | |
166 | **flags : remaining kwargs are passed to View.temp_flags |
|
166 | **flags : remaining kwargs are passed to View.temp_flags | |
167 | """ |
|
167 | """ | |
168 |
|
168 | |||
169 | chunksize = None |
|
169 | chunksize = None | |
170 | ordered = None |
|
170 | ordered = None | |
171 | mapObject = None |
|
171 | mapObject = None | |
172 | _mapping = False |
|
172 | _mapping = False | |
173 |
|
173 | |||
174 | def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags): |
|
174 | def __init__(self, view, f, dist='b', block=None, chunksize=None, ordered=True, **flags): | |
175 | super(ParallelFunction, self).__init__(view, f, block=block, **flags) |
|
175 | super(ParallelFunction, self).__init__(view, f, block=block, **flags) | |
176 | self.chunksize = chunksize |
|
176 | self.chunksize = chunksize | |
177 | self.ordered = ordered |
|
177 | self.ordered = ordered | |
178 |
|
178 | |||
179 | mapClass = Map.dists[dist] |
|
179 | mapClass = Map.dists[dist] | |
180 | self.mapObject = mapClass() |
|
180 | self.mapObject = mapClass() | |
181 |
|
181 | |||
182 | @sync_view_results |
|
182 | @sync_view_results | |
183 | def __call__(self, *sequences): |
|
183 | def __call__(self, *sequences): | |
184 | client = self.view.client |
|
184 | client = self.view.client | |
185 |
|
185 | |||
186 | lens = [] |
|
186 | lens = [] | |
187 | maxlen = minlen = -1 |
|
187 | maxlen = minlen = -1 | |
188 | for i, seq in enumerate(sequences): |
|
188 | for i, seq in enumerate(sequences): | |
189 | try: |
|
189 | try: | |
190 | n = len(seq) |
|
190 | n = len(seq) | |
191 | except Exception: |
|
191 | except Exception: | |
192 | seq = list(seq) |
|
192 | seq = list(seq) | |
193 | if isinstance(sequences, tuple): |
|
193 | if isinstance(sequences, tuple): | |
194 | # can't alter a tuple |
|
194 | # can't alter a tuple | |
195 | sequences = list(sequences) |
|
195 | sequences = list(sequences) | |
196 | sequences[i] = seq |
|
196 | sequences[i] = seq | |
197 | n = len(seq) |
|
197 | n = len(seq) | |
198 | if n > maxlen: |
|
198 | if n > maxlen: | |
199 | maxlen = n |
|
199 | maxlen = n | |
200 | if minlen == -1 or n < minlen: |
|
200 | if minlen == -1 or n < minlen: | |
201 | minlen = n |
|
201 | minlen = n | |
202 | lens.append(n) |
|
202 | lens.append(n) | |
203 |
|
203 | |||
204 | # check that the length of sequences match |
|
204 | # check that the length of sequences match | |
205 | if not self._mapping and minlen != maxlen: |
|
205 | if not self._mapping and minlen != maxlen: | |
206 | msg = 'all sequences must have equal length, but have %s' % lens |
|
206 | msg = 'all sequences must have equal length, but have %s' % lens | |
207 | raise ValueError(msg) |
|
207 | raise ValueError(msg) | |
208 |
|
208 | |||
209 | balanced = 'Balanced' in self.view.__class__.__name__ |
|
209 | balanced = 'Balanced' in self.view.__class__.__name__ | |
210 | if balanced: |
|
210 | if balanced: | |
211 | if self.chunksize: |
|
211 | if self.chunksize: | |
212 | nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0) |
|
212 | nparts = maxlen // self.chunksize + int(maxlen % self.chunksize > 0) | |
213 | else: |
|
213 | else: | |
214 | nparts = maxlen |
|
214 | nparts = maxlen | |
215 | targets = [None]*nparts |
|
215 | targets = [None]*nparts | |
216 | else: |
|
216 | else: | |
217 | if self.chunksize: |
|
217 | if self.chunksize: | |
218 | warnings.warn("`chunksize` is ignored unless load balancing", UserWarning) |
|
218 | warnings.warn("`chunksize` is ignored unless load balancing", UserWarning) | |
219 | # multiplexed: |
|
219 | # multiplexed: | |
220 | targets = self.view.targets |
|
220 | targets = self.view.targets | |
221 | # 'all' is lazily evaluated at execution time, which is now: |
|
221 | # 'all' is lazily evaluated at execution time, which is now: | |
222 | if targets == 'all': |
|
222 | if targets == 'all': | |
223 | targets = client._build_targets(targets)[1] |
|
223 | targets = client._build_targets(targets)[1] | |
224 | elif isinstance(targets, int): |
|
224 | elif isinstance(targets, int): | |
225 | # single-engine view, targets must be iterable |
|
225 | # single-engine view, targets must be iterable | |
226 | targets = [targets] |
|
226 | targets = [targets] | |
227 | nparts = len(targets) |
|
227 | nparts = len(targets) | |
228 |
|
228 | |||
229 | msg_ids = [] |
|
229 | msg_ids = [] | |
230 | for index, t in enumerate(targets): |
|
230 | for index, t in enumerate(targets): | |
231 | args = [] |
|
231 | args = [] | |
232 | for seq in sequences: |
|
232 | for seq in sequences: | |
233 | part = self.mapObject.getPartition(seq, index, nparts, maxlen) |
|
233 | part = self.mapObject.getPartition(seq, index, nparts, maxlen) | |
234 | args.append(part) |
|
234 | args.append(part) | |
235 | if not any(args): |
|
235 | if not any(args): | |
236 | continue |
|
236 | continue | |
237 |
|
237 | |||
238 | if self._mapping: |
|
238 | if self._mapping: | |
239 | if sys.version_info[0] >= 3: |
|
239 | if sys.version_info[0] >= 3: | |
240 | f = lambda f, *sequences: list(map(f, *sequences)) |
|
240 | f = lambda f, *sequences: list(map(f, *sequences)) | |
241 | else: |
|
241 | else: | |
242 | f = map |
|
242 | f = map | |
243 | args = [self.func] + args |
|
243 | args = [self.func] + args | |
244 | else: |
|
244 | else: | |
245 | f=self.func |
|
245 | f=self.func | |
246 |
|
246 | |||
247 | view = self.view if balanced else client[t] |
|
247 | view = self.view if balanced else client[t] | |
248 | with view.temp_flags(block=False, **self.flags): |
|
248 | with view.temp_flags(block=False, **self.flags): | |
249 | ar = view.apply(f, *args) |
|
249 | ar = view.apply(f, *args) | |
250 |
|
250 | |||
251 | msg_ids.extend(ar.msg_ids) |
|
251 | msg_ids.extend(ar.msg_ids) | |
252 |
|
252 | |||
253 | r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, |
|
253 | r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, | |
254 | fname=getname(self.func), |
|
254 | fname=getname(self.func), | |
255 | ordered=self.ordered |
|
255 | ordered=self.ordered | |
256 | ) |
|
256 | ) | |
257 |
|
257 | |||
258 | if self.block: |
|
258 | if self.block: | |
259 | try: |
|
259 | try: | |
260 | return r.get() |
|
260 | return r.get() | |
261 | except KeyboardInterrupt: |
|
261 | except KeyboardInterrupt: | |
262 | return r |
|
262 | return r | |
263 | else: |
|
263 | else: | |
264 | return r |
|
264 | return r | |
265 |
|
265 | |||
266 | def map(self, *sequences): |
|
266 | def map(self, *sequences): | |
267 |
"""call a function on each element of |
|
267 | """call a function on each element of one or more sequence(s) remotely. | |
268 | This should behave very much like the builtin map, but return an AsyncMapResult |
|
268 | This should behave very much like the builtin map, but return an AsyncMapResult | |
269 | if self.block is False. |
|
269 | if self.block is False. | |
|
270 | ||||
|
271 | That means it can take generators (will be cast to lists locally), | |||
|
272 | and mismatched sequence lengths will be padded with None. | |||
270 | """ |
|
273 | """ | |
271 | # set _mapping as a flag for use inside self.__call__ |
|
274 | # set _mapping as a flag for use inside self.__call__ | |
272 | self._mapping = True |
|
275 | self._mapping = True | |
273 | try: |
|
276 | try: | |
274 | ret = self(*sequences) |
|
277 | ret = self(*sequences) | |
275 | finally: |
|
278 | finally: | |
276 | self._mapping = False |
|
279 | self._mapping = False | |
277 | return ret |
|
280 | return ret | |
278 |
|
281 | |||
279 | __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] |
|
282 | __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] |
General Comments 0
You need to be logged in to leave comments.
Login now