##// END OF EJS Templates
define deepcopy for memoryview...
Min RK -
Show More
@@ -1,319 +1,318 b''
1 1 """A Task logger that presents our DB interface,
2 2 but exists entirely in memory and implemented with dicts.
3 3
4 Authors:
5
6 * Min RK
7
8 4
9 5 TaskRecords are dicts of the form::
10 6
11 7 {
12 8 'msg_id' : str(uuid),
13 9 'client_uuid' : str(uuid),
14 10 'engine_uuid' : str(uuid) or None,
15 11 'header' : dict(header),
16 12 'content': dict(content),
17 13 'buffers': list(buffers),
18 14 'submitted': datetime or None,
19 15 'started': datetime or None,
20 16 'completed': datetime or None,
21 17 'received': datetime or None,
22 18 'resubmitted': str(uuid) or None,
23 19 'result_header' : dict(header) or None,
24 20 'result_content' : dict(content) or None,
25 21 'result_buffers' : list(buffers) or None,
26 22 }
27 23
28 24 With this info, many of the special categories of tasks can be defined by query,
29 25 e.g.:
30 26
31 27 * pending: completed is None
32 28 * client's outstanding: client_uuid = uuid && completed is None
33 29 * MIA: arrived is None (and completed is None)
34 30
35 31 DictDB supports a subset of mongodb operators::
36 32
37 33 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
38 34 """
39 #-----------------------------------------------------------------------------
40 # Copyright (C) 2010-2011 The IPython Development Team
41 #
42 # Distributed under the terms of the BSD License. The full license is in
43 # the file COPYING, distributed as part of this software.
44 #-----------------------------------------------------------------------------
45
46 from copy import deepcopy as copy
35
36 # Copyright (c) IPython Development Team.
37 # Distributed under the terms of the Modified BSD License.
38
39 import copy
40 from copy import deepcopy
41
42 # Python can't copy memoryviews, but creating another memoryview works for us
43 copy._deepcopy_dispatch[memoryview] = lambda x, memo: memoryview(x)
44
45
47 46 from datetime import datetime
48 47
49 48 from IPython.config.configurable import LoggingConfigurable
50 49
51 50 from IPython.utils.py3compat import iteritems, itervalues
52 51 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
53 52
54 53 filters = {
55 54 '$lt' : lambda a,b: a < b,
56 55 '$gt' : lambda a,b: b > a,
57 56 '$eq' : lambda a,b: a == b,
58 57 '$ne' : lambda a,b: a != b,
59 58 '$lte': lambda a,b: a <= b,
60 59 '$gte': lambda a,b: a >= b,
61 60 '$in' : lambda a,b: a in b,
62 61 '$nin': lambda a,b: a not in b,
63 62 '$all': lambda a,b: all([ a in bb for bb in b ]),
64 63 '$mod': lambda a,b: a%b[0] == b[1],
65 64 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
66 65 }
67 66
68 67
69 68 class CompositeFilter(object):
70 69 """Composite filter for matching multiple properties."""
71 70
72 71 def __init__(self, dikt):
73 72 self.tests = []
74 73 self.values = []
75 74 for key, value in iteritems(dikt):
76 75 self.tests.append(filters[key])
77 76 self.values.append(value)
78 77
79 78 def __call__(self, value):
80 79 for test,check in zip(self.tests, self.values):
81 80 if not test(value, check):
82 81 return False
83 82 return True
84 83
85 84 class BaseDB(LoggingConfigurable):
86 85 """Empty Parent class so traitlets work on DB."""
87 86 # base configurable traits:
88 87 session = Unicode("")
89 88
90 89 class DictDB(BaseDB):
91 90 """Basic in-memory dict-based object for saving Task Records.
92 91
93 92 This is the first object to present the DB interface
94 93 for logging tasks out of memory.
95 94
96 95 The interface is based on MongoDB, so adding a MongoDB
97 96 backend should be straightforward.
98 97 """
99 98
100 99 _records = Dict()
101 100 _culled_ids = set() # set of ids which have been culled
102 101 _buffer_bytes = Integer(0) # running total of the bytes in the DB
103 102
104 103 size_limit = Integer(1024**3, config=True,
105 104 help="""The maximum total size (in bytes) of the buffers stored in the db
106 105
107 106 When the db exceeds this size, the oldest records will be culled until
108 107 the total size is under size_limit * (1-cull_fraction).
109 108 default: 1 GB
110 109 """
111 110 )
112 111 record_limit = Integer(1024, config=True,
113 112 help="""The maximum number of records in the db
114 113
115 114 When the history exceeds this size, the first record_limit * cull_fraction
116 115 records will be culled.
117 116 """
118 117 )
119 118 cull_fraction = Float(0.1, config=True,
120 119 help="""The fraction by which the db should culled when one of the limits is exceeded
121 120
122 121 In general, the db size will spend most of its time with a size in the range:
123 122
124 123 [limit * (1-cull_fraction), limit]
125 124
126 125 for each of size_limit and record_limit.
127 126 """
128 127 )
129 128
130 129 def _match_one(self, rec, tests):
131 130 """Check if a specific record matches tests."""
132 131 for key,test in iteritems(tests):
133 132 if not test(rec.get(key, None)):
134 133 return False
135 134 return True
136 135
137 136 def _match(self, check):
138 137 """Find all the matches for a check dict."""
139 138 matches = []
140 139 tests = {}
141 140 for k,v in iteritems(check):
142 141 if isinstance(v, dict):
143 142 tests[k] = CompositeFilter(v)
144 143 else:
145 144 tests[k] = lambda o: o==v
146 145
147 146 for rec in itervalues(self._records):
148 147 if self._match_one(rec, tests):
149 matches.append(copy(rec))
148 matches.append(deepcopy(rec))
150 149 return matches
151 150
152 151 def _extract_subdict(self, rec, keys):
153 152 """extract subdict of keys"""
154 153 d = {}
155 154 d['msg_id'] = rec['msg_id']
156 155 for key in keys:
157 156 d[key] = rec[key]
158 return copy(d)
157 return deepcopy(d)
159 158
160 159 # methods for monitoring size / culling history
161 160
162 161 def _add_bytes(self, rec):
163 162 for key in ('buffers', 'result_buffers'):
164 163 for buf in rec.get(key) or []:
165 164 self._buffer_bytes += len(buf)
166 165
167 166 self._maybe_cull()
168 167
169 168 def _drop_bytes(self, rec):
170 169 for key in ('buffers', 'result_buffers'):
171 170 for buf in rec.get(key) or []:
172 171 self._buffer_bytes -= len(buf)
173 172
174 173 def _cull_oldest(self, n=1):
175 174 """cull the oldest N records"""
176 175 for msg_id in self.get_history()[:n]:
177 176 self.log.debug("Culling record: %r", msg_id)
178 177 self._culled_ids.add(msg_id)
179 178 self.drop_record(msg_id)
180 179
181 180 def _maybe_cull(self):
182 181 # cull by count:
183 182 if len(self._records) > self.record_limit:
184 183 to_cull = int(self.cull_fraction * self.record_limit)
185 184 self.log.info("%i records exceeds limit of %i, culling oldest %i",
186 185 len(self._records), self.record_limit, to_cull
187 186 )
188 187 self._cull_oldest(to_cull)
189 188
190 189 # cull by size:
191 190 if self._buffer_bytes > self.size_limit:
192 191 limit = self.size_limit * (1 - self.cull_fraction)
193 192
194 193 before = self._buffer_bytes
195 194 before_count = len(self._records)
196 195 culled = 0
197 196 while self._buffer_bytes > limit:
198 197 self._cull_oldest(1)
199 198 culled += 1
200 199
201 200 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
202 201 before_count, before, self.size_limit, culled
203 202 )
204 203
205 204 def _check_dates(self, rec):
206 205 for key in ('submitted', 'started', 'completed'):
207 206 value = rec.get(key, None)
208 207 if value is not None and not isinstance(value, datetime):
209 208 raise ValueError("%s must be None or datetime, not %r" % (key, value))
210 209
211 210 # public API methods:
212 211
213 212 def add_record(self, msg_id, rec):
214 213 """Add a new Task Record, by msg_id."""
215 214 if msg_id in self._records:
216 215 raise KeyError("Already have msg_id %r"%(msg_id))
217 216 self._check_dates(rec)
218 217 self._records[msg_id] = rec
219 218 self._add_bytes(rec)
220 219 self._maybe_cull()
221 220
222 221 def get_record(self, msg_id):
223 222 """Get a specific Task Record, by msg_id."""
224 223 if msg_id in self._culled_ids:
225 224 raise KeyError("Record %r has been culled for size" % msg_id)
226 225 if not msg_id in self._records:
227 226 raise KeyError("No such msg_id %r"%(msg_id))
228 return copy(self._records[msg_id])
227 return deepcopy(self._records[msg_id])
229 228
230 229 def update_record(self, msg_id, rec):
231 230 """Update the data in an existing record."""
232 231 if msg_id in self._culled_ids:
233 232 raise KeyError("Record %r has been culled for size" % msg_id)
234 233 self._check_dates(rec)
235 234 _rec = self._records[msg_id]
236 235 self._drop_bytes(_rec)
237 236 _rec.update(rec)
238 237 self._add_bytes(_rec)
239 238
240 239 def drop_matching_records(self, check):
241 240 """Remove a record from the DB."""
242 241 matches = self._match(check)
243 242 for rec in matches:
244 243 self._drop_bytes(rec)
245 244 del self._records[rec['msg_id']]
246 245
247 246 def drop_record(self, msg_id):
248 247 """Remove a record from the DB."""
249 248 rec = self._records[msg_id]
250 249 self._drop_bytes(rec)
251 250 del self._records[msg_id]
252 251
253 252 def find_records(self, check, keys=None):
254 253 """Find records matching a query dict, optionally extracting subset of keys.
255 254
256 255 Returns dict keyed by msg_id of matching records.
257 256
258 257 Parameters
259 258 ----------
260 259
261 260 check: dict
262 261 mongodb-style query argument
263 262 keys: list of strs [optional]
264 263 if specified, the subset of keys to extract. msg_id will *always* be
265 264 included.
266 265 """
267 266 matches = self._match(check)
268 267 if keys:
269 268 return [ self._extract_subdict(rec, keys) for rec in matches ]
270 269 else:
271 270 return matches
272 271
273 272 def get_history(self):
274 273 """get all msg_ids, ordered by time submitted."""
275 274 msg_ids = self._records.keys()
276 275 # Remove any that do not have a submitted timestamp.
277 276 # This is extremely unlikely to happen,
278 277 # but it seems to come up in some tests on VMs.
279 278 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
280 279 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
281 280
282 281
283 282 class NoData(KeyError):
284 283 """Special KeyError to raise when requesting data from NoDB"""
285 284 def __str__(self):
286 285 return "NoDB backend doesn't store any data. "
287 286 "Start the Controller with a DB backend to enable resubmission / result persistence."
288 287
289 288
290 289 class NoDB(BaseDB):
291 290 """A blackhole db backend that actually stores no information.
292 291
293 292 Provides the full DB interface, but raises KeyErrors on any
294 293 method that tries to access the records. This can be used to
295 294 minimize the memory footprint of the Hub when its record-keeping
296 295 functionality is not required.
297 296 """
298 297
299 298 def add_record(self, msg_id, record):
300 299 pass
301 300
302 301 def get_record(self, msg_id):
303 302 raise NoData()
304 303
305 304 def update_record(self, msg_id, record):
306 305 pass
307 306
308 307 def drop_matching_records(self, check):
309 308 pass
310 309
311 310 def drop_record(self, msg_id):
312 311 pass
313 312
314 313 def find_records(self, check, keys=None):
315 314 raise NoData()
316 315
317 316 def get_history(self):
318 317 raise NoData()
319 318
General Comments 0
You need to be logged in to leave comments. Login now