##// END OF EJS Templates
Merge pull request #1930 from minrk/limitdictdb...
Min RK -
r8018:e0803aa9 merge
parent child Browse files
Show More
@@ -51,7 +51,7 b' from datetime import datetime'
51 51
52 52 from IPython.config.configurable import LoggingConfigurable
53 53
54 from IPython.utils.traitlets import Dict, Unicode, Instance
54 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
55 55
56 56 filters = {
57 57 '$lt' : lambda a,b: a < b,
@@ -100,6 +100,33 b' class DictDB(BaseDB):'
100 100 """
101 101
102 102 _records = Dict()
103 _culled_ids = set() # set of ids which have been culled
104 _buffer_bytes = Integer(0) # running total of the bytes in the DB
105
106 size_limit = Integer(1024*1024, config=True,
107 help="""The maximum total size (in bytes) of the buffers stored in the db
108
109 When the db exceeds this size, the oldest records will be culled until
110 the total size is under size_limit * (1-cull_fraction).
111 """
112 )
113 record_limit = Integer(1024, config=True,
114 help="""The maximum number of records in the db
115
116 When the history exceeds this size, the first record_limit * cull_fraction
117 records will be culled.
118 """
119 )
120 cull_fraction = Float(0.1, config=True,
121 help="""The fraction by which the db should culled when one of the limits is exceeded
122
123 In general, the db size will spend most of its time with a size in the range:
124
125 [limit * (1-cull_fraction), limit]
126
127 for each of size_limit and record_limit.
128 """
129 )
103 130
104 131 def _match_one(self, rec, tests):
105 132 """Check if a specific record matches tests."""
@@ -130,34 +157,92 b' class DictDB(BaseDB):'
130 157 for key in keys:
131 158 d[key] = rec[key]
132 159 return copy(d)
160
161 # methods for monitoring size / culling history
162
163 def _add_bytes(self, rec):
164 for key in ('buffers', 'result_buffers'):
165 for buf in rec.get(key) or []:
166 self._buffer_bytes += len(buf)
167
168 self._maybe_cull()
169
170 def _drop_bytes(self, rec):
171 for key in ('buffers', 'result_buffers'):
172 for buf in rec.get(key) or []:
173 self._buffer_bytes -= len(buf)
174
175 def _cull_oldest(self, n=1):
176 """cull the oldest N records"""
177 for msg_id in self.get_history()[:n]:
178 self.log.debug("Culling record: %r", msg_id)
179 self._culled_ids.add(msg_id)
180 self.drop_record(msg_id)
181
182 def _maybe_cull(self):
183 # cull by count:
184 if len(self._records) > self.record_limit:
185 to_cull = int(self.cull_fraction * self.record_limit)
186 self.log.info("%i records exceeds limit of %i, culling oldest %i",
187 len(self._records), self.record_limit, to_cull
188 )
189 self._cull_oldest(to_cull)
190
191 # cull by size:
192 if self._buffer_bytes > self.size_limit:
193 limit = self.size_limit * (1 - self.cull_fraction)
194
195 before = self._buffer_bytes
196 before_count = len(self._records)
197 culled = 0
198 while self._buffer_bytes > limit:
199 self._cull_oldest(1)
200 culled += 1
201
202 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
203 before_count, before, self.size_limit, culled
204 )
205
206 # public API methods:
133 207
134 208 def add_record(self, msg_id, rec):
135 209 """Add a new Task Record, by msg_id."""
136 210 if msg_id in self._records:
137 211 raise KeyError("Already have msg_id %r"%(msg_id))
138 212 self._records[msg_id] = rec
213 self._add_bytes(rec)
214 self._maybe_cull()
139 215
140 216 def get_record(self, msg_id):
141 217 """Get a specific Task Record, by msg_id."""
218 if msg_id in self._culled_ids:
219 raise KeyError("Record %r has been culled for size" % msg_id)
142 220 if not msg_id in self._records:
143 221 raise KeyError("No such msg_id %r"%(msg_id))
144 222 return copy(self._records[msg_id])
145 223
146 224 def update_record(self, msg_id, rec):
147 225 """Update the data in an existing record."""
148 self._records[msg_id].update(rec)
226 if msg_id in self._culled_ids:
227 raise KeyError("Record %r has been culled for size" % msg_id)
228 _rec = self._records[msg_id]
229 self._drop_bytes(_rec)
230 _rec.update(rec)
231 self._add_bytes(_rec)
149 232
150 233 def drop_matching_records(self, check):
151 234 """Remove a record from the DB."""
152 235 matches = self._match(check)
153 for m in matches:
154 del self._records[m['msg_id']]
236 for rec in matches:
237 self._drop_bytes(rec)
238 del self._records[rec['msg_id']]
155 239
156 240 def drop_record(self, msg_id):
157 241 """Remove a record from the DB."""
242 rec = self._records[msg_id]
243 self._drop_bytes(rec)
158 244 del self._records[msg_id]
159 245
160
161 246 def find_records(self, check, keys=None):
162 247 """Find records matching a query dict, optionally extracting subset of keys.
163 248
@@ -178,17 +263,18 b' class DictDB(BaseDB):'
178 263 else:
179 264 return matches
180 265
181
182 266 def get_history(self):
183 267 """get all msg_ids, ordered by time submitted."""
184 268 msg_ids = self._records.keys()
185 269 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
186 270
271
187 272 NODATA = KeyError("NoDB backend doesn't store any data. "
188 273 "Start the Controller with a DB backend to enable resubmission / result persistence."
189 274 )
190 275
191 class NoDB(DictDB):
276
277 class NoDB(BaseDB):
192 278 """A blackhole db backend that actually stores no information.
193 279
194 280 Provides the full DB interface, but raises KeyErrors on any
@@ -45,23 +45,23 b' def setup():'
45 45 temp_db = tempfile.NamedTemporaryFile(suffix='.db').name
46 46
47 47
48 class TestDictBackend(TestCase):
48 class TaskDBTest:
49 49 def setUp(self):
50 50 self.session = Session()
51 51 self.db = self.create_db()
52 52 self.load_records(16)
53 53
54 54 def create_db(self):
55 return DictDB()
55 raise NotImplementedError
56 56
57 def load_records(self, n=1):
57 def load_records(self, n=1, buffer_size=100):
58 58 """load n records for testing"""
59 59 #sleep 1/10 s, to ensure timestamp is different to previous calls
60 60 time.sleep(0.1)
61 61 msg_ids = []
62 62 for i in range(n):
63 63 msg = self.session.msg('apply_request', content=dict(a=5))
64 msg['buffers'] = []
64 msg['buffers'] = [os.urandom(buffer_size)]
65 65 rec = init_record(msg)
66 66 msg_id = msg['header']['msg_id']
67 67 msg_ids.append(msg_id)
@@ -228,7 +228,72 b' class TestDictBackend(TestCase):'
228 228 self.assertEqual(rec2['header']['msg_id'], msg_id)
229 229
230 230
231 class TestSQLiteBackend(TestDictBackend):
231 class TestDictBackend(TaskDBTest, TestCase):
232
233 def create_db(self):
234 return DictDB()
235
236 def test_cull_count(self):
237 self.db = self.create_db() # skip the load-records init from setUp
238 self.db.record_limit = 20
239 self.db.cull_fraction = 0.2
240 self.load_records(20)
241 self.assertEquals(len(self.db.get_history()), 20)
242 self.load_records(1)
243 # 0.2 * 20 = 4, 21 - 4 = 17
244 self.assertEquals(len(self.db.get_history()), 17)
245 self.load_records(3)
246 self.assertEquals(len(self.db.get_history()), 20)
247 self.load_records(1)
248 self.assertEquals(len(self.db.get_history()), 17)
249
250 for i in range(100):
251 self.load_records(1)
252 self.assertTrue(len(self.db.get_history()) >= 17)
253 self.assertTrue(len(self.db.get_history()) <= 20)
254
255 def test_cull_size(self):
256 self.db = self.create_db() # skip the load-records init from setUp
257 self.db.size_limit = 1000
258 self.db.cull_fraction = 0.2
259 self.load_records(100, buffer_size=10)
260 self.assertEquals(len(self.db.get_history()), 100)
261 self.load_records(1, buffer_size=0)
262 self.assertEquals(len(self.db.get_history()), 101)
263 self.load_records(1, buffer_size=1)
264 # 0.2 * 100 = 20, 101 - 20 = 81
265 self.assertEquals(len(self.db.get_history()), 81)
266
267 def test_cull_size_drop(self):
268 """dropping records updates tracked buffer size"""
269 self.db = self.create_db() # skip the load-records init from setUp
270 self.db.size_limit = 1000
271 self.db.cull_fraction = 0.2
272 self.load_records(100, buffer_size=10)
273 self.assertEquals(len(self.db.get_history()), 100)
274 self.db.drop_record(self.db.get_history()[-1])
275 self.assertEquals(len(self.db.get_history()), 99)
276 self.load_records(1, buffer_size=5)
277 self.assertEquals(len(self.db.get_history()), 100)
278 self.load_records(1, buffer_size=5)
279 self.assertEquals(len(self.db.get_history()), 101)
280 self.load_records(1, buffer_size=1)
281 self.assertEquals(len(self.db.get_history()), 81)
282
283 def test_cull_size_update(self):
284 """updating records updates tracked buffer size"""
285 self.db = self.create_db() # skip the load-records init from setUp
286 self.db.size_limit = 1000
287 self.db.cull_fraction = 0.2
288 self.load_records(100, buffer_size=10)
289 self.assertEquals(len(self.db.get_history()), 100)
290 msg_id = self.db.get_history()[-1]
291 self.db.update_record(msg_id, dict(result_buffers = [os.urandom(10)], buffers=[]))
292 self.assertEquals(len(self.db.get_history()), 100)
293 self.db.update_record(msg_id, dict(result_buffers = [os.urandom(11)], buffers=[]))
294 self.assertEquals(len(self.db.get_history()), 79)
295
296 class TestSQLiteBackend(TaskDBTest, TestCase):
232 297
233 298 @dec.skip_without('sqlite3')
234 299 def create_db(self):
@@ -16,6 +16,8 b' Authors:'
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 from unittest import TestCase
20
19 21 from nose import SkipTest
20 22
21 23 from pymongo import Connection
@@ -28,7 +30,7 b' try:'
28 30 except Exception:
29 31 c=None
30 32
31 class TestMongoBackend(test_db.TestDictBackend):
33 class TestMongoBackend(test_db.TaskDBTest, TestCase):
32 34 """MongoDB backend tests"""
33 35
34 36 def create_db(self):
General Comments 0
You need to be logged in to leave comments. Login now