##// END OF EJS Templates
add size-limiting to the DictDB backend
MinRK -
Show More
@@ -51,7 +51,7 b' from datetime import datetime'
51
51
52 from IPython.config.configurable import LoggingConfigurable
52 from IPython.config.configurable import LoggingConfigurable
53
53
54 from IPython.utils.traitlets import Dict, Unicode, Instance
54 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
55
55
56 filters = {
56 filters = {
57 '$lt' : lambda a,b: a < b,
57 '$lt' : lambda a,b: a < b,
@@ -100,6 +100,33 b' class DictDB(BaseDB):'
100 """
100 """
101
101
102 _records = Dict()
102 _records = Dict()
103 _culled_ids = set() # set of ids which have been culled
104 _buffer_bytes = Integer(0) # running total of the bytes in the DB
105
106 size_limit = Integer(1024*1024, config=True,
107 help="""The maximum total size (in bytes) of the buffers stored in the db
108
109 When the db exceeds this size, the oldest records will be culled until
110 the total size is under size_limit * (1-cull_fraction).
111 """
112 )
113 record_limit = Integer(1024, config=True,
114 help="""The maximum number of records in the db
115
116 When the history exceeds this size, the first record_limit * cull_fraction
117 records will be culled.
118 """
119 )
120 cull_fraction = Float(0.1, config=True,
121 help="""The fraction by which the db should culled when one of the limits is exceeded
122
123 In general, the db size will spend most of its time with a size in the range:
124
125 [limit * (1-cull_fraction), limit]
126
127 for each of size_limit and record_limit.
128 """
129 )
103
130
104 def _match_one(self, rec, tests):
131 def _match_one(self, rec, tests):
105 """Check if a specific record matches tests."""
132 """Check if a specific record matches tests."""
@@ -130,34 +157,92 b' class DictDB(BaseDB):'
130 for key in keys:
157 for key in keys:
131 d[key] = rec[key]
158 d[key] = rec[key]
132 return copy(d)
159 return copy(d)
160
161 # methods for monitoring size / culling history
162
163 def _add_bytes(self, rec):
164 for key in ('buffers', 'result_buffers'):
165 for buf in rec.get(key) or []:
166 self._buffer_bytes += len(buf)
167
168 self._maybe_cull()
169
170 def _drop_bytes(self, rec):
171 for key in ('buffers', 'result_buffers'):
172 for buf in rec.get(key) or []:
173 self._buffer_bytes -= len(buf)
174
175 def _cull_oldest(self, n=1):
176 """cull the oldest N records"""
177 for msg_id in self.get_history()[:n]:
178 self.log.debug("Culling record: %r", msg_id)
179 self._culled_ids.add(msg_id)
180 self.drop_record(msg_id)
181
182 def _maybe_cull(self):
183 # cull by count:
184 if len(self._records) > self.record_limit:
185 to_cull = int(self.cull_fraction * self.record_limit)
186 self.log.info("%i records exceeds limit of %i, culling oldest %i",
187 len(self._records), self.record_limit, to_cull
188 )
189 self._cull_oldest(to_cull)
190
191 # cull by size:
192 if self._buffer_bytes > self.size_limit:
193 limit = self.size_limit * (1 - self.cull_fraction)
194
195 before = self._buffer_bytes
196 before_count = len(self._records)
197 culled = 0
198 while self._buffer_bytes > limit:
199 self._cull_oldest(1)
200 culled += 1
201
202 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
203 before_count, before, self.size_limit, culled
204 )
205
206 # public API methods:
133
207
134 def add_record(self, msg_id, rec):
208 def add_record(self, msg_id, rec):
135 """Add a new Task Record, by msg_id."""
209 """Add a new Task Record, by msg_id."""
136 if self._records.has_key(msg_id):
210 if self._records.has_key(msg_id):
137 raise KeyError("Already have msg_id %r"%(msg_id))
211 raise KeyError("Already have msg_id %r"%(msg_id))
138 self._records[msg_id] = rec
212 self._records[msg_id] = rec
213 self._add_bytes(rec)
214 self._maybe_cull()
139
215
140 def get_record(self, msg_id):
216 def get_record(self, msg_id):
141 """Get a specific Task Record, by msg_id."""
217 """Get a specific Task Record, by msg_id."""
218 if msg_id in self._culled_ids:
219 raise KeyError("Record %r has been culled for size" % msg_id)
142 if not msg_id in self._records:
220 if not msg_id in self._records:
143 raise KeyError("No such msg_id %r"%(msg_id))
221 raise KeyError("No such msg_id %r"%(msg_id))
144 return copy(self._records[msg_id])
222 return copy(self._records[msg_id])
145
223
146 def update_record(self, msg_id, rec):
224 def update_record(self, msg_id, rec):
147 """Update the data in an existing record."""
225 """Update the data in an existing record."""
148 self._records[msg_id].update(rec)
226 if msg_id in self._culled_ids:
227 raise KeyError("Record %r has been culled for size" % msg_id)
228 _rec = self._records[msg_id]
229 self._drop_bytes(_rec)
230 _rec.update(rec)
231 self._add_bytes(_rec)
149
232
150 def drop_matching_records(self, check):
233 def drop_matching_records(self, check):
151 """Remove a record from the DB."""
234 """Remove a record from the DB."""
152 matches = self._match(check)
235 matches = self._match(check)
153 for m in matches:
236 for rec in matches:
154 del self._records[m['msg_id']]
237 self._drop_bytes(rec)
238 del self._records[rec['msg_id']]
155
239
156 def drop_record(self, msg_id):
240 def drop_record(self, msg_id):
157 """Remove a record from the DB."""
241 """Remove a record from the DB."""
242 rec = self._records[msg_id]
243 self._drop_bytes(rec)
158 del self._records[msg_id]
244 del self._records[msg_id]
159
245
160
161 def find_records(self, check, keys=None):
246 def find_records(self, check, keys=None):
162 """Find records matching a query dict, optionally extracting subset of keys.
247 """Find records matching a query dict, optionally extracting subset of keys.
163
248
@@ -178,17 +263,18 b' class DictDB(BaseDB):'
178 else:
263 else:
179 return matches
264 return matches
180
265
181
182 def get_history(self):
266 def get_history(self):
183 """get all msg_ids, ordered by time submitted."""
267 """get all msg_ids, ordered by time submitted."""
184 msg_ids = self._records.keys()
268 msg_ids = self._records.keys()
185 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
269 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
186
270
271
187 NODATA = KeyError("NoDB backend doesn't store any data. "
272 NODATA = KeyError("NoDB backend doesn't store any data. "
188 "Start the Controller with a DB backend to enable resubmission / result persistence."
273 "Start the Controller with a DB backend to enable resubmission / result persistence."
189 )
274 )
190
275
191 class NoDB(DictDB):
276
277 class NoDB(BaseDB):
192 """A blackhole db backend that actually stores no information.
278 """A blackhole db backend that actually stores no information.
193
279
194 Provides the full DB interface, but raises KeyErrors on any
280 Provides the full DB interface, but raises KeyErrors on any
@@ -45,23 +45,23 b' def setup():'
45 temp_db = tempfile.NamedTemporaryFile(suffix='.db').name
45 temp_db = tempfile.NamedTemporaryFile(suffix='.db').name
46
46
47
47
48 class TestDictBackend(TestCase):
48 class TaskDBTest:
49 def setUp(self):
49 def setUp(self):
50 self.session = Session()
50 self.session = Session()
51 self.db = self.create_db()
51 self.db = self.create_db()
52 self.load_records(16)
52 self.load_records(16)
53
53
54 def create_db(self):
54 def create_db(self):
55 return DictDB()
55 raise NotImplementedError
56
56
57 def load_records(self, n=1):
57 def load_records(self, n=1, buffer_size=100):
58 """load n records for testing"""
58 """load n records for testing"""
59 #sleep 1/10 s, to ensure timestamp is different to previous calls
59 #sleep 1/10 s, to ensure timestamp is different to previous calls
60 time.sleep(0.1)
60 time.sleep(0.1)
61 msg_ids = []
61 msg_ids = []
62 for i in range(n):
62 for i in range(n):
63 msg = self.session.msg('apply_request', content=dict(a=5))
63 msg = self.session.msg('apply_request', content=dict(a=5))
64 msg['buffers'] = []
64 msg['buffers'] = [os.urandom(buffer_size)]
65 rec = init_record(msg)
65 rec = init_record(msg)
66 msg_id = msg['header']['msg_id']
66 msg_id = msg['header']['msg_id']
67 msg_ids.append(msg_id)
67 msg_ids.append(msg_id)
@@ -228,7 +228,72 b' class TestDictBackend(TestCase):'
228 self.assertEquals(rec2['header']['msg_id'], msg_id)
228 self.assertEquals(rec2['header']['msg_id'], msg_id)
229
229
230
230
231 class TestSQLiteBackend(TestDictBackend):
231 class TestDictBackend(TaskDBTest, TestCase):
232
233 def create_db(self):
234 return DictDB()
235
236 def test_cull_count(self):
237 self.db = self.create_db() # skip the load-records init from setUp
238 self.db.record_limit = 20
239 self.db.cull_fraction = 0.2
240 self.load_records(20)
241 self.assertEquals(len(self.db.get_history()), 20)
242 self.load_records(1)
243 # 0.2 * 20 = 4, 21 - 4 = 17
244 self.assertEquals(len(self.db.get_history()), 17)
245 self.load_records(3)
246 self.assertEquals(len(self.db.get_history()), 20)
247 self.load_records(1)
248 self.assertEquals(len(self.db.get_history()), 17)
249
250 for i in range(100):
251 self.load_records(1)
252 self.assertTrue(len(self.db.get_history()) >= 17)
253 self.assertTrue(len(self.db.get_history()) <= 20)
254
255 def test_cull_size(self):
256 self.db = self.create_db() # skip the load-records init from setUp
257 self.db.size_limit = 1000
258 self.db.cull_fraction = 0.2
259 self.load_records(100, buffer_size=10)
260 self.assertEquals(len(self.db.get_history()), 100)
261 self.load_records(1, buffer_size=0)
262 self.assertEquals(len(self.db.get_history()), 101)
263 self.load_records(1, buffer_size=1)
264 # 0.2 * 100 = 20, 101 - 20 = 81
265 self.assertEquals(len(self.db.get_history()), 81)
266
267 def test_cull_size_drop(self):
268 """dropping records updates tracked buffer size"""
269 self.db = self.create_db() # skip the load-records init from setUp
270 self.db.size_limit = 1000
271 self.db.cull_fraction = 0.2
272 self.load_records(100, buffer_size=10)
273 self.assertEquals(len(self.db.get_history()), 100)
274 self.db.drop_record(self.db.get_history()[-1])
275 self.assertEquals(len(self.db.get_history()), 99)
276 self.load_records(1, buffer_size=5)
277 self.assertEquals(len(self.db.get_history()), 100)
278 self.load_records(1, buffer_size=5)
279 self.assertEquals(len(self.db.get_history()), 101)
280 self.load_records(1, buffer_size=1)
281 self.assertEquals(len(self.db.get_history()), 81)
282
283 def test_cull_size_update(self):
284 """updating records updates tracked buffer size"""
285 self.db = self.create_db() # skip the load-records init from setUp
286 self.db.size_limit = 1000
287 self.db.cull_fraction = 0.2
288 self.load_records(100, buffer_size=10)
289 self.assertEquals(len(self.db.get_history()), 100)
290 msg_id = self.db.get_history()[-1]
291 self.db.update_record(msg_id, dict(result_buffers = [os.urandom(10)], buffers=[]))
292 self.assertEquals(len(self.db.get_history()), 100)
293 self.db.update_record(msg_id, dict(result_buffers = [os.urandom(11)], buffers=[]))
294 self.assertEquals(len(self.db.get_history()), 79)
295
296 class TestSQLiteBackend(TaskDBTest, TestCase):
232
297
233 @dec.skip_without('sqlite3')
298 @dec.skip_without('sqlite3')
234 def create_db(self):
299 def create_db(self):
@@ -16,6 +16,8 b' Authors:'
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from unittest import TestCase
20
19 from nose import SkipTest
21 from nose import SkipTest
20
22
21 from pymongo import Connection
23 from pymongo import Connection
@@ -28,7 +30,7 b' try:'
28 except Exception:
30 except Exception:
29 c=None
31 c=None
30
32
31 class TestMongoBackend(test_db.TestDictBackend):
33 class TestMongoBackend(test_db.TaskDBTest, TestCase):
32 """MongoDB backend tests"""
34 """MongoDB backend tests"""
33
35
34 def create_db(self):
36 def create_db(self):
General Comments 0
You need to be logged in to leave comments. Login now