##// END OF EJS Templates
add db_query and hub_history tests to test_client
MinRK -
Show More
@@ -1,148 +1,214 b''
1 """Tests for parallel client.py"""
1 """Tests for parallel client.py"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import time
14 import time
15 from datetime import datetime
15 from tempfile import mktemp
16 from tempfile import mktemp
16
17
17 import zmq
18 import zmq
18
19
19 from IPython.parallel.client import client as clientmod
20 from IPython.parallel.client import client as clientmod
20 from IPython.parallel import error
21 from IPython.parallel import error
21 from IPython.parallel import AsyncResult, AsyncHubResult
22 from IPython.parallel import AsyncResult, AsyncHubResult
22 from IPython.parallel import LoadBalancedView, DirectView
23 from IPython.parallel import LoadBalancedView, DirectView
23
24
24 from clienttest import ClusterTestCase, segfault, wait, add_engines
25 from clienttest import ClusterTestCase, segfault, wait, add_engines
25
26
26 def setup():
27 def setup():
27 add_engines(4)
28 add_engines(4)
28
29
29 class TestClient(ClusterTestCase):
30 class TestClient(ClusterTestCase):
30
31
31 def test_ids(self):
32 def test_ids(self):
32 n = len(self.client.ids)
33 n = len(self.client.ids)
33 self.add_engines(3)
34 self.add_engines(3)
34 self.assertEquals(len(self.client.ids), n+3)
35 self.assertEquals(len(self.client.ids), n+3)
35
36
36 def test_view_indexing(self):
37 def test_view_indexing(self):
37 """test index access for views"""
38 """test index access for views"""
38 self.add_engines(2)
39 self.add_engines(2)
39 targets = self.client._build_targets('all')[-1]
40 targets = self.client._build_targets('all')[-1]
40 v = self.client[:]
41 v = self.client[:]
41 self.assertEquals(v.targets, targets)
42 self.assertEquals(v.targets, targets)
42 t = self.client.ids[2]
43 t = self.client.ids[2]
43 v = self.client[t]
44 v = self.client[t]
44 self.assert_(isinstance(v, DirectView))
45 self.assert_(isinstance(v, DirectView))
45 self.assertEquals(v.targets, t)
46 self.assertEquals(v.targets, t)
46 t = self.client.ids[2:4]
47 t = self.client.ids[2:4]
47 v = self.client[t]
48 v = self.client[t]
48 self.assert_(isinstance(v, DirectView))
49 self.assert_(isinstance(v, DirectView))
49 self.assertEquals(v.targets, t)
50 self.assertEquals(v.targets, t)
50 v = self.client[::2]
51 v = self.client[::2]
51 self.assert_(isinstance(v, DirectView))
52 self.assert_(isinstance(v, DirectView))
52 self.assertEquals(v.targets, targets[::2])
53 self.assertEquals(v.targets, targets[::2])
53 v = self.client[1::3]
54 v = self.client[1::3]
54 self.assert_(isinstance(v, DirectView))
55 self.assert_(isinstance(v, DirectView))
55 self.assertEquals(v.targets, targets[1::3])
56 self.assertEquals(v.targets, targets[1::3])
56 v = self.client[:-3]
57 v = self.client[:-3]
57 self.assert_(isinstance(v, DirectView))
58 self.assert_(isinstance(v, DirectView))
58 self.assertEquals(v.targets, targets[:-3])
59 self.assertEquals(v.targets, targets[:-3])
59 v = self.client[-1]
60 v = self.client[-1]
60 self.assert_(isinstance(v, DirectView))
61 self.assert_(isinstance(v, DirectView))
61 self.assertEquals(v.targets, targets[-1])
62 self.assertEquals(v.targets, targets[-1])
62 self.assertRaises(TypeError, lambda : self.client[None])
63 self.assertRaises(TypeError, lambda : self.client[None])
63
64
64 def test_lbview_targets(self):
65 def test_lbview_targets(self):
65 """test load_balanced_view targets"""
66 """test load_balanced_view targets"""
66 v = self.client.load_balanced_view()
67 v = self.client.load_balanced_view()
67 self.assertEquals(v.targets, None)
68 self.assertEquals(v.targets, None)
68 v = self.client.load_balanced_view(-1)
69 v = self.client.load_balanced_view(-1)
69 self.assertEquals(v.targets, [self.client.ids[-1]])
70 self.assertEquals(v.targets, [self.client.ids[-1]])
70 v = self.client.load_balanced_view('all')
71 v = self.client.load_balanced_view('all')
71 self.assertEquals(v.targets, self.client.ids)
72 self.assertEquals(v.targets, self.client.ids)
72
73
73 def test_targets(self):
74 def test_targets(self):
74 """test various valid targets arguments"""
75 """test various valid targets arguments"""
75 build = self.client._build_targets
76 build = self.client._build_targets
76 ids = self.client.ids
77 ids = self.client.ids
77 idents,targets = build(None)
78 idents,targets = build(None)
78 self.assertEquals(ids, targets)
79 self.assertEquals(ids, targets)
79
80
80 def test_clear(self):
81 def test_clear(self):
81 """test clear behavior"""
82 """test clear behavior"""
82 # self.add_engines(2)
83 # self.add_engines(2)
83 v = self.client[:]
84 v = self.client[:]
84 v.block=True
85 v.block=True
85 v.push(dict(a=5))
86 v.push(dict(a=5))
86 v.pull('a')
87 v.pull('a')
87 id0 = self.client.ids[-1]
88 id0 = self.client.ids[-1]
88 self.client.clear(targets=id0, block=True)
89 self.client.clear(targets=id0, block=True)
89 a = self.client[:-1].get('a')
90 a = self.client[:-1].get('a')
90 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
91 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
91 self.client.clear(block=True)
92 self.client.clear(block=True)
92 for i in self.client.ids:
93 for i in self.client.ids:
93 # print i
94 # print i
94 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
95 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
95
96
96 def test_get_result(self):
97 def test_get_result(self):
97 """test getting results from the Hub."""
98 """test getting results from the Hub."""
98 c = clientmod.Client(profile='iptest')
99 c = clientmod.Client(profile='iptest')
99 # self.add_engines(1)
100 # self.add_engines(1)
100 t = c.ids[-1]
101 t = c.ids[-1]
101 ar = c[t].apply_async(wait, 1)
102 ar = c[t].apply_async(wait, 1)
102 # give the monitor time to notice the message
103 # give the monitor time to notice the message
103 time.sleep(.25)
104 time.sleep(.25)
104 ahr = self.client.get_result(ar.msg_ids)
105 ahr = self.client.get_result(ar.msg_ids)
105 self.assertTrue(isinstance(ahr, AsyncHubResult))
106 self.assertTrue(isinstance(ahr, AsyncHubResult))
106 self.assertEquals(ahr.get(), ar.get())
107 self.assertEquals(ahr.get(), ar.get())
107 ar2 = self.client.get_result(ar.msg_ids)
108 ar2 = self.client.get_result(ar.msg_ids)
108 self.assertFalse(isinstance(ar2, AsyncHubResult))
109 self.assertFalse(isinstance(ar2, AsyncHubResult))
109 c.close()
110 c.close()
110
111
111 def test_ids_list(self):
112 def test_ids_list(self):
112 """test client.ids"""
113 """test client.ids"""
113 # self.add_engines(2)
114 # self.add_engines(2)
114 ids = self.client.ids
115 ids = self.client.ids
115 self.assertEquals(ids, self.client._ids)
116 self.assertEquals(ids, self.client._ids)
116 self.assertFalse(ids is self.client._ids)
117 self.assertFalse(ids is self.client._ids)
117 ids.remove(ids[-1])
118 ids.remove(ids[-1])
118 self.assertNotEquals(ids, self.client._ids)
119 self.assertNotEquals(ids, self.client._ids)
119
120
120 def test_queue_status(self):
121 def test_queue_status(self):
121 # self.addEngine(4)
122 # self.addEngine(4)
122 ids = self.client.ids
123 ids = self.client.ids
123 id0 = ids[0]
124 id0 = ids[0]
124 qs = self.client.queue_status(targets=id0)
125 qs = self.client.queue_status(targets=id0)
125 self.assertTrue(isinstance(qs, dict))
126 self.assertTrue(isinstance(qs, dict))
126 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
127 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
127 allqs = self.client.queue_status()
128 allqs = self.client.queue_status()
128 self.assertTrue(isinstance(allqs, dict))
129 self.assertTrue(isinstance(allqs, dict))
129 self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned']))
130 self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned']))
130 unassigned = allqs.pop('unassigned')
131 unassigned = allqs.pop('unassigned')
131 for eid,qs in allqs.items():
132 for eid,qs in allqs.items():
132 self.assertTrue(isinstance(qs, dict))
133 self.assertTrue(isinstance(qs, dict))
133 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
134 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
134
135
135 def test_shutdown(self):
136 def test_shutdown(self):
136 # self.addEngine(4)
137 # self.addEngine(4)
137 ids = self.client.ids
138 ids = self.client.ids
138 id0 = ids[0]
139 id0 = ids[0]
139 self.client.shutdown(id0, block=True)
140 self.client.shutdown(id0, block=True)
140 while id0 in self.client.ids:
141 while id0 in self.client.ids:
141 time.sleep(0.1)
142 time.sleep(0.1)
142 self.client.spin()
143 self.client.spin()
143
144
144 self.assertRaises(IndexError, lambda : self.client[id0])
145 self.assertRaises(IndexError, lambda : self.client[id0])
145
146
146 def test_result_status(self):
147 def test_result_status(self):
147 pass
148 pass
148 # to be written
149 # to be written
150
151 def test_db_query_dt(self):
152 """test db query by date"""
153 hist = self.client.hub_history()
154 middle = self.client.db_query({'msg_id' : hist[len(hist)/2]})[0]
155 tic = middle['submitted']
156 before = self.client.db_query({'submitted' : {'$lt' : tic}})
157 after = self.client.db_query({'submitted' : {'$gte' : tic}})
158 self.assertEquals(len(before)+len(after),len(hist))
159 for b in before:
160 self.assertTrue(b['submitted'] < tic)
161 for a in after:
162 self.assertTrue(a['submitted'] >= tic)
163 same = self.client.db_query({'submitted' : tic})
164 for s in same:
165 self.assertTrue(s['submitted'] == tic)
166
167 def test_db_query_keys(self):
168 """test extracting subset of record keys"""
169 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
170 for rec in found:
171 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
172
173 def test_db_query_msg_id(self):
174 """ensure msg_id is always in db queries"""
175 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
176 for rec in found:
177 self.assertTrue('msg_id' in rec.keys())
178 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
179 for rec in found:
180 self.assertTrue('msg_id' in rec.keys())
181 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
182 for rec in found:
183 self.assertTrue('msg_id' in rec.keys())
184
185 def test_db_query_in(self):
186 """test db query with '$in','$nin' operators"""
187 hist = self.client.hub_history()
188 even = hist[::2]
189 odd = hist[1::2]
190 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
191 found = [ r['msg_id'] for r in recs ]
192 self.assertEquals(set(even), set(found))
193 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
194 found = [ r['msg_id'] for r in recs ]
195 self.assertEquals(set(odd), set(found))
196
197 def test_hub_history(self):
198 hist = self.client.hub_history()
199 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
200 recdict = {}
201 for rec in recs:
202 recdict[rec['msg_id']] = rec
203
204 latest = datetime(1984,1,1)
205 for msg_id in hist:
206 rec = recdict[msg_id]
207 newt = rec['submitted']
208 self.assertTrue(newt >= latest)
209 latest = newt
210 ar = self.client[-1].apply_async(lambda : 1)
211 ar.get()
212 time.sleep(0.25)
213 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
214
General Comments 0
You need to be logged in to leave comments. Login now