##// END OF EJS Templates
test AsyncResult.display_outputs with empty stdout/err
MinRK -
Show More
@@ -1,205 +1,264 b''
1 1 """Tests for asyncresult.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import time
20 20
21 21 from IPython.parallel.error import TimeoutError
22 22
23 23 from IPython.parallel import error, Client
24 24 from IPython.parallel.tests import add_engines
25 from .clienttest import ClusterTestCase
25 from .clienttest import ClusterTestCase, capture_output
26 26
27 27 def setup():
28 28 add_engines(2, total=True)
29 29
30 30 def wait(n):
31 31 import time
32 32 time.sleep(n)
33 33 return n
34 34
35 35 class AsyncResultTest(ClusterTestCase):
36 36
37 37 def test_single_result_view(self):
38 38 """various one-target views get the right value for single_result"""
39 39 eid = self.client.ids[-1]
40 40 ar = self.client[eid].apply_async(lambda : 42)
41 41 self.assertEquals(ar.get(), 42)
42 42 ar = self.client[[eid]].apply_async(lambda : 42)
43 43 self.assertEquals(ar.get(), [42])
44 44 ar = self.client[-1:].apply_async(lambda : 42)
45 45 self.assertEquals(ar.get(), [42])
46 46
47 47 def test_get_after_done(self):
48 48 ar = self.client[-1].apply_async(lambda : 42)
49 49 ar.wait()
50 50 self.assertTrue(ar.ready())
51 51 self.assertEquals(ar.get(), 42)
52 52 self.assertEquals(ar.get(), 42)
53 53
54 54 def test_get_before_done(self):
55 55 ar = self.client[-1].apply_async(wait, 0.1)
56 56 self.assertRaises(TimeoutError, ar.get, 0)
57 57 ar.wait(0)
58 58 self.assertFalse(ar.ready())
59 59 self.assertEquals(ar.get(), 0.1)
60 60
61 61 def test_get_after_error(self):
62 62 ar = self.client[-1].apply_async(lambda : 1/0)
63 63 ar.wait(10)
64 64 self.assertRaisesRemote(ZeroDivisionError, ar.get)
65 65 self.assertRaisesRemote(ZeroDivisionError, ar.get)
66 66 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
67 67
68 68 def test_get_dict(self):
69 69 n = len(self.client)
70 70 ar = self.client[:].apply_async(lambda : 5)
71 71 self.assertEquals(ar.get(), [5]*n)
72 72 d = ar.get_dict()
73 73 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
74 74 for eid,r in d.iteritems():
75 75 self.assertEquals(r, 5)
76 76
77 77 def test_list_amr(self):
78 78 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
79 79 rlist = list(ar)
80 80
81 81 def test_getattr(self):
82 82 ar = self.client[:].apply_async(wait, 0.5)
83 83 self.assertRaises(AttributeError, lambda : ar._foo)
84 84 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
85 85 self.assertRaises(AttributeError, lambda : ar.foo)
86 86 self.assertRaises(AttributeError, lambda : ar.engine_id)
87 87 self.assertFalse(hasattr(ar, '__length_hint__'))
88 88 self.assertFalse(hasattr(ar, 'foo'))
89 89 self.assertFalse(hasattr(ar, 'engine_id'))
90 90 ar.get(5)
91 91 self.assertRaises(AttributeError, lambda : ar._foo)
92 92 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
93 93 self.assertRaises(AttributeError, lambda : ar.foo)
94 94 self.assertTrue(isinstance(ar.engine_id, list))
95 95 self.assertEquals(ar.engine_id, ar['engine_id'])
96 96 self.assertFalse(hasattr(ar, '__length_hint__'))
97 97 self.assertFalse(hasattr(ar, 'foo'))
98 98 self.assertTrue(hasattr(ar, 'engine_id'))
99 99
100 100 def test_getitem(self):
101 101 ar = self.client[:].apply_async(wait, 0.5)
102 102 self.assertRaises(TimeoutError, lambda : ar['foo'])
103 103 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
104 104 ar.get(5)
105 105 self.assertRaises(KeyError, lambda : ar['foo'])
106 106 self.assertTrue(isinstance(ar['engine_id'], list))
107 107 self.assertEquals(ar.engine_id, ar['engine_id'])
108 108
109 109 def test_single_result(self):
110 110 ar = self.client[-1].apply_async(wait, 0.5)
111 111 self.assertRaises(TimeoutError, lambda : ar['foo'])
112 112 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
113 113 self.assertTrue(ar.get(5) == 0.5)
114 114 self.assertTrue(isinstance(ar['engine_id'], int))
115 115 self.assertTrue(isinstance(ar.engine_id, int))
116 116 self.assertEquals(ar.engine_id, ar['engine_id'])
117 117
118 118 def test_abort(self):
119 119 e = self.client[-1]
120 120 ar = e.execute('import time; time.sleep(1)', block=False)
121 121 ar2 = e.apply_async(lambda : 2)
122 122 ar2.abort()
123 123 self.assertRaises(error.TaskAborted, ar2.get)
124 124 ar.get()
125 125
126 126 def test_len(self):
127 127 v = self.client.load_balanced_view()
128 128 ar = v.map_async(lambda x: x, range(10))
129 129 self.assertEquals(len(ar), 10)
130 130 ar = v.apply_async(lambda x: x, range(10))
131 131 self.assertEquals(len(ar), 1)
132 132 ar = self.client[:].apply_async(lambda x: x, range(10))
133 133 self.assertEquals(len(ar), len(self.client.ids))
134 134
135 135 def test_wall_time_single(self):
136 136 v = self.client.load_balanced_view()
137 137 ar = v.apply_async(time.sleep, 0.25)
138 138 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
139 139 ar.get(2)
140 140 self.assertTrue(ar.wall_time < 1.)
141 141 self.assertTrue(ar.wall_time > 0.2)
142 142
143 143 def test_wall_time_multi(self):
144 144 self.minimum_engines(4)
145 145 v = self.client[:]
146 146 ar = v.apply_async(time.sleep, 0.25)
147 147 self.assertRaises(TimeoutError, getattr, ar, 'wall_time')
148 148 ar.get(2)
149 149 self.assertTrue(ar.wall_time < 1.)
150 150 self.assertTrue(ar.wall_time > 0.2)
151 151
152 152 def test_serial_time_single(self):
153 153 v = self.client.load_balanced_view()
154 154 ar = v.apply_async(time.sleep, 0.25)
155 155 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
156 156 ar.get(2)
157 157 self.assertTrue(ar.serial_time < 1.)
158 158 self.assertTrue(ar.serial_time > 0.2)
159 159
160 160 def test_serial_time_multi(self):
161 161 self.minimum_engines(4)
162 162 v = self.client[:]
163 163 ar = v.apply_async(time.sleep, 0.25)
164 164 self.assertRaises(TimeoutError, getattr, ar, 'serial_time')
165 165 ar.get(2)
166 166 self.assertTrue(ar.serial_time < 2.)
167 167 self.assertTrue(ar.serial_time > 0.8)
168 168
169 169 def test_elapsed_single(self):
170 170 v = self.client.load_balanced_view()
171 171 ar = v.apply_async(time.sleep, 0.25)
172 172 while not ar.ready():
173 173 time.sleep(0.01)
174 174 self.assertTrue(ar.elapsed < 1)
175 175 self.assertTrue(ar.elapsed < 1)
176 176 ar.get(2)
177 177
178 178 def test_elapsed_multi(self):
179 179 v = self.client[:]
180 180 ar = v.apply_async(time.sleep, 0.25)
181 181 while not ar.ready():
182 182 time.sleep(0.01)
183 183 self.assertTrue(ar.elapsed < 1)
184 184 self.assertTrue(ar.elapsed < 1)
185 185 ar.get(2)
186 186
187 187 def test_hubresult_timestamps(self):
188 188 self.minimum_engines(4)
189 189 v = self.client[:]
190 190 ar = v.apply_async(time.sleep, 0.25)
191 191 ar.get(2)
192 192 rc2 = Client(profile='iptest')
193 193 # must have try/finally to close second Client, otherwise
194 194 # will have dangling sockets causing problems
195 195 try:
196 196 time.sleep(0.25)
197 197 hr = rc2.get_result(ar.msg_ids)
198 198 self.assertTrue(hr.elapsed > 0., "got bad elapsed: %s" % hr.elapsed)
199 199 hr.get(1)
200 200 self.assertTrue(hr.wall_time < ar.wall_time + 0.2, "got bad wall_time: %s > %s" % (hr.wall_time, ar.wall_time))
201 201 self.assertEquals(hr.serial_time, ar.serial_time)
202 202 finally:
203 203 rc2.close()
204 204
205 def test_display_empty_streams_single(self):
206 """empty stdout/err are not displayed (single result)"""
207 self.minimum_engines(1)
208
209 v = self.client[-1]
210 ar = v.execute("print (5555)")
211 ar.get(5)
212 with capture_output() as io:
213 ar.display_outputs()
214 self.assertEquals(io.stderr, '')
215 self.assertTrue('5555' in io.stdout)
216
217 ar = v.execute("a=5")
218 ar.get(5)
219 with capture_output() as io:
220 ar.display_outputs()
221 self.assertEquals(io.stderr, '')
222 self.assertEquals(io.stdout, '')
223
224 def test_display_empty_streams_type(self):
225 """empty stdout/err are not displayed (groupby type)"""
226 self.minimum_engines(1)
227
228 v = self.client[:]
229 ar = v.execute("print (5555)")
230 ar.get(5)
231 with capture_output() as io:
232 ar.display_outputs()
233 self.assertEquals(io.stderr, '')
234 self.assertEquals(io.stdout.count('5555'), len(v), io.stdout)
235 self.assertEquals(io.stdout.count('[stdout:'), len(v), io.stdout)
236
237 ar = v.execute("a=5")
238 ar.get(5)
239 with capture_output() as io:
240 ar.display_outputs()
241 self.assertEquals(io.stderr, '')
242 self.assertEquals(io.stdout, '')
243
244 def test_display_empty_streams_engine(self):
245 """empty stdout/err are not displayed (groupby engine)"""
246 self.minimum_engines(1)
247
248 v = self.client[:]
249 ar = v.execute("print (5555)")
250 ar.get(5)
251 with capture_output() as io:
252 ar.display_outputs('engine')
253 self.assertEquals(io.stderr, '')
254 self.assertEquals(io.stdout.count('5555'), len(v), io.stdout)
255 self.assertEquals(io.stdout.count('[stdout:'), len(v), io.stdout)
256
257 ar = v.execute("a=5")
258 ar.get(5)
259 with capture_output() as io:
260 ar.display_outputs('engine')
261 self.assertEquals(io.stderr, '')
262 self.assertEquals(io.stdout, '')
263
205 264
General Comments 0
You need to be logged in to leave comments. Login now