mongodb.py
122 lines
| 4.2 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """A TaskRecord backend using mongodb | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3579 | #----------------------------------------------------------------------------- | ||
MinRK
|
r4018 | # Copyright (C) 2010-2011 The IPython Development Team | ||
MinRK
|
r3579 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
from pymongo import Connection | ||||
MinRK
|
r12179 | |||
# bson.Binary import moved | ||||
try: | ||||
from bson.binary import Binary | ||||
except ImportError: | ||||
from bson import Binary | ||||
MinRK
|
r3646 | |||
MinRK
|
r3988 | from IPython.utils.traitlets import Dict, List, Unicode, Instance | ||
MinRK
|
r3579 | |||
MinRK
|
r3642 | from .dictdb import BaseDB | ||
MinRK
|
r3631 | |||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3579 | # MongoDB class | ||
MinRK
|
r3631 | #----------------------------------------------------------------------------- | ||
MinRK
|
r3604 | class MongoDB(BaseDB): | ||
MinRK
|
r3579 | """MongoDB TaskRecord backend.""" | ||
MinRK
|
r3646 | |||
MinRK
|
r3985 | connection_args = List(config=True, | ||
help="""Positional arguments to be passed to pymongo.Connection. Only | ||||
necessary if the default mongodb configuration does not point to your | ||||
mongod instance.""") | ||||
connection_kwargs = Dict(config=True, | ||||
help="""Keyword arguments to be passed to pymongo.Connection. Only | ||||
necessary if the default mongodb configuration does not point to your | ||||
mongod instance.""" | ||||
) | ||||
MinRK
|
r3988 | database = Unicode(config=True, | ||
MinRK
|
r3985 | help="""The MongoDB database name to use for storing tasks for this session. If unspecified, | ||
a new database will be created with the Hub's IDENT. Specifying the database will result | ||||
in tasks from previous sessions being available via Clients' db_query and | ||||
get_result methods.""") | ||||
MinRK
|
r3875 | |||
_connection = Instance(Connection) # pymongo connection | ||||
MinRK
|
r3646 | |||
def __init__(self, **kwargs): | ||||
super(MongoDB, self).__init__(**kwargs) | ||||
MinRK
|
r3875 | if self._connection is None: | ||
self._connection = Connection(*self.connection_args, **self.connection_kwargs) | ||||
MinRK
|
r3646 | if not self.database: | ||
self.database = self.session | ||||
self._db = self._connection[self.database] | ||||
MinRK
|
r3579 | self._records = self._db['task_records'] | ||
MinRK
|
r3875 | self._records.ensure_index('msg_id', unique=True) | ||
self._records.ensure_index('submitted') # for sorting history | ||||
# for rec in self._records.find | ||||
MinRK
|
r3646 | |||
def _binary_buffers(self, rec): | ||||
for key in ('buffers', 'result_buffers'): | ||||
MinRK
|
r3780 | if rec.get(key, None): | ||
MinRK
|
r3646 | rec[key] = map(Binary, rec[key]) | ||
MinRK
|
r3780 | return rec | ||
MinRK
|
r3579 | |||
def add_record(self, msg_id, rec): | ||||
"""Add a new Task Record, by msg_id.""" | ||||
# print rec | ||||
MinRK
|
r3780 | rec = self._binary_buffers(rec) | ||
MinRK
|
r3875 | self._records.insert(rec) | ||
MinRK
|
r3579 | |||
def get_record(self, msg_id): | ||||
"""Get a specific Task Record, by msg_id.""" | ||||
MinRK
|
r3875 | r = self._records.find_one({'msg_id': msg_id}) | ||
if not r: | ||||
# r will be '' if nothing is found | ||||
raise KeyError(msg_id) | ||||
return r | ||||
MinRK
|
r3579 | |||
def update_record(self, msg_id, rec): | ||||
"""Update the data in an existing record.""" | ||||
MinRK
|
r3780 | rec = self._binary_buffers(rec) | ||
MinRK
|
r3875 | |||
self._records.update({'msg_id':msg_id}, {'$set': rec}) | ||||
MinRK
|
r3579 | |||
def drop_matching_records(self, check): | ||||
"""Remove a record from the DB.""" | ||||
self._records.remove(check) | ||||
def drop_record(self, msg_id): | ||||
"""Remove a record from the DB.""" | ||||
MinRK
|
r3875 | self._records.remove({'msg_id':msg_id}) | ||
MinRK
|
r3579 | |||
MinRK
|
r3780 | def find_records(self, check, keys=None): | ||
"""Find records matching a query dict, optionally extracting subset of keys. | ||||
Returns list of matching records. | ||||
Parameters | ||||
---------- | ||||
check: dict | ||||
mongodb-style query argument | ||||
keys: list of strs [optional] | ||||
if specified, the subset of keys to extract. msg_id will *always* be | ||||
included. | ||||
""" | ||||
if keys and 'msg_id' not in keys: | ||||
keys.append('msg_id') | ||||
matches = list(self._records.find(check,keys)) | ||||
for rec in matches: | ||||
rec.pop('_id') | ||||
return matches | ||||
def get_history(self): | ||||
"""get all msg_ids, ordered by time submitted.""" | ||||
cursor = self._records.find({},{'msg_id':1}).sort('submitted') | ||||
return [ rec['msg_id'] for rec in cursor ] | ||||
MinRK
|
r3579 | |||