##// END OF EJS Templates
s/assertNotEquals/assertNotEqual/
Bradley M. Froehle -
Show More
@@ -1,455 +1,455 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython import parallel
27 from IPython import parallel
28 from IPython.parallel.client import client as clientmod
28 from IPython.parallel.client import client as clientmod
29 from IPython.parallel import error
29 from IPython.parallel import error
30 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import AsyncResult, AsyncHubResult
31 from IPython.parallel import LoadBalancedView, DirectView
31 from IPython.parallel import LoadBalancedView, DirectView
32
32
33 from clienttest import ClusterTestCase, segfault, wait, add_engines
33 from clienttest import ClusterTestCase, segfault, wait, add_engines
34
34
35 def setup():
35 def setup():
36 add_engines(4, total=True)
36 add_engines(4, total=True)
37
37
38 class TestClient(ClusterTestCase):
38 class TestClient(ClusterTestCase):
39
39
40 def test_ids(self):
40 def test_ids(self):
41 n = len(self.client.ids)
41 n = len(self.client.ids)
42 self.add_engines(2)
42 self.add_engines(2)
43 self.assertEqual(len(self.client.ids), n+2)
43 self.assertEqual(len(self.client.ids), n+2)
44
44
45 def test_view_indexing(self):
45 def test_view_indexing(self):
46 """test index access for views"""
46 """test index access for views"""
47 self.minimum_engines(4)
47 self.minimum_engines(4)
48 targets = self.client._build_targets('all')[-1]
48 targets = self.client._build_targets('all')[-1]
49 v = self.client[:]
49 v = self.client[:]
50 self.assertEqual(v.targets, targets)
50 self.assertEqual(v.targets, targets)
51 t = self.client.ids[2]
51 t = self.client.ids[2]
52 v = self.client[t]
52 v = self.client[t]
53 self.assertTrue(isinstance(v, DirectView))
53 self.assertTrue(isinstance(v, DirectView))
54 self.assertEqual(v.targets, t)
54 self.assertEqual(v.targets, t)
55 t = self.client.ids[2:4]
55 t = self.client.ids[2:4]
56 v = self.client[t]
56 v = self.client[t]
57 self.assertTrue(isinstance(v, DirectView))
57 self.assertTrue(isinstance(v, DirectView))
58 self.assertEqual(v.targets, t)
58 self.assertEqual(v.targets, t)
59 v = self.client[::2]
59 v = self.client[::2]
60 self.assertTrue(isinstance(v, DirectView))
60 self.assertTrue(isinstance(v, DirectView))
61 self.assertEqual(v.targets, targets[::2])
61 self.assertEqual(v.targets, targets[::2])
62 v = self.client[1::3]
62 v = self.client[1::3]
63 self.assertTrue(isinstance(v, DirectView))
63 self.assertTrue(isinstance(v, DirectView))
64 self.assertEqual(v.targets, targets[1::3])
64 self.assertEqual(v.targets, targets[1::3])
65 v = self.client[:-3]
65 v = self.client[:-3]
66 self.assertTrue(isinstance(v, DirectView))
66 self.assertTrue(isinstance(v, DirectView))
67 self.assertEqual(v.targets, targets[:-3])
67 self.assertEqual(v.targets, targets[:-3])
68 v = self.client[-1]
68 v = self.client[-1]
69 self.assertTrue(isinstance(v, DirectView))
69 self.assertTrue(isinstance(v, DirectView))
70 self.assertEqual(v.targets, targets[-1])
70 self.assertEqual(v.targets, targets[-1])
71 self.assertRaises(TypeError, lambda : self.client[None])
71 self.assertRaises(TypeError, lambda : self.client[None])
72
72
73 def test_lbview_targets(self):
73 def test_lbview_targets(self):
74 """test load_balanced_view targets"""
74 """test load_balanced_view targets"""
75 v = self.client.load_balanced_view()
75 v = self.client.load_balanced_view()
76 self.assertEqual(v.targets, None)
76 self.assertEqual(v.targets, None)
77 v = self.client.load_balanced_view(-1)
77 v = self.client.load_balanced_view(-1)
78 self.assertEqual(v.targets, [self.client.ids[-1]])
78 self.assertEqual(v.targets, [self.client.ids[-1]])
79 v = self.client.load_balanced_view('all')
79 v = self.client.load_balanced_view('all')
80 self.assertEqual(v.targets, None)
80 self.assertEqual(v.targets, None)
81
81
82 def test_dview_targets(self):
82 def test_dview_targets(self):
83 """test direct_view targets"""
83 """test direct_view targets"""
84 v = self.client.direct_view()
84 v = self.client.direct_view()
85 self.assertEqual(v.targets, 'all')
85 self.assertEqual(v.targets, 'all')
86 v = self.client.direct_view('all')
86 v = self.client.direct_view('all')
87 self.assertEqual(v.targets, 'all')
87 self.assertEqual(v.targets, 'all')
88 v = self.client.direct_view(-1)
88 v = self.client.direct_view(-1)
89 self.assertEqual(v.targets, self.client.ids[-1])
89 self.assertEqual(v.targets, self.client.ids[-1])
90
90
91 def test_lazy_all_targets(self):
91 def test_lazy_all_targets(self):
92 """test lazy evaluation of rc.direct_view('all')"""
92 """test lazy evaluation of rc.direct_view('all')"""
93 v = self.client.direct_view()
93 v = self.client.direct_view()
94 self.assertEqual(v.targets, 'all')
94 self.assertEqual(v.targets, 'all')
95
95
96 def double(x):
96 def double(x):
97 return x*2
97 return x*2
98 seq = range(100)
98 seq = range(100)
99 ref = [ double(x) for x in seq ]
99 ref = [ double(x) for x in seq ]
100
100
101 # add some engines, which should be used
101 # add some engines, which should be used
102 self.add_engines(1)
102 self.add_engines(1)
103 n1 = len(self.client.ids)
103 n1 = len(self.client.ids)
104
104
105 # simple apply
105 # simple apply
106 r = v.apply_sync(lambda : 1)
106 r = v.apply_sync(lambda : 1)
107 self.assertEqual(r, [1] * n1)
107 self.assertEqual(r, [1] * n1)
108
108
109 # map goes through remotefunction
109 # map goes through remotefunction
110 r = v.map_sync(double, seq)
110 r = v.map_sync(double, seq)
111 self.assertEqual(r, ref)
111 self.assertEqual(r, ref)
112
112
113 # add a couple more engines, and try again
113 # add a couple more engines, and try again
114 self.add_engines(2)
114 self.add_engines(2)
115 n2 = len(self.client.ids)
115 n2 = len(self.client.ids)
116 self.assertNotEquals(n2, n1)
116 self.assertNotEqual(n2, n1)
117
117
118 # apply
118 # apply
119 r = v.apply_sync(lambda : 1)
119 r = v.apply_sync(lambda : 1)
120 self.assertEqual(r, [1] * n2)
120 self.assertEqual(r, [1] * n2)
121
121
122 # map
122 # map
123 r = v.map_sync(double, seq)
123 r = v.map_sync(double, seq)
124 self.assertEqual(r, ref)
124 self.assertEqual(r, ref)
125
125
126 def test_targets(self):
126 def test_targets(self):
127 """test various valid targets arguments"""
127 """test various valid targets arguments"""
128 build = self.client._build_targets
128 build = self.client._build_targets
129 ids = self.client.ids
129 ids = self.client.ids
130 idents,targets = build(None)
130 idents,targets = build(None)
131 self.assertEqual(ids, targets)
131 self.assertEqual(ids, targets)
132
132
133 def test_clear(self):
133 def test_clear(self):
134 """test clear behavior"""
134 """test clear behavior"""
135 self.minimum_engines(2)
135 self.minimum_engines(2)
136 v = self.client[:]
136 v = self.client[:]
137 v.block=True
137 v.block=True
138 v.push(dict(a=5))
138 v.push(dict(a=5))
139 v.pull('a')
139 v.pull('a')
140 id0 = self.client.ids[-1]
140 id0 = self.client.ids[-1]
141 self.client.clear(targets=id0, block=True)
141 self.client.clear(targets=id0, block=True)
142 a = self.client[:-1].get('a')
142 a = self.client[:-1].get('a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
144 self.client.clear(block=True)
144 self.client.clear(block=True)
145 for i in self.client.ids:
145 for i in self.client.ids:
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147
147
148 def test_get_result(self):
148 def test_get_result(self):
149 """test getting results from the Hub."""
149 """test getting results from the Hub."""
150 c = clientmod.Client(profile='iptest')
150 c = clientmod.Client(profile='iptest')
151 t = c.ids[-1]
151 t = c.ids[-1]
152 ar = c[t].apply_async(wait, 1)
152 ar = c[t].apply_async(wait, 1)
153 # give the monitor time to notice the message
153 # give the monitor time to notice the message
154 time.sleep(.25)
154 time.sleep(.25)
155 ahr = self.client.get_result(ar.msg_ids)
155 ahr = self.client.get_result(ar.msg_ids)
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertEqual(ahr.get(), ar.get())
157 self.assertEqual(ahr.get(), ar.get())
158 ar2 = self.client.get_result(ar.msg_ids)
158 ar2 = self.client.get_result(ar.msg_ids)
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 c.close()
160 c.close()
161
161
162 def test_get_execute_result(self):
162 def test_get_execute_result(self):
163 """test getting execute results from the Hub."""
163 """test getting execute results from the Hub."""
164 c = clientmod.Client(profile='iptest')
164 c = clientmod.Client(profile='iptest')
165 t = c.ids[-1]
165 t = c.ids[-1]
166 cell = '\n'.join([
166 cell = '\n'.join([
167 'import time',
167 'import time',
168 'time.sleep(0.25)',
168 'time.sleep(0.25)',
169 '5'
169 '5'
170 ])
170 ])
171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
172 # give the monitor time to notice the message
172 # give the monitor time to notice the message
173 time.sleep(.25)
173 time.sleep(.25)
174 ahr = self.client.get_result(ar.msg_ids)
174 ahr = self.client.get_result(ar.msg_ids)
175 self.assertTrue(isinstance(ahr, AsyncHubResult))
175 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
176 self.assertEqual(ahr.get().pyout, ar.get().pyout)
177 ar2 = self.client.get_result(ar.msg_ids)
177 ar2 = self.client.get_result(ar.msg_ids)
178 self.assertFalse(isinstance(ar2, AsyncHubResult))
178 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 c.close()
179 c.close()
180
180
181 def test_ids_list(self):
181 def test_ids_list(self):
182 """test client.ids"""
182 """test client.ids"""
183 ids = self.client.ids
183 ids = self.client.ids
184 self.assertEqual(ids, self.client._ids)
184 self.assertEqual(ids, self.client._ids)
185 self.assertFalse(ids is self.client._ids)
185 self.assertFalse(ids is self.client._ids)
186 ids.remove(ids[-1])
186 ids.remove(ids[-1])
187 self.assertNotEquals(ids, self.client._ids)
187 self.assertNotEqual(ids, self.client._ids)
188
188
189 def test_queue_status(self):
189 def test_queue_status(self):
190 ids = self.client.ids
190 ids = self.client.ids
191 id0 = ids[0]
191 id0 = ids[0]
192 qs = self.client.queue_status(targets=id0)
192 qs = self.client.queue_status(targets=id0)
193 self.assertTrue(isinstance(qs, dict))
193 self.assertTrue(isinstance(qs, dict))
194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
194 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
195 allqs = self.client.queue_status()
195 allqs = self.client.queue_status()
196 self.assertTrue(isinstance(allqs, dict))
196 self.assertTrue(isinstance(allqs, dict))
197 intkeys = list(allqs.keys())
197 intkeys = list(allqs.keys())
198 intkeys.remove('unassigned')
198 intkeys.remove('unassigned')
199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
199 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
200 unassigned = allqs.pop('unassigned')
200 unassigned = allqs.pop('unassigned')
201 for eid,qs in allqs.items():
201 for eid,qs in allqs.items():
202 self.assertTrue(isinstance(qs, dict))
202 self.assertTrue(isinstance(qs, dict))
203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
203 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
204
204
205 def test_shutdown(self):
205 def test_shutdown(self):
206 ids = self.client.ids
206 ids = self.client.ids
207 id0 = ids[0]
207 id0 = ids[0]
208 self.client.shutdown(id0, block=True)
208 self.client.shutdown(id0, block=True)
209 while id0 in self.client.ids:
209 while id0 in self.client.ids:
210 time.sleep(0.1)
210 time.sleep(0.1)
211 self.client.spin()
211 self.client.spin()
212
212
213 self.assertRaises(IndexError, lambda : self.client[id0])
213 self.assertRaises(IndexError, lambda : self.client[id0])
214
214
215 def test_result_status(self):
215 def test_result_status(self):
216 pass
216 pass
217 # to be written
217 # to be written
218
218
219 def test_db_query_dt(self):
219 def test_db_query_dt(self):
220 """test db query by date"""
220 """test db query by date"""
221 hist = self.client.hub_history()
221 hist = self.client.hub_history()
222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
223 tic = middle['submitted']
223 tic = middle['submitted']
224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
226 self.assertEqual(len(before)+len(after),len(hist))
226 self.assertEqual(len(before)+len(after),len(hist))
227 for b in before:
227 for b in before:
228 self.assertTrue(b['submitted'] < tic)
228 self.assertTrue(b['submitted'] < tic)
229 for a in after:
229 for a in after:
230 self.assertTrue(a['submitted'] >= tic)
230 self.assertTrue(a['submitted'] >= tic)
231 same = self.client.db_query({'submitted' : tic})
231 same = self.client.db_query({'submitted' : tic})
232 for s in same:
232 for s in same:
233 self.assertTrue(s['submitted'] == tic)
233 self.assertTrue(s['submitted'] == tic)
234
234
235 def test_db_query_keys(self):
235 def test_db_query_keys(self):
236 """test extracting subset of record keys"""
236 """test extracting subset of record keys"""
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
238 for rec in found:
238 for rec in found:
239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
239 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
240
240
241 def test_db_query_default_keys(self):
241 def test_db_query_default_keys(self):
242 """default db_query excludes buffers"""
242 """default db_query excludes buffers"""
243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
244 for rec in found:
244 for rec in found:
245 keys = set(rec.keys())
245 keys = set(rec.keys())
246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
248
248
249 def test_db_query_msg_id(self):
249 def test_db_query_msg_id(self):
250 """ensure msg_id is always in db queries"""
250 """ensure msg_id is always in db queries"""
251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
252 for rec in found:
252 for rec in found:
253 self.assertTrue('msg_id' in rec.keys())
253 self.assertTrue('msg_id' in rec.keys())
254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
255 for rec in found:
255 for rec in found:
256 self.assertTrue('msg_id' in rec.keys())
256 self.assertTrue('msg_id' in rec.keys())
257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
258 for rec in found:
258 for rec in found:
259 self.assertTrue('msg_id' in rec.keys())
259 self.assertTrue('msg_id' in rec.keys())
260
260
261 def test_db_query_get_result(self):
261 def test_db_query_get_result(self):
262 """pop in db_query shouldn't pop from result itself"""
262 """pop in db_query shouldn't pop from result itself"""
263 self.client[:].apply_sync(lambda : 1)
263 self.client[:].apply_sync(lambda : 1)
264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
265 rc2 = clientmod.Client(profile='iptest')
265 rc2 = clientmod.Client(profile='iptest')
266 # If this bug is not fixed, this call will hang:
266 # If this bug is not fixed, this call will hang:
267 ar = rc2.get_result(self.client.history[-1])
267 ar = rc2.get_result(self.client.history[-1])
268 ar.wait(2)
268 ar.wait(2)
269 self.assertTrue(ar.ready())
269 self.assertTrue(ar.ready())
270 ar.get()
270 ar.get()
271 rc2.close()
271 rc2.close()
272
272
273 def test_db_query_in(self):
273 def test_db_query_in(self):
274 """test db query with '$in','$nin' operators"""
274 """test db query with '$in','$nin' operators"""
275 hist = self.client.hub_history()
275 hist = self.client.hub_history()
276 even = hist[::2]
276 even = hist[::2]
277 odd = hist[1::2]
277 odd = hist[1::2]
278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
279 found = [ r['msg_id'] for r in recs ]
279 found = [ r['msg_id'] for r in recs ]
280 self.assertEqual(set(even), set(found))
280 self.assertEqual(set(even), set(found))
281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
282 found = [ r['msg_id'] for r in recs ]
282 found = [ r['msg_id'] for r in recs ]
283 self.assertEqual(set(odd), set(found))
283 self.assertEqual(set(odd), set(found))
284
284
285 def test_hub_history(self):
285 def test_hub_history(self):
286 hist = self.client.hub_history()
286 hist = self.client.hub_history()
287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
288 recdict = {}
288 recdict = {}
289 for rec in recs:
289 for rec in recs:
290 recdict[rec['msg_id']] = rec
290 recdict[rec['msg_id']] = rec
291
291
292 latest = datetime(1984,1,1)
292 latest = datetime(1984,1,1)
293 for msg_id in hist:
293 for msg_id in hist:
294 rec = recdict[msg_id]
294 rec = recdict[msg_id]
295 newt = rec['submitted']
295 newt = rec['submitted']
296 self.assertTrue(newt >= latest)
296 self.assertTrue(newt >= latest)
297 latest = newt
297 latest = newt
298 ar = self.client[-1].apply_async(lambda : 1)
298 ar = self.client[-1].apply_async(lambda : 1)
299 ar.get()
299 ar.get()
300 time.sleep(0.25)
300 time.sleep(0.25)
301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
301 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
302
302
303 def _wait_for_idle(self):
303 def _wait_for_idle(self):
304 """wait for an engine to become idle, according to the Hub"""
304 """wait for an engine to become idle, according to the Hub"""
305 rc = self.client
305 rc = self.client
306
306
307 # timeout 5s, polling every 100ms
307 # timeout 5s, polling every 100ms
308 qs = rc.queue_status()
308 qs = rc.queue_status()
309 for i in range(50):
309 for i in range(50):
310 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
310 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
311 time.sleep(0.1)
311 time.sleep(0.1)
312 qs = rc.queue_status()
312 qs = rc.queue_status()
313 else:
313 else:
314 break
314 break
315
315
316 # ensure Hub up to date:
316 # ensure Hub up to date:
317 self.assertEqual(qs['unassigned'], 0)
317 self.assertEqual(qs['unassigned'], 0)
318 for eid in rc.ids:
318 for eid in rc.ids:
319 self.assertEqual(qs[eid]['tasks'], 0)
319 self.assertEqual(qs[eid]['tasks'], 0)
320
320
321
321
322 def test_resubmit(self):
322 def test_resubmit(self):
323 def f():
323 def f():
324 import random
324 import random
325 return random.random()
325 return random.random()
326 v = self.client.load_balanced_view()
326 v = self.client.load_balanced_view()
327 ar = v.apply_async(f)
327 ar = v.apply_async(f)
328 r1 = ar.get(1)
328 r1 = ar.get(1)
329 # give the Hub a chance to notice:
329 # give the Hub a chance to notice:
330 self._wait_for_idle()
330 self._wait_for_idle()
331 ahr = self.client.resubmit(ar.msg_ids)
331 ahr = self.client.resubmit(ar.msg_ids)
332 r2 = ahr.get(1)
332 r2 = ahr.get(1)
333 self.assertFalse(r1 == r2)
333 self.assertFalse(r1 == r2)
334
334
335 def test_resubmit_chain(self):
335 def test_resubmit_chain(self):
336 """resubmit resubmitted tasks"""
336 """resubmit resubmitted tasks"""
337 v = self.client.load_balanced_view()
337 v = self.client.load_balanced_view()
338 ar = v.apply_async(lambda x: x, 'x'*1024)
338 ar = v.apply_async(lambda x: x, 'x'*1024)
339 ar.get()
339 ar.get()
340 self._wait_for_idle()
340 self._wait_for_idle()
341 ars = [ar]
341 ars = [ar]
342
342
343 for i in range(10):
343 for i in range(10):
344 ar = ars[-1]
344 ar = ars[-1]
345 ar2 = self.client.resubmit(ar.msg_ids)
345 ar2 = self.client.resubmit(ar.msg_ids)
346
346
347 [ ar.get() for ar in ars ]
347 [ ar.get() for ar in ars ]
348
348
349 def test_resubmit_header(self):
349 def test_resubmit_header(self):
350 """resubmit shouldn't clobber the whole header"""
350 """resubmit shouldn't clobber the whole header"""
351 def f():
351 def f():
352 import random
352 import random
353 return random.random()
353 return random.random()
354 v = self.client.load_balanced_view()
354 v = self.client.load_balanced_view()
355 v.retries = 1
355 v.retries = 1
356 ar = v.apply_async(f)
356 ar = v.apply_async(f)
357 r1 = ar.get(1)
357 r1 = ar.get(1)
358 # give the Hub a chance to notice:
358 # give the Hub a chance to notice:
359 self._wait_for_idle()
359 self._wait_for_idle()
360 ahr = self.client.resubmit(ar.msg_ids)
360 ahr = self.client.resubmit(ar.msg_ids)
361 ahr.get(1)
361 ahr.get(1)
362 time.sleep(0.5)
362 time.sleep(0.5)
363 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
363 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
364 h1,h2 = [ r['header'] for r in records ]
364 h1,h2 = [ r['header'] for r in records ]
365 for key in set(h1.keys()).union(set(h2.keys())):
365 for key in set(h1.keys()).union(set(h2.keys())):
366 if key in ('msg_id', 'date'):
366 if key in ('msg_id', 'date'):
367 self.assertNotEquals(h1[key], h2[key])
367 self.assertNotEqual(h1[key], h2[key])
368 else:
368 else:
369 self.assertEqual(h1[key], h2[key])
369 self.assertEqual(h1[key], h2[key])
370
370
371 def test_resubmit_aborted(self):
371 def test_resubmit_aborted(self):
372 def f():
372 def f():
373 import random
373 import random
374 return random.random()
374 return random.random()
375 v = self.client.load_balanced_view()
375 v = self.client.load_balanced_view()
376 # restrict to one engine, so we can put a sleep
376 # restrict to one engine, so we can put a sleep
377 # ahead of the task, so it will get aborted
377 # ahead of the task, so it will get aborted
378 eid = self.client.ids[-1]
378 eid = self.client.ids[-1]
379 v.targets = [eid]
379 v.targets = [eid]
380 sleep = v.apply_async(time.sleep, 0.5)
380 sleep = v.apply_async(time.sleep, 0.5)
381 ar = v.apply_async(f)
381 ar = v.apply_async(f)
382 ar.abort()
382 ar.abort()
383 self.assertRaises(error.TaskAborted, ar.get)
383 self.assertRaises(error.TaskAborted, ar.get)
384 # Give the Hub a chance to get up to date:
384 # Give the Hub a chance to get up to date:
385 self._wait_for_idle()
385 self._wait_for_idle()
386 ahr = self.client.resubmit(ar.msg_ids)
386 ahr = self.client.resubmit(ar.msg_ids)
387 r2 = ahr.get(1)
387 r2 = ahr.get(1)
388
388
389 def test_resubmit_inflight(self):
389 def test_resubmit_inflight(self):
390 """resubmit of inflight task"""
390 """resubmit of inflight task"""
391 v = self.client.load_balanced_view()
391 v = self.client.load_balanced_view()
392 ar = v.apply_async(time.sleep,1)
392 ar = v.apply_async(time.sleep,1)
393 # give the message a chance to arrive
393 # give the message a chance to arrive
394 time.sleep(0.2)
394 time.sleep(0.2)
395 ahr = self.client.resubmit(ar.msg_ids)
395 ahr = self.client.resubmit(ar.msg_ids)
396 ar.get(2)
396 ar.get(2)
397 ahr.get(2)
397 ahr.get(2)
398
398
399 def test_resubmit_badkey(self):
399 def test_resubmit_badkey(self):
400 """ensure KeyError on resubmit of nonexistant task"""
400 """ensure KeyError on resubmit of nonexistant task"""
401 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
401 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
402
402
403 def test_purge_results(self):
403 def test_purge_results(self):
404 # ensure there are some tasks
404 # ensure there are some tasks
405 for i in range(5):
405 for i in range(5):
406 self.client[:].apply_sync(lambda : 1)
406 self.client[:].apply_sync(lambda : 1)
407 # Wait for the Hub to realise the result is done:
407 # Wait for the Hub to realise the result is done:
408 # This prevents a race condition, where we
408 # This prevents a race condition, where we
409 # might purge a result the Hub still thinks is pending.
409 # might purge a result the Hub still thinks is pending.
410 time.sleep(0.1)
410 time.sleep(0.1)
411 rc2 = clientmod.Client(profile='iptest')
411 rc2 = clientmod.Client(profile='iptest')
412 hist = self.client.hub_history()
412 hist = self.client.hub_history()
413 ahr = rc2.get_result([hist[-1]])
413 ahr = rc2.get_result([hist[-1]])
414 ahr.wait(10)
414 ahr.wait(10)
415 self.client.purge_results(hist[-1])
415 self.client.purge_results(hist[-1])
416 newhist = self.client.hub_history()
416 newhist = self.client.hub_history()
417 self.assertEqual(len(newhist)+1,len(hist))
417 self.assertEqual(len(newhist)+1,len(hist))
418 rc2.spin()
418 rc2.spin()
419 rc2.close()
419 rc2.close()
420
420
421 def test_purge_all_results(self):
421 def test_purge_all_results(self):
422 self.client.purge_results('all')
422 self.client.purge_results('all')
423 hist = self.client.hub_history()
423 hist = self.client.hub_history()
424 self.assertEqual(len(hist), 0)
424 self.assertEqual(len(hist), 0)
425
425
426 def test_spin_thread(self):
426 def test_spin_thread(self):
427 self.client.spin_thread(0.01)
427 self.client.spin_thread(0.01)
428 ar = self.client[-1].apply_async(lambda : 1)
428 ar = self.client[-1].apply_async(lambda : 1)
429 time.sleep(0.1)
429 time.sleep(0.1)
430 self.assertTrue(ar.wall_time < 0.1,
430 self.assertTrue(ar.wall_time < 0.1,
431 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
431 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
432 )
432 )
433
433
434 def test_stop_spin_thread(self):
434 def test_stop_spin_thread(self):
435 self.client.spin_thread(0.01)
435 self.client.spin_thread(0.01)
436 self.client.stop_spin_thread()
436 self.client.stop_spin_thread()
437 ar = self.client[-1].apply_async(lambda : 1)
437 ar = self.client[-1].apply_async(lambda : 1)
438 time.sleep(0.15)
438 time.sleep(0.15)
439 self.assertTrue(ar.wall_time > 0.1,
439 self.assertTrue(ar.wall_time > 0.1,
440 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
440 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
441 )
441 )
442
442
443 def test_activate(self):
443 def test_activate(self):
444 ip = get_ipython()
444 ip = get_ipython()
445 magics = ip.magics_manager.magics
445 magics = ip.magics_manager.magics
446 self.assertTrue('px' in magics['line'])
446 self.assertTrue('px' in magics['line'])
447 self.assertTrue('px' in magics['cell'])
447 self.assertTrue('px' in magics['cell'])
448 v0 = self.client.activate(-1, '0')
448 v0 = self.client.activate(-1, '0')
449 self.assertTrue('px0' in magics['line'])
449 self.assertTrue('px0' in magics['line'])
450 self.assertTrue('px0' in magics['cell'])
450 self.assertTrue('px0' in magics['cell'])
451 self.assertEqual(v0.targets, self.client.ids[-1])
451 self.assertEqual(v0.targets, self.client.ids[-1])
452 v0 = self.client.activate('all', 'all')
452 v0 = self.client.activate('all', 'all')
453 self.assertTrue('pxall' in magics['line'])
453 self.assertTrue('pxall' in magics['line'])
454 self.assertTrue('pxall' in magics['cell'])
454 self.assertTrue('pxall' in magics['cell'])
455 self.assertEqual(v0.targets, 'all')
455 self.assertEqual(v0.targets, 'all')
@@ -1,176 +1,176 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test LoadBalancedView objects
2 """test LoadBalancedView objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 import zmq
22 import zmq
23 from nose import SkipTest
23 from nose import SkipTest
24
24
25 from IPython import parallel as pmod
25 from IPython import parallel as pmod
26 from IPython.parallel import error
26 from IPython.parallel import error
27
27
28 from IPython.parallel.tests import add_engines
28 from IPython.parallel.tests import add_engines
29
29
30 from .clienttest import ClusterTestCase, crash, wait, skip_without
30 from .clienttest import ClusterTestCase, crash, wait, skip_without
31
31
32 def setup():
32 def setup():
33 add_engines(3, total=True)
33 add_engines(3, total=True)
34
34
35 class TestLoadBalancedView(ClusterTestCase):
35 class TestLoadBalancedView(ClusterTestCase):
36
36
37 def setUp(self):
37 def setUp(self):
38 ClusterTestCase.setUp(self)
38 ClusterTestCase.setUp(self)
39 self.view = self.client.load_balanced_view()
39 self.view = self.client.load_balanced_view()
40
40
41 def test_z_crash_task(self):
41 def test_z_crash_task(self):
42 """test graceful handling of engine death (balanced)"""
42 """test graceful handling of engine death (balanced)"""
43 raise SkipTest("crash tests disabled, due to undesirable crash reports")
43 raise SkipTest("crash tests disabled, due to undesirable crash reports")
44 # self.add_engines(1)
44 # self.add_engines(1)
45 ar = self.view.apply_async(crash)
45 ar = self.view.apply_async(crash)
46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 eid = ar.engine_id
47 eid = ar.engine_id
48 tic = time.time()
48 tic = time.time()
49 while eid in self.client.ids and time.time()-tic < 5:
49 while eid in self.client.ids and time.time()-tic < 5:
50 time.sleep(.01)
50 time.sleep(.01)
51 self.client.spin()
51 self.client.spin()
52 self.assertFalse(eid in self.client.ids, "Engine should have died")
52 self.assertFalse(eid in self.client.ids, "Engine should have died")
53
53
54 def test_map(self):
54 def test_map(self):
55 def f(x):
55 def f(x):
56 return x**2
56 return x**2
57 data = range(16)
57 data = range(16)
58 r = self.view.map_sync(f, data)
58 r = self.view.map_sync(f, data)
59 self.assertEqual(r, map(f, data))
59 self.assertEqual(r, map(f, data))
60
60
61 def test_map_unordered(self):
61 def test_map_unordered(self):
62 def f(x):
62 def f(x):
63 return x**2
63 return x**2
64 def slow_f(x):
64 def slow_f(x):
65 import time
65 import time
66 time.sleep(0.05*x)
66 time.sleep(0.05*x)
67 return x**2
67 return x**2
68 data = range(16,0,-1)
68 data = range(16,0,-1)
69 reference = map(f, data)
69 reference = map(f, data)
70
70
71 amr = self.view.map_async(slow_f, data, ordered=False)
71 amr = self.view.map_async(slow_f, data, ordered=False)
72 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
72 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
73 # check individual elements, retrieved as they come
73 # check individual elements, retrieved as they come
74 # list comprehension uses __iter__
74 # list comprehension uses __iter__
75 astheycame = [ r for r in amr ]
75 astheycame = [ r for r in amr ]
76 # Ensure that at least one result came out of order:
76 # Ensure that at least one result came out of order:
77 self.assertNotEquals(astheycame, reference, "should not have preserved order")
77 self.assertNotEqual(astheycame, reference, "should not have preserved order")
78 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
78 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
79
79
80 def test_map_ordered(self):
80 def test_map_ordered(self):
81 def f(x):
81 def f(x):
82 return x**2
82 return x**2
83 def slow_f(x):
83 def slow_f(x):
84 import time
84 import time
85 time.sleep(0.05*x)
85 time.sleep(0.05*x)
86 return x**2
86 return x**2
87 data = range(16,0,-1)
87 data = range(16,0,-1)
88 reference = map(f, data)
88 reference = map(f, data)
89
89
90 amr = self.view.map_async(slow_f, data)
90 amr = self.view.map_async(slow_f, data)
91 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
91 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
92 # check individual elements, retrieved as they come
92 # check individual elements, retrieved as they come
93 # list(amr) uses __iter__
93 # list(amr) uses __iter__
94 astheycame = list(amr)
94 astheycame = list(amr)
95 # Ensure that results came in order
95 # Ensure that results came in order
96 self.assertEqual(astheycame, reference)
96 self.assertEqual(astheycame, reference)
97 self.assertEqual(amr.result, reference)
97 self.assertEqual(amr.result, reference)
98
98
99 def test_map_iterable(self):
99 def test_map_iterable(self):
100 """test map on iterables (balanced)"""
100 """test map on iterables (balanced)"""
101 view = self.view
101 view = self.view
102 # 101 is prime, so it won't be evenly distributed
102 # 101 is prime, so it won't be evenly distributed
103 arr = range(101)
103 arr = range(101)
104 # so that it will be an iterator, even in Python 3
104 # so that it will be an iterator, even in Python 3
105 it = iter(arr)
105 it = iter(arr)
106 r = view.map_sync(lambda x:x, arr)
106 r = view.map_sync(lambda x:x, arr)
107 self.assertEqual(r, list(arr))
107 self.assertEqual(r, list(arr))
108
108
109
109
110 def test_abort(self):
110 def test_abort(self):
111 view = self.view
111 view = self.view
112 ar = self.client[:].apply_async(time.sleep, .5)
112 ar = self.client[:].apply_async(time.sleep, .5)
113 ar = self.client[:].apply_async(time.sleep, .5)
113 ar = self.client[:].apply_async(time.sleep, .5)
114 time.sleep(0.2)
114 time.sleep(0.2)
115 ar2 = view.apply_async(lambda : 2)
115 ar2 = view.apply_async(lambda : 2)
116 ar3 = view.apply_async(lambda : 3)
116 ar3 = view.apply_async(lambda : 3)
117 view.abort(ar2)
117 view.abort(ar2)
118 view.abort(ar3.msg_ids)
118 view.abort(ar3.msg_ids)
119 self.assertRaises(error.TaskAborted, ar2.get)
119 self.assertRaises(error.TaskAborted, ar2.get)
120 self.assertRaises(error.TaskAborted, ar3.get)
120 self.assertRaises(error.TaskAborted, ar3.get)
121
121
122 def test_retries(self):
122 def test_retries(self):
123 view = self.view
123 view = self.view
124 view.timeout = 1 # prevent hang if this doesn't behave
124 view.timeout = 1 # prevent hang if this doesn't behave
125 def fail():
125 def fail():
126 assert False
126 assert False
127 for r in range(len(self.client)-1):
127 for r in range(len(self.client)-1):
128 with view.temp_flags(retries=r):
128 with view.temp_flags(retries=r):
129 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
129 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
130
130
131 with view.temp_flags(retries=len(self.client), timeout=0.25):
131 with view.temp_flags(retries=len(self.client), timeout=0.25):
132 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
132 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
133
133
134 def test_invalid_dependency(self):
134 def test_invalid_dependency(self):
135 view = self.view
135 view = self.view
136 with view.temp_flags(after='12345'):
136 with view.temp_flags(after='12345'):
137 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
137 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
138
138
139 def test_impossible_dependency(self):
139 def test_impossible_dependency(self):
140 self.minimum_engines(2)
140 self.minimum_engines(2)
141 view = self.client.load_balanced_view()
141 view = self.client.load_balanced_view()
142 ar1 = view.apply_async(lambda : 1)
142 ar1 = view.apply_async(lambda : 1)
143 ar1.get()
143 ar1.get()
144 e1 = ar1.engine_id
144 e1 = ar1.engine_id
145 e2 = e1
145 e2 = e1
146 while e2 == e1:
146 while e2 == e1:
147 ar2 = view.apply_async(lambda : 1)
147 ar2 = view.apply_async(lambda : 1)
148 ar2.get()
148 ar2.get()
149 e2 = ar2.engine_id
149 e2 = ar2.engine_id
150
150
151 with view.temp_flags(follow=[ar1, ar2]):
151 with view.temp_flags(follow=[ar1, ar2]):
152 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
152 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
153
153
154
154
155 def test_follow(self):
155 def test_follow(self):
156 ar = self.view.apply_async(lambda : 1)
156 ar = self.view.apply_async(lambda : 1)
157 ar.get()
157 ar.get()
158 ars = []
158 ars = []
159 first_id = ar.engine_id
159 first_id = ar.engine_id
160
160
161 self.view.follow = ar
161 self.view.follow = ar
162 for i in range(5):
162 for i in range(5):
163 ars.append(self.view.apply_async(lambda : 1))
163 ars.append(self.view.apply_async(lambda : 1))
164 self.view.wait(ars)
164 self.view.wait(ars)
165 for ar in ars:
165 for ar in ars:
166 self.assertEqual(ar.engine_id, first_id)
166 self.assertEqual(ar.engine_id, first_id)
167
167
168 def test_after(self):
168 def test_after(self):
169 view = self.view
169 view = self.view
170 ar = view.apply_async(time.sleep, 0.5)
170 ar = view.apply_async(time.sleep, 0.5)
171 with view.temp_flags(after=ar):
171 with view.temp_flags(after=ar):
172 ar2 = view.apply_async(lambda : 1)
172 ar2 = view.apply_async(lambda : 1)
173
173
174 ar.wait()
174 ar.wait()
175 ar2.wait()
175 ar2.wait()
176 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
176 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
General Comments 0
You need to be logged in to leave comments. Login now