Show More
@@ -73,6 +73,9 class AsyncResult(object): | |||||
73 | if isinstance(msg_ids, basestring): |
|
73 | if isinstance(msg_ids, basestring): | |
74 | # always a list |
|
74 | # always a list | |
75 | msg_ids = [msg_ids] |
|
75 | msg_ids = [msg_ids] | |
|
76 | self._single_result = True | |||
|
77 | else: | |||
|
78 | self._single_result = False | |||
76 | if tracker is None: |
|
79 | if tracker is None: | |
77 | # default to always done |
|
80 | # default to always done | |
78 | tracker = finished_tracker |
|
81 | tracker = finished_tracker | |
@@ -81,14 +84,11 class AsyncResult(object): | |||||
81 | self._fname=fname |
|
84 | self._fname=fname | |
82 | self._targets = targets |
|
85 | self._targets = targets | |
83 | self._tracker = tracker |
|
86 | self._tracker = tracker | |
|
87 | ||||
84 | self._ready = False |
|
88 | self._ready = False | |
85 | self._outputs_ready = False |
|
89 | self._outputs_ready = False | |
86 | self._success = None |
|
90 | self._success = None | |
87 | self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ] |
|
91 | self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ] | |
88 | if len(msg_ids) == 1: |
|
|||
89 | self._single_result = not isinstance(targets, (list, tuple)) |
|
|||
90 | else: |
|
|||
91 | self._single_result = False |
|
|||
92 |
|
92 | |||
93 | def __repr__(self): |
|
93 | def __repr__(self): | |
94 | if self._ready: |
|
94 | if self._ready: |
@@ -1378,9 +1378,11 class Client(HasTraits): | |||||
1378 | block = self.block if block is None else block |
|
1378 | block = self.block if block is None else block | |
1379 | if indices_or_msg_ids is None: |
|
1379 | if indices_or_msg_ids is None: | |
1380 | indices_or_msg_ids = -1 |
|
1380 | indices_or_msg_ids = -1 | |
1381 |
|
1381 | |||
|
1382 | single_result = False | |||
1382 | if not isinstance(indices_or_msg_ids, (list,tuple)): |
|
1383 | if not isinstance(indices_or_msg_ids, (list,tuple)): | |
1383 | indices_or_msg_ids = [indices_or_msg_ids] |
|
1384 | indices_or_msg_ids = [indices_or_msg_ids] | |
|
1385 | single_result = True | |||
1384 |
|
1386 | |||
1385 | theids = [] |
|
1387 | theids = [] | |
1386 | for id in indices_or_msg_ids: |
|
1388 | for id in indices_or_msg_ids: | |
@@ -1392,6 +1394,11 class Client(HasTraits): | |||||
1392 |
|
1394 | |||
1393 | local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids) |
|
1395 | local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids) | |
1394 | remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) |
|
1396 | remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) | |
|
1397 | ||||
|
1398 | # given single msg_id initially, get_result shot get the result itself, | |||
|
1399 | # not a length-one list | |||
|
1400 | if single_result: | |||
|
1401 | theids = theids[0] | |||
1395 |
|
1402 | |||
1396 | if remote_ids: |
|
1403 | if remote_ids: | |
1397 | ar = AsyncHubResult(self, msg_ids=theids) |
|
1404 | ar = AsyncHubResult(self, msg_ids=theids) |
@@ -549,8 +549,8 class DirectView(View): | |||||
549 | block = self.block if block is None else block |
|
549 | block = self.block if block is None else block | |
550 | track = self.track if track is None else track |
|
550 | track = self.track if track is None else track | |
551 | targets = self.targets if targets is None else targets |
|
551 | targets = self.targets if targets is None else targets | |
552 |
|
552 | |||
553 |
_idents = self.client._build_targets(targets) |
|
553 | _idents, _targets = self.client._build_targets(targets) | |
554 | msg_ids = [] |
|
554 | msg_ids = [] | |
555 | trackers = [] |
|
555 | trackers = [] | |
556 | for ident in _idents: |
|
556 | for ident in _idents: | |
@@ -559,8 +559,10 class DirectView(View): | |||||
559 | if track: |
|
559 | if track: | |
560 | trackers.append(msg['tracker']) |
|
560 | trackers.append(msg['tracker']) | |
561 | msg_ids.append(msg['header']['msg_id']) |
|
561 | msg_ids.append(msg['header']['msg_id']) | |
|
562 | if isinstance(targets, int): | |||
|
563 | msg_ids = msg_ids[0] | |||
562 | tracker = None if track is False else zmq.MessageTracker(*trackers) |
|
564 | tracker = None if track is False else zmq.MessageTracker(*trackers) | |
563 | ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker) |
|
565 | ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, tracker=tracker) | |
564 | if block: |
|
566 | if block: | |
565 | try: |
|
567 | try: | |
566 | return ar.get() |
|
568 | return ar.get() | |
@@ -631,13 +633,15 class DirectView(View): | |||||
631 | block = self.block if block is None else block |
|
633 | block = self.block if block is None else block | |
632 | targets = self.targets if targets is None else targets |
|
634 | targets = self.targets if targets is None else targets | |
633 |
|
635 | |||
634 |
_idents = self.client._build_targets(targets) |
|
636 | _idents, _targets = self.client._build_targets(targets) | |
635 | msg_ids = [] |
|
637 | msg_ids = [] | |
636 | trackers = [] |
|
638 | trackers = [] | |
637 | for ident in _idents: |
|
639 | for ident in _idents: | |
638 | msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident) |
|
640 | msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident) | |
639 | msg_ids.append(msg['header']['msg_id']) |
|
641 | msg_ids.append(msg['header']['msg_id']) | |
640 | ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets) |
|
642 | if isinstance(targets, int): | |
|
643 | msg_ids = msg_ids[0] | |||
|
644 | ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets) | |||
641 | if block: |
|
645 | if block: | |
642 | try: |
|
646 | try: | |
643 | ar.get() |
|
647 | ar.get() |
@@ -43,15 +43,14 class IPythonError(Exception): | |||||
43 | class KernelError(IPythonError): |
|
43 | class KernelError(IPythonError): | |
44 | pass |
|
44 | pass | |
45 |
|
45 | |||
46 |
|
||||
47 |
|
||||
48 | class NoEnginesRegistered(KernelError): |
|
46 | class NoEnginesRegistered(KernelError): | |
49 | pass |
|
47 | pass | |
50 |
|
48 | |||
51 |
|
||||
52 | class TaskAborted(KernelError): |
|
49 | class TaskAborted(KernelError): | |
53 | pass |
|
50 | pass | |
54 |
|
51 | |||
|
52 | class TaskTimeout(KernelError): | |||
|
53 | pass | |||
55 |
|
54 | |||
56 | class TimeoutError(KernelError): |
|
55 | class TimeoutError(KernelError): | |
57 | pass |
|
56 | pass |
@@ -308,4 +308,18 class AsyncResultTest(ClusterTestCase): | |||||
308 | ar.get(5) |
|
308 | ar.get(5) | |
309 | nt.assert_in(4, found) |
|
309 | nt.assert_in(4, found) | |
310 | self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found) |
|
310 | self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found) | |
|
311 | ||||
|
312 | def test_not_single_result(self): | |||
|
313 | save_build = self.client._build_targets | |||
|
314 | def single_engine(*a, **kw): | |||
|
315 | idents, targets = save_build(*a, **kw) | |||
|
316 | return idents[:1], targets[:1] | |||
|
317 | ids = single_engine('all')[1] | |||
|
318 | self.client._build_targets = single_engine | |||
|
319 | for targets in ('all', None, ids): | |||
|
320 | dv = self.client.direct_view(targets=targets) | |||
|
321 | ar = dv.apply_async(lambda : 5) | |||
|
322 | self.assertEqual(ar.get(10), [5]) | |||
|
323 | self.client._build_targets = save_build | |||
|
324 | ||||
311 |
|
325 |
@@ -152,10 +152,10 class TestClient(ClusterTestCase): | |||||
152 | ar = c[t].apply_async(wait, 1) |
|
152 | ar = c[t].apply_async(wait, 1) | |
153 | # give the monitor time to notice the message |
|
153 | # give the monitor time to notice the message | |
154 | time.sleep(.25) |
|
154 | time.sleep(.25) | |
155 | ahr = self.client.get_result(ar.msg_ids) |
|
155 | ahr = self.client.get_result(ar.msg_ids[0]) | |
156 | self.assertTrue(isinstance(ahr, AsyncHubResult)) |
|
156 | self.assertTrue(isinstance(ahr, AsyncHubResult)) | |
157 | self.assertEqual(ahr.get(), ar.get()) |
|
157 | self.assertEqual(ahr.get(), ar.get()) | |
158 | ar2 = self.client.get_result(ar.msg_ids) |
|
158 | ar2 = self.client.get_result(ar.msg_ids[0]) | |
159 | self.assertFalse(isinstance(ar2, AsyncHubResult)) |
|
159 | self.assertFalse(isinstance(ar2, AsyncHubResult)) | |
160 | c.close() |
|
160 | c.close() | |
161 |
|
161 | |||
@@ -171,10 +171,11 class TestClient(ClusterTestCase): | |||||
171 | ar = c[t].execute("import time; time.sleep(1)", silent=False) |
|
171 | ar = c[t].execute("import time; time.sleep(1)", silent=False) | |
172 | # give the monitor time to notice the message |
|
172 | # give the monitor time to notice the message | |
173 | time.sleep(.25) |
|
173 | time.sleep(.25) | |
174 | ahr = self.client.get_result(ar.msg_ids) |
|
174 | ahr = self.client.get_result(ar.msg_ids[0]) | |
|
175 | print ar.get(), ahr.get(), ar._single_result, ahr._single_result | |||
175 | self.assertTrue(isinstance(ahr, AsyncHubResult)) |
|
176 | self.assertTrue(isinstance(ahr, AsyncHubResult)) | |
176 | self.assertEqual(ahr.get().pyout, ar.get().pyout) |
|
177 | self.assertEqual(ahr.get().pyout, ar.get().pyout) | |
177 | ar2 = self.client.get_result(ar.msg_ids) |
|
178 | ar2 = self.client.get_result(ar.msg_ids[0]) | |
178 | self.assertFalse(isinstance(ar2, AsyncHubResult)) |
|
179 | self.assertFalse(isinstance(ar2, AsyncHubResult)) | |
179 | c.close() |
|
180 | c.close() | |
180 |
|
181 |
@@ -156,10 +156,10 class TestView(ClusterTestCase, ParametricTestCase): | |||||
156 | ar = v.apply_async(wait, 1) |
|
156 | ar = v.apply_async(wait, 1) | |
157 | # give the monitor time to notice the message |
|
157 | # give the monitor time to notice the message | |
158 | time.sleep(.25) |
|
158 | time.sleep(.25) | |
159 | ahr = v2.get_result(ar.msg_ids) |
|
159 | ahr = v2.get_result(ar.msg_ids[0]) | |
160 | self.assertTrue(isinstance(ahr, AsyncHubResult)) |
|
160 | self.assertTrue(isinstance(ahr, AsyncHubResult)) | |
161 | self.assertEqual(ahr.get(), ar.get()) |
|
161 | self.assertEqual(ahr.get(), ar.get()) | |
162 | ar2 = v2.get_result(ar.msg_ids) |
|
162 | ar2 = v2.get_result(ar.msg_ids[0]) | |
163 | self.assertFalse(isinstance(ar2, AsyncHubResult)) |
|
163 | self.assertFalse(isinstance(ar2, AsyncHubResult)) | |
164 | c.spin() |
|
164 | c.spin() | |
165 | c.close() |
|
165 | c.close() |
General Comments 0
You need to be logged in to leave comments.
Login now