##// END OF EJS Templates
add db_query and hub_history tests to test_client
MinRK -
Show More
@@ -1,148 +1,214 b''
1 1 """Tests for parallel client.py"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import time
15 from datetime import datetime
15 16 from tempfile import mktemp
16 17
17 18 import zmq
18 19
19 20 from IPython.parallel.client import client as clientmod
20 21 from IPython.parallel import error
21 22 from IPython.parallel import AsyncResult, AsyncHubResult
22 23 from IPython.parallel import LoadBalancedView, DirectView
23 24
24 25 from clienttest import ClusterTestCase, segfault, wait, add_engines
25 26
26 27 def setup():
27 28 add_engines(4)
28 29
29 30 class TestClient(ClusterTestCase):
30 31
31 32 def test_ids(self):
32 33 n = len(self.client.ids)
33 34 self.add_engines(3)
34 35 self.assertEquals(len(self.client.ids), n+3)
35 36
36 37 def test_view_indexing(self):
37 38 """test index access for views"""
38 39 self.add_engines(2)
39 40 targets = self.client._build_targets('all')[-1]
40 41 v = self.client[:]
41 42 self.assertEquals(v.targets, targets)
42 43 t = self.client.ids[2]
43 44 v = self.client[t]
44 45 self.assert_(isinstance(v, DirectView))
45 46 self.assertEquals(v.targets, t)
46 47 t = self.client.ids[2:4]
47 48 v = self.client[t]
48 49 self.assert_(isinstance(v, DirectView))
49 50 self.assertEquals(v.targets, t)
50 51 v = self.client[::2]
51 52 self.assert_(isinstance(v, DirectView))
52 53 self.assertEquals(v.targets, targets[::2])
53 54 v = self.client[1::3]
54 55 self.assert_(isinstance(v, DirectView))
55 56 self.assertEquals(v.targets, targets[1::3])
56 57 v = self.client[:-3]
57 58 self.assert_(isinstance(v, DirectView))
58 59 self.assertEquals(v.targets, targets[:-3])
59 60 v = self.client[-1]
60 61 self.assert_(isinstance(v, DirectView))
61 62 self.assertEquals(v.targets, targets[-1])
62 63 self.assertRaises(TypeError, lambda : self.client[None])
63 64
64 65 def test_lbview_targets(self):
65 66 """test load_balanced_view targets"""
66 67 v = self.client.load_balanced_view()
67 68 self.assertEquals(v.targets, None)
68 69 v = self.client.load_balanced_view(-1)
69 70 self.assertEquals(v.targets, [self.client.ids[-1]])
70 71 v = self.client.load_balanced_view('all')
71 72 self.assertEquals(v.targets, self.client.ids)
72 73
73 74 def test_targets(self):
74 75 """test various valid targets arguments"""
75 76 build = self.client._build_targets
76 77 ids = self.client.ids
77 78 idents,targets = build(None)
78 79 self.assertEquals(ids, targets)
79 80
80 81 def test_clear(self):
81 82 """test clear behavior"""
82 83 # self.add_engines(2)
83 84 v = self.client[:]
84 85 v.block=True
85 86 v.push(dict(a=5))
86 87 v.pull('a')
87 88 id0 = self.client.ids[-1]
88 89 self.client.clear(targets=id0, block=True)
89 90 a = self.client[:-1].get('a')
90 91 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
91 92 self.client.clear(block=True)
92 93 for i in self.client.ids:
93 94 # print i
94 95 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
95 96
96 97 def test_get_result(self):
97 98 """test getting results from the Hub."""
98 99 c = clientmod.Client(profile='iptest')
99 100 # self.add_engines(1)
100 101 t = c.ids[-1]
101 102 ar = c[t].apply_async(wait, 1)
102 103 # give the monitor time to notice the message
103 104 time.sleep(.25)
104 105 ahr = self.client.get_result(ar.msg_ids)
105 106 self.assertTrue(isinstance(ahr, AsyncHubResult))
106 107 self.assertEquals(ahr.get(), ar.get())
107 108 ar2 = self.client.get_result(ar.msg_ids)
108 109 self.assertFalse(isinstance(ar2, AsyncHubResult))
109 110 c.close()
110 111
111 112 def test_ids_list(self):
112 113 """test client.ids"""
113 114 # self.add_engines(2)
114 115 ids = self.client.ids
115 116 self.assertEquals(ids, self.client._ids)
116 117 self.assertFalse(ids is self.client._ids)
117 118 ids.remove(ids[-1])
118 119 self.assertNotEquals(ids, self.client._ids)
119 120
120 121 def test_queue_status(self):
121 122 # self.addEngine(4)
122 123 ids = self.client.ids
123 124 id0 = ids[0]
124 125 qs = self.client.queue_status(targets=id0)
125 126 self.assertTrue(isinstance(qs, dict))
126 127 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
127 128 allqs = self.client.queue_status()
128 129 self.assertTrue(isinstance(allqs, dict))
129 130 self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned']))
130 131 unassigned = allqs.pop('unassigned')
131 132 for eid,qs in allqs.items():
132 133 self.assertTrue(isinstance(qs, dict))
133 134 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
134 135
135 136 def test_shutdown(self):
136 137 # self.addEngine(4)
137 138 ids = self.client.ids
138 139 id0 = ids[0]
139 140 self.client.shutdown(id0, block=True)
140 141 while id0 in self.client.ids:
141 142 time.sleep(0.1)
142 143 self.client.spin()
143 144
144 145 self.assertRaises(IndexError, lambda : self.client[id0])
145 146
146 147 def test_result_status(self):
147 148 pass
148 149 # to be written
150
151 def test_db_query_dt(self):
152 """test db query by date"""
153 hist = self.client.hub_history()
154 middle = self.client.db_query({'msg_id' : hist[len(hist)/2]})[0]
155 tic = middle['submitted']
156 before = self.client.db_query({'submitted' : {'$lt' : tic}})
157 after = self.client.db_query({'submitted' : {'$gte' : tic}})
158 self.assertEquals(len(before)+len(after),len(hist))
159 for b in before:
160 self.assertTrue(b['submitted'] < tic)
161 for a in after:
162 self.assertTrue(a['submitted'] >= tic)
163 same = self.client.db_query({'submitted' : tic})
164 for s in same:
165 self.assertTrue(s['submitted'] == tic)
166
167 def test_db_query_keys(self):
168 """test extracting subset of record keys"""
169 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
170 for rec in found:
171 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
172
173 def test_db_query_msg_id(self):
174 """ensure msg_id is always in db queries"""
175 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
176 for rec in found:
177 self.assertTrue('msg_id' in rec.keys())
178 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
179 for rec in found:
180 self.assertTrue('msg_id' in rec.keys())
181 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
182 for rec in found:
183 self.assertTrue('msg_id' in rec.keys())
184
185 def test_db_query_in(self):
186 """test db query with '$in','$nin' operators"""
187 hist = self.client.hub_history()
188 even = hist[::2]
189 odd = hist[1::2]
190 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
191 found = [ r['msg_id'] for r in recs ]
192 self.assertEquals(set(even), set(found))
193 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
194 found = [ r['msg_id'] for r in recs ]
195 self.assertEquals(set(odd), set(found))
196
197 def test_hub_history(self):
198 hist = self.client.hub_history()
199 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
200 recdict = {}
201 for rec in recs:
202 recdict[rec['msg_id']] = rec
203
204 latest = datetime(1984,1,1)
205 for msg_id in hist:
206 rec = recdict[msg_id]
207 newt = rec['submitted']
208 self.assertTrue(newt >= latest)
209 latest = newt
210 ar = self.client[-1].apply_async(lambda : 1)
211 ar.get()
212 time.sleep(0.25)
213 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
214
General Comments 0
You need to be logged in to leave comments. Login now