##// END OF EJS Templates
Merge pull request #3654 from minrk/alltargets...
Min RK -
r11433:14dc7fd2 merge
parent child Browse files
Show More
@@ -73,6 +73,9 class AsyncResult(object):
73 if isinstance(msg_ids, basestring):
73 if isinstance(msg_ids, basestring):
74 # always a list
74 # always a list
75 msg_ids = [msg_ids]
75 msg_ids = [msg_ids]
76 self._single_result = True
77 else:
78 self._single_result = False
76 if tracker is None:
79 if tracker is None:
77 # default to always done
80 # default to always done
78 tracker = finished_tracker
81 tracker = finished_tracker
@@ -81,14 +84,11 class AsyncResult(object):
81 self._fname=fname
84 self._fname=fname
82 self._targets = targets
85 self._targets = targets
83 self._tracker = tracker
86 self._tracker = tracker
87
84 self._ready = False
88 self._ready = False
85 self._outputs_ready = False
89 self._outputs_ready = False
86 self._success = None
90 self._success = None
87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
91 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
88 if len(msg_ids) == 1:
89 self._single_result = not isinstance(targets, (list, tuple))
90 else:
91 self._single_result = False
92
92
93 def __repr__(self):
93 def __repr__(self):
94 if self._ready:
94 if self._ready:
@@ -1378,9 +1378,11 class Client(HasTraits):
1378 block = self.block if block is None else block
1378 block = self.block if block is None else block
1379 if indices_or_msg_ids is None:
1379 if indices_or_msg_ids is None:
1380 indices_or_msg_ids = -1
1380 indices_or_msg_ids = -1
1381
1381
1382 single_result = False
1382 if not isinstance(indices_or_msg_ids, (list,tuple)):
1383 if not isinstance(indices_or_msg_ids, (list,tuple)):
1383 indices_or_msg_ids = [indices_or_msg_ids]
1384 indices_or_msg_ids = [indices_or_msg_ids]
1385 single_result = True
1384
1386
1385 theids = []
1387 theids = []
1386 for id in indices_or_msg_ids:
1388 for id in indices_or_msg_ids:
@@ -1392,6 +1394,11 class Client(HasTraits):
1392
1394
1393 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1395 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1394 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1396 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1397
1398 # given single msg_id initially, get_result shot get the result itself,
1399 # not a length-one list
1400 if single_result:
1401 theids = theids[0]
1395
1402
1396 if remote_ids:
1403 if remote_ids:
1397 ar = AsyncHubResult(self, msg_ids=theids)
1404 ar = AsyncHubResult(self, msg_ids=theids)
@@ -549,8 +549,8 class DirectView(View):
549 block = self.block if block is None else block
549 block = self.block if block is None else block
550 track = self.track if track is None else track
550 track = self.track if track is None else track
551 targets = self.targets if targets is None else targets
551 targets = self.targets if targets is None else targets
552
552
553 _idents = self.client._build_targets(targets)[0]
553 _idents, _targets = self.client._build_targets(targets)
554 msg_ids = []
554 msg_ids = []
555 trackers = []
555 trackers = []
556 for ident in _idents:
556 for ident in _idents:
@@ -559,8 +559,10 class DirectView(View):
559 if track:
559 if track:
560 trackers.append(msg['tracker'])
560 trackers.append(msg['tracker'])
561 msg_ids.append(msg['header']['msg_id'])
561 msg_ids.append(msg['header']['msg_id'])
562 if isinstance(targets, int):
563 msg_ids = msg_ids[0]
562 tracker = None if track is False else zmq.MessageTracker(*trackers)
564 tracker = None if track is False else zmq.MessageTracker(*trackers)
563 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=targets, tracker=tracker)
565 ar = AsyncResult(self.client, msg_ids, fname=getname(f), targets=_targets, tracker=tracker)
564 if block:
566 if block:
565 try:
567 try:
566 return ar.get()
568 return ar.get()
@@ -631,13 +633,15 class DirectView(View):
631 block = self.block if block is None else block
633 block = self.block if block is None else block
632 targets = self.targets if targets is None else targets
634 targets = self.targets if targets is None else targets
633
635
634 _idents = self.client._build_targets(targets)[0]
636 _idents, _targets = self.client._build_targets(targets)
635 msg_ids = []
637 msg_ids = []
636 trackers = []
638 trackers = []
637 for ident in _idents:
639 for ident in _idents:
638 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
640 msg = self.client.send_execute_request(self._socket, code, silent=silent, ident=ident)
639 msg_ids.append(msg['header']['msg_id'])
641 msg_ids.append(msg['header']['msg_id'])
640 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=targets)
642 if isinstance(targets, int):
643 msg_ids = msg_ids[0]
644 ar = AsyncResult(self.client, msg_ids, fname='execute', targets=_targets)
641 if block:
645 if block:
642 try:
646 try:
643 ar.get()
647 ar.get()
@@ -43,15 +43,14 class IPythonError(Exception):
43 class KernelError(IPythonError):
43 class KernelError(IPythonError):
44 pass
44 pass
45
45
46
47
48 class NoEnginesRegistered(KernelError):
46 class NoEnginesRegistered(KernelError):
49 pass
47 pass
50
48
51
52 class TaskAborted(KernelError):
49 class TaskAborted(KernelError):
53 pass
50 pass
54
51
52 class TaskTimeout(KernelError):
53 pass
55
54
56 class TimeoutError(KernelError):
55 class TimeoutError(KernelError):
57 pass
56 pass
@@ -308,4 +308,18 class AsyncResultTest(ClusterTestCase):
308 ar.get(5)
308 ar.get(5)
309 nt.assert_in(4, found)
309 nt.assert_in(4, found)
310 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
310 self.assertTrue(len(found) > 1, "should have seen data multiple times, but got: %s" % found)
311
312 def test_not_single_result(self):
313 save_build = self.client._build_targets
314 def single_engine(*a, **kw):
315 idents, targets = save_build(*a, **kw)
316 return idents[:1], targets[:1]
317 ids = single_engine('all')[1]
318 self.client._build_targets = single_engine
319 for targets in ('all', None, ids):
320 dv = self.client.direct_view(targets=targets)
321 ar = dv.apply_async(lambda : 5)
322 self.assertEqual(ar.get(10), [5])
323 self.client._build_targets = save_build
324
311
325
@@ -152,10 +152,10 class TestClient(ClusterTestCase):
152 ar = c[t].apply_async(wait, 1)
152 ar = c[t].apply_async(wait, 1)
153 # give the monitor time to notice the message
153 # give the monitor time to notice the message
154 time.sleep(.25)
154 time.sleep(.25)
155 ahr = self.client.get_result(ar.msg_ids)
155 ahr = self.client.get_result(ar.msg_ids[0])
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertEqual(ahr.get(), ar.get())
157 self.assertEqual(ahr.get(), ar.get())
158 ar2 = self.client.get_result(ar.msg_ids)
158 ar2 = self.client.get_result(ar.msg_ids[0])
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 c.close()
160 c.close()
161
161
@@ -171,10 +171,11 class TestClient(ClusterTestCase):
171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
172 # give the monitor time to notice the message
172 # give the monitor time to notice the message
173 time.sleep(.25)
173 time.sleep(.25)
174 ahr = self.client.get_result(ar.msg_ids)
174 ahr = self.client.get_result(ar.msg_ids[0])
175 print ar.get(), ahr.get(), ar._single_result, ahr._single_result
175 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
177 self.assertEqual(ahr.get().pyout, ar.get().pyout)
177 ar2 = self.client.get_result(ar.msg_ids)
178 ar2 = self.client.get_result(ar.msg_ids[0])
178 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 c.close()
180 c.close()
180
181
@@ -156,10 +156,10 class TestView(ClusterTestCase, ParametricTestCase):
156 ar = v.apply_async(wait, 1)
156 ar = v.apply_async(wait, 1)
157 # give the monitor time to notice the message
157 # give the monitor time to notice the message
158 time.sleep(.25)
158 time.sleep(.25)
159 ahr = v2.get_result(ar.msg_ids)
159 ahr = v2.get_result(ar.msg_ids[0])
160 self.assertTrue(isinstance(ahr, AsyncHubResult))
160 self.assertTrue(isinstance(ahr, AsyncHubResult))
161 self.assertEqual(ahr.get(), ar.get())
161 self.assertEqual(ahr.get(), ar.get())
162 ar2 = v2.get_result(ar.msg_ids)
162 ar2 = v2.get_result(ar.msg_ids[0])
163 self.assertFalse(isinstance(ar2, AsyncHubResult))
163 self.assertFalse(isinstance(ar2, AsyncHubResult))
164 c.spin()
164 c.spin()
165 c.close()
165 c.close()
General Comments 0
You need to be logged in to leave comments. Login now