##// END OF EJS Templates
fix typo in DictDB.size_limit...
MinRK -
Show More
@@ -1,310 +1,311 b''
1 """A Task logger that presents our DB interface,
1 """A Task logger that presents our DB interface,
2 but exists entirely in memory and implemented with dicts.
2 but exists entirely in memory and implemented with dicts.
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7
7
8
8
9 TaskRecords are dicts of the form:
9 TaskRecords are dicts of the form:
10 {
10 {
11 'msg_id' : str(uuid),
11 'msg_id' : str(uuid),
12 'client_uuid' : str(uuid),
12 'client_uuid' : str(uuid),
13 'engine_uuid' : str(uuid) or None,
13 'engine_uuid' : str(uuid) or None,
14 'header' : dict(header),
14 'header' : dict(header),
15 'content': dict(content),
15 'content': dict(content),
16 'buffers': list(buffers),
16 'buffers': list(buffers),
17 'submitted': datetime,
17 'submitted': datetime,
18 'started': datetime or None,
18 'started': datetime or None,
19 'completed': datetime or None,
19 'completed': datetime or None,
20 'resubmitted': datetime or None,
20 'resubmitted': datetime or None,
21 'result_header' : dict(header) or None,
21 'result_header' : dict(header) or None,
22 'result_content' : dict(content) or None,
22 'result_content' : dict(content) or None,
23 'result_buffers' : list(buffers) or None,
23 'result_buffers' : list(buffers) or None,
24 }
24 }
25 With this info, many of the special categories of tasks can be defined by query:
25 With this info, many of the special categories of tasks can be defined by query:
26
26
27 pending: completed is None
27 pending: completed is None
28 client's outstanding: client_uuid = uuid && completed is None
28 client's outstanding: client_uuid = uuid && completed is None
29 MIA: arrived is None (and completed is None)
29 MIA: arrived is None (and completed is None)
30 etc.
30 etc.
31
31
32 EngineRecords are dicts of the form:
32 EngineRecords are dicts of the form:
33 {
33 {
34 'eid' : int(id),
34 'eid' : int(id),
35 'uuid': str(uuid)
35 'uuid': str(uuid)
36 }
36 }
37 This may be extended, but is currently.
37 This may be extended, but is currently.
38
38
39 We support a subset of mongodb operators:
39 We support a subset of mongodb operators:
40 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
40 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
41 """
41 """
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # Copyright (C) 2010-2011 The IPython Development Team
43 # Copyright (C) 2010-2011 The IPython Development Team
44 #
44 #
45 # Distributed under the terms of the BSD License. The full license is in
45 # Distributed under the terms of the BSD License. The full license is in
46 # the file COPYING, distributed as part of this software.
46 # the file COPYING, distributed as part of this software.
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48
48
49 from copy import deepcopy as copy
49 from copy import deepcopy as copy
50 from datetime import datetime
50 from datetime import datetime
51
51
52 from IPython.config.configurable import LoggingConfigurable
52 from IPython.config.configurable import LoggingConfigurable
53
53
54 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
54 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
55
55
56 filters = {
56 filters = {
57 '$lt' : lambda a,b: a < b,
57 '$lt' : lambda a,b: a < b,
58 '$gt' : lambda a,b: b > a,
58 '$gt' : lambda a,b: b > a,
59 '$eq' : lambda a,b: a == b,
59 '$eq' : lambda a,b: a == b,
60 '$ne' : lambda a,b: a != b,
60 '$ne' : lambda a,b: a != b,
61 '$lte': lambda a,b: a <= b,
61 '$lte': lambda a,b: a <= b,
62 '$gte': lambda a,b: a >= b,
62 '$gte': lambda a,b: a >= b,
63 '$in' : lambda a,b: a in b,
63 '$in' : lambda a,b: a in b,
64 '$nin': lambda a,b: a not in b,
64 '$nin': lambda a,b: a not in b,
65 '$all': lambda a,b: all([ a in bb for bb in b ]),
65 '$all': lambda a,b: all([ a in bb for bb in b ]),
66 '$mod': lambda a,b: a%b[0] == b[1],
66 '$mod': lambda a,b: a%b[0] == b[1],
67 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
67 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
68 }
68 }
69
69
70
70
71 class CompositeFilter(object):
71 class CompositeFilter(object):
72 """Composite filter for matching multiple properties."""
72 """Composite filter for matching multiple properties."""
73
73
74 def __init__(self, dikt):
74 def __init__(self, dikt):
75 self.tests = []
75 self.tests = []
76 self.values = []
76 self.values = []
77 for key, value in dikt.iteritems():
77 for key, value in dikt.iteritems():
78 self.tests.append(filters[key])
78 self.tests.append(filters[key])
79 self.values.append(value)
79 self.values.append(value)
80
80
81 def __call__(self, value):
81 def __call__(self, value):
82 for test,check in zip(self.tests, self.values):
82 for test,check in zip(self.tests, self.values):
83 if not test(value, check):
83 if not test(value, check):
84 return False
84 return False
85 return True
85 return True
86
86
87 class BaseDB(LoggingConfigurable):
87 class BaseDB(LoggingConfigurable):
88 """Empty Parent class so traitlets work on DB."""
88 """Empty Parent class so traitlets work on DB."""
89 # base configurable traits:
89 # base configurable traits:
90 session = Unicode("")
90 session = Unicode("")
91
91
92 class DictDB(BaseDB):
92 class DictDB(BaseDB):
93 """Basic in-memory dict-based object for saving Task Records.
93 """Basic in-memory dict-based object for saving Task Records.
94
94
95 This is the first object to present the DB interface
95 This is the first object to present the DB interface
96 for logging tasks out of memory.
96 for logging tasks out of memory.
97
97
98 The interface is based on MongoDB, so adding a MongoDB
98 The interface is based on MongoDB, so adding a MongoDB
99 backend should be straightforward.
99 backend should be straightforward.
100 """
100 """
101
101
102 _records = Dict()
102 _records = Dict()
103 _culled_ids = set() # set of ids which have been culled
103 _culled_ids = set() # set of ids which have been culled
104 _buffer_bytes = Integer(0) # running total of the bytes in the DB
104 _buffer_bytes = Integer(0) # running total of the bytes in the DB
105
105
106 size_limit = Integer(1024*1024, config=True,
106 size_limit = Integer(1024**3, config=True,
107 help="""The maximum total size (in bytes) of the buffers stored in the db
107 help="""The maximum total size (in bytes) of the buffers stored in the db
108
108
109 When the db exceeds this size, the oldest records will be culled until
109 When the db exceeds this size, the oldest records will be culled until
110 the total size is under size_limit * (1-cull_fraction).
110 the total size is under size_limit * (1-cull_fraction).
111 default: 1 GB
111 """
112 """
112 )
113 )
113 record_limit = Integer(1024, config=True,
114 record_limit = Integer(1024, config=True,
114 help="""The maximum number of records in the db
115 help="""The maximum number of records in the db
115
116
116 When the history exceeds this size, the first record_limit * cull_fraction
117 When the history exceeds this size, the first record_limit * cull_fraction
117 records will be culled.
118 records will be culled.
118 """
119 """
119 )
120 )
120 cull_fraction = Float(0.1, config=True,
121 cull_fraction = Float(0.1, config=True,
121 help="""The fraction by which the db should culled when one of the limits is exceeded
122 help="""The fraction by which the db should culled when one of the limits is exceeded
122
123
123 In general, the db size will spend most of its time with a size in the range:
124 In general, the db size will spend most of its time with a size in the range:
124
125
125 [limit * (1-cull_fraction), limit]
126 [limit * (1-cull_fraction), limit]
126
127
127 for each of size_limit and record_limit.
128 for each of size_limit and record_limit.
128 """
129 """
129 )
130 )
130
131
131 def _match_one(self, rec, tests):
132 def _match_one(self, rec, tests):
132 """Check if a specific record matches tests."""
133 """Check if a specific record matches tests."""
133 for key,test in tests.iteritems():
134 for key,test in tests.iteritems():
134 if not test(rec.get(key, None)):
135 if not test(rec.get(key, None)):
135 return False
136 return False
136 return True
137 return True
137
138
138 def _match(self, check):
139 def _match(self, check):
139 """Find all the matches for a check dict."""
140 """Find all the matches for a check dict."""
140 matches = []
141 matches = []
141 tests = {}
142 tests = {}
142 for k,v in check.iteritems():
143 for k,v in check.iteritems():
143 if isinstance(v, dict):
144 if isinstance(v, dict):
144 tests[k] = CompositeFilter(v)
145 tests[k] = CompositeFilter(v)
145 else:
146 else:
146 tests[k] = lambda o: o==v
147 tests[k] = lambda o: o==v
147
148
148 for rec in self._records.itervalues():
149 for rec in self._records.itervalues():
149 if self._match_one(rec, tests):
150 if self._match_one(rec, tests):
150 matches.append(copy(rec))
151 matches.append(copy(rec))
151 return matches
152 return matches
152
153
153 def _extract_subdict(self, rec, keys):
154 def _extract_subdict(self, rec, keys):
154 """extract subdict of keys"""
155 """extract subdict of keys"""
155 d = {}
156 d = {}
156 d['msg_id'] = rec['msg_id']
157 d['msg_id'] = rec['msg_id']
157 for key in keys:
158 for key in keys:
158 d[key] = rec[key]
159 d[key] = rec[key]
159 return copy(d)
160 return copy(d)
160
161
161 # methods for monitoring size / culling history
162 # methods for monitoring size / culling history
162
163
163 def _add_bytes(self, rec):
164 def _add_bytes(self, rec):
164 for key in ('buffers', 'result_buffers'):
165 for key in ('buffers', 'result_buffers'):
165 for buf in rec.get(key) or []:
166 for buf in rec.get(key) or []:
166 self._buffer_bytes += len(buf)
167 self._buffer_bytes += len(buf)
167
168
168 self._maybe_cull()
169 self._maybe_cull()
169
170
170 def _drop_bytes(self, rec):
171 def _drop_bytes(self, rec):
171 for key in ('buffers', 'result_buffers'):
172 for key in ('buffers', 'result_buffers'):
172 for buf in rec.get(key) or []:
173 for buf in rec.get(key) or []:
173 self._buffer_bytes -= len(buf)
174 self._buffer_bytes -= len(buf)
174
175
175 def _cull_oldest(self, n=1):
176 def _cull_oldest(self, n=1):
176 """cull the oldest N records"""
177 """cull the oldest N records"""
177 for msg_id in self.get_history()[:n]:
178 for msg_id in self.get_history()[:n]:
178 self.log.debug("Culling record: %r", msg_id)
179 self.log.debug("Culling record: %r", msg_id)
179 self._culled_ids.add(msg_id)
180 self._culled_ids.add(msg_id)
180 self.drop_record(msg_id)
181 self.drop_record(msg_id)
181
182
182 def _maybe_cull(self):
183 def _maybe_cull(self):
183 # cull by count:
184 # cull by count:
184 if len(self._records) > self.record_limit:
185 if len(self._records) > self.record_limit:
185 to_cull = int(self.cull_fraction * self.record_limit)
186 to_cull = int(self.cull_fraction * self.record_limit)
186 self.log.info("%i records exceeds limit of %i, culling oldest %i",
187 self.log.info("%i records exceeds limit of %i, culling oldest %i",
187 len(self._records), self.record_limit, to_cull
188 len(self._records), self.record_limit, to_cull
188 )
189 )
189 self._cull_oldest(to_cull)
190 self._cull_oldest(to_cull)
190
191
191 # cull by size:
192 # cull by size:
192 if self._buffer_bytes > self.size_limit:
193 if self._buffer_bytes > self.size_limit:
193 limit = self.size_limit * (1 - self.cull_fraction)
194 limit = self.size_limit * (1 - self.cull_fraction)
194
195
195 before = self._buffer_bytes
196 before = self._buffer_bytes
196 before_count = len(self._records)
197 before_count = len(self._records)
197 culled = 0
198 culled = 0
198 while self._buffer_bytes > limit:
199 while self._buffer_bytes > limit:
199 self._cull_oldest(1)
200 self._cull_oldest(1)
200 culled += 1
201 culled += 1
201
202
202 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
203 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
203 before_count, before, self.size_limit, culled
204 before_count, before, self.size_limit, culled
204 )
205 )
205
206
206 # public API methods:
207 # public API methods:
207
208
208 def add_record(self, msg_id, rec):
209 def add_record(self, msg_id, rec):
209 """Add a new Task Record, by msg_id."""
210 """Add a new Task Record, by msg_id."""
210 if msg_id in self._records:
211 if msg_id in self._records:
211 raise KeyError("Already have msg_id %r"%(msg_id))
212 raise KeyError("Already have msg_id %r"%(msg_id))
212 self._records[msg_id] = rec
213 self._records[msg_id] = rec
213 self._add_bytes(rec)
214 self._add_bytes(rec)
214 self._maybe_cull()
215 self._maybe_cull()
215
216
216 def get_record(self, msg_id):
217 def get_record(self, msg_id):
217 """Get a specific Task Record, by msg_id."""
218 """Get a specific Task Record, by msg_id."""
218 if msg_id in self._culled_ids:
219 if msg_id in self._culled_ids:
219 raise KeyError("Record %r has been culled for size" % msg_id)
220 raise KeyError("Record %r has been culled for size" % msg_id)
220 if not msg_id in self._records:
221 if not msg_id in self._records:
221 raise KeyError("No such msg_id %r"%(msg_id))
222 raise KeyError("No such msg_id %r"%(msg_id))
222 return copy(self._records[msg_id])
223 return copy(self._records[msg_id])
223
224
224 def update_record(self, msg_id, rec):
225 def update_record(self, msg_id, rec):
225 """Update the data in an existing record."""
226 """Update the data in an existing record."""
226 if msg_id in self._culled_ids:
227 if msg_id in self._culled_ids:
227 raise KeyError("Record %r has been culled for size" % msg_id)
228 raise KeyError("Record %r has been culled for size" % msg_id)
228 _rec = self._records[msg_id]
229 _rec = self._records[msg_id]
229 self._drop_bytes(_rec)
230 self._drop_bytes(_rec)
230 _rec.update(rec)
231 _rec.update(rec)
231 self._add_bytes(_rec)
232 self._add_bytes(_rec)
232
233
233 def drop_matching_records(self, check):
234 def drop_matching_records(self, check):
234 """Remove a record from the DB."""
235 """Remove a record from the DB."""
235 matches = self._match(check)
236 matches = self._match(check)
236 for rec in matches:
237 for rec in matches:
237 self._drop_bytes(rec)
238 self._drop_bytes(rec)
238 del self._records[rec['msg_id']]
239 del self._records[rec['msg_id']]
239
240
240 def drop_record(self, msg_id):
241 def drop_record(self, msg_id):
241 """Remove a record from the DB."""
242 """Remove a record from the DB."""
242 rec = self._records[msg_id]
243 rec = self._records[msg_id]
243 self._drop_bytes(rec)
244 self._drop_bytes(rec)
244 del self._records[msg_id]
245 del self._records[msg_id]
245
246
246 def find_records(self, check, keys=None):
247 def find_records(self, check, keys=None):
247 """Find records matching a query dict, optionally extracting subset of keys.
248 """Find records matching a query dict, optionally extracting subset of keys.
248
249
249 Returns dict keyed by msg_id of matching records.
250 Returns dict keyed by msg_id of matching records.
250
251
251 Parameters
252 Parameters
252 ----------
253 ----------
253
254
254 check: dict
255 check: dict
255 mongodb-style query argument
256 mongodb-style query argument
256 keys: list of strs [optional]
257 keys: list of strs [optional]
257 if specified, the subset of keys to extract. msg_id will *always* be
258 if specified, the subset of keys to extract. msg_id will *always* be
258 included.
259 included.
259 """
260 """
260 matches = self._match(check)
261 matches = self._match(check)
261 if keys:
262 if keys:
262 return [ self._extract_subdict(rec, keys) for rec in matches ]
263 return [ self._extract_subdict(rec, keys) for rec in matches ]
263 else:
264 else:
264 return matches
265 return matches
265
266
266 def get_history(self):
267 def get_history(self):
267 """get all msg_ids, ordered by time submitted."""
268 """get all msg_ids, ordered by time submitted."""
268 msg_ids = self._records.keys()
269 msg_ids = self._records.keys()
269 # Remove any that do not have a submitted timestamp.
270 # Remove any that do not have a submitted timestamp.
270 # This is extremely unlikely to happen,
271 # This is extremely unlikely to happen,
271 # but it seems to come up in some tests on VMs.
272 # but it seems to come up in some tests on VMs.
272 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
273 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
273 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
274 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
274
275
275
276
276 NODATA = KeyError("NoDB backend doesn't store any data. "
277 NODATA = KeyError("NoDB backend doesn't store any data. "
277 "Start the Controller with a DB backend to enable resubmission / result persistence."
278 "Start the Controller with a DB backend to enable resubmission / result persistence."
278 )
279 )
279
280
280
281
281 class NoDB(BaseDB):
282 class NoDB(BaseDB):
282 """A blackhole db backend that actually stores no information.
283 """A blackhole db backend that actually stores no information.
283
284
284 Provides the full DB interface, but raises KeyErrors on any
285 Provides the full DB interface, but raises KeyErrors on any
285 method that tries to access the records. This can be used to
286 method that tries to access the records. This can be used to
286 minimize the memory footprint of the Hub when its record-keeping
287 minimize the memory footprint of the Hub when its record-keeping
287 functionality is not required.
288 functionality is not required.
288 """
289 """
289
290
290 def add_record(self, msg_id, record):
291 def add_record(self, msg_id, record):
291 pass
292 pass
292
293
293 def get_record(self, msg_id):
294 def get_record(self, msg_id):
294 raise NODATA
295 raise NODATA
295
296
296 def update_record(self, msg_id, record):
297 def update_record(self, msg_id, record):
297 pass
298 pass
298
299
299 def drop_matching_records(self, check):
300 def drop_matching_records(self, check):
300 pass
301 pass
301
302
302 def drop_record(self, msg_id):
303 def drop_record(self, msg_id):
303 pass
304 pass
304
305
305 def find_records(self, check, keys=None):
306 def find_records(self, check, keys=None):
306 raise NODATA
307 raise NODATA
307
308
308 def get_history(self):
309 def get_history(self):
309 raise NODATA
310 raise NODATA
310
311
General Comments 0
You need to be logged in to leave comments. Login now