##// END OF EJS Templates
reduce test_cull_count test...
MinRK -
Show More
@@ -1,314 +1,314 b''
1 """Tests for db backends
1 """Tests for db backends
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import logging
21 import logging
22 import os
22 import os
23 import tempfile
23 import tempfile
24 import time
24 import time
25
25
26 from datetime import datetime, timedelta
26 from datetime import datetime, timedelta
27 from unittest import TestCase
27 from unittest import TestCase
28
28
29 from IPython.parallel import error
29 from IPython.parallel import error
30 from IPython.parallel.controller.dictdb import DictDB
30 from IPython.parallel.controller.dictdb import DictDB
31 from IPython.parallel.controller.sqlitedb import SQLiteDB
31 from IPython.parallel.controller.sqlitedb import SQLiteDB
32 from IPython.parallel.controller.hub import init_record, empty_record
32 from IPython.parallel.controller.hub import init_record, empty_record
33
33
34 from IPython.testing import decorators as dec
34 from IPython.testing import decorators as dec
35 from IPython.kernel.zmq.session import Session
35 from IPython.kernel.zmq.session import Session
36
36
37
37
38 #-------------------------------------------------------------------------------
38 #-------------------------------------------------------------------------------
39 # TestCases
39 # TestCases
40 #-------------------------------------------------------------------------------
40 #-------------------------------------------------------------------------------
41
41
42
42
43 def setup():
43 def setup():
44 global temp_db
44 global temp_db
45 temp_db = tempfile.NamedTemporaryFile(suffix='.db').name
45 temp_db = tempfile.NamedTemporaryFile(suffix='.db').name
46
46
47
47
48 class TaskDBTest:
48 class TaskDBTest:
49 def setUp(self):
49 def setUp(self):
50 self.session = Session()
50 self.session = Session()
51 self.db = self.create_db()
51 self.db = self.create_db()
52 self.load_records(16)
52 self.load_records(16)
53
53
54 def create_db(self):
54 def create_db(self):
55 raise NotImplementedError
55 raise NotImplementedError
56
56
57 def load_records(self, n=1, buffer_size=100):
57 def load_records(self, n=1, buffer_size=100):
58 """load n records for testing"""
58 """load n records for testing"""
59 #sleep 1/10 s, to ensure timestamp is different to previous calls
59 #sleep 1/10 s, to ensure timestamp is different to previous calls
60 time.sleep(0.1)
60 time.sleep(0.1)
61 msg_ids = []
61 msg_ids = []
62 for i in range(n):
62 for i in range(n):
63 msg = self.session.msg('apply_request', content=dict(a=5))
63 msg = self.session.msg('apply_request', content=dict(a=5))
64 msg['buffers'] = [os.urandom(buffer_size)]
64 msg['buffers'] = [os.urandom(buffer_size)]
65 rec = init_record(msg)
65 rec = init_record(msg)
66 msg_id = msg['header']['msg_id']
66 msg_id = msg['header']['msg_id']
67 msg_ids.append(msg_id)
67 msg_ids.append(msg_id)
68 self.db.add_record(msg_id, rec)
68 self.db.add_record(msg_id, rec)
69 return msg_ids
69 return msg_ids
70
70
71 def test_add_record(self):
71 def test_add_record(self):
72 before = self.db.get_history()
72 before = self.db.get_history()
73 self.load_records(5)
73 self.load_records(5)
74 after = self.db.get_history()
74 after = self.db.get_history()
75 self.assertEqual(len(after), len(before)+5)
75 self.assertEqual(len(after), len(before)+5)
76 self.assertEqual(after[:-5],before)
76 self.assertEqual(after[:-5],before)
77
77
78 def test_drop_record(self):
78 def test_drop_record(self):
79 msg_id = self.load_records()[-1]
79 msg_id = self.load_records()[-1]
80 rec = self.db.get_record(msg_id)
80 rec = self.db.get_record(msg_id)
81 self.db.drop_record(msg_id)
81 self.db.drop_record(msg_id)
82 self.assertRaises(KeyError,self.db.get_record, msg_id)
82 self.assertRaises(KeyError,self.db.get_record, msg_id)
83
83
84 def _round_to_millisecond(self, dt):
84 def _round_to_millisecond(self, dt):
85 """necessary because mongodb rounds microseconds"""
85 """necessary because mongodb rounds microseconds"""
86 micro = dt.microsecond
86 micro = dt.microsecond
87 extra = int(str(micro)[-3:])
87 extra = int(str(micro)[-3:])
88 return dt - timedelta(microseconds=extra)
88 return dt - timedelta(microseconds=extra)
89
89
90 def test_update_record(self):
90 def test_update_record(self):
91 now = self._round_to_millisecond(datetime.now())
91 now = self._round_to_millisecond(datetime.now())
92 #
92 #
93 msg_id = self.db.get_history()[-1]
93 msg_id = self.db.get_history()[-1]
94 rec1 = self.db.get_record(msg_id)
94 rec1 = self.db.get_record(msg_id)
95 data = {'stdout': 'hello there', 'completed' : now}
95 data = {'stdout': 'hello there', 'completed' : now}
96 self.db.update_record(msg_id, data)
96 self.db.update_record(msg_id, data)
97 rec2 = self.db.get_record(msg_id)
97 rec2 = self.db.get_record(msg_id)
98 self.assertEqual(rec2['stdout'], 'hello there')
98 self.assertEqual(rec2['stdout'], 'hello there')
99 self.assertEqual(rec2['completed'], now)
99 self.assertEqual(rec2['completed'], now)
100 rec1.update(data)
100 rec1.update(data)
101 self.assertEqual(rec1, rec2)
101 self.assertEqual(rec1, rec2)
102
102
103 # def test_update_record_bad(self):
103 # def test_update_record_bad(self):
104 # """test updating nonexistant records"""
104 # """test updating nonexistant records"""
105 # msg_id = str(uuid.uuid4())
105 # msg_id = str(uuid.uuid4())
106 # data = {'stdout': 'hello there'}
106 # data = {'stdout': 'hello there'}
107 # self.assertRaises(KeyError, self.db.update_record, msg_id, data)
107 # self.assertRaises(KeyError, self.db.update_record, msg_id, data)
108
108
109 def test_find_records_dt(self):
109 def test_find_records_dt(self):
110 """test finding records by date"""
110 """test finding records by date"""
111 hist = self.db.get_history()
111 hist = self.db.get_history()
112 middle = self.db.get_record(hist[len(hist)//2])
112 middle = self.db.get_record(hist[len(hist)//2])
113 tic = middle['submitted']
113 tic = middle['submitted']
114 before = self.db.find_records({'submitted' : {'$lt' : tic}})
114 before = self.db.find_records({'submitted' : {'$lt' : tic}})
115 after = self.db.find_records({'submitted' : {'$gte' : tic}})
115 after = self.db.find_records({'submitted' : {'$gte' : tic}})
116 self.assertEqual(len(before)+len(after),len(hist))
116 self.assertEqual(len(before)+len(after),len(hist))
117 for b in before:
117 for b in before:
118 self.assertTrue(b['submitted'] < tic)
118 self.assertTrue(b['submitted'] < tic)
119 for a in after:
119 for a in after:
120 self.assertTrue(a['submitted'] >= tic)
120 self.assertTrue(a['submitted'] >= tic)
121 same = self.db.find_records({'submitted' : tic})
121 same = self.db.find_records({'submitted' : tic})
122 for s in same:
122 for s in same:
123 self.assertTrue(s['submitted'] == tic)
123 self.assertTrue(s['submitted'] == tic)
124
124
125 def test_find_records_keys(self):
125 def test_find_records_keys(self):
126 """test extracting subset of record keys"""
126 """test extracting subset of record keys"""
127 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
127 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
128 for rec in found:
128 for rec in found:
129 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
129 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
130
130
131 def test_find_records_msg_id(self):
131 def test_find_records_msg_id(self):
132 """ensure msg_id is always in found records"""
132 """ensure msg_id is always in found records"""
133 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
133 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
134 for rec in found:
134 for rec in found:
135 self.assertTrue('msg_id' in rec.keys())
135 self.assertTrue('msg_id' in rec.keys())
136 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted'])
136 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted'])
137 for rec in found:
137 for rec in found:
138 self.assertTrue('msg_id' in rec.keys())
138 self.assertTrue('msg_id' in rec.keys())
139 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['msg_id'])
139 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['msg_id'])
140 for rec in found:
140 for rec in found:
141 self.assertTrue('msg_id' in rec.keys())
141 self.assertTrue('msg_id' in rec.keys())
142
142
143 def test_find_records_in(self):
143 def test_find_records_in(self):
144 """test finding records with '$in','$nin' operators"""
144 """test finding records with '$in','$nin' operators"""
145 hist = self.db.get_history()
145 hist = self.db.get_history()
146 even = hist[::2]
146 even = hist[::2]
147 odd = hist[1::2]
147 odd = hist[1::2]
148 recs = self.db.find_records({ 'msg_id' : {'$in' : even}})
148 recs = self.db.find_records({ 'msg_id' : {'$in' : even}})
149 found = [ r['msg_id'] for r in recs ]
149 found = [ r['msg_id'] for r in recs ]
150 self.assertEqual(set(even), set(found))
150 self.assertEqual(set(even), set(found))
151 recs = self.db.find_records({ 'msg_id' : {'$nin' : even}})
151 recs = self.db.find_records({ 'msg_id' : {'$nin' : even}})
152 found = [ r['msg_id'] for r in recs ]
152 found = [ r['msg_id'] for r in recs ]
153 self.assertEqual(set(odd), set(found))
153 self.assertEqual(set(odd), set(found))
154
154
155 def test_get_history(self):
155 def test_get_history(self):
156 msg_ids = self.db.get_history()
156 msg_ids = self.db.get_history()
157 latest = datetime(1984,1,1)
157 latest = datetime(1984,1,1)
158 for msg_id in msg_ids:
158 for msg_id in msg_ids:
159 rec = self.db.get_record(msg_id)
159 rec = self.db.get_record(msg_id)
160 newt = rec['submitted']
160 newt = rec['submitted']
161 self.assertTrue(newt >= latest)
161 self.assertTrue(newt >= latest)
162 latest = newt
162 latest = newt
163 msg_id = self.load_records(1)[-1]
163 msg_id = self.load_records(1)[-1]
164 self.assertEqual(self.db.get_history()[-1],msg_id)
164 self.assertEqual(self.db.get_history()[-1],msg_id)
165
165
166 def test_datetime(self):
166 def test_datetime(self):
167 """get/set timestamps with datetime objects"""
167 """get/set timestamps with datetime objects"""
168 msg_id = self.db.get_history()[-1]
168 msg_id = self.db.get_history()[-1]
169 rec = self.db.get_record(msg_id)
169 rec = self.db.get_record(msg_id)
170 self.assertTrue(isinstance(rec['submitted'], datetime))
170 self.assertTrue(isinstance(rec['submitted'], datetime))
171 self.db.update_record(msg_id, dict(completed=datetime.now()))
171 self.db.update_record(msg_id, dict(completed=datetime.now()))
172 rec = self.db.get_record(msg_id)
172 rec = self.db.get_record(msg_id)
173 self.assertTrue(isinstance(rec['completed'], datetime))
173 self.assertTrue(isinstance(rec['completed'], datetime))
174
174
175 def test_drop_matching(self):
175 def test_drop_matching(self):
176 msg_ids = self.load_records(10)
176 msg_ids = self.load_records(10)
177 query = {'msg_id' : {'$in':msg_ids}}
177 query = {'msg_id' : {'$in':msg_ids}}
178 self.db.drop_matching_records(query)
178 self.db.drop_matching_records(query)
179 recs = self.db.find_records(query)
179 recs = self.db.find_records(query)
180 self.assertEqual(len(recs), 0)
180 self.assertEqual(len(recs), 0)
181
181
182 def test_null(self):
182 def test_null(self):
183 """test None comparison queries"""
183 """test None comparison queries"""
184 msg_ids = self.load_records(10)
184 msg_ids = self.load_records(10)
185
185
186 query = {'msg_id' : None}
186 query = {'msg_id' : None}
187 recs = self.db.find_records(query)
187 recs = self.db.find_records(query)
188 self.assertEqual(len(recs), 0)
188 self.assertEqual(len(recs), 0)
189
189
190 query = {'msg_id' : {'$ne' : None}}
190 query = {'msg_id' : {'$ne' : None}}
191 recs = self.db.find_records(query)
191 recs = self.db.find_records(query)
192 self.assertTrue(len(recs) >= 10)
192 self.assertTrue(len(recs) >= 10)
193
193
194 def test_pop_safe_get(self):
194 def test_pop_safe_get(self):
195 """editing query results shouldn't affect record [get]"""
195 """editing query results shouldn't affect record [get]"""
196 msg_id = self.db.get_history()[-1]
196 msg_id = self.db.get_history()[-1]
197 rec = self.db.get_record(msg_id)
197 rec = self.db.get_record(msg_id)
198 rec.pop('buffers')
198 rec.pop('buffers')
199 rec['garbage'] = 'hello'
199 rec['garbage'] = 'hello'
200 rec['header']['msg_id'] = 'fubar'
200 rec['header']['msg_id'] = 'fubar'
201 rec2 = self.db.get_record(msg_id)
201 rec2 = self.db.get_record(msg_id)
202 self.assertTrue('buffers' in rec2)
202 self.assertTrue('buffers' in rec2)
203 self.assertFalse('garbage' in rec2)
203 self.assertFalse('garbage' in rec2)
204 self.assertEqual(rec2['header']['msg_id'], msg_id)
204 self.assertEqual(rec2['header']['msg_id'], msg_id)
205
205
206 def test_pop_safe_find(self):
206 def test_pop_safe_find(self):
207 """editing query results shouldn't affect record [find]"""
207 """editing query results shouldn't affect record [find]"""
208 msg_id = self.db.get_history()[-1]
208 msg_id = self.db.get_history()[-1]
209 rec = self.db.find_records({'msg_id' : msg_id})[0]
209 rec = self.db.find_records({'msg_id' : msg_id})[0]
210 rec.pop('buffers')
210 rec.pop('buffers')
211 rec['garbage'] = 'hello'
211 rec['garbage'] = 'hello'
212 rec['header']['msg_id'] = 'fubar'
212 rec['header']['msg_id'] = 'fubar'
213 rec2 = self.db.find_records({'msg_id' : msg_id})[0]
213 rec2 = self.db.find_records({'msg_id' : msg_id})[0]
214 self.assertTrue('buffers' in rec2)
214 self.assertTrue('buffers' in rec2)
215 self.assertFalse('garbage' in rec2)
215 self.assertFalse('garbage' in rec2)
216 self.assertEqual(rec2['header']['msg_id'], msg_id)
216 self.assertEqual(rec2['header']['msg_id'], msg_id)
217
217
218 def test_pop_safe_find_keys(self):
218 def test_pop_safe_find_keys(self):
219 """editing query results shouldn't affect record [find+keys]"""
219 """editing query results shouldn't affect record [find+keys]"""
220 msg_id = self.db.get_history()[-1]
220 msg_id = self.db.get_history()[-1]
221 rec = self.db.find_records({'msg_id' : msg_id}, keys=['buffers', 'header'])[0]
221 rec = self.db.find_records({'msg_id' : msg_id}, keys=['buffers', 'header'])[0]
222 rec.pop('buffers')
222 rec.pop('buffers')
223 rec['garbage'] = 'hello'
223 rec['garbage'] = 'hello'
224 rec['header']['msg_id'] = 'fubar'
224 rec['header']['msg_id'] = 'fubar'
225 rec2 = self.db.find_records({'msg_id' : msg_id})[0]
225 rec2 = self.db.find_records({'msg_id' : msg_id})[0]
226 self.assertTrue('buffers' in rec2)
226 self.assertTrue('buffers' in rec2)
227 self.assertFalse('garbage' in rec2)
227 self.assertFalse('garbage' in rec2)
228 self.assertEqual(rec2['header']['msg_id'], msg_id)
228 self.assertEqual(rec2['header']['msg_id'], msg_id)
229
229
230
230
231 class TestDictBackend(TaskDBTest, TestCase):
231 class TestDictBackend(TaskDBTest, TestCase):
232
232
233 def create_db(self):
233 def create_db(self):
234 return DictDB()
234 return DictDB()
235
235
236 def test_cull_count(self):
236 def test_cull_count(self):
237 self.db = self.create_db() # skip the load-records init from setUp
237 self.db = self.create_db() # skip the load-records init from setUp
238 self.db.record_limit = 20
238 self.db.record_limit = 20
239 self.db.cull_fraction = 0.2
239 self.db.cull_fraction = 0.2
240 self.load_records(20)
240 self.load_records(20)
241 self.assertEqual(len(self.db.get_history()), 20)
241 self.assertEqual(len(self.db.get_history()), 20)
242 self.load_records(1)
242 self.load_records(1)
243 # 0.2 * 20 = 4, 21 - 4 = 17
243 # 0.2 * 20 = 4, 21 - 4 = 17
244 self.assertEqual(len(self.db.get_history()), 17)
244 self.assertEqual(len(self.db.get_history()), 17)
245 self.load_records(3)
245 self.load_records(3)
246 self.assertEqual(len(self.db.get_history()), 20)
246 self.assertEqual(len(self.db.get_history()), 20)
247 self.load_records(1)
247 self.load_records(1)
248 self.assertEqual(len(self.db.get_history()), 17)
248 self.assertEqual(len(self.db.get_history()), 17)
249
249
250 for i in range(100):
250 for i in range(25):
251 self.load_records(1)
251 self.load_records(1)
252 self.assertTrue(len(self.db.get_history()) >= 17)
252 self.assertTrue(len(self.db.get_history()) >= 17)
253 self.assertTrue(len(self.db.get_history()) <= 20)
253 self.assertTrue(len(self.db.get_history()) <= 20)
254
254
255 def test_cull_size(self):
255 def test_cull_size(self):
256 self.db = self.create_db() # skip the load-records init from setUp
256 self.db = self.create_db() # skip the load-records init from setUp
257 self.db.size_limit = 1000
257 self.db.size_limit = 1000
258 self.db.cull_fraction = 0.2
258 self.db.cull_fraction = 0.2
259 self.load_records(100, buffer_size=10)
259 self.load_records(100, buffer_size=10)
260 self.assertEqual(len(self.db.get_history()), 100)
260 self.assertEqual(len(self.db.get_history()), 100)
261 self.load_records(1, buffer_size=0)
261 self.load_records(1, buffer_size=0)
262 self.assertEqual(len(self.db.get_history()), 101)
262 self.assertEqual(len(self.db.get_history()), 101)
263 self.load_records(1, buffer_size=1)
263 self.load_records(1, buffer_size=1)
264 # 0.2 * 100 = 20, 101 - 20 = 81
264 # 0.2 * 100 = 20, 101 - 20 = 81
265 self.assertEqual(len(self.db.get_history()), 81)
265 self.assertEqual(len(self.db.get_history()), 81)
266
266
267 def test_cull_size_drop(self):
267 def test_cull_size_drop(self):
268 """dropping records updates tracked buffer size"""
268 """dropping records updates tracked buffer size"""
269 self.db = self.create_db() # skip the load-records init from setUp
269 self.db = self.create_db() # skip the load-records init from setUp
270 self.db.size_limit = 1000
270 self.db.size_limit = 1000
271 self.db.cull_fraction = 0.2
271 self.db.cull_fraction = 0.2
272 self.load_records(100, buffer_size=10)
272 self.load_records(100, buffer_size=10)
273 self.assertEqual(len(self.db.get_history()), 100)
273 self.assertEqual(len(self.db.get_history()), 100)
274 self.db.drop_record(self.db.get_history()[-1])
274 self.db.drop_record(self.db.get_history()[-1])
275 self.assertEqual(len(self.db.get_history()), 99)
275 self.assertEqual(len(self.db.get_history()), 99)
276 self.load_records(1, buffer_size=5)
276 self.load_records(1, buffer_size=5)
277 self.assertEqual(len(self.db.get_history()), 100)
277 self.assertEqual(len(self.db.get_history()), 100)
278 self.load_records(1, buffer_size=5)
278 self.load_records(1, buffer_size=5)
279 self.assertEqual(len(self.db.get_history()), 101)
279 self.assertEqual(len(self.db.get_history()), 101)
280 self.load_records(1, buffer_size=1)
280 self.load_records(1, buffer_size=1)
281 self.assertEqual(len(self.db.get_history()), 81)
281 self.assertEqual(len(self.db.get_history()), 81)
282
282
283 def test_cull_size_update(self):
283 def test_cull_size_update(self):
284 """updating records updates tracked buffer size"""
284 """updating records updates tracked buffer size"""
285 self.db = self.create_db() # skip the load-records init from setUp
285 self.db = self.create_db() # skip the load-records init from setUp
286 self.db.size_limit = 1000
286 self.db.size_limit = 1000
287 self.db.cull_fraction = 0.2
287 self.db.cull_fraction = 0.2
288 self.load_records(100, buffer_size=10)
288 self.load_records(100, buffer_size=10)
289 self.assertEqual(len(self.db.get_history()), 100)
289 self.assertEqual(len(self.db.get_history()), 100)
290 msg_id = self.db.get_history()[-1]
290 msg_id = self.db.get_history()[-1]
291 self.db.update_record(msg_id, dict(result_buffers = [os.urandom(10)], buffers=[]))
291 self.db.update_record(msg_id, dict(result_buffers = [os.urandom(10)], buffers=[]))
292 self.assertEqual(len(self.db.get_history()), 100)
292 self.assertEqual(len(self.db.get_history()), 100)
293 self.db.update_record(msg_id, dict(result_buffers = [os.urandom(11)], buffers=[]))
293 self.db.update_record(msg_id, dict(result_buffers = [os.urandom(11)], buffers=[]))
294 self.assertEqual(len(self.db.get_history()), 79)
294 self.assertEqual(len(self.db.get_history()), 79)
295
295
296 class TestSQLiteBackend(TaskDBTest, TestCase):
296 class TestSQLiteBackend(TaskDBTest, TestCase):
297
297
298 @dec.skip_without('sqlite3')
298 @dec.skip_without('sqlite3')
299 def create_db(self):
299 def create_db(self):
300 location, fname = os.path.split(temp_db)
300 location, fname = os.path.split(temp_db)
301 log = logging.getLogger('test')
301 log = logging.getLogger('test')
302 log.setLevel(logging.CRITICAL)
302 log.setLevel(logging.CRITICAL)
303 return SQLiteDB(location=location, fname=fname, log=log)
303 return SQLiteDB(location=location, fname=fname, log=log)
304
304
305 def tearDown(self):
305 def tearDown(self):
306 self.db._db.close()
306 self.db._db.close()
307
307
308
308
309 def teardown():
309 def teardown():
310 """cleanup task db file after all tests have run"""
310 """cleanup task db file after all tests have run"""
311 try:
311 try:
312 os.remove(temp_db)
312 os.remove(temp_db)
313 except:
313 except:
314 pass
314 pass
General Comments 0
You need to be logged in to leave comments. Login now