##// END OF EJS Templates
AsyncResult.__getattr__ shouldn't raise TimeoutError...
MinRK -
Show More
@@ -1,395 +1,396
1 """AsyncResult objects for the client
1 """AsyncResult objects for the client
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import time
18 import time
19
19
20 from zmq import MessageTracker
20 from zmq import MessageTracker
21
21
22 from IPython.external.decorator import decorator
22 from IPython.external.decorator import decorator
23 from IPython.parallel import error
23 from IPython.parallel import error
24
24
25 #-----------------------------------------------------------------------------
25 #-----------------------------------------------------------------------------
26 # Classes
26 # Classes
27 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
28
28
29 # global empty tracker that's always done:
29 # global empty tracker that's always done:
30 finished_tracker = MessageTracker()
30 finished_tracker = MessageTracker()
31
31
32 @decorator
32 @decorator
33 def check_ready(f, self, *args, **kwargs):
33 def check_ready(f, self, *args, **kwargs):
34 """Call spin() to sync state prior to calling the method."""
34 """Call spin() to sync state prior to calling the method."""
35 self.wait(0)
35 self.wait(0)
36 if not self._ready:
36 if not self._ready:
37 raise error.TimeoutError("result not ready")
37 raise error.TimeoutError("result not ready")
38 return f(self, *args, **kwargs)
38 return f(self, *args, **kwargs)
39
39
40 class AsyncResult(object):
40 class AsyncResult(object):
41 """Class for representing results of non-blocking calls.
41 """Class for representing results of non-blocking calls.
42
42
43 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
43 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
44 """
44 """
45
45
46 msg_ids = None
46 msg_ids = None
47 _targets = None
47 _targets = None
48 _tracker = None
48 _tracker = None
49 _single_result = False
49 _single_result = False
50
50
51 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
51 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
52 if isinstance(msg_ids, basestring):
52 if isinstance(msg_ids, basestring):
53 # always a list
53 # always a list
54 msg_ids = [msg_ids]
54 msg_ids = [msg_ids]
55 if tracker is None:
55 if tracker is None:
56 # default to always done
56 # default to always done
57 tracker = finished_tracker
57 tracker = finished_tracker
58 self._client = client
58 self._client = client
59 self.msg_ids = msg_ids
59 self.msg_ids = msg_ids
60 self._fname=fname
60 self._fname=fname
61 self._targets = targets
61 self._targets = targets
62 self._tracker = tracker
62 self._tracker = tracker
63 self._ready = False
63 self._ready = False
64 self._success = None
64 self._success = None
65 self._metadata = None
65 if len(msg_ids) == 1:
66 if len(msg_ids) == 1:
66 self._single_result = not isinstance(targets, (list, tuple))
67 self._single_result = not isinstance(targets, (list, tuple))
67 else:
68 else:
68 self._single_result = False
69 self._single_result = False
69
70
70 def __repr__(self):
71 def __repr__(self):
71 if self._ready:
72 if self._ready:
72 return "<%s: finished>"%(self.__class__.__name__)
73 return "<%s: finished>"%(self.__class__.__name__)
73 else:
74 else:
74 return "<%s: %s>"%(self.__class__.__name__,self._fname)
75 return "<%s: %s>"%(self.__class__.__name__,self._fname)
75
76
76
77
77 def _reconstruct_result(self, res):
78 def _reconstruct_result(self, res):
78 """Reconstruct our result from actual result list (always a list)
79 """Reconstruct our result from actual result list (always a list)
79
80
80 Override me in subclasses for turning a list of results
81 Override me in subclasses for turning a list of results
81 into the expected form.
82 into the expected form.
82 """
83 """
83 if self._single_result:
84 if self._single_result:
84 return res[0]
85 return res[0]
85 else:
86 else:
86 return res
87 return res
87
88
88 def get(self, timeout=-1):
89 def get(self, timeout=-1):
89 """Return the result when it arrives.
90 """Return the result when it arrives.
90
91
91 If `timeout` is not ``None`` and the result does not arrive within
92 If `timeout` is not ``None`` and the result does not arrive within
92 `timeout` seconds then ``TimeoutError`` is raised. If the
93 `timeout` seconds then ``TimeoutError`` is raised. If the
93 remote call raised an exception then that exception will be reraised
94 remote call raised an exception then that exception will be reraised
94 by get() inside a `RemoteError`.
95 by get() inside a `RemoteError`.
95 """
96 """
96 if not self.ready():
97 if not self.ready():
97 self.wait(timeout)
98 self.wait(timeout)
98
99
99 if self._ready:
100 if self._ready:
100 if self._success:
101 if self._success:
101 return self._result
102 return self._result
102 else:
103 else:
103 raise self._exception
104 raise self._exception
104 else:
105 else:
105 raise error.TimeoutError("Result not ready.")
106 raise error.TimeoutError("Result not ready.")
106
107
107 def ready(self):
108 def ready(self):
108 """Return whether the call has completed."""
109 """Return whether the call has completed."""
109 if not self._ready:
110 if not self._ready:
110 self.wait(0)
111 self.wait(0)
111 return self._ready
112 return self._ready
112
113
113 def wait(self, timeout=-1):
114 def wait(self, timeout=-1):
114 """Wait until the result is available or until `timeout` seconds pass.
115 """Wait until the result is available or until `timeout` seconds pass.
115
116
116 This method always returns None.
117 This method always returns None.
117 """
118 """
118 if self._ready:
119 if self._ready:
119 return
120 return
120 self._ready = self._client.wait(self.msg_ids, timeout)
121 self._ready = self._client.wait(self.msg_ids, timeout)
121 if self._ready:
122 if self._ready:
122 try:
123 try:
123 results = map(self._client.results.get, self.msg_ids)
124 results = map(self._client.results.get, self.msg_ids)
124 self._result = results
125 self._result = results
125 if self._single_result:
126 if self._single_result:
126 r = results[0]
127 r = results[0]
127 if isinstance(r, Exception):
128 if isinstance(r, Exception):
128 raise r
129 raise r
129 else:
130 else:
130 results = error.collect_exceptions(results, self._fname)
131 results = error.collect_exceptions(results, self._fname)
131 self._result = self._reconstruct_result(results)
132 self._result = self._reconstruct_result(results)
132 except Exception, e:
133 except Exception, e:
133 self._exception = e
134 self._exception = e
134 self._success = False
135 self._success = False
135 else:
136 else:
136 self._success = True
137 self._success = True
137 finally:
138 finally:
138 self._metadata = map(self._client.metadata.get, self.msg_ids)
139 self._metadata = map(self._client.metadata.get, self.msg_ids)
139
140
140
141
141 def successful(self):
142 def successful(self):
142 """Return whether the call completed without raising an exception.
143 """Return whether the call completed without raising an exception.
143
144
144 Will raise ``AssertionError`` if the result is not ready.
145 Will raise ``AssertionError`` if the result is not ready.
145 """
146 """
146 assert self.ready()
147 assert self.ready()
147 return self._success
148 return self._success
148
149
149 #----------------------------------------------------------------
150 #----------------------------------------------------------------
150 # Extra methods not in mp.pool.AsyncResult
151 # Extra methods not in mp.pool.AsyncResult
151 #----------------------------------------------------------------
152 #----------------------------------------------------------------
152
153
153 def get_dict(self, timeout=-1):
154 def get_dict(self, timeout=-1):
154 """Get the results as a dict, keyed by engine_id.
155 """Get the results as a dict, keyed by engine_id.
155
156
156 timeout behavior is described in `get()`.
157 timeout behavior is described in `get()`.
157 """
158 """
158
159
159 results = self.get(timeout)
160 results = self.get(timeout)
160 engine_ids = [ md['engine_id'] for md in self._metadata ]
161 engine_ids = [ md['engine_id'] for md in self._metadata ]
161 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
162 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
162 maxcount = bycount.count(bycount[-1])
163 maxcount = bycount.count(bycount[-1])
163 if maxcount > 1:
164 if maxcount > 1:
164 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
165 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
165 maxcount, bycount[-1]))
166 maxcount, bycount[-1]))
166
167
167 return dict(zip(engine_ids,results))
168 return dict(zip(engine_ids,results))
168
169
169 @property
170 @property
170 def result(self):
171 def result(self):
171 """result property wrapper for `get(timeout=0)`."""
172 """result property wrapper for `get(timeout=0)`."""
172 return self.get()
173 return self.get()
173
174
174 # abbreviated alias:
175 # abbreviated alias:
175 r = result
176 r = result
176
177
177 @property
178 @property
178 @check_ready
179 @check_ready
179 def metadata(self):
180 def metadata(self):
180 """property for accessing execution metadata."""
181 """property for accessing execution metadata."""
181 if self._single_result:
182 if self._single_result:
182 return self._metadata[0]
183 return self._metadata[0]
183 else:
184 else:
184 return self._metadata
185 return self._metadata
185
186
186 @property
187 @property
187 def result_dict(self):
188 def result_dict(self):
188 """result property as a dict."""
189 """result property as a dict."""
189 return self.get_dict()
190 return self.get_dict()
190
191
191 def __dict__(self):
192 def __dict__(self):
192 return self.get_dict(0)
193 return self.get_dict(0)
193
194
194 def abort(self):
195 def abort(self):
195 """abort my tasks."""
196 """abort my tasks."""
196 assert not self.ready(), "Can't abort, I am already done!"
197 assert not self.ready(), "Can't abort, I am already done!"
197 return self.client.abort(self.msg_ids, targets=self._targets, block=True)
198 return self.client.abort(self.msg_ids, targets=self._targets, block=True)
198
199
199 @property
200 @property
200 def sent(self):
201 def sent(self):
201 """check whether my messages have been sent."""
202 """check whether my messages have been sent."""
202 return self._tracker.done
203 return self._tracker.done
203
204
204 def wait_for_send(self, timeout=-1):
205 def wait_for_send(self, timeout=-1):
205 """wait for pyzmq send to complete.
206 """wait for pyzmq send to complete.
206
207
207 This is necessary when sending arrays that you intend to edit in-place.
208 This is necessary when sending arrays that you intend to edit in-place.
208 `timeout` is in seconds, and will raise TimeoutError if it is reached
209 `timeout` is in seconds, and will raise TimeoutError if it is reached
209 before the send completes.
210 before the send completes.
210 """
211 """
211 return self._tracker.wait(timeout)
212 return self._tracker.wait(timeout)
212
213
213 #-------------------------------------
214 #-------------------------------------
214 # dict-access
215 # dict-access
215 #-------------------------------------
216 #-------------------------------------
216
217
217 @check_ready
218 @check_ready
218 def __getitem__(self, key):
219 def __getitem__(self, key):
219 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
220 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
220 """
221 """
221 if isinstance(key, int):
222 if isinstance(key, int):
222 return error.collect_exceptions([self._result[key]], self._fname)[0]
223 return error.collect_exceptions([self._result[key]], self._fname)[0]
223 elif isinstance(key, slice):
224 elif isinstance(key, slice):
224 return error.collect_exceptions(self._result[key], self._fname)
225 return error.collect_exceptions(self._result[key], self._fname)
225 elif isinstance(key, basestring):
226 elif isinstance(key, basestring):
226 values = [ md[key] for md in self._metadata ]
227 values = [ md[key] for md in self._metadata ]
227 if self._single_result:
228 if self._single_result:
228 return values[0]
229 return values[0]
229 else:
230 else:
230 return values
231 return values
231 else:
232 else:
232 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
233 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
233
234
234 @check_ready
235 def __getattr__(self, key):
235 def __getattr__(self, key):
236 """getattr maps to getitem for convenient attr access to metadata."""
236 """getattr maps to getitem for convenient attr access to metadata."""
237 if key not in self._metadata[0].keys():
237 try:
238 return self.__getitem__(key)
239 except (error.TimeoutError, KeyError):
238 raise AttributeError("%r object has no attribute %r"%(
240 raise AttributeError("%r object has no attribute %r"%(
239 self.__class__.__name__, key))
241 self.__class__.__name__, key))
240 return self.__getitem__(key)
241
242
242 # asynchronous iterator:
243 # asynchronous iterator:
243 def __iter__(self):
244 def __iter__(self):
244 if self._single_result:
245 if self._single_result:
245 raise TypeError("AsyncResults with a single result are not iterable.")
246 raise TypeError("AsyncResults with a single result are not iterable.")
246 try:
247 try:
247 rlist = self.get(0)
248 rlist = self.get(0)
248 except error.TimeoutError:
249 except error.TimeoutError:
249 # wait for each result individually
250 # wait for each result individually
250 for msg_id in self.msg_ids:
251 for msg_id in self.msg_ids:
251 ar = AsyncResult(self._client, msg_id, self._fname)
252 ar = AsyncResult(self._client, msg_id, self._fname)
252 yield ar.get()
253 yield ar.get()
253 else:
254 else:
254 # already done
255 # already done
255 for r in rlist:
256 for r in rlist:
256 yield r
257 yield r
257
258
258
259
259
260
260 class AsyncMapResult(AsyncResult):
261 class AsyncMapResult(AsyncResult):
261 """Class for representing results of non-blocking gathers.
262 """Class for representing results of non-blocking gathers.
262
263
263 This will properly reconstruct the gather.
264 This will properly reconstruct the gather.
264
265
265 This class is iterable at any time, and will wait on results as they come.
266 This class is iterable at any time, and will wait on results as they come.
266
267
267 If ordered=False, then the first results to arrive will come first, otherwise
268 If ordered=False, then the first results to arrive will come first, otherwise
268 results will be yielded in the order they were submitted.
269 results will be yielded in the order they were submitted.
269
270
270 """
271 """
271
272
272 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
273 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
273 AsyncResult.__init__(self, client, msg_ids, fname=fname)
274 AsyncResult.__init__(self, client, msg_ids, fname=fname)
274 self._mapObject = mapObject
275 self._mapObject = mapObject
275 self._single_result = False
276 self._single_result = False
276 self.ordered = ordered
277 self.ordered = ordered
277
278
278 def _reconstruct_result(self, res):
279 def _reconstruct_result(self, res):
279 """Perform the gather on the actual results."""
280 """Perform the gather on the actual results."""
280 return self._mapObject.joinPartitions(res)
281 return self._mapObject.joinPartitions(res)
281
282
282 # asynchronous iterator:
283 # asynchronous iterator:
283 def __iter__(self):
284 def __iter__(self):
284 it = self._ordered_iter if self.ordered else self._unordered_iter
285 it = self._ordered_iter if self.ordered else self._unordered_iter
285 for r in it():
286 for r in it():
286 yield r
287 yield r
287
288
288 # asynchronous ordered iterator:
289 # asynchronous ordered iterator:
289 def _ordered_iter(self):
290 def _ordered_iter(self):
290 """iterator for results *as they arrive*, preserving submission order."""
291 """iterator for results *as they arrive*, preserving submission order."""
291 try:
292 try:
292 rlist = self.get(0)
293 rlist = self.get(0)
293 except error.TimeoutError:
294 except error.TimeoutError:
294 # wait for each result individually
295 # wait for each result individually
295 for msg_id in self.msg_ids:
296 for msg_id in self.msg_ids:
296 ar = AsyncResult(self._client, msg_id, self._fname)
297 ar = AsyncResult(self._client, msg_id, self._fname)
297 rlist = ar.get()
298 rlist = ar.get()
298 try:
299 try:
299 for r in rlist:
300 for r in rlist:
300 yield r
301 yield r
301 except TypeError:
302 except TypeError:
302 # flattened, not a list
303 # flattened, not a list
303 # this could get broken by flattened data that returns iterables
304 # this could get broken by flattened data that returns iterables
304 # but most calls to map do not expose the `flatten` argument
305 # but most calls to map do not expose the `flatten` argument
305 yield rlist
306 yield rlist
306 else:
307 else:
307 # already done
308 # already done
308 for r in rlist:
309 for r in rlist:
309 yield r
310 yield r
310
311
311 # asynchronous unordered iterator:
312 # asynchronous unordered iterator:
312 def _unordered_iter(self):
313 def _unordered_iter(self):
313 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
314 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
314 try:
315 try:
315 rlist = self.get(0)
316 rlist = self.get(0)
316 except error.TimeoutError:
317 except error.TimeoutError:
317 pending = set(self.msg_ids)
318 pending = set(self.msg_ids)
318 while pending:
319 while pending:
319 try:
320 try:
320 self._client.wait(pending, 1e-3)
321 self._client.wait(pending, 1e-3)
321 except error.TimeoutError:
322 except error.TimeoutError:
322 # ignore timeout error, because that only means
323 # ignore timeout error, because that only means
323 # *some* jobs are outstanding
324 # *some* jobs are outstanding
324 pass
325 pass
325 # update ready set with those no longer outstanding:
326 # update ready set with those no longer outstanding:
326 ready = pending.difference(self._client.outstanding)
327 ready = pending.difference(self._client.outstanding)
327 # update pending to exclude those that are finished
328 # update pending to exclude those that are finished
328 pending = pending.difference(ready)
329 pending = pending.difference(ready)
329 while ready:
330 while ready:
330 msg_id = ready.pop()
331 msg_id = ready.pop()
331 ar = AsyncResult(self._client, msg_id, self._fname)
332 ar = AsyncResult(self._client, msg_id, self._fname)
332 rlist = ar.get()
333 rlist = ar.get()
333 try:
334 try:
334 for r in rlist:
335 for r in rlist:
335 yield r
336 yield r
336 except TypeError:
337 except TypeError:
337 # flattened, not a list
338 # flattened, not a list
338 # this could get broken by flattened data that returns iterables
339 # this could get broken by flattened data that returns iterables
339 # but most calls to map do not expose the `flatten` argument
340 # but most calls to map do not expose the `flatten` argument
340 yield rlist
341 yield rlist
341 else:
342 else:
342 # already done
343 # already done
343 for r in rlist:
344 for r in rlist:
344 yield r
345 yield r
345
346
346
347
347
348
348 class AsyncHubResult(AsyncResult):
349 class AsyncHubResult(AsyncResult):
349 """Class to wrap pending results that must be requested from the Hub.
350 """Class to wrap pending results that must be requested from the Hub.
350
351
351 Note that waiting/polling on these objects requires polling the Hubover the network,
352 Note that waiting/polling on these objects requires polling the Hubover the network,
352 so use `AsyncHubResult.wait()` sparingly.
353 so use `AsyncHubResult.wait()` sparingly.
353 """
354 """
354
355
355 def wait(self, timeout=-1):
356 def wait(self, timeout=-1):
356 """wait for result to complete."""
357 """wait for result to complete."""
357 start = time.time()
358 start = time.time()
358 if self._ready:
359 if self._ready:
359 return
360 return
360 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
361 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
361 local_ready = self._client.wait(local_ids, timeout)
362 local_ready = self._client.wait(local_ids, timeout)
362 if local_ready:
363 if local_ready:
363 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
364 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
364 if not remote_ids:
365 if not remote_ids:
365 self._ready = True
366 self._ready = True
366 else:
367 else:
367 rdict = self._client.result_status(remote_ids, status_only=False)
368 rdict = self._client.result_status(remote_ids, status_only=False)
368 pending = rdict['pending']
369 pending = rdict['pending']
369 while pending and (timeout < 0 or time.time() < start+timeout):
370 while pending and (timeout < 0 or time.time() < start+timeout):
370 rdict = self._client.result_status(remote_ids, status_only=False)
371 rdict = self._client.result_status(remote_ids, status_only=False)
371 pending = rdict['pending']
372 pending = rdict['pending']
372 if pending:
373 if pending:
373 time.sleep(0.1)
374 time.sleep(0.1)
374 if not pending:
375 if not pending:
375 self._ready = True
376 self._ready = True
376 if self._ready:
377 if self._ready:
377 try:
378 try:
378 results = map(self._client.results.get, self.msg_ids)
379 results = map(self._client.results.get, self.msg_ids)
379 self._result = results
380 self._result = results
380 if self._single_result:
381 if self._single_result:
381 r = results[0]
382 r = results[0]
382 if isinstance(r, Exception):
383 if isinstance(r, Exception):
383 raise r
384 raise r
384 else:
385 else:
385 results = error.collect_exceptions(results, self._fname)
386 results = error.collect_exceptions(results, self._fname)
386 self._result = self._reconstruct_result(results)
387 self._result = self._reconstruct_result(results)
387 except Exception, e:
388 except Exception, e:
388 self._exception = e
389 self._exception = e
389 self._success = False
390 self._success = False
390 else:
391 else:
391 self._success = True
392 self._success = True
392 finally:
393 finally:
393 self._metadata = map(self._client.metadata.get, self.msg_ids)
394 self._metadata = map(self._client.metadata.get, self.msg_ids)
394
395
395 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
396 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
@@ -1,73 +1,115
1 """Tests for asyncresult.py
1 """Tests for asyncresult.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19
19
20 from IPython.parallel.error import TimeoutError
20 from IPython.parallel.error import TimeoutError
21
21
22 from IPython.parallel.tests import add_engines
22 from IPython.parallel.tests import add_engines
23 from .clienttest import ClusterTestCase
23 from .clienttest import ClusterTestCase
24
24
25 def setup():
25 def setup():
26 add_engines(2)
26 add_engines(2)
27
27
28 def wait(n):
28 def wait(n):
29 import time
29 import time
30 time.sleep(n)
30 time.sleep(n)
31 return n
31 return n
32
32
33 class AsyncResultTest(ClusterTestCase):
33 class AsyncResultTest(ClusterTestCase):
34
34
35 def test_single_result(self):
35 def test_single_result(self):
36 eid = self.client.ids[-1]
36 eid = self.client.ids[-1]
37 ar = self.client[eid].apply_async(lambda : 42)
37 ar = self.client[eid].apply_async(lambda : 42)
38 self.assertEquals(ar.get(), 42)
38 self.assertEquals(ar.get(), 42)
39 ar = self.client[[eid]].apply_async(lambda : 42)
39 ar = self.client[[eid]].apply_async(lambda : 42)
40 self.assertEquals(ar.get(), [42])
40 self.assertEquals(ar.get(), [42])
41 ar = self.client[-1:].apply_async(lambda : 42)
41 ar = self.client[-1:].apply_async(lambda : 42)
42 self.assertEquals(ar.get(), [42])
42 self.assertEquals(ar.get(), [42])
43
43
44 def test_get_after_done(self):
44 def test_get_after_done(self):
45 ar = self.client[-1].apply_async(lambda : 42)
45 ar = self.client[-1].apply_async(lambda : 42)
46 ar.wait()
46 ar.wait()
47 self.assertTrue(ar.ready())
47 self.assertTrue(ar.ready())
48 self.assertEquals(ar.get(), 42)
48 self.assertEquals(ar.get(), 42)
49 self.assertEquals(ar.get(), 42)
49 self.assertEquals(ar.get(), 42)
50
50
51 def test_get_before_done(self):
51 def test_get_before_done(self):
52 ar = self.client[-1].apply_async(wait, 0.1)
52 ar = self.client[-1].apply_async(wait, 0.1)
53 self.assertRaises(TimeoutError, ar.get, 0)
53 self.assertRaises(TimeoutError, ar.get, 0)
54 ar.wait(0)
54 ar.wait(0)
55 self.assertFalse(ar.ready())
55 self.assertFalse(ar.ready())
56 self.assertEquals(ar.get(), 0.1)
56 self.assertEquals(ar.get(), 0.1)
57
57
58 def test_get_after_error(self):
58 def test_get_after_error(self):
59 ar = self.client[-1].apply_async(lambda : 1/0)
59 ar = self.client[-1].apply_async(lambda : 1/0)
60 ar.wait(10)
60 ar.wait(10)
61 self.assertRaisesRemote(ZeroDivisionError, ar.get)
61 self.assertRaisesRemote(ZeroDivisionError, ar.get)
62 self.assertRaisesRemote(ZeroDivisionError, ar.get)
62 self.assertRaisesRemote(ZeroDivisionError, ar.get)
63 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
63 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
64
64
65 def test_get_dict(self):
65 def test_get_dict(self):
66 n = len(self.client)
66 n = len(self.client)
67 ar = self.client[:].apply_async(lambda : 5)
67 ar = self.client[:].apply_async(lambda : 5)
68 self.assertEquals(ar.get(), [5]*n)
68 self.assertEquals(ar.get(), [5]*n)
69 d = ar.get_dict()
69 d = ar.get_dict()
70 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
70 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
71 for eid,r in d.iteritems():
71 for eid,r in d.iteritems():
72 self.assertEquals(r, 5)
72 self.assertEquals(r, 5)
73
73
74 def test_list_amr(self):
75 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
76 rlist = list(ar)
77
78 def test_getattr(self):
79 ar = self.client[:].apply_async(wait, 0.5)
80 self.assertRaises(AttributeError, lambda : ar._foo)
81 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
82 self.assertRaises(AttributeError, lambda : ar.foo)
83 self.assertRaises(AttributeError, lambda : ar.engine_id)
84 self.assertFalse(hasattr(ar, '__length_hint__'))
85 self.assertFalse(hasattr(ar, 'foo'))
86 self.assertFalse(hasattr(ar, 'engine_id'))
87 ar.get(5)
88 self.assertRaises(AttributeError, lambda : ar._foo)
89 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
90 self.assertRaises(AttributeError, lambda : ar.foo)
91 self.assertTrue(isinstance(ar.engine_id, list))
92 self.assertEquals(ar.engine_id, ar['engine_id'])
93 self.assertFalse(hasattr(ar, '__length_hint__'))
94 self.assertFalse(hasattr(ar, 'foo'))
95 self.assertTrue(hasattr(ar, 'engine_id'))
96
97 def test_getitem(self):
98 ar = self.client[:].apply_async(wait, 0.5)
99 self.assertRaises(TimeoutError, lambda : ar['foo'])
100 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
101 ar.get(5)
102 self.assertRaises(KeyError, lambda : ar['foo'])
103 self.assertTrue(isinstance(ar['engine_id'], list))
104 self.assertEquals(ar.engine_id, ar['engine_id'])
105
106 def test_single_result(self):
107 ar = self.client[-1].apply_async(wait, 0.5)
108 self.assertRaises(TimeoutError, lambda : ar['foo'])
109 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
110 self.assertTrue(ar.get(5) == 0.5)
111 self.assertTrue(isinstance(ar['engine_id'], int))
112 self.assertTrue(isinstance(ar.engine_id, int))
113 self.assertEquals(ar.engine_id, ar['engine_id'])
114
115
@@ -1,165 +1,167
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test LoadBalancedView objects
2 """test LoadBalancedView objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 import zmq
22 import zmq
23 from nose import SkipTest
23 from nose import SkipTest
24
24
25 from IPython import parallel as pmod
25 from IPython import parallel as pmod
26 from IPython.parallel import error
26 from IPython.parallel import error
27
27
28 from IPython.parallel.tests import add_engines
28 from IPython.parallel.tests import add_engines
29
29
30 from .clienttest import ClusterTestCase, crash, wait, skip_without
30 from .clienttest import ClusterTestCase, crash, wait, skip_without
31
31
32 def setup():
32 def setup():
33 add_engines(3)
33 add_engines(3)
34
34
35 class TestLoadBalancedView(ClusterTestCase):
35 class TestLoadBalancedView(ClusterTestCase):
36
36
37 def setUp(self):
37 def setUp(self):
38 ClusterTestCase.setUp(self)
38 ClusterTestCase.setUp(self)
39 self.view = self.client.load_balanced_view()
39 self.view = self.client.load_balanced_view()
40
40
41 def test_z_crash_task(self):
41 def test_z_crash_task(self):
42 """test graceful handling of engine death (balanced)"""
42 """test graceful handling of engine death (balanced)"""
43 raise SkipTest("crash tests disabled, due to undesirable crash reports")
43 raise SkipTest("crash tests disabled, due to undesirable crash reports")
44 # self.add_engines(1)
44 # self.add_engines(1)
45 ar = self.view.apply_async(crash)
45 ar = self.view.apply_async(crash)
46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 eid = ar.engine_id
47 eid = ar.engine_id
48 tic = time.time()
48 tic = time.time()
49 while eid in self.client.ids and time.time()-tic < 5:
49 while eid in self.client.ids and time.time()-tic < 5:
50 time.sleep(.01)
50 time.sleep(.01)
51 self.client.spin()
51 self.client.spin()
52 self.assertFalse(eid in self.client.ids, "Engine should have died")
52 self.assertFalse(eid in self.client.ids, "Engine should have died")
53
53
54 def test_map(self):
54 def test_map(self):
55 def f(x):
55 def f(x):
56 return x**2
56 return x**2
57 data = range(16)
57 data = range(16)
58 r = self.view.map_sync(f, data)
58 r = self.view.map_sync(f, data)
59 self.assertEquals(r, map(f, data))
59 self.assertEquals(r, map(f, data))
60
60
61 def test_map_unordered(self):
61 def test_map_unordered(self):
62 def f(x):
62 def f(x):
63 return x**2
63 return x**2
64 def slow_f(x):
64 def slow_f(x):
65 import time
65 import time
66 time.sleep(0.05*x)
66 time.sleep(0.05*x)
67 return x**2
67 return x**2
68 data = range(16,0,-1)
68 data = range(16,0,-1)
69 reference = map(f, data)
69 reference = map(f, data)
70
70
71 amr = self.view.map_async(f, data, ordered=False)
71 amr = self.view.map_async(slow_f, data, ordered=False)
72 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
72 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
73 # check individual elements, retrieved as they come (uses __iter__)
73 # check individual elements, retrieved as they come
74 astheycame = list(amr)
74 # list comprehension uses __iter__
75 astheycame = [ r for r in amr ]
75 # Ensure that at least one result came out of order:
76 # Ensure that at least one result came out of order:
76 self.assertNotEquals(astheycame, reference, "should not have preserved order")
77 self.assertNotEquals(astheycame, reference, "should not have preserved order")
77 self.assertEquals(sorted(astheycame, reverse=True), reference, "result corrupted")
78 self.assertEquals(sorted(astheycame, reverse=True), reference, "result corrupted")
78
79
79 def test_map_ordered(self):
80 def test_map_ordered(self):
80 def f(x):
81 def f(x):
81 return x**2
82 return x**2
82 def slow_f(x):
83 def slow_f(x):
83 import time
84 import time
84 time.sleep(0.05*x)
85 time.sleep(0.05*x)
85 return x**2
86 return x**2
86 data = range(16,0,-1)
87 data = range(16,0,-1)
87 reference = map(f, data)
88 reference = map(f, data)
88
89
89 amr = self.view.map_async(f, data)
90 amr = self.view.map_async(slow_f, data)
90 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
91 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
91 # check individual elements, retrieved as they come (uses __iter__)
92 # check individual elements, retrieved as they come
93 # list(amr) uses __iter__
92 astheycame = list(amr)
94 astheycame = list(amr)
93 # Ensure that results came in order
95 # Ensure that results came in order
94 self.assertEquals(astheycame, reference)
96 self.assertEquals(astheycame, reference)
95 self.assertEquals(amr.result, reference)
97 self.assertEquals(amr.result, reference)
96
98
97 def test_abort(self):
99 def test_abort(self):
98 view = self.view
100 view = self.view
99 ar = self.client[:].apply_async(time.sleep, .5)
101 ar = self.client[:].apply_async(time.sleep, .5)
100 ar = self.client[:].apply_async(time.sleep, .5)
102 ar = self.client[:].apply_async(time.sleep, .5)
101 time.sleep(0.2)
103 time.sleep(0.2)
102 ar2 = view.apply_async(lambda : 2)
104 ar2 = view.apply_async(lambda : 2)
103 ar3 = view.apply_async(lambda : 3)
105 ar3 = view.apply_async(lambda : 3)
104 view.abort(ar2)
106 view.abort(ar2)
105 view.abort(ar3.msg_ids)
107 view.abort(ar3.msg_ids)
106 self.assertRaises(error.TaskAborted, ar2.get)
108 self.assertRaises(error.TaskAborted, ar2.get)
107 self.assertRaises(error.TaskAborted, ar3.get)
109 self.assertRaises(error.TaskAborted, ar3.get)
108
110
109 def test_retries(self):
111 def test_retries(self):
110 add_engines(3)
112 add_engines(3)
111 view = self.view
113 view = self.view
112 view.timeout = 1 # prevent hang if this doesn't behave
114 view.timeout = 1 # prevent hang if this doesn't behave
113 def fail():
115 def fail():
114 assert False
116 assert False
115 for r in range(len(self.client)-1):
117 for r in range(len(self.client)-1):
116 with view.temp_flags(retries=r):
118 with view.temp_flags(retries=r):
117 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
119 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
118
120
119 with view.temp_flags(retries=len(self.client), timeout=0.25):
121 with view.temp_flags(retries=len(self.client), timeout=0.25):
120 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
122 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
121
123
122 def test_invalid_dependency(self):
124 def test_invalid_dependency(self):
123 view = self.view
125 view = self.view
124 with view.temp_flags(after='12345'):
126 with view.temp_flags(after='12345'):
125 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
127 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
126
128
127 def test_impossible_dependency(self):
129 def test_impossible_dependency(self):
128 if len(self.client) < 2:
130 if len(self.client) < 2:
129 add_engines(2)
131 add_engines(2)
130 view = self.client.load_balanced_view()
132 view = self.client.load_balanced_view()
131 ar1 = view.apply_async(lambda : 1)
133 ar1 = view.apply_async(lambda : 1)
132 ar1.get()
134 ar1.get()
133 e1 = ar1.engine_id
135 e1 = ar1.engine_id
134 e2 = e1
136 e2 = e1
135 while e2 == e1:
137 while e2 == e1:
136 ar2 = view.apply_async(lambda : 1)
138 ar2 = view.apply_async(lambda : 1)
137 ar2.get()
139 ar2.get()
138 e2 = ar2.engine_id
140 e2 = ar2.engine_id
139
141
140 with view.temp_flags(follow=[ar1, ar2]):
142 with view.temp_flags(follow=[ar1, ar2]):
141 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
143 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
142
144
143
145
144 def test_follow(self):
146 def test_follow(self):
145 ar = self.view.apply_async(lambda : 1)
147 ar = self.view.apply_async(lambda : 1)
146 ar.get()
148 ar.get()
147 ars = []
149 ars = []
148 first_id = ar.engine_id
150 first_id = ar.engine_id
149
151
150 self.view.follow = ar
152 self.view.follow = ar
151 for i in range(5):
153 for i in range(5):
152 ars.append(self.view.apply_async(lambda : 1))
154 ars.append(self.view.apply_async(lambda : 1))
153 self.view.wait(ars)
155 self.view.wait(ars)
154 for ar in ars:
156 for ar in ars:
155 self.assertEquals(ar.engine_id, first_id)
157 self.assertEquals(ar.engine_id, first_id)
156
158
157 def test_after(self):
159 def test_after(self):
158 view = self.view
160 view = self.view
159 ar = view.apply_async(time.sleep, 0.5)
161 ar = view.apply_async(time.sleep, 0.5)
160 with view.temp_flags(after=ar):
162 with view.temp_flags(after=ar):
161 ar2 = view.apply_async(lambda : 1)
163 ar2 = view.apply_async(lambda : 1)
162
164
163 ar.wait()
165 ar.wait()
164 ar2.wait()
166 ar2.wait()
165 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
167 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
General Comments 0
You need to be logged in to leave comments. Login now