##// END OF EJS Templates
fix typo in DictDB.size_limit...
MinRK -
Show More
@@ -1,310 +1,311 b''
1 1 """A Task logger that presents our DB interface,
2 2 but exists entirely in memory and implemented with dicts.
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7
8 8
9 9 TaskRecords are dicts of the form:
10 10 {
11 11 'msg_id' : str(uuid),
12 12 'client_uuid' : str(uuid),
13 13 'engine_uuid' : str(uuid) or None,
14 14 'header' : dict(header),
15 15 'content': dict(content),
16 16 'buffers': list(buffers),
17 17 'submitted': datetime,
18 18 'started': datetime or None,
19 19 'completed': datetime or None,
20 20 'resubmitted': datetime or None,
21 21 'result_header' : dict(header) or None,
22 22 'result_content' : dict(content) or None,
23 23 'result_buffers' : list(buffers) or None,
24 24 }
25 25 With this info, many of the special categories of tasks can be defined by query:
26 26
27 27 pending: completed is None
28 28 client's outstanding: client_uuid = uuid && completed is None
29 29 MIA: arrived is None (and completed is None)
30 30 etc.
31 31
32 32 EngineRecords are dicts of the form:
33 33 {
34 34 'eid' : int(id),
35 35 'uuid': str(uuid)
36 36 }
37 37 This may be extended, but is currently.
38 38
39 39 We support a subset of mongodb operators:
40 40 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
41 41 """
42 42 #-----------------------------------------------------------------------------
43 43 # Copyright (C) 2010-2011 The IPython Development Team
44 44 #
45 45 # Distributed under the terms of the BSD License. The full license is in
46 46 # the file COPYING, distributed as part of this software.
47 47 #-----------------------------------------------------------------------------
48 48
49 49 from copy import deepcopy as copy
50 50 from datetime import datetime
51 51
52 52 from IPython.config.configurable import LoggingConfigurable
53 53
54 54 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
55 55
56 56 filters = {
57 57 '$lt' : lambda a,b: a < b,
58 58 '$gt' : lambda a,b: b > a,
59 59 '$eq' : lambda a,b: a == b,
60 60 '$ne' : lambda a,b: a != b,
61 61 '$lte': lambda a,b: a <= b,
62 62 '$gte': lambda a,b: a >= b,
63 63 '$in' : lambda a,b: a in b,
64 64 '$nin': lambda a,b: a not in b,
65 65 '$all': lambda a,b: all([ a in bb for bb in b ]),
66 66 '$mod': lambda a,b: a%b[0] == b[1],
67 67 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
68 68 }
69 69
70 70
71 71 class CompositeFilter(object):
72 72 """Composite filter for matching multiple properties."""
73 73
74 74 def __init__(self, dikt):
75 75 self.tests = []
76 76 self.values = []
77 77 for key, value in dikt.iteritems():
78 78 self.tests.append(filters[key])
79 79 self.values.append(value)
80 80
81 81 def __call__(self, value):
82 82 for test,check in zip(self.tests, self.values):
83 83 if not test(value, check):
84 84 return False
85 85 return True
86 86
87 87 class BaseDB(LoggingConfigurable):
88 88 """Empty Parent class so traitlets work on DB."""
89 89 # base configurable traits:
90 90 session = Unicode("")
91 91
92 92 class DictDB(BaseDB):
93 93 """Basic in-memory dict-based object for saving Task Records.
94 94
95 95 This is the first object to present the DB interface
96 96 for logging tasks out of memory.
97 97
98 98 The interface is based on MongoDB, so adding a MongoDB
99 99 backend should be straightforward.
100 100 """
101 101
102 102 _records = Dict()
103 103 _culled_ids = set() # set of ids which have been culled
104 104 _buffer_bytes = Integer(0) # running total of the bytes in the DB
105 105
106 size_limit = Integer(1024*1024, config=True,
106 size_limit = Integer(1024**3, config=True,
107 107 help="""The maximum total size (in bytes) of the buffers stored in the db
108 108
109 109 When the db exceeds this size, the oldest records will be culled until
110 110 the total size is under size_limit * (1-cull_fraction).
111 default: 1 GB
111 112 """
112 113 )
113 114 record_limit = Integer(1024, config=True,
114 115 help="""The maximum number of records in the db
115 116
116 117 When the history exceeds this size, the first record_limit * cull_fraction
117 118 records will be culled.
118 119 """
119 120 )
120 121 cull_fraction = Float(0.1, config=True,
121 122 help="""The fraction by which the db should culled when one of the limits is exceeded
122 123
123 124 In general, the db size will spend most of its time with a size in the range:
124 125
125 126 [limit * (1-cull_fraction), limit]
126 127
127 128 for each of size_limit and record_limit.
128 129 """
129 130 )
130 131
131 132 def _match_one(self, rec, tests):
132 133 """Check if a specific record matches tests."""
133 134 for key,test in tests.iteritems():
134 135 if not test(rec.get(key, None)):
135 136 return False
136 137 return True
137 138
138 139 def _match(self, check):
139 140 """Find all the matches for a check dict."""
140 141 matches = []
141 142 tests = {}
142 143 for k,v in check.iteritems():
143 144 if isinstance(v, dict):
144 145 tests[k] = CompositeFilter(v)
145 146 else:
146 147 tests[k] = lambda o: o==v
147 148
148 149 for rec in self._records.itervalues():
149 150 if self._match_one(rec, tests):
150 151 matches.append(copy(rec))
151 152 return matches
152 153
153 154 def _extract_subdict(self, rec, keys):
154 155 """extract subdict of keys"""
155 156 d = {}
156 157 d['msg_id'] = rec['msg_id']
157 158 for key in keys:
158 159 d[key] = rec[key]
159 160 return copy(d)
160 161
161 162 # methods for monitoring size / culling history
162 163
163 164 def _add_bytes(self, rec):
164 165 for key in ('buffers', 'result_buffers'):
165 166 for buf in rec.get(key) or []:
166 167 self._buffer_bytes += len(buf)
167 168
168 169 self._maybe_cull()
169 170
170 171 def _drop_bytes(self, rec):
171 172 for key in ('buffers', 'result_buffers'):
172 173 for buf in rec.get(key) or []:
173 174 self._buffer_bytes -= len(buf)
174 175
175 176 def _cull_oldest(self, n=1):
176 177 """cull the oldest N records"""
177 178 for msg_id in self.get_history()[:n]:
178 179 self.log.debug("Culling record: %r", msg_id)
179 180 self._culled_ids.add(msg_id)
180 181 self.drop_record(msg_id)
181 182
182 183 def _maybe_cull(self):
183 184 # cull by count:
184 185 if len(self._records) > self.record_limit:
185 186 to_cull = int(self.cull_fraction * self.record_limit)
186 187 self.log.info("%i records exceeds limit of %i, culling oldest %i",
187 188 len(self._records), self.record_limit, to_cull
188 189 )
189 190 self._cull_oldest(to_cull)
190 191
191 192 # cull by size:
192 193 if self._buffer_bytes > self.size_limit:
193 194 limit = self.size_limit * (1 - self.cull_fraction)
194 195
195 196 before = self._buffer_bytes
196 197 before_count = len(self._records)
197 198 culled = 0
198 199 while self._buffer_bytes > limit:
199 200 self._cull_oldest(1)
200 201 culled += 1
201 202
202 203 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
203 204 before_count, before, self.size_limit, culled
204 205 )
205 206
206 207 # public API methods:
207 208
208 209 def add_record(self, msg_id, rec):
209 210 """Add a new Task Record, by msg_id."""
210 211 if msg_id in self._records:
211 212 raise KeyError("Already have msg_id %r"%(msg_id))
212 213 self._records[msg_id] = rec
213 214 self._add_bytes(rec)
214 215 self._maybe_cull()
215 216
216 217 def get_record(self, msg_id):
217 218 """Get a specific Task Record, by msg_id."""
218 219 if msg_id in self._culled_ids:
219 220 raise KeyError("Record %r has been culled for size" % msg_id)
220 221 if not msg_id in self._records:
221 222 raise KeyError("No such msg_id %r"%(msg_id))
222 223 return copy(self._records[msg_id])
223 224
224 225 def update_record(self, msg_id, rec):
225 226 """Update the data in an existing record."""
226 227 if msg_id in self._culled_ids:
227 228 raise KeyError("Record %r has been culled for size" % msg_id)
228 229 _rec = self._records[msg_id]
229 230 self._drop_bytes(_rec)
230 231 _rec.update(rec)
231 232 self._add_bytes(_rec)
232 233
233 234 def drop_matching_records(self, check):
234 235 """Remove a record from the DB."""
235 236 matches = self._match(check)
236 237 for rec in matches:
237 238 self._drop_bytes(rec)
238 239 del self._records[rec['msg_id']]
239 240
240 241 def drop_record(self, msg_id):
241 242 """Remove a record from the DB."""
242 243 rec = self._records[msg_id]
243 244 self._drop_bytes(rec)
244 245 del self._records[msg_id]
245 246
246 247 def find_records(self, check, keys=None):
247 248 """Find records matching a query dict, optionally extracting subset of keys.
248 249
249 250 Returns dict keyed by msg_id of matching records.
250 251
251 252 Parameters
252 253 ----------
253 254
254 255 check: dict
255 256 mongodb-style query argument
256 257 keys: list of strs [optional]
257 258 if specified, the subset of keys to extract. msg_id will *always* be
258 259 included.
259 260 """
260 261 matches = self._match(check)
261 262 if keys:
262 263 return [ self._extract_subdict(rec, keys) for rec in matches ]
263 264 else:
264 265 return matches
265 266
266 267 def get_history(self):
267 268 """get all msg_ids, ordered by time submitted."""
268 269 msg_ids = self._records.keys()
269 270 # Remove any that do not have a submitted timestamp.
270 271 # This is extremely unlikely to happen,
271 272 # but it seems to come up in some tests on VMs.
272 273 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
273 274 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
274 275
275 276
276 277 NODATA = KeyError("NoDB backend doesn't store any data. "
277 278 "Start the Controller with a DB backend to enable resubmission / result persistence."
278 279 )
279 280
280 281
281 282 class NoDB(BaseDB):
282 283 """A blackhole db backend that actually stores no information.
283 284
284 285 Provides the full DB interface, but raises KeyErrors on any
285 286 method that tries to access the records. This can be used to
286 287 minimize the memory footprint of the Hub when its record-keeping
287 288 functionality is not required.
288 289 """
289 290
290 291 def add_record(self, msg_id, record):
291 292 pass
292 293
293 294 def get_record(self, msg_id):
294 295 raise NODATA
295 296
296 297 def update_record(self, msg_id, record):
297 298 pass
298 299
299 300 def drop_matching_records(self, check):
300 301 pass
301 302
302 303 def drop_record(self, msg_id):
303 304 pass
304 305
305 306 def find_records(self, check, keys=None):
306 307 raise NODATA
307 308
308 309 def get_history(self):
309 310 raise NODATA
310 311
General Comments 0
You need to be logged in to leave comments. Login now