dictdb.py
311 lines
| 9.7 KiB
| text/x-python
|
PythonLexer
Bernardo B. Marques
|
r4872 | """A Task logger that presents our DB interface, | ||
MinRK
|
r3579 | but exists entirely in memory and implemented with dicts. | ||
MinRK
|
r4018 | Authors: | ||
* Min RK | ||||
MinRK
|
r3579 | TaskRecords are dicts of the form: | ||
{ | ||||
'msg_id' : str(uuid), | ||||
'client_uuid' : str(uuid), | ||||
'engine_uuid' : str(uuid) or None, | ||||
'header' : dict(header), | ||||
'content': dict(content), | ||||
'buffers': list(buffers), | ||||
'submitted': datetime, | ||||
'started': datetime or None, | ||||
'completed': datetime or None, | ||||
'resubmitted': datetime or None, | ||||
'result_header' : dict(header) or None, | ||||
'result_content' : dict(content) or None, | ||||
'result_buffers' : list(buffers) or None, | ||||
} | ||||
With this info, many of the special categories of tasks can be defined by query: | ||||
pending: completed is None | ||||
client's outstanding: client_uuid = uuid && completed is None | ||||
MIA: arrived is None (and completed is None) | ||||
etc. | ||||
EngineRecords are dicts of the form: | ||||
{ | ||||
'eid' : int(id), | ||||
'uuid': str(uuid) | ||||
} | ||||
This may be extended, but is currently. | ||||
We support a subset of mongodb operators: | ||||
$lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r4018 | # Copyright (C) 2010-2011 The IPython Development Team | ||
MinRK
|
r3579 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r7261 | from copy import deepcopy as copy | ||
MinRK
|
r3579 | from datetime import datetime | ||
MinRK
|
r4016 | from IPython.config.configurable import LoggingConfigurable | ||
MinRK
|
r3646 | |||
MinRK
|
r7533 | from IPython.utils.traitlets import Dict, Unicode, Integer, Float | ||
MinRK
|
r3646 | |||
MinRK
|
r3579 | filters = { | ||
'$lt' : lambda a,b: a < b, | ||||
'$gt' : lambda a,b: b > a, | ||||
MinRK
|
r3644 | '$eq' : lambda a,b: a == b, | ||
'$ne' : lambda a,b: a != b, | ||||
MinRK
|
r3579 | '$lte': lambda a,b: a <= b, | ||
'$gte': lambda a,b: a >= b, | ||||
'$in' : lambda a,b: a in b, | ||||
'$nin': lambda a,b: a not in b, | ||||
MinRK
|
r3644 | '$all': lambda a,b: all([ a in bb for bb in b ]), | ||
MinRK
|
r3579 | '$mod': lambda a,b: a%b[0] == b[1], | ||
'$exists' : lambda a,b: (b and a is not None) or (a is None and not b) | ||||
} | ||||
class CompositeFilter(object): | ||||
"""Composite filter for matching multiple properties.""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3579 | def __init__(self, dikt): | ||
self.tests = [] | ||||
self.values = [] | ||||
for key, value in dikt.iteritems(): | ||||
self.tests.append(filters[key]) | ||||
self.values.append(value) | ||||
def __call__(self, value): | ||||
for test,check in zip(self.tests, self.values): | ||||
if not test(value, check): | ||||
return False | ||||
return True | ||||
MinRK
|
r4016 | class BaseDB(LoggingConfigurable): | ||
MinRK
|
r3604 | """Empty Parent class so traitlets work on DB.""" | ||
MinRK
|
r3646 | # base configurable traits: | ||
MinRK
|
r3988 | session = Unicode("") | ||
MinRK
|
r3604 | |||
class DictDB(BaseDB): | ||||
MinRK
|
r3579 | """Basic in-memory dict-based object for saving Task Records. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3579 | This is the first object to present the DB interface | ||
for logging tasks out of memory. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3579 | The interface is based on MongoDB, so adding a MongoDB | ||
backend should be straightforward. | ||||
""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3646 | _records = Dict() | ||
MinRK
|
r7533 | _culled_ids = set() # set of ids which have been culled | ||
_buffer_bytes = Integer(0) # running total of the bytes in the DB | ||||
MinRK
|
r9865 | size_limit = Integer(1024**3, config=True, | ||
MinRK
|
r7533 | help="""The maximum total size (in bytes) of the buffers stored in the db | ||
When the db exceeds this size, the oldest records will be culled until | ||||
the total size is under size_limit * (1-cull_fraction). | ||||
MinRK
|
r9865 | default: 1 GB | ||
MinRK
|
r7533 | """ | ||
) | ||||
record_limit = Integer(1024, config=True, | ||||
help="""The maximum number of records in the db | ||||
When the history exceeds this size, the first record_limit * cull_fraction | ||||
records will be culled. | ||||
""" | ||||
) | ||||
cull_fraction = Float(0.1, config=True, | ||||
help="""The fraction by which the db should culled when one of the limits is exceeded | ||||
In general, the db size will spend most of its time with a size in the range: | ||||
[limit * (1-cull_fraction), limit] | ||||
for each of size_limit and record_limit. | ||||
""" | ||||
) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3579 | def _match_one(self, rec, tests): | ||
"""Check if a specific record matches tests.""" | ||||
for key,test in tests.iteritems(): | ||||
if not test(rec.get(key, None)): | ||||
return False | ||||
return True | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | def _match(self, check): | ||
MinRK
|
r3579 | """Find all the matches for a check dict.""" | ||
MinRK
|
r3780 | matches = [] | ||
MinRK
|
r3579 | tests = {} | ||
for k,v in check.iteritems(): | ||||
if isinstance(v, dict): | ||||
tests[k] = CompositeFilter(v) | ||||
else: | ||||
tests[k] = lambda o: o==v | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | for rec in self._records.itervalues(): | ||
MinRK
|
r3579 | if self._match_one(rec, tests): | ||
MinRK
|
r6323 | matches.append(copy(rec)) | ||
MinRK
|
r3780 | return matches | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | def _extract_subdict(self, rec, keys): | ||
"""extract subdict of keys""" | ||||
d = {} | ||||
d['msg_id'] = rec['msg_id'] | ||||
for key in keys: | ||||
d[key] = rec[key] | ||||
MinRK
|
r7261 | return copy(d) | ||
MinRK
|
r7533 | |||
# methods for monitoring size / culling history | ||||
def _add_bytes(self, rec): | ||||
for key in ('buffers', 'result_buffers'): | ||||
for buf in rec.get(key) or []: | ||||
self._buffer_bytes += len(buf) | ||||
self._maybe_cull() | ||||
def _drop_bytes(self, rec): | ||||
for key in ('buffers', 'result_buffers'): | ||||
for buf in rec.get(key) or []: | ||||
self._buffer_bytes -= len(buf) | ||||
def _cull_oldest(self, n=1): | ||||
"""cull the oldest N records""" | ||||
for msg_id in self.get_history()[:n]: | ||||
self.log.debug("Culling record: %r", msg_id) | ||||
self._culled_ids.add(msg_id) | ||||
self.drop_record(msg_id) | ||||
def _maybe_cull(self): | ||||
# cull by count: | ||||
if len(self._records) > self.record_limit: | ||||
to_cull = int(self.cull_fraction * self.record_limit) | ||||
self.log.info("%i records exceeds limit of %i, culling oldest %i", | ||||
len(self._records), self.record_limit, to_cull | ||||
) | ||||
self._cull_oldest(to_cull) | ||||
# cull by size: | ||||
if self._buffer_bytes > self.size_limit: | ||||
limit = self.size_limit * (1 - self.cull_fraction) | ||||
before = self._buffer_bytes | ||||
before_count = len(self._records) | ||||
culled = 0 | ||||
while self._buffer_bytes > limit: | ||||
self._cull_oldest(1) | ||||
culled += 1 | ||||
self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.", | ||||
before_count, before, self.size_limit, culled | ||||
) | ||||
# public API methods: | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3579 | def add_record(self, msg_id, rec): | ||
"""Add a new Task Record, by msg_id.""" | ||||
Bradley M. Froehle
|
r7859 | if msg_id in self._records: | ||
MinRK
|
r3579 | raise KeyError("Already have msg_id %r"%(msg_id)) | ||
self._records[msg_id] = rec | ||||
MinRK
|
r7533 | self._add_bytes(rec) | ||
self._maybe_cull() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3579 | def get_record(self, msg_id): | ||
"""Get a specific Task Record, by msg_id.""" | ||||
MinRK
|
r7533 | if msg_id in self._culled_ids: | ||
raise KeyError("Record %r has been culled for size" % msg_id) | ||||
MinRK
|
r6323 | if not msg_id in self._records: | ||
MinRK
|
r3579 | raise KeyError("No such msg_id %r"%(msg_id)) | ||
MinRK
|
r6323 | return copy(self._records[msg_id]) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3579 | def update_record(self, msg_id, rec): | ||
"""Update the data in an existing record.""" | ||||
MinRK
|
r7533 | if msg_id in self._culled_ids: | ||
raise KeyError("Record %r has been culled for size" % msg_id) | ||||
_rec = self._records[msg_id] | ||||
self._drop_bytes(_rec) | ||||
_rec.update(rec) | ||||
self._add_bytes(_rec) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3579 | def drop_matching_records(self, check): | ||
"""Remove a record from the DB.""" | ||||
MinRK
|
r3780 | matches = self._match(check) | ||
MinRK
|
r7533 | for rec in matches: | ||
self._drop_bytes(rec) | ||||
del self._records[rec['msg_id']] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3579 | def drop_record(self, msg_id): | ||
"""Remove a record from the DB.""" | ||||
MinRK
|
r7533 | rec = self._records[msg_id] | ||
self._drop_bytes(rec) | ||||
MinRK
|
r3579 | del self._records[msg_id] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | def find_records(self, check, keys=None): | ||
"""Find records matching a query dict, optionally extracting subset of keys. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Returns dict keyed by msg_id of matching records. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | check: dict | ||
mongodb-style query argument | ||||
keys: list of strs [optional] | ||||
if specified, the subset of keys to extract. msg_id will *always* be | ||||
included. | ||||
""" | ||||
matches = self._match(check) | ||||
if keys: | ||||
return [ self._extract_subdict(rec, keys) for rec in matches ] | ||||
else: | ||||
return matches | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | def get_history(self): | ||
"""get all msg_ids, ordered by time submitted.""" | ||||
Thomas Kluyver
|
r9236 | msg_ids = self._records.keys() | ||
MinRK
|
r9223 | # Remove any that do not have a submitted timestamp. | ||
# This is extremely unlikely to happen, | ||||
# but it seems to come up in some tests on VMs. | ||||
msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ] | ||||
MinRK
|
r3780 | return sorted(msg_ids, key=lambda m: self._records[m]['submitted']) | ||
MinRK
|
r5892 | |||
MinRK
|
r7533 | |||
MinRK
|
r7509 | NODATA = KeyError("NoDB backend doesn't store any data. " | ||
"Start the Controller with a DB backend to enable resubmission / result persistence." | ||||
) | ||||
MinRK
|
r7533 | |||
class NoDB(BaseDB): | ||||
MinRK
|
r5892 | """A blackhole db backend that actually stores no information. | ||
Provides the full DB interface, but raises KeyErrors on any | ||||
method that tries to access the records. This can be used to | ||||
minimize the memory footprint of the Hub when its record-keeping | ||||
functionality is not required. | ||||
""" | ||||
def add_record(self, msg_id, record): | ||||
pass | ||||
def get_record(self, msg_id): | ||||
MinRK
|
r7509 | raise NODATA | ||
MinRK
|
r5892 | |||
def update_record(self, msg_id, record): | ||||
pass | ||||
def drop_matching_records(self, check): | ||||
pass | ||||
def drop_record(self, msg_id): | ||||
pass | ||||
def find_records(self, check, keys=None): | ||||
MinRK
|
r7509 | raise NODATA | ||
MinRK
|
r5892 | |||
def get_history(self): | ||||
MinRK
|
r7509 | raise NODATA | ||
MinRK
|
r5892 | |||