diff --git a/IPython/config/default/ipcontrollerz_config.py b/IPython/config/default/ipcontrollerz_config.py index 58a10a8..3cf437b 100644 --- a/IPython/config/default/ipcontrollerz_config.py +++ b/IPython/config/default/ipcontrollerz_config.py @@ -134,3 +134,47 @@ c = get_config() # set the engine heartbeat ports to use: # c.HubFactory.hb = (10303,10313) +#----------------------------------------------------------------------------- +# Configure the TaskRecord database backend +#----------------------------------------------------------------------------- + +# For memory/persistance reasons, tasks can be stored out-of-memory in a database. +# Currently, only sqlite and mongodb are supported as backends, but the interface +# is fairly simple, so advanced developers could write their own backend. + +# ----- in-memory configuration -------- +# this line restores the default behavior: in-memory storage of all results. +# c.HubFactory.db_class = 'IPython.zmq.parallel.dictdb.DictDB' + +# ----- sqlite configuration -------- +# use this line to activate sqlite: +# c.HubFactory.db_class = 'IPython.zmq.parallel.sqlitedb.SQLiteDB' + +# You can specify the name of the db-file. By default, this will be located +# in the active cluster_dir, e.g. ~/.ipython/clusterz_default/tasks.db +# c.SQLiteDB.filename = 'tasks.db' + +# You can also specify the location of the db-file, if you want it to be somewhere +# other than the cluster_dir. +# c.SQLiteDB.location = '/scratch/' + +# This will specify the name of the table for the controller to use. The default +# behavior is to use the session ID of the SessionFactory object (a uuid). Overriding +# this will result in results persisting for multiple sessions. +# c.SQLiteDB.table = 'results' + +# ----- mongodb configuration -------- +# use this line to activate mongodb: +# c.HubFactory.db_class = 'IPython.zmq.parallel.mongodb.MongoDB' + +# You can specify the args and kwargs pymongo will use when creating the Connection. +# For more information on what these options might be, see pymongo documentation. +# c.MongoDB.connection_kwargs = {} +# c.MongoDB.connection_args = [] + +# This will specify the name of the mongo database for the controller to use. The default +# behavior is to use the session ID of the SessionFactory object (a uuid). Overriding +# this will result in task results persisting through multiple sessions. +# c.MongoDB.database = 'ipythondb' + + diff --git a/IPython/utils/newserialized.py b/IPython/utils/newserialized.py index 689c507..b75430e 100644 --- a/IPython/utils/newserialized.py +++ b/IPython/utils/newserialized.py @@ -146,7 +146,7 @@ class UnSerializeIt(UnSerialized): typeDescriptor = self.serialized.getTypeDescriptor() if globals().has_key('numpy') and typeDescriptor == 'ndarray': buf = self.serialized.getData() - if isinstance(buf, buffer): + if isinstance(buf, (buffer,bytes)): result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype']) else: # memoryview diff --git a/IPython/zmq/parallel/dictdb.py b/IPython/zmq/parallel/dictdb.py index cf13975..9b7e48d 100644 --- a/IPython/zmq/parallel/dictdb.py +++ b/IPython/zmq/parallel/dictdb.py @@ -44,6 +44,10 @@ We support a subset of mongodb operators: from datetime import datetime +from IPython.config.configurable import Configurable + +from IPython.utils.traitlets import Dict, CUnicode + filters = { '$lt' : lambda a,b: a < b, '$gt' : lambda a,b: b > a, @@ -75,9 +79,10 @@ class CompositeFilter(object): return False return True -class BaseDB(object): +class BaseDB(Configurable): """Empty Parent class so traitlets work on DB.""" - pass + # base configurable traits: + session = CUnicode("") class DictDB(BaseDB): """Basic in-memory dict-based object for saving Task Records. @@ -88,10 +93,8 @@ class DictDB(BaseDB): The interface is based on MongoDB, so adding a MongoDB backend should be straightforward. """ - _records = None - def __init__(self, *args, **kwargs): - self._records = dict() + _records = Dict() def _match_one(self, rec, tests): """Check if a specific record matches tests.""" diff --git a/IPython/zmq/parallel/hub.py b/IPython/zmq/parallel/hub.py index 8e3418d..df30f53 100755 --- a/IPython/zmq/parallel/hub.py +++ b/IPython/zmq/parallel/hub.py @@ -34,13 +34,6 @@ from . import error from .heartmonitor import HeartMonitor from .util import validate_url_container, ISO8601 -try: - from pymongo.binary import Binary -except ImportError: - MongoDB=None -else: - from mongodb import MongoDB - #----------------------------------------------------------------------------- # Code #----------------------------------------------------------------------------- @@ -236,7 +229,10 @@ class HubFactory(RegistrationFactory): sub = ZMQStream(sub, loop) # connect the db - self.db = import_item(self.db_class)(self.session.session) + self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1])) + cdir = self.config.Global.cluster_dir + print (cdir) + self.db = import_item(self.db_class)(session=self.session.session, config=self.config) time.sleep(.25) # build connection dicts @@ -257,8 +253,8 @@ class HubFactory(RegistrationFactory): 'iopub' : client_iface%self.iopub[0], 'notification': client_iface%self.notifier_port } - self.log.debug("hub::Hub engine addrs: %s"%self.engine_info) - self.log.debug("hub::Hub client addrs: %s"%self.client_info) + self.log.debug("Hub engine addrs: %s"%self.engine_info) + self.log.debug("Hub client addrs: %s"%self.client_info) self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, registrar=reg, clientele=c, notifier=n, db=self.db, engine_info=self.engine_info, client_info=self.client_info, @@ -569,8 +565,7 @@ class Hub(LoggingFactory): record['engine_uuid'] = queue_id record['client_uuid'] = client_id record['queue'] = 'mux' - if MongoDB is not None and isinstance(self.db, MongoDB): - record['buffers'] = map(Binary, record['buffers']) + self.pending.add(msg_id) self.queues[eid].append(msg_id) self.db.add_record(msg_id, record) @@ -614,10 +609,8 @@ class Hub(LoggingFactory): 'started' : started, 'completed' : completed } - if MongoDB is not None and isinstance(self.db, MongoDB): - result['result_buffers'] = map(Binary, msg['buffers']) - else: - result['result_buffers'] = msg['buffers'] + + result['result_buffers'] = msg['buffers'] self.db.update_record(msg_id, result) else: self.log.debug("queue:: unknown msg finished %s"%msg_id) @@ -635,8 +628,7 @@ class Hub(LoggingFactory): client_id, msg), exc_info=True) return record = init_record(msg) - if MongoDB is not None and isinstance(self.db, MongoDB): - record['buffers'] = map(Binary, record['buffers']) + record['client_uuid'] = client_id record['queue'] = 'task' header = msg['header'] @@ -684,10 +676,8 @@ class Hub(LoggingFactory): 'completed' : completed, 'engine_uuid': engine_uuid } - if MongoDB is not None and isinstance(self.db, MongoDB): - result['result_buffers'] = map(Binary, msg['buffers']) - else: - result['result_buffers'] = msg['buffers'] + + result['result_buffers'] = msg['buffers'] self.db.update_record(msg_id, result) else: diff --git a/IPython/zmq/parallel/ipcontrollerapp.py b/IPython/zmq/parallel/ipcontrollerapp.py index b75afa1..897c838 100755 --- a/IPython/zmq/parallel/ipcontrollerapp.py +++ b/IPython/zmq/parallel/ipcontrollerapp.py @@ -118,7 +118,11 @@ class IPControllerAppConfigLoader(ClusterDirConfigLoader): paa('--mongodb', dest='HubFactory.db_class', action='store_const', const='IPython.zmq.parallel.mongodb.MongoDB', - help='Use MongoDB task storage [default: in-memory]') + help='Use MongoDB for task storage [default: in-memory]') + paa('--sqlite', + dest='HubFactory.db_class', action='store_const', + const='IPython.zmq.parallel.sqlitedb.SQLiteDB', + help='Use SQLite3 for DB task storage [default: in-memory]') paa('--hb', type=int, dest='HubFactory.hb', nargs=2, help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat ' diff --git a/IPython/zmq/parallel/mongodb.py b/IPython/zmq/parallel/mongodb.py index 81c347f..d2c4080 100644 --- a/IPython/zmq/parallel/mongodb.py +++ b/IPython/zmq/parallel/mongodb.py @@ -9,6 +9,9 @@ from datetime import datetime from pymongo import Connection +from pymongo.binary import Binary + +from IPython.utils.traitlets import Dict, List, CUnicode from .dictdb import BaseDB @@ -18,15 +21,29 @@ from .dictdb import BaseDB class MongoDB(BaseDB): """MongoDB TaskRecord backend.""" - def __init__(self, session_uuid, *args, **kwargs): - self._connection = Connection(*args, **kwargs) - self._db = self._connection[session_uuid] + + connection_args = List(config=True) + connection_kwargs = Dict(config=True) + database = CUnicode(config=True) + _table = Dict() + + def __init__(self, **kwargs): + super(MongoDB, self).__init__(**kwargs) + self._connection = Connection(*self.connection_args, **self.connection_kwargs) + if not self.database: + self.database = self.session + self._db = self._connection[self.database] self._records = self._db['task_records'] - self._table = {} + + def _binary_buffers(self, rec): + for key in ('buffers', 'result_buffers'): + if key in rec: + rec[key] = map(Binary, rec[key]) def add_record(self, msg_id, rec): """Add a new Task Record, by msg_id.""" # print rec + rec = _binary_buffers(rec) obj_id = self._records.insert(rec) self._table[msg_id] = obj_id @@ -36,6 +53,7 @@ class MongoDB(BaseDB): def update_record(self, msg_id, rec): """Update the data in an existing record.""" + rec = _binary_buffers(rec) obj_id = self._table[msg_id] self._records.update({'_id':obj_id}, {'$set': rec})