##// END OF EJS Templates
check type of date fields in DictDB
MinRK -
Show More
@@ -1,317 +1,317 b''
1 """A Task logger that presents our DB interface,
1 """A Task logger that presents our DB interface,
2 but exists entirely in memory and implemented with dicts.
2 but exists entirely in memory and implemented with dicts.
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7
7
8
8
9 TaskRecords are dicts of the form::
9 TaskRecords are dicts of the form::
10
10
11 {
11 {
12 'msg_id' : str(uuid),
12 'msg_id' : str(uuid),
13 'client_uuid' : str(uuid),
13 'client_uuid' : str(uuid),
14 'engine_uuid' : str(uuid) or None,
14 'engine_uuid' : str(uuid) or None,
15 'header' : dict(header),
15 'header' : dict(header),
16 'content': dict(content),
16 'content': dict(content),
17 'buffers': list(buffers),
17 'buffers': list(buffers),
18 'submitted': datetime,
18 'submitted': datetime or None,
19 'started': datetime or None,
19 'started': datetime or None,
20 'completed': datetime or None,
20 'completed': datetime or None,
21 'resubmitted': datetime or None,
21 'received': datetime or None,
22 'resubmitted': str(uuid) or None,
22 'result_header' : dict(header) or None,
23 'result_header' : dict(header) or None,
23 'result_content' : dict(content) or None,
24 'result_content' : dict(content) or None,
24 'result_buffers' : list(buffers) or None,
25 'result_buffers' : list(buffers) or None,
25 }
26 }
26
27
27 With this info, many of the special categories of tasks can be defined by query,
28 With this info, many of the special categories of tasks can be defined by query,
28 e.g.:
29 e.g.:
29
30
30 * pending: completed is None
31 * pending: completed is None
31 * client's outstanding: client_uuid = uuid && completed is None
32 * client's outstanding: client_uuid = uuid && completed is None
32 * MIA: arrived is None (and completed is None)
33 * MIA: arrived is None (and completed is None)
33
34
34 EngineRecords are dicts of the form::
35 DictDB supports a subset of mongodb operators::
35
36 {
37 'eid' : int(id),
38 'uuid': str(uuid)
39 }
40
41 This may be extended, but is currently.
42
43 We support a subset of mongodb operators::
44
36
45 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
37 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
46 """
38 """
47 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
48 # Copyright (C) 2010-2011 The IPython Development Team
40 # Copyright (C) 2010-2011 The IPython Development Team
49 #
41 #
50 # Distributed under the terms of the BSD License. The full license is in
42 # Distributed under the terms of the BSD License. The full license is in
51 # the file COPYING, distributed as part of this software.
43 # the file COPYING, distributed as part of this software.
52 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
53
45
54 from copy import deepcopy as copy
46 from copy import deepcopy as copy
55 from datetime import datetime
47 from datetime import datetime
56
48
57 from IPython.config.configurable import LoggingConfigurable
49 from IPython.config.configurable import LoggingConfigurable
58
50
59 from IPython.utils.py3compat import iteritems, itervalues
51 from IPython.utils.py3compat import iteritems, itervalues
60 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
52 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
61
53
62 filters = {
54 filters = {
63 '$lt' : lambda a,b: a < b,
55 '$lt' : lambda a,b: a < b,
64 '$gt' : lambda a,b: b > a,
56 '$gt' : lambda a,b: b > a,
65 '$eq' : lambda a,b: a == b,
57 '$eq' : lambda a,b: a == b,
66 '$ne' : lambda a,b: a != b,
58 '$ne' : lambda a,b: a != b,
67 '$lte': lambda a,b: a <= b,
59 '$lte': lambda a,b: a <= b,
68 '$gte': lambda a,b: a >= b,
60 '$gte': lambda a,b: a >= b,
69 '$in' : lambda a,b: a in b,
61 '$in' : lambda a,b: a in b,
70 '$nin': lambda a,b: a not in b,
62 '$nin': lambda a,b: a not in b,
71 '$all': lambda a,b: all([ a in bb for bb in b ]),
63 '$all': lambda a,b: all([ a in bb for bb in b ]),
72 '$mod': lambda a,b: a%b[0] == b[1],
64 '$mod': lambda a,b: a%b[0] == b[1],
73 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
65 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
74 }
66 }
75
67
76
68
77 class CompositeFilter(object):
69 class CompositeFilter(object):
78 """Composite filter for matching multiple properties."""
70 """Composite filter for matching multiple properties."""
79
71
80 def __init__(self, dikt):
72 def __init__(self, dikt):
81 self.tests = []
73 self.tests = []
82 self.values = []
74 self.values = []
83 for key, value in iteritems(dikt):
75 for key, value in iteritems(dikt):
84 self.tests.append(filters[key])
76 self.tests.append(filters[key])
85 self.values.append(value)
77 self.values.append(value)
86
78
87 def __call__(self, value):
79 def __call__(self, value):
88 for test,check in zip(self.tests, self.values):
80 for test,check in zip(self.tests, self.values):
89 if not test(value, check):
81 if not test(value, check):
90 return False
82 return False
91 return True
83 return True
92
84
93 class BaseDB(LoggingConfigurable):
85 class BaseDB(LoggingConfigurable):
94 """Empty Parent class so traitlets work on DB."""
86 """Empty Parent class so traitlets work on DB."""
95 # base configurable traits:
87 # base configurable traits:
96 session = Unicode("")
88 session = Unicode("")
97
89
98 class DictDB(BaseDB):
90 class DictDB(BaseDB):
99 """Basic in-memory dict-based object for saving Task Records.
91 """Basic in-memory dict-based object for saving Task Records.
100
92
101 This is the first object to present the DB interface
93 This is the first object to present the DB interface
102 for logging tasks out of memory.
94 for logging tasks out of memory.
103
95
104 The interface is based on MongoDB, so adding a MongoDB
96 The interface is based on MongoDB, so adding a MongoDB
105 backend should be straightforward.
97 backend should be straightforward.
106 """
98 """
107
99
108 _records = Dict()
100 _records = Dict()
109 _culled_ids = set() # set of ids which have been culled
101 _culled_ids = set() # set of ids which have been culled
110 _buffer_bytes = Integer(0) # running total of the bytes in the DB
102 _buffer_bytes = Integer(0) # running total of the bytes in the DB
111
103
112 size_limit = Integer(1024**3, config=True,
104 size_limit = Integer(1024**3, config=True,
113 help="""The maximum total size (in bytes) of the buffers stored in the db
105 help="""The maximum total size (in bytes) of the buffers stored in the db
114
106
115 When the db exceeds this size, the oldest records will be culled until
107 When the db exceeds this size, the oldest records will be culled until
116 the total size is under size_limit * (1-cull_fraction).
108 the total size is under size_limit * (1-cull_fraction).
117 default: 1 GB
109 default: 1 GB
118 """
110 """
119 )
111 )
120 record_limit = Integer(1024, config=True,
112 record_limit = Integer(1024, config=True,
121 help="""The maximum number of records in the db
113 help="""The maximum number of records in the db
122
114
123 When the history exceeds this size, the first record_limit * cull_fraction
115 When the history exceeds this size, the first record_limit * cull_fraction
124 records will be culled.
116 records will be culled.
125 """
117 """
126 )
118 )
127 cull_fraction = Float(0.1, config=True,
119 cull_fraction = Float(0.1, config=True,
128 help="""The fraction by which the db should culled when one of the limits is exceeded
120 help="""The fraction by which the db should culled when one of the limits is exceeded
129
121
130 In general, the db size will spend most of its time with a size in the range:
122 In general, the db size will spend most of its time with a size in the range:
131
123
132 [limit * (1-cull_fraction), limit]
124 [limit * (1-cull_fraction), limit]
133
125
134 for each of size_limit and record_limit.
126 for each of size_limit and record_limit.
135 """
127 """
136 )
128 )
137
129
138 def _match_one(self, rec, tests):
130 def _match_one(self, rec, tests):
139 """Check if a specific record matches tests."""
131 """Check if a specific record matches tests."""
140 for key,test in iteritems(tests):
132 for key,test in iteritems(tests):
141 if not test(rec.get(key, None)):
133 if not test(rec.get(key, None)):
142 return False
134 return False
143 return True
135 return True
144
136
145 def _match(self, check):
137 def _match(self, check):
146 """Find all the matches for a check dict."""
138 """Find all the matches for a check dict."""
147 matches = []
139 matches = []
148 tests = {}
140 tests = {}
149 for k,v in iteritems(check):
141 for k,v in iteritems(check):
150 if isinstance(v, dict):
142 if isinstance(v, dict):
151 tests[k] = CompositeFilter(v)
143 tests[k] = CompositeFilter(v)
152 else:
144 else:
153 tests[k] = lambda o: o==v
145 tests[k] = lambda o: o==v
154
146
155 for rec in itervalues(self._records):
147 for rec in itervalues(self._records):
156 if self._match_one(rec, tests):
148 if self._match_one(rec, tests):
157 matches.append(copy(rec))
149 matches.append(copy(rec))
158 return matches
150 return matches
159
151
160 def _extract_subdict(self, rec, keys):
152 def _extract_subdict(self, rec, keys):
161 """extract subdict of keys"""
153 """extract subdict of keys"""
162 d = {}
154 d = {}
163 d['msg_id'] = rec['msg_id']
155 d['msg_id'] = rec['msg_id']
164 for key in keys:
156 for key in keys:
165 d[key] = rec[key]
157 d[key] = rec[key]
166 return copy(d)
158 return copy(d)
167
159
168 # methods for monitoring size / culling history
160 # methods for monitoring size / culling history
169
161
170 def _add_bytes(self, rec):
162 def _add_bytes(self, rec):
171 for key in ('buffers', 'result_buffers'):
163 for key in ('buffers', 'result_buffers'):
172 for buf in rec.get(key) or []:
164 for buf in rec.get(key) or []:
173 self._buffer_bytes += len(buf)
165 self._buffer_bytes += len(buf)
174
166
175 self._maybe_cull()
167 self._maybe_cull()
176
168
177 def _drop_bytes(self, rec):
169 def _drop_bytes(self, rec):
178 for key in ('buffers', 'result_buffers'):
170 for key in ('buffers', 'result_buffers'):
179 for buf in rec.get(key) or []:
171 for buf in rec.get(key) or []:
180 self._buffer_bytes -= len(buf)
172 self._buffer_bytes -= len(buf)
181
173
182 def _cull_oldest(self, n=1):
174 def _cull_oldest(self, n=1):
183 """cull the oldest N records"""
175 """cull the oldest N records"""
184 for msg_id in self.get_history()[:n]:
176 for msg_id in self.get_history()[:n]:
185 self.log.debug("Culling record: %r", msg_id)
177 self.log.debug("Culling record: %r", msg_id)
186 self._culled_ids.add(msg_id)
178 self._culled_ids.add(msg_id)
187 self.drop_record(msg_id)
179 self.drop_record(msg_id)
188
180
189 def _maybe_cull(self):
181 def _maybe_cull(self):
190 # cull by count:
182 # cull by count:
191 if len(self._records) > self.record_limit:
183 if len(self._records) > self.record_limit:
192 to_cull = int(self.cull_fraction * self.record_limit)
184 to_cull = int(self.cull_fraction * self.record_limit)
193 self.log.info("%i records exceeds limit of %i, culling oldest %i",
185 self.log.info("%i records exceeds limit of %i, culling oldest %i",
194 len(self._records), self.record_limit, to_cull
186 len(self._records), self.record_limit, to_cull
195 )
187 )
196 self._cull_oldest(to_cull)
188 self._cull_oldest(to_cull)
197
189
198 # cull by size:
190 # cull by size:
199 if self._buffer_bytes > self.size_limit:
191 if self._buffer_bytes > self.size_limit:
200 limit = self.size_limit * (1 - self.cull_fraction)
192 limit = self.size_limit * (1 - self.cull_fraction)
201
193
202 before = self._buffer_bytes
194 before = self._buffer_bytes
203 before_count = len(self._records)
195 before_count = len(self._records)
204 culled = 0
196 culled = 0
205 while self._buffer_bytes > limit:
197 while self._buffer_bytes > limit:
206 self._cull_oldest(1)
198 self._cull_oldest(1)
207 culled += 1
199 culled += 1
208
200
209 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
201 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
210 before_count, before, self.size_limit, culled
202 before_count, before, self.size_limit, culled
211 )
203 )
212
204
205 def _check_dates(self, rec):
206 for key in ('submitted', 'started', 'completed'):
207 value = rec.get(key, None)
208 if value is not None and not isinstance(value, datetime):
209 raise ValueError("%s must be None or datetime, not %r" % (key, value))
210
213 # public API methods:
211 # public API methods:
214
212
215 def add_record(self, msg_id, rec):
213 def add_record(self, msg_id, rec):
216 """Add a new Task Record, by msg_id."""
214 """Add a new Task Record, by msg_id."""
217 if msg_id in self._records:
215 if msg_id in self._records:
218 raise KeyError("Already have msg_id %r"%(msg_id))
216 raise KeyError("Already have msg_id %r"%(msg_id))
217 self._check_dates(rec)
219 self._records[msg_id] = rec
218 self._records[msg_id] = rec
220 self._add_bytes(rec)
219 self._add_bytes(rec)
221 self._maybe_cull()
220 self._maybe_cull()
222
221
223 def get_record(self, msg_id):
222 def get_record(self, msg_id):
224 """Get a specific Task Record, by msg_id."""
223 """Get a specific Task Record, by msg_id."""
225 if msg_id in self._culled_ids:
224 if msg_id in self._culled_ids:
226 raise KeyError("Record %r has been culled for size" % msg_id)
225 raise KeyError("Record %r has been culled for size" % msg_id)
227 if not msg_id in self._records:
226 if not msg_id in self._records:
228 raise KeyError("No such msg_id %r"%(msg_id))
227 raise KeyError("No such msg_id %r"%(msg_id))
229 return copy(self._records[msg_id])
228 return copy(self._records[msg_id])
230
229
231 def update_record(self, msg_id, rec):
230 def update_record(self, msg_id, rec):
232 """Update the data in an existing record."""
231 """Update the data in an existing record."""
233 if msg_id in self._culled_ids:
232 if msg_id in self._culled_ids:
234 raise KeyError("Record %r has been culled for size" % msg_id)
233 raise KeyError("Record %r has been culled for size" % msg_id)
234 self._check_dates(rec)
235 _rec = self._records[msg_id]
235 _rec = self._records[msg_id]
236 self._drop_bytes(_rec)
236 self._drop_bytes(_rec)
237 _rec.update(rec)
237 _rec.update(rec)
238 self._add_bytes(_rec)
238 self._add_bytes(_rec)
239
239
240 def drop_matching_records(self, check):
240 def drop_matching_records(self, check):
241 """Remove a record from the DB."""
241 """Remove a record from the DB."""
242 matches = self._match(check)
242 matches = self._match(check)
243 for rec in matches:
243 for rec in matches:
244 self._drop_bytes(rec)
244 self._drop_bytes(rec)
245 del self._records[rec['msg_id']]
245 del self._records[rec['msg_id']]
246
246
247 def drop_record(self, msg_id):
247 def drop_record(self, msg_id):
248 """Remove a record from the DB."""
248 """Remove a record from the DB."""
249 rec = self._records[msg_id]
249 rec = self._records[msg_id]
250 self._drop_bytes(rec)
250 self._drop_bytes(rec)
251 del self._records[msg_id]
251 del self._records[msg_id]
252
252
253 def find_records(self, check, keys=None):
253 def find_records(self, check, keys=None):
254 """Find records matching a query dict, optionally extracting subset of keys.
254 """Find records matching a query dict, optionally extracting subset of keys.
255
255
256 Returns dict keyed by msg_id of matching records.
256 Returns dict keyed by msg_id of matching records.
257
257
258 Parameters
258 Parameters
259 ----------
259 ----------
260
260
261 check: dict
261 check: dict
262 mongodb-style query argument
262 mongodb-style query argument
263 keys: list of strs [optional]
263 keys: list of strs [optional]
264 if specified, the subset of keys to extract. msg_id will *always* be
264 if specified, the subset of keys to extract. msg_id will *always* be
265 included.
265 included.
266 """
266 """
267 matches = self._match(check)
267 matches = self._match(check)
268 if keys:
268 if keys:
269 return [ self._extract_subdict(rec, keys) for rec in matches ]
269 return [ self._extract_subdict(rec, keys) for rec in matches ]
270 else:
270 else:
271 return matches
271 return matches
272
272
273 def get_history(self):
273 def get_history(self):
274 """get all msg_ids, ordered by time submitted."""
274 """get all msg_ids, ordered by time submitted."""
275 msg_ids = self._records.keys()
275 msg_ids = self._records.keys()
276 # Remove any that do not have a submitted timestamp.
276 # Remove any that do not have a submitted timestamp.
277 # This is extremely unlikely to happen,
277 # This is extremely unlikely to happen,
278 # but it seems to come up in some tests on VMs.
278 # but it seems to come up in some tests on VMs.
279 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
279 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
280 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
280 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
281
281
282
282
283 NODATA = KeyError("NoDB backend doesn't store any data. "
283 NODATA = KeyError("NoDB backend doesn't store any data. "
284 "Start the Controller with a DB backend to enable resubmission / result persistence."
284 "Start the Controller with a DB backend to enable resubmission / result persistence."
285 )
285 )
286
286
287
287
288 class NoDB(BaseDB):
288 class NoDB(BaseDB):
289 """A blackhole db backend that actually stores no information.
289 """A blackhole db backend that actually stores no information.
290
290
291 Provides the full DB interface, but raises KeyErrors on any
291 Provides the full DB interface, but raises KeyErrors on any
292 method that tries to access the records. This can be used to
292 method that tries to access the records. This can be used to
293 minimize the memory footprint of the Hub when its record-keeping
293 minimize the memory footprint of the Hub when its record-keeping
294 functionality is not required.
294 functionality is not required.
295 """
295 """
296
296
297 def add_record(self, msg_id, record):
297 def add_record(self, msg_id, record):
298 pass
298 pass
299
299
300 def get_record(self, msg_id):
300 def get_record(self, msg_id):
301 raise NODATA
301 raise NODATA
302
302
303 def update_record(self, msg_id, record):
303 def update_record(self, msg_id, record):
304 pass
304 pass
305
305
306 def drop_matching_records(self, check):
306 def drop_matching_records(self, check):
307 pass
307 pass
308
308
309 def drop_record(self, msg_id):
309 def drop_record(self, msg_id):
310 pass
310 pass
311
311
312 def find_records(self, check, keys=None):
312 def find_records(self, check, keys=None):
313 raise NODATA
313 raise NODATA
314
314
315 def get_history(self):
315 def get_history(self):
316 raise NODATA
316 raise NODATA
317
317
General Comments 0
You need to be logged in to leave comments. Login now