Show More
@@ -51,7 +51,7 b' from datetime import datetime' | |||||
51 |
|
51 | |||
52 | from IPython.config.configurable import LoggingConfigurable |
|
52 | from IPython.config.configurable import LoggingConfigurable | |
53 |
|
53 | |||
54 |
from IPython.utils.traitlets import Dict, Unicode, In |
|
54 | from IPython.utils.traitlets import Dict, Unicode, Integer, Float | |
55 |
|
55 | |||
56 | filters = { |
|
56 | filters = { | |
57 | '$lt' : lambda a,b: a < b, |
|
57 | '$lt' : lambda a,b: a < b, | |
@@ -100,6 +100,33 b' class DictDB(BaseDB):' | |||||
100 | """ |
|
100 | """ | |
101 |
|
101 | |||
102 | _records = Dict() |
|
102 | _records = Dict() | |
|
103 | _culled_ids = set() # set of ids which have been culled | |||
|
104 | _buffer_bytes = Integer(0) # running total of the bytes in the DB | |||
|
105 | ||||
|
106 | size_limit = Integer(1024*1024, config=True, | |||
|
107 | help="""The maximum total size (in bytes) of the buffers stored in the db | |||
|
108 | ||||
|
109 | When the db exceeds this size, the oldest records will be culled until | |||
|
110 | the total size is under size_limit * (1-cull_fraction). | |||
|
111 | """ | |||
|
112 | ) | |||
|
113 | record_limit = Integer(1024, config=True, | |||
|
114 | help="""The maximum number of records in the db | |||
|
115 | ||||
|
116 | When the history exceeds this size, the first record_limit * cull_fraction | |||
|
117 | records will be culled. | |||
|
118 | """ | |||
|
119 | ) | |||
|
120 | cull_fraction = Float(0.1, config=True, | |||
|
121 | help="""The fraction by which the db should culled when one of the limits is exceeded | |||
|
122 | ||||
|
123 | In general, the db size will spend most of its time with a size in the range: | |||
|
124 | ||||
|
125 | [limit * (1-cull_fraction), limit] | |||
|
126 | ||||
|
127 | for each of size_limit and record_limit. | |||
|
128 | """ | |||
|
129 | ) | |||
103 |
|
130 | |||
104 | def _match_one(self, rec, tests): |
|
131 | def _match_one(self, rec, tests): | |
105 | """Check if a specific record matches tests.""" |
|
132 | """Check if a specific record matches tests.""" | |
@@ -130,34 +157,92 b' class DictDB(BaseDB):' | |||||
130 | for key in keys: |
|
157 | for key in keys: | |
131 | d[key] = rec[key] |
|
158 | d[key] = rec[key] | |
132 | return copy(d) |
|
159 | return copy(d) | |
|
160 | ||||
|
161 | # methods for monitoring size / culling history | |||
|
162 | ||||
|
163 | def _add_bytes(self, rec): | |||
|
164 | for key in ('buffers', 'result_buffers'): | |||
|
165 | for buf in rec.get(key) or []: | |||
|
166 | self._buffer_bytes += len(buf) | |||
|
167 | ||||
|
168 | self._maybe_cull() | |||
|
169 | ||||
|
170 | def _drop_bytes(self, rec): | |||
|
171 | for key in ('buffers', 'result_buffers'): | |||
|
172 | for buf in rec.get(key) or []: | |||
|
173 | self._buffer_bytes -= len(buf) | |||
|
174 | ||||
|
175 | def _cull_oldest(self, n=1): | |||
|
176 | """cull the oldest N records""" | |||
|
177 | for msg_id in self.get_history()[:n]: | |||
|
178 | self.log.debug("Culling record: %r", msg_id) | |||
|
179 | self._culled_ids.add(msg_id) | |||
|
180 | self.drop_record(msg_id) | |||
|
181 | ||||
|
182 | def _maybe_cull(self): | |||
|
183 | # cull by count: | |||
|
184 | if len(self._records) > self.record_limit: | |||
|
185 | to_cull = int(self.cull_fraction * self.record_limit) | |||
|
186 | self.log.info("%i records exceeds limit of %i, culling oldest %i", | |||
|
187 | len(self._records), self.record_limit, to_cull | |||
|
188 | ) | |||
|
189 | self._cull_oldest(to_cull) | |||
|
190 | ||||
|
191 | # cull by size: | |||
|
192 | if self._buffer_bytes > self.size_limit: | |||
|
193 | limit = self.size_limit * (1 - self.cull_fraction) | |||
|
194 | ||||
|
195 | before = self._buffer_bytes | |||
|
196 | before_count = len(self._records) | |||
|
197 | culled = 0 | |||
|
198 | while self._buffer_bytes > limit: | |||
|
199 | self._cull_oldest(1) | |||
|
200 | culled += 1 | |||
|
201 | ||||
|
202 | self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.", | |||
|
203 | before_count, before, self.size_limit, culled | |||
|
204 | ) | |||
|
205 | ||||
|
206 | # public API methods: | |||
133 |
|
207 | |||
134 | def add_record(self, msg_id, rec): |
|
208 | def add_record(self, msg_id, rec): | |
135 | """Add a new Task Record, by msg_id.""" |
|
209 | """Add a new Task Record, by msg_id.""" | |
136 | if msg_id in self._records: |
|
210 | if msg_id in self._records: | |
137 | raise KeyError("Already have msg_id %r"%(msg_id)) |
|
211 | raise KeyError("Already have msg_id %r"%(msg_id)) | |
138 | self._records[msg_id] = rec |
|
212 | self._records[msg_id] = rec | |
|
213 | self._add_bytes(rec) | |||
|
214 | self._maybe_cull() | |||
139 |
|
215 | |||
140 | def get_record(self, msg_id): |
|
216 | def get_record(self, msg_id): | |
141 | """Get a specific Task Record, by msg_id.""" |
|
217 | """Get a specific Task Record, by msg_id.""" | |
|
218 | if msg_id in self._culled_ids: | |||
|
219 | raise KeyError("Record %r has been culled for size" % msg_id) | |||
142 | if not msg_id in self._records: |
|
220 | if not msg_id in self._records: | |
143 | raise KeyError("No such msg_id %r"%(msg_id)) |
|
221 | raise KeyError("No such msg_id %r"%(msg_id)) | |
144 | return copy(self._records[msg_id]) |
|
222 | return copy(self._records[msg_id]) | |
145 |
|
223 | |||
146 | def update_record(self, msg_id, rec): |
|
224 | def update_record(self, msg_id, rec): | |
147 | """Update the data in an existing record.""" |
|
225 | """Update the data in an existing record.""" | |
148 | self._records[msg_id].update(rec) |
|
226 | if msg_id in self._culled_ids: | |
|
227 | raise KeyError("Record %r has been culled for size" % msg_id) | |||
|
228 | _rec = self._records[msg_id] | |||
|
229 | self._drop_bytes(_rec) | |||
|
230 | _rec.update(rec) | |||
|
231 | self._add_bytes(_rec) | |||
149 |
|
232 | |||
150 | def drop_matching_records(self, check): |
|
233 | def drop_matching_records(self, check): | |
151 | """Remove a record from the DB.""" |
|
234 | """Remove a record from the DB.""" | |
152 | matches = self._match(check) |
|
235 | matches = self._match(check) | |
153 |
for |
|
236 | for rec in matches: | |
154 | del self._records[m['msg_id']] |
|
237 | self._drop_bytes(rec) | |
|
238 | del self._records[rec['msg_id']] | |||
155 |
|
239 | |||
156 | def drop_record(self, msg_id): |
|
240 | def drop_record(self, msg_id): | |
157 | """Remove a record from the DB.""" |
|
241 | """Remove a record from the DB.""" | |
|
242 | rec = self._records[msg_id] | |||
|
243 | self._drop_bytes(rec) | |||
158 | del self._records[msg_id] |
|
244 | del self._records[msg_id] | |
159 |
|
245 | |||
160 |
|
||||
161 | def find_records(self, check, keys=None): |
|
246 | def find_records(self, check, keys=None): | |
162 | """Find records matching a query dict, optionally extracting subset of keys. |
|
247 | """Find records matching a query dict, optionally extracting subset of keys. | |
163 |
|
248 | |||
@@ -178,17 +263,18 b' class DictDB(BaseDB):' | |||||
178 | else: |
|
263 | else: | |
179 | return matches |
|
264 | return matches | |
180 |
|
265 | |||
181 |
|
||||
182 | def get_history(self): |
|
266 | def get_history(self): | |
183 | """get all msg_ids, ordered by time submitted.""" |
|
267 | """get all msg_ids, ordered by time submitted.""" | |
184 | msg_ids = self._records.keys() |
|
268 | msg_ids = self._records.keys() | |
185 | return sorted(msg_ids, key=lambda m: self._records[m]['submitted']) |
|
269 | return sorted(msg_ids, key=lambda m: self._records[m]['submitted']) | |
186 |
|
270 | |||
|
271 | ||||
187 | NODATA = KeyError("NoDB backend doesn't store any data. " |
|
272 | NODATA = KeyError("NoDB backend doesn't store any data. " | |
188 | "Start the Controller with a DB backend to enable resubmission / result persistence." |
|
273 | "Start the Controller with a DB backend to enable resubmission / result persistence." | |
189 | ) |
|
274 | ) | |
190 |
|
275 | |||
191 | class NoDB(DictDB): |
|
276 | ||
|
277 | class NoDB(BaseDB): | |||
192 | """A blackhole db backend that actually stores no information. |
|
278 | """A blackhole db backend that actually stores no information. | |
193 |
|
279 | |||
194 | Provides the full DB interface, but raises KeyErrors on any |
|
280 | Provides the full DB interface, but raises KeyErrors on any |
@@ -45,23 +45,23 b' def setup():' | |||||
45 | temp_db = tempfile.NamedTemporaryFile(suffix='.db').name |
|
45 | temp_db = tempfile.NamedTemporaryFile(suffix='.db').name | |
46 |
|
46 | |||
47 |
|
47 | |||
48 |
class T |
|
48 | class TaskDBTest: | |
49 | def setUp(self): |
|
49 | def setUp(self): | |
50 | self.session = Session() |
|
50 | self.session = Session() | |
51 | self.db = self.create_db() |
|
51 | self.db = self.create_db() | |
52 | self.load_records(16) |
|
52 | self.load_records(16) | |
53 |
|
53 | |||
54 | def create_db(self): |
|
54 | def create_db(self): | |
55 | return DictDB() |
|
55 | raise NotImplementedError | |
56 |
|
56 | |||
57 | def load_records(self, n=1): |
|
57 | def load_records(self, n=1, buffer_size=100): | |
58 | """load n records for testing""" |
|
58 | """load n records for testing""" | |
59 | #sleep 1/10 s, to ensure timestamp is different to previous calls |
|
59 | #sleep 1/10 s, to ensure timestamp is different to previous calls | |
60 | time.sleep(0.1) |
|
60 | time.sleep(0.1) | |
61 | msg_ids = [] |
|
61 | msg_ids = [] | |
62 | for i in range(n): |
|
62 | for i in range(n): | |
63 | msg = self.session.msg('apply_request', content=dict(a=5)) |
|
63 | msg = self.session.msg('apply_request', content=dict(a=5)) | |
64 | msg['buffers'] = [] |
|
64 | msg['buffers'] = [os.urandom(buffer_size)] | |
65 | rec = init_record(msg) |
|
65 | rec = init_record(msg) | |
66 | msg_id = msg['header']['msg_id'] |
|
66 | msg_id = msg['header']['msg_id'] | |
67 | msg_ids.append(msg_id) |
|
67 | msg_ids.append(msg_id) | |
@@ -228,7 +228,72 b' class TestDictBackend(TestCase):' | |||||
228 | self.assertEqual(rec2['header']['msg_id'], msg_id) |
|
228 | self.assertEqual(rec2['header']['msg_id'], msg_id) | |
229 |
|
229 | |||
230 |
|
230 | |||
231 | class TestSQLiteBackend(TestDictBackend): |
|
231 | class TestDictBackend(TaskDBTest, TestCase): | |
|
232 | ||||
|
233 | def create_db(self): | |||
|
234 | return DictDB() | |||
|
235 | ||||
|
236 | def test_cull_count(self): | |||
|
237 | self.db = self.create_db() # skip the load-records init from setUp | |||
|
238 | self.db.record_limit = 20 | |||
|
239 | self.db.cull_fraction = 0.2 | |||
|
240 | self.load_records(20) | |||
|
241 | self.assertEquals(len(self.db.get_history()), 20) | |||
|
242 | self.load_records(1) | |||
|
243 | # 0.2 * 20 = 4, 21 - 4 = 17 | |||
|
244 | self.assertEquals(len(self.db.get_history()), 17) | |||
|
245 | self.load_records(3) | |||
|
246 | self.assertEquals(len(self.db.get_history()), 20) | |||
|
247 | self.load_records(1) | |||
|
248 | self.assertEquals(len(self.db.get_history()), 17) | |||
|
249 | ||||
|
250 | for i in range(100): | |||
|
251 | self.load_records(1) | |||
|
252 | self.assertTrue(len(self.db.get_history()) >= 17) | |||
|
253 | self.assertTrue(len(self.db.get_history()) <= 20) | |||
|
254 | ||||
|
255 | def test_cull_size(self): | |||
|
256 | self.db = self.create_db() # skip the load-records init from setUp | |||
|
257 | self.db.size_limit = 1000 | |||
|
258 | self.db.cull_fraction = 0.2 | |||
|
259 | self.load_records(100, buffer_size=10) | |||
|
260 | self.assertEquals(len(self.db.get_history()), 100) | |||
|
261 | self.load_records(1, buffer_size=0) | |||
|
262 | self.assertEquals(len(self.db.get_history()), 101) | |||
|
263 | self.load_records(1, buffer_size=1) | |||
|
264 | # 0.2 * 100 = 20, 101 - 20 = 81 | |||
|
265 | self.assertEquals(len(self.db.get_history()), 81) | |||
|
266 | ||||
|
267 | def test_cull_size_drop(self): | |||
|
268 | """dropping records updates tracked buffer size""" | |||
|
269 | self.db = self.create_db() # skip the load-records init from setUp | |||
|
270 | self.db.size_limit = 1000 | |||
|
271 | self.db.cull_fraction = 0.2 | |||
|
272 | self.load_records(100, buffer_size=10) | |||
|
273 | self.assertEquals(len(self.db.get_history()), 100) | |||
|
274 | self.db.drop_record(self.db.get_history()[-1]) | |||
|
275 | self.assertEquals(len(self.db.get_history()), 99) | |||
|
276 | self.load_records(1, buffer_size=5) | |||
|
277 | self.assertEquals(len(self.db.get_history()), 100) | |||
|
278 | self.load_records(1, buffer_size=5) | |||
|
279 | self.assertEquals(len(self.db.get_history()), 101) | |||
|
280 | self.load_records(1, buffer_size=1) | |||
|
281 | self.assertEquals(len(self.db.get_history()), 81) | |||
|
282 | ||||
|
283 | def test_cull_size_update(self): | |||
|
284 | """updating records updates tracked buffer size""" | |||
|
285 | self.db = self.create_db() # skip the load-records init from setUp | |||
|
286 | self.db.size_limit = 1000 | |||
|
287 | self.db.cull_fraction = 0.2 | |||
|
288 | self.load_records(100, buffer_size=10) | |||
|
289 | self.assertEquals(len(self.db.get_history()), 100) | |||
|
290 | msg_id = self.db.get_history()[-1] | |||
|
291 | self.db.update_record(msg_id, dict(result_buffers = [os.urandom(10)], buffers=[])) | |||
|
292 | self.assertEquals(len(self.db.get_history()), 100) | |||
|
293 | self.db.update_record(msg_id, dict(result_buffers = [os.urandom(11)], buffers=[])) | |||
|
294 | self.assertEquals(len(self.db.get_history()), 79) | |||
|
295 | ||||
|
296 | class TestSQLiteBackend(TaskDBTest, TestCase): | |||
232 |
|
297 | |||
233 | @dec.skip_without('sqlite3') |
|
298 | @dec.skip_without('sqlite3') | |
234 | def create_db(self): |
|
299 | def create_db(self): |
@@ -16,6 +16,8 b' Authors:' | |||||
16 | # Imports |
|
16 | # Imports | |
17 | #------------------------------------------------------------------------------- |
|
17 | #------------------------------------------------------------------------------- | |
18 |
|
18 | |||
|
19 | from unittest import TestCase | |||
|
20 | ||||
19 | from nose import SkipTest |
|
21 | from nose import SkipTest | |
20 |
|
22 | |||
21 | from pymongo import Connection |
|
23 | from pymongo import Connection | |
@@ -28,7 +30,7 b' try:' | |||||
28 | except Exception: |
|
30 | except Exception: | |
29 | c=None |
|
31 | c=None | |
30 |
|
32 | |||
31 |
class TestMongoBackend(test_db.T |
|
33 | class TestMongoBackend(test_db.TaskDBTest, TestCase): | |
32 | """MongoDB backend tests""" |
|
34 | """MongoDB backend tests""" | |
33 |
|
35 | |||
34 | def create_db(self): |
|
36 | def create_db(self): |
General Comments 0
You need to be logged in to leave comments.
Login now