##// END OF EJS Templates
add db,resubmit/retries docs
add db,resubmit/retries docs

File last commit:

r3875:ffe043d4
r3876:4bb2eb4d
Show More
dictdb.py
180 lines | 5.5 KiB | text/x-python | PythonLexer
MinRK
Started DB backend with mongoDB support.
r3579 """A Task logger that presents our DB interface,
but exists entirely in memory and implemented with dicts.
TaskRecords are dicts of the form:
{
'msg_id' : str(uuid),
'client_uuid' : str(uuid),
'engine_uuid' : str(uuid) or None,
'header' : dict(header),
'content': dict(content),
'buffers': list(buffers),
'submitted': datetime,
'started': datetime or None,
'completed': datetime or None,
'resubmitted': datetime or None,
'result_header' : dict(header) or None,
'result_content' : dict(content) or None,
'result_buffers' : list(buffers) or None,
}
With this info, many of the special categories of tasks can be defined by query:
pending: completed is None
client's outstanding: client_uuid = uuid && completed is None
MIA: arrived is None (and completed is None)
etc.
EngineRecords are dicts of the form:
{
'eid' : int(id),
'uuid': str(uuid)
}
This may be extended, but is currently.
We support a subset of mongodb operators:
$lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
from datetime import datetime
MinRK
Add SQLite backend, DB backends are Configurable...
r3646 from IPython.config.configurable import Configurable
from IPython.utils.traitlets import Dict, CUnicode
MinRK
Started DB backend with mongoDB support.
r3579 filters = {
'$lt' : lambda a,b: a < b,
'$gt' : lambda a,b: b > a,
MinRK
cleanup pass
r3644 '$eq' : lambda a,b: a == b,
'$ne' : lambda a,b: a != b,
MinRK
Started DB backend with mongoDB support.
r3579 '$lte': lambda a,b: a <= b,
'$gte': lambda a,b: a >= b,
'$in' : lambda a,b: a in b,
'$nin': lambda a,b: a not in b,
MinRK
cleanup pass
r3644 '$all': lambda a,b: all([ a in bb for bb in b ]),
MinRK
Started DB backend with mongoDB support.
r3579 '$mod': lambda a,b: a%b[0] == b[1],
'$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
}
class CompositeFilter(object):
"""Composite filter for matching multiple properties."""
def __init__(self, dikt):
self.tests = []
self.values = []
for key, value in dikt.iteritems():
self.tests.append(filters[key])
self.values.append(value)
def __call__(self, value):
for test,check in zip(self.tests, self.values):
if not test(value, check):
return False
return True
MinRK
Add SQLite backend, DB backends are Configurable...
r3646 class BaseDB(Configurable):
MinRK
Refactor newparallel to use Config system...
r3604 """Empty Parent class so traitlets work on DB."""
MinRK
Add SQLite backend, DB backends are Configurable...
r3646 # base configurable traits:
session = CUnicode("")
MinRK
Refactor newparallel to use Config system...
r3604
class DictDB(BaseDB):
MinRK
Started DB backend with mongoDB support.
r3579 """Basic in-memory dict-based object for saving Task Records.
This is the first object to present the DB interface
for logging tasks out of memory.
The interface is based on MongoDB, so adding a MongoDB
backend should be straightforward.
"""
MinRK
Add SQLite backend, DB backends are Configurable...
r3646 _records = Dict()
MinRK
Started DB backend with mongoDB support.
r3579
def _match_one(self, rec, tests):
"""Check if a specific record matches tests."""
for key,test in tests.iteritems():
if not test(rec.get(key, None)):
return False
return True
MinRK
General improvements to database backend...
r3780 def _match(self, check):
MinRK
Started DB backend with mongoDB support.
r3579 """Find all the matches for a check dict."""
MinRK
General improvements to database backend...
r3780 matches = []
MinRK
Started DB backend with mongoDB support.
r3579 tests = {}
for k,v in check.iteritems():
if isinstance(v, dict):
tests[k] = CompositeFilter(v)
else:
tests[k] = lambda o: o==v
MinRK
General improvements to database backend...
r3780 for rec in self._records.itervalues():
MinRK
Started DB backend with mongoDB support.
r3579 if self._match_one(rec, tests):
MinRK
General improvements to database backend...
r3780 matches.append(rec)
return matches
def _extract_subdict(self, rec, keys):
"""extract subdict of keys"""
d = {}
d['msg_id'] = rec['msg_id']
for key in keys:
d[key] = rec[key]
return d
MinRK
Started DB backend with mongoDB support.
r3579
def add_record(self, msg_id, rec):
"""Add a new Task Record, by msg_id."""
if self._records.has_key(msg_id):
raise KeyError("Already have msg_id %r"%(msg_id))
self._records[msg_id] = rec
def get_record(self, msg_id):
"""Get a specific Task Record, by msg_id."""
if not self._records.has_key(msg_id):
raise KeyError("No such msg_id %r"%(msg_id))
return self._records[msg_id]
def update_record(self, msg_id, rec):
"""Update the data in an existing record."""
self._records[msg_id].update(rec)
def drop_matching_records(self, check):
"""Remove a record from the DB."""
MinRK
General improvements to database backend...
r3780 matches = self._match(check)
MinRK
Started DB backend with mongoDB support.
r3579 for m in matches:
MinRK
various db backend fixes...
r3875 del self._records[m['msg_id']]
MinRK
Started DB backend with mongoDB support.
r3579
def drop_record(self, msg_id):
"""Remove a record from the DB."""
del self._records[msg_id]
MinRK
General improvements to database backend...
r3780 def find_records(self, check, keys=None):
"""Find records matching a query dict, optionally extracting subset of keys.
Returns dict keyed by msg_id of matching records.
Parameters
----------
check: dict
mongodb-style query argument
keys: list of strs [optional]
if specified, the subset of keys to extract. msg_id will *always* be
included.
"""
matches = self._match(check)
if keys:
return [ self._extract_subdict(rec, keys) for rec in matches ]
else:
return matches
def get_history(self):
"""get all msg_ids, ordered by time submitted."""
msg_ids = self._records.keys()
return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])