##// END OF EJS Templates
define deepcopy for memoryview...
Min RK -
Show More
@@ -1,319 +1,318 b''
1 """A Task logger that presents our DB interface,
1 """A Task logger that presents our DB interface,
2 but exists entirely in memory and implemented with dicts.
2 but exists entirely in memory and implemented with dicts.
3
3
4 Authors:
5
6 * Min RK
7
8
4
9 TaskRecords are dicts of the form::
5 TaskRecords are dicts of the form::
10
6
11 {
7 {
12 'msg_id' : str(uuid),
8 'msg_id' : str(uuid),
13 'client_uuid' : str(uuid),
9 'client_uuid' : str(uuid),
14 'engine_uuid' : str(uuid) or None,
10 'engine_uuid' : str(uuid) or None,
15 'header' : dict(header),
11 'header' : dict(header),
16 'content': dict(content),
12 'content': dict(content),
17 'buffers': list(buffers),
13 'buffers': list(buffers),
18 'submitted': datetime or None,
14 'submitted': datetime or None,
19 'started': datetime or None,
15 'started': datetime or None,
20 'completed': datetime or None,
16 'completed': datetime or None,
21 'received': datetime or None,
17 'received': datetime or None,
22 'resubmitted': str(uuid) or None,
18 'resubmitted': str(uuid) or None,
23 'result_header' : dict(header) or None,
19 'result_header' : dict(header) or None,
24 'result_content' : dict(content) or None,
20 'result_content' : dict(content) or None,
25 'result_buffers' : list(buffers) or None,
21 'result_buffers' : list(buffers) or None,
26 }
22 }
27
23
28 With this info, many of the special categories of tasks can be defined by query,
24 With this info, many of the special categories of tasks can be defined by query,
29 e.g.:
25 e.g.:
30
26
31 * pending: completed is None
27 * pending: completed is None
32 * client's outstanding: client_uuid = uuid && completed is None
28 * client's outstanding: client_uuid = uuid && completed is None
33 * MIA: arrived is None (and completed is None)
29 * MIA: arrived is None (and completed is None)
34
30
35 DictDB supports a subset of mongodb operators::
31 DictDB supports a subset of mongodb operators::
36
32
37 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
33 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
38 """
34 """
39 #-----------------------------------------------------------------------------
35
40 # Copyright (C) 2010-2011 The IPython Development Team
36 # Copyright (c) IPython Development Team.
41 #
37 # Distributed under the terms of the Modified BSD License.
42 # Distributed under the terms of the BSD License. The full license is in
38
43 # the file COPYING, distributed as part of this software.
39 import copy
44 #-----------------------------------------------------------------------------
40 from copy import deepcopy
45
41
46 from copy import deepcopy as copy
42 # Python can't copy memoryviews, but creating another memoryview works for us
43 copy._deepcopy_dispatch[memoryview] = lambda x, memo: memoryview(x)
44
45
47 from datetime import datetime
46 from datetime import datetime
48
47
49 from IPython.config.configurable import LoggingConfigurable
48 from IPython.config.configurable import LoggingConfigurable
50
49
51 from IPython.utils.py3compat import iteritems, itervalues
50 from IPython.utils.py3compat import iteritems, itervalues
52 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
51 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
53
52
54 filters = {
53 filters = {
55 '$lt' : lambda a,b: a < b,
54 '$lt' : lambda a,b: a < b,
56 '$gt' : lambda a,b: b > a,
55 '$gt' : lambda a,b: b > a,
57 '$eq' : lambda a,b: a == b,
56 '$eq' : lambda a,b: a == b,
58 '$ne' : lambda a,b: a != b,
57 '$ne' : lambda a,b: a != b,
59 '$lte': lambda a,b: a <= b,
58 '$lte': lambda a,b: a <= b,
60 '$gte': lambda a,b: a >= b,
59 '$gte': lambda a,b: a >= b,
61 '$in' : lambda a,b: a in b,
60 '$in' : lambda a,b: a in b,
62 '$nin': lambda a,b: a not in b,
61 '$nin': lambda a,b: a not in b,
63 '$all': lambda a,b: all([ a in bb for bb in b ]),
62 '$all': lambda a,b: all([ a in bb for bb in b ]),
64 '$mod': lambda a,b: a%b[0] == b[1],
63 '$mod': lambda a,b: a%b[0] == b[1],
65 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
64 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
66 }
65 }
67
66
68
67
69 class CompositeFilter(object):
68 class CompositeFilter(object):
70 """Composite filter for matching multiple properties."""
69 """Composite filter for matching multiple properties."""
71
70
72 def __init__(self, dikt):
71 def __init__(self, dikt):
73 self.tests = []
72 self.tests = []
74 self.values = []
73 self.values = []
75 for key, value in iteritems(dikt):
74 for key, value in iteritems(dikt):
76 self.tests.append(filters[key])
75 self.tests.append(filters[key])
77 self.values.append(value)
76 self.values.append(value)
78
77
79 def __call__(self, value):
78 def __call__(self, value):
80 for test,check in zip(self.tests, self.values):
79 for test,check in zip(self.tests, self.values):
81 if not test(value, check):
80 if not test(value, check):
82 return False
81 return False
83 return True
82 return True
84
83
85 class BaseDB(LoggingConfigurable):
84 class BaseDB(LoggingConfigurable):
86 """Empty Parent class so traitlets work on DB."""
85 """Empty Parent class so traitlets work on DB."""
87 # base configurable traits:
86 # base configurable traits:
88 session = Unicode("")
87 session = Unicode("")
89
88
90 class DictDB(BaseDB):
89 class DictDB(BaseDB):
91 """Basic in-memory dict-based object for saving Task Records.
90 """Basic in-memory dict-based object for saving Task Records.
92
91
93 This is the first object to present the DB interface
92 This is the first object to present the DB interface
94 for logging tasks out of memory.
93 for logging tasks out of memory.
95
94
96 The interface is based on MongoDB, so adding a MongoDB
95 The interface is based on MongoDB, so adding a MongoDB
97 backend should be straightforward.
96 backend should be straightforward.
98 """
97 """
99
98
100 _records = Dict()
99 _records = Dict()
101 _culled_ids = set() # set of ids which have been culled
100 _culled_ids = set() # set of ids which have been culled
102 _buffer_bytes = Integer(0) # running total of the bytes in the DB
101 _buffer_bytes = Integer(0) # running total of the bytes in the DB
103
102
104 size_limit = Integer(1024**3, config=True,
103 size_limit = Integer(1024**3, config=True,
105 help="""The maximum total size (in bytes) of the buffers stored in the db
104 help="""The maximum total size (in bytes) of the buffers stored in the db
106
105
107 When the db exceeds this size, the oldest records will be culled until
106 When the db exceeds this size, the oldest records will be culled until
108 the total size is under size_limit * (1-cull_fraction).
107 the total size is under size_limit * (1-cull_fraction).
109 default: 1 GB
108 default: 1 GB
110 """
109 """
111 )
110 )
112 record_limit = Integer(1024, config=True,
111 record_limit = Integer(1024, config=True,
113 help="""The maximum number of records in the db
112 help="""The maximum number of records in the db
114
113
115 When the history exceeds this size, the first record_limit * cull_fraction
114 When the history exceeds this size, the first record_limit * cull_fraction
116 records will be culled.
115 records will be culled.
117 """
116 """
118 )
117 )
119 cull_fraction = Float(0.1, config=True,
118 cull_fraction = Float(0.1, config=True,
120 help="""The fraction by which the db should culled when one of the limits is exceeded
119 help="""The fraction by which the db should culled when one of the limits is exceeded
121
120
122 In general, the db size will spend most of its time with a size in the range:
121 In general, the db size will spend most of its time with a size in the range:
123
122
124 [limit * (1-cull_fraction), limit]
123 [limit * (1-cull_fraction), limit]
125
124
126 for each of size_limit and record_limit.
125 for each of size_limit and record_limit.
127 """
126 """
128 )
127 )
129
128
130 def _match_one(self, rec, tests):
129 def _match_one(self, rec, tests):
131 """Check if a specific record matches tests."""
130 """Check if a specific record matches tests."""
132 for key,test in iteritems(tests):
131 for key,test in iteritems(tests):
133 if not test(rec.get(key, None)):
132 if not test(rec.get(key, None)):
134 return False
133 return False
135 return True
134 return True
136
135
137 def _match(self, check):
136 def _match(self, check):
138 """Find all the matches for a check dict."""
137 """Find all the matches for a check dict."""
139 matches = []
138 matches = []
140 tests = {}
139 tests = {}
141 for k,v in iteritems(check):
140 for k,v in iteritems(check):
142 if isinstance(v, dict):
141 if isinstance(v, dict):
143 tests[k] = CompositeFilter(v)
142 tests[k] = CompositeFilter(v)
144 else:
143 else:
145 tests[k] = lambda o: o==v
144 tests[k] = lambda o: o==v
146
145
147 for rec in itervalues(self._records):
146 for rec in itervalues(self._records):
148 if self._match_one(rec, tests):
147 if self._match_one(rec, tests):
149 matches.append(copy(rec))
148 matches.append(deepcopy(rec))
150 return matches
149 return matches
151
150
152 def _extract_subdict(self, rec, keys):
151 def _extract_subdict(self, rec, keys):
153 """extract subdict of keys"""
152 """extract subdict of keys"""
154 d = {}
153 d = {}
155 d['msg_id'] = rec['msg_id']
154 d['msg_id'] = rec['msg_id']
156 for key in keys:
155 for key in keys:
157 d[key] = rec[key]
156 d[key] = rec[key]
158 return copy(d)
157 return deepcopy(d)
159
158
160 # methods for monitoring size / culling history
159 # methods for monitoring size / culling history
161
160
162 def _add_bytes(self, rec):
161 def _add_bytes(self, rec):
163 for key in ('buffers', 'result_buffers'):
162 for key in ('buffers', 'result_buffers'):
164 for buf in rec.get(key) or []:
163 for buf in rec.get(key) or []:
165 self._buffer_bytes += len(buf)
164 self._buffer_bytes += len(buf)
166
165
167 self._maybe_cull()
166 self._maybe_cull()
168
167
169 def _drop_bytes(self, rec):
168 def _drop_bytes(self, rec):
170 for key in ('buffers', 'result_buffers'):
169 for key in ('buffers', 'result_buffers'):
171 for buf in rec.get(key) or []:
170 for buf in rec.get(key) or []:
172 self._buffer_bytes -= len(buf)
171 self._buffer_bytes -= len(buf)
173
172
174 def _cull_oldest(self, n=1):
173 def _cull_oldest(self, n=1):
175 """cull the oldest N records"""
174 """cull the oldest N records"""
176 for msg_id in self.get_history()[:n]:
175 for msg_id in self.get_history()[:n]:
177 self.log.debug("Culling record: %r", msg_id)
176 self.log.debug("Culling record: %r", msg_id)
178 self._culled_ids.add(msg_id)
177 self._culled_ids.add(msg_id)
179 self.drop_record(msg_id)
178 self.drop_record(msg_id)
180
179
181 def _maybe_cull(self):
180 def _maybe_cull(self):
182 # cull by count:
181 # cull by count:
183 if len(self._records) > self.record_limit:
182 if len(self._records) > self.record_limit:
184 to_cull = int(self.cull_fraction * self.record_limit)
183 to_cull = int(self.cull_fraction * self.record_limit)
185 self.log.info("%i records exceeds limit of %i, culling oldest %i",
184 self.log.info("%i records exceeds limit of %i, culling oldest %i",
186 len(self._records), self.record_limit, to_cull
185 len(self._records), self.record_limit, to_cull
187 )
186 )
188 self._cull_oldest(to_cull)
187 self._cull_oldest(to_cull)
189
188
190 # cull by size:
189 # cull by size:
191 if self._buffer_bytes > self.size_limit:
190 if self._buffer_bytes > self.size_limit:
192 limit = self.size_limit * (1 - self.cull_fraction)
191 limit = self.size_limit * (1 - self.cull_fraction)
193
192
194 before = self._buffer_bytes
193 before = self._buffer_bytes
195 before_count = len(self._records)
194 before_count = len(self._records)
196 culled = 0
195 culled = 0
197 while self._buffer_bytes > limit:
196 while self._buffer_bytes > limit:
198 self._cull_oldest(1)
197 self._cull_oldest(1)
199 culled += 1
198 culled += 1
200
199
201 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
200 self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
202 before_count, before, self.size_limit, culled
201 before_count, before, self.size_limit, culled
203 )
202 )
204
203
205 def _check_dates(self, rec):
204 def _check_dates(self, rec):
206 for key in ('submitted', 'started', 'completed'):
205 for key in ('submitted', 'started', 'completed'):
207 value = rec.get(key, None)
206 value = rec.get(key, None)
208 if value is not None and not isinstance(value, datetime):
207 if value is not None and not isinstance(value, datetime):
209 raise ValueError("%s must be None or datetime, not %r" % (key, value))
208 raise ValueError("%s must be None or datetime, not %r" % (key, value))
210
209
211 # public API methods:
210 # public API methods:
212
211
213 def add_record(self, msg_id, rec):
212 def add_record(self, msg_id, rec):
214 """Add a new Task Record, by msg_id."""
213 """Add a new Task Record, by msg_id."""
215 if msg_id in self._records:
214 if msg_id in self._records:
216 raise KeyError("Already have msg_id %r"%(msg_id))
215 raise KeyError("Already have msg_id %r"%(msg_id))
217 self._check_dates(rec)
216 self._check_dates(rec)
218 self._records[msg_id] = rec
217 self._records[msg_id] = rec
219 self._add_bytes(rec)
218 self._add_bytes(rec)
220 self._maybe_cull()
219 self._maybe_cull()
221
220
222 def get_record(self, msg_id):
221 def get_record(self, msg_id):
223 """Get a specific Task Record, by msg_id."""
222 """Get a specific Task Record, by msg_id."""
224 if msg_id in self._culled_ids:
223 if msg_id in self._culled_ids:
225 raise KeyError("Record %r has been culled for size" % msg_id)
224 raise KeyError("Record %r has been culled for size" % msg_id)
226 if not msg_id in self._records:
225 if not msg_id in self._records:
227 raise KeyError("No such msg_id %r"%(msg_id))
226 raise KeyError("No such msg_id %r"%(msg_id))
228 return copy(self._records[msg_id])
227 return deepcopy(self._records[msg_id])
229
228
230 def update_record(self, msg_id, rec):
229 def update_record(self, msg_id, rec):
231 """Update the data in an existing record."""
230 """Update the data in an existing record."""
232 if msg_id in self._culled_ids:
231 if msg_id in self._culled_ids:
233 raise KeyError("Record %r has been culled for size" % msg_id)
232 raise KeyError("Record %r has been culled for size" % msg_id)
234 self._check_dates(rec)
233 self._check_dates(rec)
235 _rec = self._records[msg_id]
234 _rec = self._records[msg_id]
236 self._drop_bytes(_rec)
235 self._drop_bytes(_rec)
237 _rec.update(rec)
236 _rec.update(rec)
238 self._add_bytes(_rec)
237 self._add_bytes(_rec)
239
238
240 def drop_matching_records(self, check):
239 def drop_matching_records(self, check):
241 """Remove a record from the DB."""
240 """Remove a record from the DB."""
242 matches = self._match(check)
241 matches = self._match(check)
243 for rec in matches:
242 for rec in matches:
244 self._drop_bytes(rec)
243 self._drop_bytes(rec)
245 del self._records[rec['msg_id']]
244 del self._records[rec['msg_id']]
246
245
247 def drop_record(self, msg_id):
246 def drop_record(self, msg_id):
248 """Remove a record from the DB."""
247 """Remove a record from the DB."""
249 rec = self._records[msg_id]
248 rec = self._records[msg_id]
250 self._drop_bytes(rec)
249 self._drop_bytes(rec)
251 del self._records[msg_id]
250 del self._records[msg_id]
252
251
253 def find_records(self, check, keys=None):
252 def find_records(self, check, keys=None):
254 """Find records matching a query dict, optionally extracting subset of keys.
253 """Find records matching a query dict, optionally extracting subset of keys.
255
254
256 Returns dict keyed by msg_id of matching records.
255 Returns dict keyed by msg_id of matching records.
257
256
258 Parameters
257 Parameters
259 ----------
258 ----------
260
259
261 check: dict
260 check: dict
262 mongodb-style query argument
261 mongodb-style query argument
263 keys: list of strs [optional]
262 keys: list of strs [optional]
264 if specified, the subset of keys to extract. msg_id will *always* be
263 if specified, the subset of keys to extract. msg_id will *always* be
265 included.
264 included.
266 """
265 """
267 matches = self._match(check)
266 matches = self._match(check)
268 if keys:
267 if keys:
269 return [ self._extract_subdict(rec, keys) for rec in matches ]
268 return [ self._extract_subdict(rec, keys) for rec in matches ]
270 else:
269 else:
271 return matches
270 return matches
272
271
273 def get_history(self):
272 def get_history(self):
274 """get all msg_ids, ordered by time submitted."""
273 """get all msg_ids, ordered by time submitted."""
275 msg_ids = self._records.keys()
274 msg_ids = self._records.keys()
276 # Remove any that do not have a submitted timestamp.
275 # Remove any that do not have a submitted timestamp.
277 # This is extremely unlikely to happen,
276 # This is extremely unlikely to happen,
278 # but it seems to come up in some tests on VMs.
277 # but it seems to come up in some tests on VMs.
279 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
278 msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
280 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
279 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
281
280
282
281
283 class NoData(KeyError):
282 class NoData(KeyError):
284 """Special KeyError to raise when requesting data from NoDB"""
283 """Special KeyError to raise when requesting data from NoDB"""
285 def __str__(self):
284 def __str__(self):
286 return "NoDB backend doesn't store any data. "
285 return "NoDB backend doesn't store any data. "
287 "Start the Controller with a DB backend to enable resubmission / result persistence."
286 "Start the Controller with a DB backend to enable resubmission / result persistence."
288
287
289
288
290 class NoDB(BaseDB):
289 class NoDB(BaseDB):
291 """A blackhole db backend that actually stores no information.
290 """A blackhole db backend that actually stores no information.
292
291
293 Provides the full DB interface, but raises KeyErrors on any
292 Provides the full DB interface, but raises KeyErrors on any
294 method that tries to access the records. This can be used to
293 method that tries to access the records. This can be used to
295 minimize the memory footprint of the Hub when its record-keeping
294 minimize the memory footprint of the Hub when its record-keeping
296 functionality is not required.
295 functionality is not required.
297 """
296 """
298
297
299 def add_record(self, msg_id, record):
298 def add_record(self, msg_id, record):
300 pass
299 pass
301
300
302 def get_record(self, msg_id):
301 def get_record(self, msg_id):
303 raise NoData()
302 raise NoData()
304
303
305 def update_record(self, msg_id, record):
304 def update_record(self, msg_id, record):
306 pass
305 pass
307
306
308 def drop_matching_records(self, check):
307 def drop_matching_records(self, check):
309 pass
308 pass
310
309
311 def drop_record(self, msg_id):
310 def drop_record(self, msg_id):
312 pass
311 pass
313
312
314 def find_records(self, check, keys=None):
313 def find_records(self, check, keys=None):
315 raise NoData()
314 raise NoData()
316
315
317 def get_history(self):
316 def get_history(self):
318 raise NoData()
317 raise NoData()
319
318
General Comments 0
You need to be logged in to leave comments. Login now