##// END OF EJS Templates
shim out IPython.parallel into ipython_parallel
shim out IPython.parallel into ipython_parallel

File last commit:

r20858:330bfc56
r20858:330bfc56
Show More
dictdb.py
318 lines | 10.1 KiB | text/x-python | PythonLexer
Bernardo B. Marques
remove all trailling spaces
r4872 """A Task logger that presents our DB interface,
MinRK
Started DB backend with mongoDB support.
r3579 but exists entirely in memory and implemented with dicts.
MinRK
update recently changed modules with Authors in docstring
r4018
Thomas Kluyver
Various docs fixes
r13595 TaskRecords are dicts of the form::
{
'msg_id' : str(uuid),
'client_uuid' : str(uuid),
'engine_uuid' : str(uuid) or None,
'header' : dict(header),
'content': dict(content),
'buffers': list(buffers),
MinRK
check type of date fields in DictDB
r15302 'submitted': datetime or None,
Thomas Kluyver
Various docs fixes
r13595 'started': datetime or None,
'completed': datetime or None,
MinRK
check type of date fields in DictDB
r15302 'received': datetime or None,
'resubmitted': str(uuid) or None,
Thomas Kluyver
Various docs fixes
r13595 'result_header' : dict(header) or None,
'result_content' : dict(content) or None,
'result_buffers' : list(buffers) or None,
}
With this info, many of the special categories of tasks can be defined by query,
e.g.:
* pending: completed is None
* client's outstanding: client_uuid = uuid && completed is None
* MIA: arrived is None (and completed is None)
MinRK
check type of date fields in DictDB
r15302 DictDB supports a subset of mongodb operators::
Thomas Kluyver
Various docs fixes
r13595
MinRK
Started DB backend with mongoDB support.
r3579 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
"""
Min RK
define deepcopy for memoryview...
r20774
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import copy
from copy import deepcopy
# Python can't copy memoryviews, but creating another memoryview works for us
copy._deepcopy_dispatch[memoryview] = lambda x, memo: memoryview(x)
MinRK
Started DB backend with mongoDB support.
r3579 from datetime import datetime
MinRK
add LoggingConfigurable base class
r4016 from IPython.config.configurable import LoggingConfigurable
MinRK
Add SQLite backend, DB backends are Configurable...
r3646
Thomas Kluyver
Fix references to dict.iteritems and dict.itervalues
r13361 from IPython.utils.py3compat import iteritems, itervalues
MinRK
add size-limiting to the DictDB backend
r7533 from IPython.utils.traitlets import Dict, Unicode, Integer, Float
MinRK
Add SQLite backend, DB backends are Configurable...
r3646
MinRK
Started DB backend with mongoDB support.
r3579 filters = {
'$lt' : lambda a,b: a < b,
'$gt' : lambda a,b: b > a,
MinRK
cleanup pass
r3644 '$eq' : lambda a,b: a == b,
'$ne' : lambda a,b: a != b,
MinRK
Started DB backend with mongoDB support.
r3579 '$lte': lambda a,b: a <= b,
'$gte': lambda a,b: a >= b,
'$in' : lambda a,b: a in b,
'$nin': lambda a,b: a not in b,
MinRK
cleanup pass
r3644 '$all': lambda a,b: all([ a in bb for bb in b ]),
MinRK
Started DB backend with mongoDB support.
r3579 '$mod': lambda a,b: a%b[0] == b[1],
'$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
}
class CompositeFilter(object):
"""Composite filter for matching multiple properties."""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Started DB backend with mongoDB support.
r3579 def __init__(self, dikt):
self.tests = []
self.values = []
Thomas Kluyver
Fix references to dict.iteritems and dict.itervalues
r13361 for key, value in iteritems(dikt):
MinRK
Started DB backend with mongoDB support.
r3579 self.tests.append(filters[key])
self.values.append(value)
def __call__(self, value):
for test,check in zip(self.tests, self.values):
if not test(value, check):
return False
return True
MinRK
add LoggingConfigurable base class
r4016 class BaseDB(LoggingConfigurable):
MinRK
Refactor newparallel to use Config system...
r3604 """Empty Parent class so traitlets work on DB."""
MinRK
Add SQLite backend, DB backends are Configurable...
r3646 # base configurable traits:
MinRK
cleanup parallel traits...
r3988 session = Unicode("")
MinRK
Refactor newparallel to use Config system...
r3604
class DictDB(BaseDB):
MinRK
Started DB backend with mongoDB support.
r3579 """Basic in-memory dict-based object for saving Task Records.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Started DB backend with mongoDB support.
r3579 This is the first object to present the DB interface
for logging tasks out of memory.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Started DB backend with mongoDB support.
r3579 The interface is based on MongoDB, so adding a MongoDB
backend should be straightforward.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Add SQLite backend, DB backends are Configurable...
r3646 _records = Dict()
MinRK
add size-limiting to the DictDB backend
r7533 _culled_ids = set() # set of ids which have been culled
_buffer_bytes = Integer(0) # running total of the bytes in the DB
MinRK
fix typo in DictDB.size_limit...
r9865 size_limit = Integer(1024**3, config=True,
MinRK
add size-limiting to the DictDB backend
r7533 help="""The maximum total size (in bytes) of the buffers stored in the db
When the db exceeds this size, the oldest records will be culled until
the total size is under size_limit * (1-cull_fraction).
MinRK
fix typo in DictDB.size_limit...
r9865 default: 1 GB
MinRK
add size-limiting to the DictDB backend
r7533 """
)
record_limit = Integer(1024, config=True,
help="""The maximum number of records in the db
When the history exceeds this size, the first record_limit * cull_fraction
records will be culled.
"""
)
cull_fraction = Float(0.1, config=True,
help="""The fraction by which the db should culled when one of the limits is exceeded
In general, the db size will spend most of its time with a size in the range:
[limit * (1-cull_fraction), limit]
for each of size_limit and record_limit.
"""
)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Started DB backend with mongoDB support.
r3579 def _match_one(self, rec, tests):
"""Check if a specific record matches tests."""
Thomas Kluyver
Fix references to dict.iteritems and dict.itervalues
r13361 for key,test in iteritems(tests):
MinRK
Started DB backend with mongoDB support.
r3579 if not test(rec.get(key, None)):
return False
return True
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 def _match(self, check):
MinRK
Started DB backend with mongoDB support.
r3579 """Find all the matches for a check dict."""
MinRK
General improvements to database backend...
r3780 matches = []
MinRK
Started DB backend with mongoDB support.
r3579 tests = {}
Thomas Kluyver
Fix references to dict.iteritems and dict.itervalues
r13361 for k,v in iteritems(check):
MinRK
Started DB backend with mongoDB support.
r3579 if isinstance(v, dict):
tests[k] = CompositeFilter(v)
else:
tests[k] = lambda o: o==v
Bernardo B. Marques
remove all trailling spaces
r4872
Thomas Kluyver
Fix references to dict.iteritems and dict.itervalues
r13361 for rec in itervalues(self._records):
MinRK
Started DB backend with mongoDB support.
r3579 if self._match_one(rec, tests):
Min RK
define deepcopy for memoryview...
r20774 matches.append(deepcopy(rec))
MinRK
General improvements to database backend...
r3780 return matches
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 def _extract_subdict(self, rec, keys):
"""extract subdict of keys"""
d = {}
d['msg_id'] = rec['msg_id']
for key in keys:
d[key] = rec[key]
Min RK
define deepcopy for memoryview...
r20774 return deepcopy(d)
MinRK
add size-limiting to the DictDB backend
r7533
# methods for monitoring size / culling history
def _add_bytes(self, rec):
for key in ('buffers', 'result_buffers'):
for buf in rec.get(key) or []:
self._buffer_bytes += len(buf)
self._maybe_cull()
def _drop_bytes(self, rec):
for key in ('buffers', 'result_buffers'):
for buf in rec.get(key) or []:
self._buffer_bytes -= len(buf)
def _cull_oldest(self, n=1):
"""cull the oldest N records"""
for msg_id in self.get_history()[:n]:
self.log.debug("Culling record: %r", msg_id)
self._culled_ids.add(msg_id)
self.drop_record(msg_id)
def _maybe_cull(self):
# cull by count:
if len(self._records) > self.record_limit:
to_cull = int(self.cull_fraction * self.record_limit)
self.log.info("%i records exceeds limit of %i, culling oldest %i",
len(self._records), self.record_limit, to_cull
)
self._cull_oldest(to_cull)
# cull by size:
if self._buffer_bytes > self.size_limit:
limit = self.size_limit * (1 - self.cull_fraction)
before = self._buffer_bytes
before_count = len(self._records)
culled = 0
while self._buffer_bytes > limit:
self._cull_oldest(1)
culled += 1
self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
before_count, before, self.size_limit, culled
)
MinRK
check type of date fields in DictDB
r15302 def _check_dates(self, rec):
for key in ('submitted', 'started', 'completed'):
value = rec.get(key, None)
if value is not None and not isinstance(value, datetime):
raise ValueError("%s must be None or datetime, not %r" % (key, value))
MinRK
add size-limiting to the DictDB backend
r7533 # public API methods:
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Started DB backend with mongoDB support.
r3579 def add_record(self, msg_id, rec):
"""Add a new Task Record, by msg_id."""
Bradley M. Froehle
2to3: Apply has_key fixer.
r7859 if msg_id in self._records:
MinRK
Started DB backend with mongoDB support.
r3579 raise KeyError("Already have msg_id %r"%(msg_id))
MinRK
check type of date fields in DictDB
r15302 self._check_dates(rec)
MinRK
Started DB backend with mongoDB support.
r3579 self._records[msg_id] = rec
MinRK
add size-limiting to the DictDB backend
r7533 self._add_bytes(rec)
self._maybe_cull()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Started DB backend with mongoDB support.
r3579 def get_record(self, msg_id):
"""Get a specific Task Record, by msg_id."""
MinRK
add size-limiting to the DictDB backend
r7533 if msg_id in self._culled_ids:
raise KeyError("Record %r has been culled for size" % msg_id)
MinRK
dictdb queries should [shallow] copy records...
r6323 if not msg_id in self._records:
MinRK
Started DB backend with mongoDB support.
r3579 raise KeyError("No such msg_id %r"%(msg_id))
Min RK
define deepcopy for memoryview...
r20774 return deepcopy(self._records[msg_id])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Started DB backend with mongoDB support.
r3579 def update_record(self, msg_id, rec):
"""Update the data in an existing record."""
MinRK
add size-limiting to the DictDB backend
r7533 if msg_id in self._culled_ids:
raise KeyError("Record %r has been culled for size" % msg_id)
MinRK
check type of date fields in DictDB
r15302 self._check_dates(rec)
MinRK
add size-limiting to the DictDB backend
r7533 _rec = self._records[msg_id]
self._drop_bytes(_rec)
_rec.update(rec)
self._add_bytes(_rec)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Started DB backend with mongoDB support.
r3579 def drop_matching_records(self, check):
"""Remove a record from the DB."""
MinRK
General improvements to database backend...
r3780 matches = self._match(check)
MinRK
add size-limiting to the DictDB backend
r7533 for rec in matches:
self._drop_bytes(rec)
del self._records[rec['msg_id']]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Started DB backend with mongoDB support.
r3579 def drop_record(self, msg_id):
"""Remove a record from the DB."""
MinRK
add size-limiting to the DictDB backend
r7533 rec = self._records[msg_id]
self._drop_bytes(rec)
MinRK
Started DB backend with mongoDB support.
r3579 del self._records[msg_id]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 def find_records(self, check, keys=None):
"""Find records matching a query dict, optionally extracting subset of keys.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 Returns dict keyed by msg_id of matching records.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 Parameters
----------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 check: dict
mongodb-style query argument
keys: list of strs [optional]
if specified, the subset of keys to extract. msg_id will *always* be
included.
"""
matches = self._match(check)
if keys:
return [ self._extract_subdict(rec, keys) for rec in matches ]
else:
return matches
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 def get_history(self):
"""get all msg_ids, ordered by time submitted."""
Thomas Kluyver
Fix SyntaxError
r9236 msg_ids = self._records.keys()
MinRK
avoid comparison error in dictdb hub history...
r9223 # Remove any that do not have a submitted timestamp.
# This is extremely unlikely to happen,
# but it seems to come up in some tests on VMs.
msg_ids = [ m for m in msg_ids if self._records[m]['submitted'] is not None ]
MinRK
General improvements to database backend...
r3780 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
MinRK
add NoDB for non-recording Hub...
r5892
MinRK
add size-limiting to the DictDB backend
r7533
Min RK
don't re-raise the same Exception in NoDB...
r20673 class NoData(KeyError):
"""Special KeyError to raise when requesting data from NoDB"""
def __str__(self):
return "NoDB backend doesn't store any data. "
"Start the Controller with a DB backend to enable resubmission / result persistence."
MinRK
Use NoDB by default...
r7509
MinRK
add size-limiting to the DictDB backend
r7533
class NoDB(BaseDB):
MinRK
add NoDB for non-recording Hub...
r5892 """A blackhole db backend that actually stores no information.
Provides the full DB interface, but raises KeyErrors on any
method that tries to access the records. This can be used to
minimize the memory footprint of the Hub when its record-keeping
functionality is not required.
"""
def add_record(self, msg_id, record):
pass
def get_record(self, msg_id):
Min RK
don't re-raise the same Exception in NoDB...
r20673 raise NoData()
MinRK
add NoDB for non-recording Hub...
r5892
def update_record(self, msg_id, record):
pass
def drop_matching_records(self, check):
pass
def drop_record(self, msg_id):
pass
def find_records(self, check, keys=None):
Min RK
don't re-raise the same Exception in NoDB...
r20673 raise NoData()
MinRK
add NoDB for non-recording Hub...
r5892
def get_history(self):
Min RK
don't re-raise the same Exception in NoDB...
r20673 raise NoData()
MinRK
add NoDB for non-recording Hub...
r5892