##// END OF EJS Templates
check type of date fields in DictDB
MinRK -
Show More
@@ -1,317 +1,317 b''
1 1 """A Task logger that presents our DB interface,
2 2 but exists entirely in memory and implemented with dicts.
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7
8 8
9 9 TaskRecords are dicts of the form::
10 10
11 11 {
12 12 'msg_id' : str(uuid),
13 13 'client_uuid' : str(uuid),
14 14 'engine_uuid' : str(uuid) or None,
15 15 'header' : dict(header),
16 16 'content': dict(content),
17 17 'buffers': list(buffers),
18 'submitted': datetime,
18 'submitted': datetime or None,
19 19 'started': datetime or None,
20 20 'completed': datetime or None,
21 'resubmitted': datetime or None,
21 'received': datetime or None,
22 'resubmitted': str(uuid) or None,
22 23 'result_header' : dict(header) or None,
23 24 'result_content' : dict(content) or None,
24 25 'result_buffers' : list(buffers) or None,
25 26 }
26 27
27 28 With this info, many of the special categories of tasks can be defined by query,
28 29 e.g.:
29 30
30 31 * pending: completed is None
31 32 * client's outstanding: client_uuid = uuid && completed is None
32 33 * MIA: arrived is None (and completed is None)
33 34
34 EngineRecords are dicts of the form::
35
36 {
37 'eid' : int(id),
38 'uuid': str(uuid)
39 }
40
41 This may be extended, but is currently.
42
43 We support a subset of mongodb operators::
35 DictDB supports a subset of mongodb operators::
44 36
45 37 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
46 38 """
47 39 #-----------------------------------------------------------------------------
48 40 # Copyright (C) 2010-2011 The IPython Development Team
49 41 #
50 42 # Distributed under the terms of the BSD License. The full license is in
51 43 # the file COPYING, distributed as part of this software.
52 44 #-----------------------------------------------------------------------------
53 45
54 46 from copy import deepcopy as copy
55 47 from datetime import datetime
56 48
57 49 from IPython.config.configurable import LoggingConfigurable
58 50
59 51 from IPython.utils.py3compat import iteritems, itervalues
60 52 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
61 53
62 54 filters = {
63 55 '$lt' : lambda a,b: a < b,
64 56 '$gt' : lambda a,b: b > a,
65 57 '$eq' : lambda a,b: a == b,
66 58 '$ne' : lambda a,b: a != b,
67 59 '$lte': lambda a,b: a <= b,
68 60 '$gte': lambda a,b: a >= b,
69 61 '$in' : lambda a,b: a in b,
70 62 '$nin': lambda a,b: a not in b,
71 63 '$all': lambda a,b: all([ a in bb for bb in b ]),
72 64 '$mod': lambda a,b: a%b[0] == b[1],
73 65 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
74 66 }
75 67
76 68
77 69 class CompositeFilter(object):
78 70 """Composite filter for matching multiple properties."""
79 71
80 72 def __init__(self, dikt):
81 73 self.tests = []
82 74 self.values = []
83 75 for key, value in iteritems(dikt):
84 76 self.tests.append(filters[key])
85 77 self.values.append(value)
86 78
87 79 def __call__(self, value):
88 80 for test,check in zip(self.tests, self.values):
89 81 if not test(value, check):
90 82 return False
91 83 return True
92 84
93 85 class BaseDB(LoggingConfigurable):
94 86 """Empty Parent class so traitlets work on DB."""
95 87 # base configurable traits:
96 88 session = Unicode("")
97 89
98 90 class DictDB(BaseDB):
99 91 """Basic in-memory dict-based object for saving Task Records.
100 92
101 93 This is the first object to present the DB interface
102 94 for logging tasks out of memory.
103 95
104 96 The interface is based on MongoDB, so adding a MongoDB
105 97 backend should be straightforward.
106 98 """
107 99
108 100 _records = Dict()
109 101 _culled_ids = set() # set of ids which have been culled
110 102 _buffer_bytes = Integer(0) # running total of the bytes in the DB
111 103
112 104 size_limit = Integer(1024**3, config=True,
113 105 help="""The maximum total size (in bytes) of the buffers stored in the db
114 106
115 107 When the db exceeds this size, the oldest records will be culled until
116 108 the total size is under size_limit * (1-cull_fraction).
117 109 default: 1 GB
118 110 """
119 111 )
120 112 record_limit = Integer(1024, config=True,
121 113 help="""The maximum number of records in the db
122 114
123 115 When the history exceeds this size, the first record_limit * cull_fraction
124 116 records will be culled.
125 117 """
126 118 )
127 119 cull_fraction = Float(0.1, config=True,
128 120 help="""The fraction by which the db should culled when one of the limits is exceeded
129 121
130 122 In general, the db size will spend most of its time with a size in the range:
131 123
132 124 [limit * (1-cull_fraction), limit]
133 125
134 126 for each of size_limit and record_limit.
135 127 """
136 128 )
137 129
138 130 def _match_one(self, rec, tests):
139 131 """Check if a specific record matches tests."""
140 132 for key,test in iteritems(tests):
141 133 if not test(rec.get(key, None)):
142 134 return False
143 135 return True
144 136
145 137 def _match(self, check):
146 138 """Find all the matches for a check dict."""
147 139 matches = []
148 140 tests = {}
149 141 for k,v in iteritems(check):
150 142 if isinstance(v, dict):
151 143 tests[k] = CompositeFilter(v)
152 144 else:
153 145 tests[k] = lambda o: o==v
154 146
155 147 for rec in itervalues(self._records):
156 148 if self._match_one(rec, tests):
157 149 matches.append(copy(rec))
158 150 return matches
159 151
160 152 def _extract_subdict(self, rec, keys):
161 153 """extract subdict of keys"""
162 154 d = {}
163 155 d['msg_id'] = rec['msg_id']
164 156 for key in keys:
165 157 d[key] = rec[key]
166 158 return copy(d)
167 159
168 160 # methods for monitoring size / culling history
169 161
170 162 def _add_bytes(self, rec):
171 163 for key in ('buffers', 'result_buffers'):
172 164 for buf in rec.get(key) or []:
173 165 self._buffer_bytes += len(buf)
174 166
175 167 self._maybe_cull()
176 168
177 169 def _drop_bytes(self, rec):
178 170 for key in ('buffers', 'result_buffers'):
179 171 for buf in rec.get(key) or []:
180 172 self._buffer_bytes -= len(buf)
181 173
182 174 def _cull_oldest(self, n=1):
183 175 """cull the oldest N records"""
184 176 for msg_id in self.get_history()[:n]:
185 177 self.log.debug("Culling record: %r", msg_id)
186 178 self._culled_ids.add(msg_id)
187 179 self.drop_record(msg_id)
188 180
189 181 def _maybe_cull(self):
190 182 # cull by count:
191 183 if len(self._records) > self.record_limit:
192 184 to_cull = int(self.cull_fraction * self.record_limit)
193 185 self.log.info("%i records exceeds limit of %i, culling oldest %i",
194 186 len(self._records), self.record_limit, to_cull
195 187 )
196 188 self._cull_oldest(to_cull)
197 189
198 190 # cull by size:
199 191 if self._buffer_bytes > self.size_limit:
200 192 limit = self.size_limit * (1 - self.cull_fraction)
201 193
202 194 before = self._buffer_bytes
203 195 before_count = len(self._records)
204 196 culled = 0
205 197 while self._buffer_bytes > limit:
206 198 self._cull_oldest(1)
207 199 culled += 1
208 200
209 201 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
210 202 before_count, before, self.size_limit, culled
211 203 )
212 204
205 def _check_dates(self, rec):
206 for key in ('submitted', 'started', 'completed'):
207 value = rec.get(key, None)
208 if value is not None and not isinstance(value, datetime):
209 raise ValueError("%s must be None or datetime, not %r" % (key, value))
210
213 211 # public API methods:
214 212
215 213 def add_record(self, msg_id, rec):
216 214 """Add a new Task Record, by msg_id."""
217 215 if msg_id in self._records:
218 216 raise KeyError("Already have msg_id %r"%(msg_id))
217 self._check_dates(rec)
219 218 self._records[msg_id] = rec
220 219 self._add_bytes(rec)
221 220 self._maybe_cull()
222 221
223 222 def get_record(self, msg_id):
224 223 """Get a specific Task Record, by msg_id."""
225 224 if msg_id in self._culled_ids:
226 225 raise KeyError("Record %r has been culled for size" % msg_id)
227 226 if not msg_id in self._records:
228 227 raise KeyError("No such msg_id %r"%(msg_id))
229 228 return copy(self._records[msg_id])
230 229
231 230 def update_record(self, msg_id, rec):
232 231 """Update the data in an existing record."""
233 232 if msg_id in self._culled_ids:
234 233 raise KeyError("Record %r has been culled for size" % msg_id)
234 self._check_dates(rec)
235 235 _rec = self._records[msg_id]
236 236 self._drop_bytes(_rec)
237 237 _rec.update(rec)
238 238 self._add_bytes(_rec)
239 239
240 240 def drop_matching_records(self, check):
241 241 """Remove a record from the DB."""
242 242 matches = self._match(check)
243 243 for rec in matches:
244 244 self._drop_bytes(rec)
245 245 del self._records[rec['msg_id']]
246 246
247 247 def drop_record(self, msg_id):
248 248 """Remove a record from the DB."""
249 249 rec = self._records[msg_id]
250 250 self._drop_bytes(rec)
251 251 del self._records[msg_id]
252 252
253 253 def find_records(self, check, keys=None):
254 254 """Find records matching a query dict, optionally extracting subset of keys.
255 255
256 256 Returns dict keyed by msg_id of matching records.
257 257
258 258 Parameters
259 259 ----------
260 260
261 261 check: dict
262 262 mongodb-style query argument
263 263 keys: list of strs [optional]
264 264 if specified, the subset of keys to extract. msg_id will *always* be
265 265 included.
266 266 """
267 267 matches = self._match(check)
268 268 if keys:
269 269 return [ self._extract_subdict(rec, keys) for rec in matches ]
270 270 else:
271 271 return matches
272 272
273 273 def get_history(self):
274 274 """get all msg_ids, ordered by time submitted."""
275 275 msg_ids = self._records.keys()
276 276 # Remove any that do not have a submitted timestamp.
277 277 # This is extremely unlikely to happen,
278 278 # but it seems to come up in some tests on VMs.
279 279 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
280 280 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
281 281
282 282
283 283 NODATA = KeyError("NoDB backend doesn't store any data. "
284 284 "Start the Controller with a DB backend to enable resubmission / result persistence."
285 285 )
286 286
287 287
288 288 class NoDB(BaseDB):
289 289 """A blackhole db backend that actually stores no information.
290 290
291 291 Provides the full DB interface, but raises KeyErrors on any
292 292 method that tries to access the records. This can be used to
293 293 minimize the memory footprint of the Hub when its record-keeping
294 294 functionality is not required.
295 295 """
296 296
297 297 def add_record(self, msg_id, record):
298 298 pass
299 299
300 300 def get_record(self, msg_id):
301 301 raise NODATA
302 302
303 303 def update_record(self, msg_id, record):
304 304 pass
305 305
306 306 def drop_matching_records(self, check):
307 307 pass
308 308
309 309 def drop_record(self, msg_id):
310 310 pass
311 311
312 312 def find_records(self, check, keys=None):
313 313 raise NODATA
314 314
315 315 def get_history(self):
316 316 raise NODATA
317 317
General Comments 0
You need to be logged in to leave comments. Login now