##// END OF EJS Templates
Fix SyntaxError
Thomas Kluyver -
Show More
@@ -1,310 +1,310 b''
1 1 """A Task logger that presents our DB interface,
2 2 but exists entirely in memory and implemented with dicts.
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7
8 8
9 9 TaskRecords are dicts of the form:
10 10 {
11 11 'msg_id' : str(uuid),
12 12 'client_uuid' : str(uuid),
13 13 'engine_uuid' : str(uuid) or None,
14 14 'header' : dict(header),
15 15 'content': dict(content),
16 16 'buffers': list(buffers),
17 17 'submitted': datetime,
18 18 'started': datetime or None,
19 19 'completed': datetime or None,
20 20 'resubmitted': datetime or None,
21 21 'result_header' : dict(header) or None,
22 22 'result_content' : dict(content) or None,
23 23 'result_buffers' : list(buffers) or None,
24 24 }
25 25 With this info, many of the special categories of tasks can be defined by query:
26 26
27 27 pending: completed is None
28 28 client's outstanding: client_uuid = uuid && completed is None
29 29 MIA: arrived is None (and completed is None)
30 30 etc.
31 31
32 32 EngineRecords are dicts of the form:
33 33 {
34 34 'eid' : int(id),
35 35 'uuid': str(uuid)
36 36 }
37 37 This may be extended, but is currently.
38 38
39 39 We support a subset of mongodb operators:
40 40 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
41 41 """
42 42 #-----------------------------------------------------------------------------
43 43 # Copyright (C) 2010-2011 The IPython Development Team
44 44 #
45 45 # Distributed under the terms of the BSD License. The full license is in
46 46 # the file COPYING, distributed as part of this software.
47 47 #-----------------------------------------------------------------------------
48 48
49 49 from copy import deepcopy as copy
50 50 from datetime import datetime
51 51
52 52 from IPython.config.configurable import LoggingConfigurable
53 53
54 54 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
55 55
56 56 filters = {
57 57 '$lt' : lambda a,b: a < b,
58 58 '$gt' : lambda a,b: b > a,
59 59 '$eq' : lambda a,b: a == b,
60 60 '$ne' : lambda a,b: a != b,
61 61 '$lte': lambda a,b: a <= b,
62 62 '$gte': lambda a,b: a >= b,
63 63 '$in' : lambda a,b: a in b,
64 64 '$nin': lambda a,b: a not in b,
65 65 '$all': lambda a,b: all([ a in bb for bb in b ]),
66 66 '$mod': lambda a,b: a%b[0] == b[1],
67 67 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
68 68 }
69 69
70 70
71 71 class CompositeFilter(object):
72 72 """Composite filter for matching multiple properties."""
73 73
74 74 def __init__(self, dikt):
75 75 self.tests = []
76 76 self.values = []
77 77 for key, value in dikt.iteritems():
78 78 self.tests.append(filters[key])
79 79 self.values.append(value)
80 80
81 81 def __call__(self, value):
82 82 for test,check in zip(self.tests, self.values):
83 83 if not test(value, check):
84 84 return False
85 85 return True
86 86
87 87 class BaseDB(LoggingConfigurable):
88 88 """Empty Parent class so traitlets work on DB."""
89 89 # base configurable traits:
90 90 session = Unicode("")
91 91
92 92 class DictDB(BaseDB):
93 93 """Basic in-memory dict-based object for saving Task Records.
94 94
95 95 This is the first object to present the DB interface
96 96 for logging tasks out of memory.
97 97
98 98 The interface is based on MongoDB, so adding a MongoDB
99 99 backend should be straightforward.
100 100 """
101 101
102 102 _records = Dict()
103 103 _culled_ids = set() # set of ids which have been culled
104 104 _buffer_bytes = Integer(0) # running total of the bytes in the DB
105 105
106 106 size_limit = Integer(1024*1024, config=True,
107 107 help="""The maximum total size (in bytes) of the buffers stored in the db
108 108
109 109 When the db exceeds this size, the oldest records will be culled until
110 110 the total size is under size_limit * (1-cull_fraction).
111 111 """
112 112 )
113 113 record_limit = Integer(1024, config=True,
114 114 help="""The maximum number of records in the db
115 115
116 116 When the history exceeds this size, the first record_limit * cull_fraction
117 117 records will be culled.
118 118 """
119 119 )
120 120 cull_fraction = Float(0.1, config=True,
121 121 help="""The fraction by which the db should culled when one of the limits is exceeded
122 122
123 123 In general, the db size will spend most of its time with a size in the range:
124 124
125 125 [limit * (1-cull_fraction), limit]
126 126
127 127 for each of size_limit and record_limit.
128 128 """
129 129 )
130 130
131 131 def _match_one(self, rec, tests):
132 132 """Check if a specific record matches tests."""
133 133 for key,test in tests.iteritems():
134 134 if not test(rec.get(key, None)):
135 135 return False
136 136 return True
137 137
138 138 def _match(self, check):
139 139 """Find all the matches for a check dict."""
140 140 matches = []
141 141 tests = {}
142 142 for k,v in check.iteritems():
143 143 if isinstance(v, dict):
144 144 tests[k] = CompositeFilter(v)
145 145 else:
146 146 tests[k] = lambda o: o==v
147 147
148 148 for rec in self._records.itervalues():
149 149 if self._match_one(rec, tests):
150 150 matches.append(copy(rec))
151 151 return matches
152 152
153 153 def _extract_subdict(self, rec, keys):
154 154 """extract subdict of keys"""
155 155 d = {}
156 156 d['msg_id'] = rec['msg_id']
157 157 for key in keys:
158 158 d[key] = rec[key]
159 159 return copy(d)
160 160
161 161 # methods for monitoring size / culling history
162 162
163 163 def _add_bytes(self, rec):
164 164 for key in ('buffers', 'result_buffers'):
165 165 for buf in rec.get(key) or []:
166 166 self._buffer_bytes += len(buf)
167 167
168 168 self._maybe_cull()
169 169
170 170 def _drop_bytes(self, rec):
171 171 for key in ('buffers', 'result_buffers'):
172 172 for buf in rec.get(key) or []:
173 173 self._buffer_bytes -= len(buf)
174 174
175 175 def _cull_oldest(self, n=1):
176 176 """cull the oldest N records"""
177 177 for msg_id in self.get_history()[:n]:
178 178 self.log.debug("Culling record: %r", msg_id)
179 179 self._culled_ids.add(msg_id)
180 180 self.drop_record(msg_id)
181 181
182 182 def _maybe_cull(self):
183 183 # cull by count:
184 184 if len(self._records) > self.record_limit:
185 185 to_cull = int(self.cull_fraction * self.record_limit)
186 186 self.log.info("%i records exceeds limit of %i, culling oldest %i",
187 187 len(self._records), self.record_limit, to_cull
188 188 )
189 189 self._cull_oldest(to_cull)
190 190
191 191 # cull by size:
192 192 if self._buffer_bytes > self.size_limit:
193 193 limit = self.size_limit * (1 - self.cull_fraction)
194 194
195 195 before = self._buffer_bytes
196 196 before_count = len(self._records)
197 197 culled = 0
198 198 while self._buffer_bytes > limit:
199 199 self._cull_oldest(1)
200 200 culled += 1
201 201
202 202 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
203 203 before_count, before, self.size_limit, culled
204 204 )
205 205
206 206 # public API methods:
207 207
208 208 def add_record(self, msg_id, rec):
209 209 """Add a new Task Record, by msg_id."""
210 210 if msg_id in self._records:
211 211 raise KeyError("Already have msg_id %r"%(msg_id))
212 212 self._records[msg_id] = rec
213 213 self._add_bytes(rec)
214 214 self._maybe_cull()
215 215
216 216 def get_record(self, msg_id):
217 217 """Get a specific Task Record, by msg_id."""
218 218 if msg_id in self._culled_ids:
219 219 raise KeyError("Record %r has been culled for size" % msg_id)
220 220 if not msg_id in self._records:
221 221 raise KeyError("No such msg_id %r"%(msg_id))
222 222 return copy(self._records[msg_id])
223 223
224 224 def update_record(self, msg_id, rec):
225 225 """Update the data in an existing record."""
226 226 if msg_id in self._culled_ids:
227 227 raise KeyError("Record %r has been culled for size" % msg_id)
228 228 _rec = self._records[msg_id]
229 229 self._drop_bytes(_rec)
230 230 _rec.update(rec)
231 231 self._add_bytes(_rec)
232 232
233 233 def drop_matching_records(self, check):
234 234 """Remove a record from the DB."""
235 235 matches = self._match(check)
236 236 for rec in matches:
237 237 self._drop_bytes(rec)
238 238 del self._records[rec['msg_id']]
239 239
240 240 def drop_record(self, msg_id):
241 241 """Remove a record from the DB."""
242 242 rec = self._records[msg_id]
243 243 self._drop_bytes(rec)
244 244 del self._records[msg_id]
245 245
246 246 def find_records(self, check, keys=None):
247 247 """Find records matching a query dict, optionally extracting subset of keys.
248 248
249 249 Returns dict keyed by msg_id of matching records.
250 250
251 251 Parameters
252 252 ----------
253 253
254 254 check: dict
255 255 mongodb-style query argument
256 256 keys: list of strs [optional]
257 257 if specified, the subset of keys to extract. msg_id will *always* be
258 258 included.
259 259 """
260 260 matches = self._match(check)
261 261 if keys:
262 262 return [ self._extract_subdict(rec, keys) for rec in matches ]
263 263 else:
264 264 return matches
265 265
266 266 def get_history(self):
267 267 """get all msg_ids, ordered by time submitted."""
268 msg_ids = self._records.keys():
268 msg_ids = self._records.keys()
269 269 # Remove any that do not have a submitted timestamp.
270 270 # This is extremely unlikely to happen,
271 271 # but it seems to come up in some tests on VMs.
272 272 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
273 273 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
274 274
275 275
276 276 NODATA = KeyError("NoDB backend doesn't store any data. "
277 277 "Start the Controller with a DB backend to enable resubmission / result persistence."
278 278 )
279 279
280 280
281 281 class NoDB(BaseDB):
282 282 """A blackhole db backend that actually stores no information.
283 283
284 284 Provides the full DB interface, but raises KeyErrors on any
285 285 method that tries to access the records. This can be used to
286 286 minimize the memory footprint of the Hub when its record-keeping
287 287 functionality is not required.
288 288 """
289 289
290 290 def add_record(self, msg_id, record):
291 291 pass
292 292
293 293 def get_record(self, msg_id):
294 294 raise NODATA
295 295
296 296 def update_record(self, msg_id, record):
297 297 pass
298 298
299 299 def drop_matching_records(self, check):
300 300 pass
301 301
302 302 def drop_record(self, msg_id):
303 303 pass
304 304
305 305 def find_records(self, check, keys=None):
306 306 raise NODATA
307 307
308 308 def get_history(self):
309 309 raise NODATA
310 310
General Comments 0
You need to be logged in to leave comments. Login now