Show More
@@ -16,16 +16,10 b' from IPython.external.decorator import decorator' | |||||
16 | from IPython.parallel import error |
|
16 | from IPython.parallel import error | |
17 | from IPython.utils.py3compat import string_types |
|
17 | from IPython.utils.py3compat import string_types | |
18 |
|
18 | |||
19 | #----------------------------------------------------------------------------- |
|
|||
20 | # Functions |
|
|||
21 | #----------------------------------------------------------------------------- |
|
|||
22 |
|
19 | |||
23 | def _raw_text(s): |
|
20 | def _raw_text(s): | |
24 | display_pretty(s, raw=True) |
|
21 | display_pretty(s, raw=True) | |
25 |
|
22 | |||
26 | #----------------------------------------------------------------------------- |
|
|||
27 | # Classes |
|
|||
28 | #----------------------------------------------------------------------------- |
|
|||
29 |
|
23 | |||
30 | # global empty tracker that's always done: |
|
24 | # global empty tracker that's always done: | |
31 | finished_tracker = MessageTracker() |
|
25 | finished_tracker = MessageTracker() | |
@@ -48,8 +42,11 b' class AsyncResult(object):' | |||||
48 | _targets = None |
|
42 | _targets = None | |
49 | _tracker = None |
|
43 | _tracker = None | |
50 | _single_result = False |
|
44 | _single_result = False | |
|
45 | owner = False, | |||
51 |
|
46 | |||
52 |
def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None |
|
47 | def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None, | |
|
48 | owner=False, | |||
|
49 | ): | |||
53 | if isinstance(msg_ids, string_types): |
|
50 | if isinstance(msg_ids, string_types): | |
54 | # always a list |
|
51 | # always a list | |
55 | msg_ids = [msg_ids] |
|
52 | msg_ids = [msg_ids] | |
@@ -64,6 +61,7 b' class AsyncResult(object):' | |||||
64 | self._fname=fname |
|
61 | self._fname=fname | |
65 | self._targets = targets |
|
62 | self._targets = targets | |
66 | self._tracker = tracker |
|
63 | self._tracker = tracker | |
|
64 | self.owner = owner | |||
67 |
|
65 | |||
68 | self._ready = False |
|
66 | self._ready = False | |
69 | self._outputs_ready = False |
|
67 | self._outputs_ready = False | |
@@ -151,6 +149,12 b' class AsyncResult(object):' | |||||
151 | timeout = 10 |
|
149 | timeout = 10 | |
152 | self._wait_for_outputs(timeout) |
|
150 | self._wait_for_outputs(timeout) | |
153 |
|
151 | |||
|
152 | if self.owner: | |||
|
153 | ||||
|
154 | self._metadata = [self._client.metadata.pop(mid) for mid in self.msg_ids] | |||
|
155 | [self._client.results.pop(mid) for mid in self.msg_ids] | |||
|
156 | ||||
|
157 | ||||
154 |
|
158 | |||
155 | def successful(self): |
|
159 | def successful(self): | |
156 | """Return whether the call completed without raising an exception. |
|
160 | """Return whether the call completed without raising an exception. | |
@@ -691,5 +695,9 b' class AsyncHubResult(AsyncResult):' | |||||
691 | self._success = True |
|
695 | self._success = True | |
692 | finally: |
|
696 | finally: | |
693 | self._metadata = [self._client.metadata[mid] for mid in self.msg_ids] |
|
697 | self._metadata = [self._client.metadata[mid] for mid in self.msg_ids] | |
|
698 | if self.owner: | |||
|
699 | [self._client.metadata.pop(mid) for mid in self.msg_ids] | |||
|
700 | [self._client.results.pop(mid) for mid in self.msg_ids] | |||
|
701 | ||||
694 |
|
702 | |||
695 | __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] |
|
703 | __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] |
@@ -1358,7 +1358,7 b' class Client(HasTraits):' | |||||
1358 | #-------------------------------------------------------------------------- |
|
1358 | #-------------------------------------------------------------------------- | |
1359 |
|
1359 | |||
1360 | @spin_first |
|
1360 | @spin_first | |
1361 | def get_result(self, indices_or_msg_ids=None, block=None): |
|
1361 | def get_result(self, indices_or_msg_ids=None, block=None, owner=True): | |
1362 | """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object. |
|
1362 | """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object. | |
1363 |
|
1363 | |||
1364 | If the client already has the results, no request to the Hub will be made. |
|
1364 | If the client already has the results, no request to the Hub will be made. | |
@@ -1384,6 +1384,11 b' class Client(HasTraits):' | |||||
1384 |
|
1384 | |||
1385 | block : bool |
|
1385 | block : bool | |
1386 | Whether to wait for the result to be done |
|
1386 | Whether to wait for the result to be done | |
|
1387 | owner : bool [default: True] | |||
|
1388 | Whether this AsyncResult should own the result. | |||
|
1389 | If so, calling `ar.get()` will remove data from the | |||
|
1390 | client's result and metadata cache. | |||
|
1391 | There should only be one owner of any given msg_id. | |||
1387 |
|
1392 | |||
1388 | Returns |
|
1393 | Returns | |
1389 | ------- |
|
1394 | ------- | |
@@ -1421,9 +1426,9 b' class Client(HasTraits):' | |||||
1421 | theids = theids[0] |
|
1426 | theids = theids[0] | |
1422 |
|
1427 | |||
1423 | if remote_ids: |
|
1428 | if remote_ids: | |
1424 | ar = AsyncHubResult(self, msg_ids=theids) |
|
1429 | ar = AsyncHubResult(self, msg_ids=theids, owner=owner) | |
1425 | else: |
|
1430 | else: | |
1426 | ar = AsyncResult(self, msg_ids=theids) |
|
1431 | ar = AsyncResult(self, msg_ids=theids, owner=owner) | |
1427 |
|
1432 | |||
1428 | if block: |
|
1433 | if block: | |
1429 | ar.wait() |
|
1434 | ar.wait() | |
@@ -1703,8 +1708,8 b' class Client(HasTraits):' | |||||
1703 | if still_outstanding: |
|
1708 | if still_outstanding: | |
1704 | raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding) |
|
1709 | raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding) | |
1705 | for mid in msg_ids: |
|
1710 | for mid in msg_ids: | |
1706 | self.results.pop(mid) |
|
1711 | self.results.pop(mid, None) | |
1707 | self.metadata.pop(mid) |
|
1712 | self.metadata.pop(mid, None) | |
1708 |
|
1713 | |||
1709 |
|
1714 | |||
1710 | @spin_first |
|
1715 | @spin_first |
@@ -1,20 +1,9 b'' | |||||
1 | """Views of remote engines. |
|
1 | """Views of remote engines.""" | |
2 |
|
2 | |||
3 | Authors: |
|
3 | # Copyright (c) IPython Development Team. | |
|
4 | # Distributed under the terms of the Modified BSD License. | |||
4 |
|
5 | |||
5 | * Min RK |
|
|||
6 | """ |
|
|||
7 | from __future__ import print_function |
|
6 | from __future__ import print_function | |
8 | #----------------------------------------------------------------------------- |
|
|||
9 | # Copyright (C) 2010-2011 The IPython Development Team |
|
|||
10 | # |
|
|||
11 | # Distributed under the terms of the BSD License. The full license is in |
|
|||
12 | # the file COPYING, distributed as part of this software. |
|
|||
13 | #----------------------------------------------------------------------------- |
|
|||
14 |
|
||||
15 | #----------------------------------------------------------------------------- |
|
|||
16 | # Imports |
|
|||
17 | #----------------------------------------------------------------------------- |
|
|||
18 |
|
7 | |||
19 | import imp |
|
8 | import imp | |
20 | import sys |
|
9 | import sys | |
@@ -315,7 +304,7 b' class View(HasTraits):' | |||||
315 | return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block) |
|
304 | return self.client.shutdown(targets=targets, restart=restart, hub=hub, block=block) | |
316 |
|
305 | |||
317 | @spin_after |
|
306 | @spin_after | |
318 | def get_result(self, indices_or_msg_ids=None): |
|
307 | def get_result(self, indices_or_msg_ids=None, block=None, owner=True): | |
319 | """return one or more results, specified by history index or msg_id. |
|
308 | """return one or more results, specified by history index or msg_id. | |
320 |
|
309 | |||
321 | See :meth:`IPython.parallel.client.client.Client.get_result` for details. |
|
310 | See :meth:`IPython.parallel.client.client.Client.get_result` for details. | |
@@ -330,7 +319,7 b' class View(HasTraits):' | |||||
330 | for i,index in enumerate(indices_or_msg_ids): |
|
319 | for i,index in enumerate(indices_or_msg_ids): | |
331 | if isinstance(index, int): |
|
320 | if isinstance(index, int): | |
332 | indices_or_msg_ids[i] = self.history[index] |
|
321 | indices_or_msg_ids[i] = self.history[index] | |
333 | return self.client.get_result(indices_or_msg_ids) |
|
322 | return self.client.get_result(indices_or_msg_ids, block=block, owner=owner) | |
334 |
|
323 | |||
335 | #------------------------------------------------------------------- |
|
324 | #------------------------------------------------------------------- | |
336 | # Map |
|
325 | # Map | |
@@ -577,7 +566,9 b' class DirectView(View):' | |||||
577 | if isinstance(targets, int): |
|
566 | if isinstance(targets, int): | |
578 | msg_ids = msg_ids[0] |
|
567 | msg_ids = msg_ids[0] | |
579 | tracker = None if track is False else zmq.MessageTracker(*trackers) |
|
568 | tracker = None if track is False else zmq.MessageTracker(*trackers) | |
580 |
ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, |
|
569 | ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, | |
|
570 | tracker=tracker, owner=True, | |||
|
571 | ) | |||
581 | if block: |
|
572 | if block: | |
582 | try: |
|
573 | try: | |
583 | return ar.get() |
|
574 | return ar.get() | |
@@ -656,7 +647,7 b' class DirectView(View):' | |||||
656 | msg_ids.append(msg['header']['msg_id']) |
|
647 | msg_ids.append(msg['header']['msg_id']) | |
657 | if isinstance(targets, int): |
|
648 | if isinstance(targets, int): | |
658 | msg_ids = msg_ids[0] |
|
649 | msg_ids = msg_ids[0] | |
659 | ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets) |
|
650 | ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets, owner=True) | |
660 | if block: |
|
651 | if block: | |
661 | try: |
|
652 | try: | |
662 | ar.get() |
|
653 | ar.get() | |
@@ -774,7 +765,9 b' class DirectView(View):' | |||||
774 | else: |
|
765 | else: | |
775 | tracker = None |
|
766 | tracker = None | |
776 |
|
767 | |||
777 |
r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, |
|
768 | r = AsyncResult(self.client, msg_ids, fname='scatter', targets=targets, | |
|
769 | tracker=tracker, owner=True, | |||
|
770 | ) | |||
778 | if block: |
|
771 | if block: | |
779 | r.wait() |
|
772 | r.wait() | |
780 | else: |
|
773 | else: | |
@@ -1057,8 +1050,9 b' class LoadBalancedView(View):' | |||||
1057 | metadata=metadata) |
|
1050 | metadata=metadata) | |
1058 | tracker = None if track is False else msg['tracker'] |
|
1051 | tracker = None if track is False else msg['tracker'] | |
1059 |
|
1052 | |||
1060 |
ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), |
|
1053 | ar = AsyncResult(self.client, msg['header']['msg_id'], fname=getname(f), | |
1061 |
|
1054 | targets=None, tracker=tracker, owner=True, | ||
|
1055 | ) | |||
1062 | if block: |
|
1056 | if block: | |
1063 | try: |
|
1057 | try: | |
1064 | return ar.get() |
|
1058 | return ar.get() |
@@ -1,20 +1,7 b'' | |||||
1 | """Tests for asyncresult.py |
|
1 | """Tests for asyncresult.py""" | |
2 |
|
2 | |||
3 | Authors: |
|
3 | # Copyright (c) IPython Development Team. | |
4 |
|
4 | # Distributed under the terms of the Modified BSD License. | ||
5 | * Min RK |
|
|||
6 | """ |
|
|||
7 |
|
||||
8 | #------------------------------------------------------------------------------- |
|
|||
9 | # Copyright (C) 2011 The IPython Development Team |
|
|||
10 | # |
|
|||
11 | # Distributed under the terms of the BSD License. The full license is in |
|
|||
12 | # the file COPYING, distributed as part of this software. |
|
|||
13 | #------------------------------------------------------------------------------- |
|
|||
14 |
|
||||
15 | #------------------------------------------------------------------------------- |
|
|||
16 | # Imports |
|
|||
17 | #------------------------------------------------------------------------------- |
|
|||
18 |
|
5 | |||
19 | import time |
|
6 | import time | |
20 |
|
7 | |||
@@ -324,4 +311,32 b' class AsyncResultTest(ClusterTestCase):' | |||||
324 | self.assertEqual(ar.get(10), [5]) |
|
311 | self.assertEqual(ar.get(10), [5]) | |
325 | self.client._build_targets = save_build |
|
312 | self.client._build_targets = save_build | |
326 |
|
313 | |||
|
314 | def test_owner_pop(self): | |||
|
315 | self.minimum_engines(1) | |||
|
316 | ||||
|
317 | view = self.client[-1] | |||
|
318 | ar = view.apply_async(lambda : 1) | |||
|
319 | ar.get() | |||
|
320 | msg_id = ar.msg_ids[0] | |||
|
321 | self.assertNotIn(msg_id, self.client.results) | |||
|
322 | self.assertNotIn(msg_id, self.client.metadata) | |||
|
323 | ||||
|
324 | def test_non_owner(self): | |||
|
325 | self.minimum_engines(1) | |||
|
326 | ||||
|
327 | view = self.client[-1] | |||
|
328 | ar = view.apply_async(lambda : 1) | |||
|
329 | ar.owner = False | |||
|
330 | ar.get() | |||
|
331 | msg_id = ar.msg_ids[0] | |||
|
332 | self.assertIn(msg_id, self.client.results) | |||
|
333 | self.assertIn(msg_id, self.client.metadata) | |||
|
334 | ar2 = self.client.get_result(msg_id, owner=True) | |||
|
335 | self.assertIs(type(ar2), type(ar)) | |||
|
336 | self.assertTrue(ar2.owner) | |||
|
337 | self.assertEqual(ar.get(), ar2.get()) | |||
|
338 | ar2.get() | |||
|
339 | self.assertNotIn(msg_id, self.client.results) | |||
|
340 | self.assertNotIn(msg_id, self.client.metadata) | |||
|
341 | ||||
327 |
|
342 |
@@ -143,11 +143,12 b' class TestClient(ClusterTestCase):' | |||||
143 | ar = c[t].apply_async(wait, 1) |
|
143 | ar = c[t].apply_async(wait, 1) | |
144 | # give the monitor time to notice the message |
|
144 | # give the monitor time to notice the message | |
145 | time.sleep(.25) |
|
145 | time.sleep(.25) | |
146 | ahr = self.client.get_result(ar.msg_ids[0]) |
|
146 | ahr = self.client.get_result(ar.msg_ids[0], owner=False) | |
147 |
self.assert |
|
147 | self.assertIsInstance(ahr, AsyncHubResult) | |
148 | self.assertEqual(ahr.get(), ar.get()) |
|
148 | self.assertEqual(ahr.get(), ar.get()) | |
149 | ar2 = self.client.get_result(ar.msg_ids[0]) |
|
149 | ar2 = self.client.get_result(ar.msg_ids[0]) | |
150 |
self.assert |
|
150 | self.assertNotIsInstance(ar2, AsyncHubResult) | |
|
151 | self.assertEqual(ahr.get(), ar2.get()) | |||
151 | c.close() |
|
152 | c.close() | |
152 |
|
153 | |||
153 | def test_get_execute_result(self): |
|
154 | def test_get_execute_result(self): | |
@@ -162,11 +163,12 b' class TestClient(ClusterTestCase):' | |||||
162 | ar = c[t].execute("import time; time.sleep(1)", silent=False) |
|
163 | ar = c[t].execute("import time; time.sleep(1)", silent=False) | |
163 | # give the monitor time to notice the message |
|
164 | # give the monitor time to notice the message | |
164 | time.sleep(.25) |
|
165 | time.sleep(.25) | |
165 | ahr = self.client.get_result(ar.msg_ids[0]) |
|
166 | ahr = self.client.get_result(ar.msg_ids[0], owner=False) | |
166 |
self.assert |
|
167 | self.assertIsInstance(ahr, AsyncHubResult) | |
167 | self.assertEqual(ahr.get().execute_result, ar.get().execute_result) |
|
168 | self.assertEqual(ahr.get().execute_result, ar.get().execute_result) | |
168 | ar2 = self.client.get_result(ar.msg_ids[0]) |
|
169 | ar2 = self.client.get_result(ar.msg_ids[0]) | |
169 |
self.assert |
|
170 | self.assertNotIsInstance(ar2, AsyncHubResult) | |
|
171 | self.assertEqual(ahr.get(), ar2.get()) | |||
170 | c.close() |
|
172 | c.close() | |
171 |
|
173 | |||
172 | def test_ids_list(self): |
|
174 | def test_ids_list(self): | |
@@ -450,6 +452,7 b' class TestClient(ClusterTestCase):' | |||||
450 | v = self.client[-1] |
|
452 | v = self.client[-1] | |
451 | ar = v.apply_async(lambda : 1) |
|
453 | ar = v.apply_async(lambda : 1) | |
452 | msg_id = ar.msg_ids[0] |
|
454 | msg_id = ar.msg_ids[0] | |
|
455 | ar.owner = False | |||
453 | ar.get() |
|
456 | ar.get() | |
454 | self._wait_for_idle() |
|
457 | self._wait_for_idle() | |
455 | ar2 = v.apply_async(time.sleep, 1) |
|
458 | ar2 = v.apply_async(time.sleep, 1) |
@@ -142,11 +142,12 b' class TestView(ClusterTestCase):' | |||||
142 | ar = v.apply_async(wait, 1) |
|
142 | ar = v.apply_async(wait, 1) | |
143 | # give the monitor time to notice the message |
|
143 | # give the monitor time to notice the message | |
144 | time.sleep(.25) |
|
144 | time.sleep(.25) | |
145 | ahr = v2.get_result(ar.msg_ids[0]) |
|
145 | ahr = v2.get_result(ar.msg_ids[0], owner=False) | |
146 |
self.assert |
|
146 | self.assertIsInstance(ahr, AsyncHubResult) | |
147 | self.assertEqual(ahr.get(), ar.get()) |
|
147 | self.assertEqual(ahr.get(), ar.get()) | |
148 | ar2 = v2.get_result(ar.msg_ids[0]) |
|
148 | ar2 = v2.get_result(ar.msg_ids[0]) | |
149 |
self.assert |
|
149 | self.assertNotIsInstance(ar2, AsyncHubResult) | |
|
150 | self.assertEqual(ahr.get(), ar2.get()) | |||
150 | c.spin() |
|
151 | c.spin() | |
151 | c.close() |
|
152 | c.close() | |
152 |
|
153 |
General Comments 0
You need to be logged in to leave comments.
Login now