##// END OF EJS Templates
Add SQLite backend, DB backends are Configurable...
MinRK -
Show More
@@ -134,3 +134,47 b' c = get_config()'
134 134 # set the engine heartbeat ports to use:
135 135 # c.HubFactory.hb = (10303,10313)
136 136
137 #-----------------------------------------------------------------------------
138 # Configure the TaskRecord database backend
139 #-----------------------------------------------------------------------------
140
141 # For memory/persistance reasons, tasks can be stored out-of-memory in a database.
142 # Currently, only sqlite and mongodb are supported as backends, but the interface
143 # is fairly simple, so advanced developers could write their own backend.
144
145 # ----- in-memory configuration --------
146 # this line restores the default behavior: in-memory storage of all results.
147 # c.HubFactory.db_class = 'IPython.zmq.parallel.dictdb.DictDB'
148
149 # ----- sqlite configuration --------
150 # use this line to activate sqlite:
151 # c.HubFactory.db_class = 'IPython.zmq.parallel.sqlitedb.SQLiteDB'
152
153 # You can specify the name of the db-file. By default, this will be located
154 # in the active cluster_dir, e.g. ~/.ipython/clusterz_default/tasks.db
155 # c.SQLiteDB.filename = 'tasks.db'
156
157 # You can also specify the location of the db-file, if you want it to be somewhere
158 # other than the cluster_dir.
159 # c.SQLiteDB.location = '/scratch/'
160
161 # This will specify the name of the table for the controller to use. The default
162 # behavior is to use the session ID of the SessionFactory object (a uuid). Overriding
163 # this will result in results persisting for multiple sessions.
164 # c.SQLiteDB.table = 'results'
165
166 # ----- mongodb configuration --------
167 # use this line to activate mongodb:
168 # c.HubFactory.db_class = 'IPython.zmq.parallel.mongodb.MongoDB'
169
170 # You can specify the args and kwargs pymongo will use when creating the Connection.
171 # For more information on what these options might be, see pymongo documentation.
172 # c.MongoDB.connection_kwargs = {}
173 # c.MongoDB.connection_args = []
174
175 # This will specify the name of the mongo database for the controller to use. The default
176 # behavior is to use the session ID of the SessionFactory object (a uuid). Overriding
177 # this will result in task results persisting through multiple sessions.
178 # c.MongoDB.database = 'ipythondb'
179
180
@@ -146,7 +146,7 b' class UnSerializeIt(UnSerialized):'
146 146 typeDescriptor = self.serialized.getTypeDescriptor()
147 147 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
148 148 buf = self.serialized.getData()
149 if isinstance(buf, buffer):
149 if isinstance(buf, (buffer,bytes)):
150 150 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
151 151 else:
152 152 # memoryview
@@ -44,6 +44,10 b' We support a subset of mongodb operators:'
44 44
45 45 from datetime import datetime
46 46
47 from IPython.config.configurable import Configurable
48
49 from IPython.utils.traitlets import Dict, CUnicode
50
47 51 filters = {
48 52 '$lt' : lambda a,b: a < b,
49 53 '$gt' : lambda a,b: b > a,
@@ -75,9 +79,10 b' class CompositeFilter(object):'
75 79 return False
76 80 return True
77 81
78 class BaseDB(object):
82 class BaseDB(Configurable):
79 83 """Empty Parent class so traitlets work on DB."""
80 pass
84 # base configurable traits:
85 session = CUnicode("")
81 86
82 87 class DictDB(BaseDB):
83 88 """Basic in-memory dict-based object for saving Task Records.
@@ -88,10 +93,8 b' class DictDB(BaseDB):'
88 93 The interface is based on MongoDB, so adding a MongoDB
89 94 backend should be straightforward.
90 95 """
91 _records = None
92 96
93 def __init__(self, *args, **kwargs):
94 self._records = dict()
97 _records = Dict()
95 98
96 99 def _match_one(self, rec, tests):
97 100 """Check if a specific record matches tests."""
@@ -34,13 +34,6 b' from . import error'
34 34 from .heartmonitor import HeartMonitor
35 35 from .util import validate_url_container, ISO8601
36 36
37 try:
38 from pymongo.binary import Binary
39 except ImportError:
40 MongoDB=None
41 else:
42 from mongodb import MongoDB
43
44 37 #-----------------------------------------------------------------------------
45 38 # Code
46 39 #-----------------------------------------------------------------------------
@@ -236,7 +229,10 b' class HubFactory(RegistrationFactory):'
236 229 sub = ZMQStream(sub, loop)
237 230
238 231 # connect the db
239 self.db = import_item(self.db_class)(self.session.session)
232 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
233 cdir = self.config.Global.cluster_dir
234 print (cdir)
235 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
240 236 time.sleep(.25)
241 237
242 238 # build connection dicts
@@ -257,8 +253,8 b' class HubFactory(RegistrationFactory):'
257 253 'iopub' : client_iface%self.iopub[0],
258 254 'notification': client_iface%self.notifier_port
259 255 }
260 self.log.debug("hub::Hub engine addrs: %s"%self.engine_info)
261 self.log.debug("hub::Hub client addrs: %s"%self.client_info)
256 self.log.debug("Hub engine addrs: %s"%self.engine_info)
257 self.log.debug("Hub client addrs: %s"%self.client_info)
262 258 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
263 259 registrar=reg, clientele=c, notifier=n, db=self.db,
264 260 engine_info=self.engine_info, client_info=self.client_info,
@@ -569,8 +565,7 b' class Hub(LoggingFactory):'
569 565 record['engine_uuid'] = queue_id
570 566 record['client_uuid'] = client_id
571 567 record['queue'] = 'mux'
572 if MongoDB is not None and isinstance(self.db, MongoDB):
573 record['buffers'] = map(Binary, record['buffers'])
568
574 569 self.pending.add(msg_id)
575 570 self.queues[eid].append(msg_id)
576 571 self.db.add_record(msg_id, record)
@@ -614,10 +609,8 b' class Hub(LoggingFactory):'
614 609 'started' : started,
615 610 'completed' : completed
616 611 }
617 if MongoDB is not None and isinstance(self.db, MongoDB):
618 result['result_buffers'] = map(Binary, msg['buffers'])
619 else:
620 result['result_buffers'] = msg['buffers']
612
613 result['result_buffers'] = msg['buffers']
621 614 self.db.update_record(msg_id, result)
622 615 else:
623 616 self.log.debug("queue:: unknown msg finished %s"%msg_id)
@@ -635,8 +628,7 b' class Hub(LoggingFactory):'
635 628 client_id, msg), exc_info=True)
636 629 return
637 630 record = init_record(msg)
638 if MongoDB is not None and isinstance(self.db, MongoDB):
639 record['buffers'] = map(Binary, record['buffers'])
631
640 632 record['client_uuid'] = client_id
641 633 record['queue'] = 'task'
642 634 header = msg['header']
@@ -684,10 +676,8 b' class Hub(LoggingFactory):'
684 676 'completed' : completed,
685 677 'engine_uuid': engine_uuid
686 678 }
687 if MongoDB is not None and isinstance(self.db, MongoDB):
688 result['result_buffers'] = map(Binary, msg['buffers'])
689 else:
690 result['result_buffers'] = msg['buffers']
679
680 result['result_buffers'] = msg['buffers']
691 681 self.db.update_record(msg_id, result)
692 682
693 683 else:
@@ -118,7 +118,11 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):'
118 118 paa('--mongodb',
119 119 dest='HubFactory.db_class', action='store_const',
120 120 const='IPython.zmq.parallel.mongodb.MongoDB',
121 help='Use MongoDB task storage [default: in-memory]')
121 help='Use MongoDB for task storage [default: in-memory]')
122 paa('--sqlite',
123 dest='HubFactory.db_class', action='store_const',
124 const='IPython.zmq.parallel.sqlitedb.SQLiteDB',
125 help='Use SQLite3 for DB task storage [default: in-memory]')
122 126 paa('--hb',
123 127 type=int, dest='HubFactory.hb', nargs=2,
124 128 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
@@ -9,6 +9,9 b''
9 9 from datetime import datetime
10 10
11 11 from pymongo import Connection
12 from pymongo.binary import Binary
13
14 from IPython.utils.traitlets import Dict, List, CUnicode
12 15
13 16 from .dictdb import BaseDB
14 17
@@ -18,15 +21,29 b' from .dictdb import BaseDB'
18 21
19 22 class MongoDB(BaseDB):
20 23 """MongoDB TaskRecord backend."""
21 def __init__(self, session_uuid, *args, **kwargs):
22 self._connection = Connection(*args, **kwargs)
23 self._db = self._connection[session_uuid]
24
25 connection_args = List(config=True)
26 connection_kwargs = Dict(config=True)
27 database = CUnicode(config=True)
28 _table = Dict()
29
30 def __init__(self, **kwargs):
31 super(MongoDB, self).__init__(**kwargs)
32 self._connection = Connection(*self.connection_args, **self.connection_kwargs)
33 if not self.database:
34 self.database = self.session
35 self._db = self._connection[self.database]
24 36 self._records = self._db['task_records']
25 self._table = {}
37
38 def _binary_buffers(self, rec):
39 for key in ('buffers', 'result_buffers'):
40 if key in rec:
41 rec[key] = map(Binary, rec[key])
26 42
27 43 def add_record(self, msg_id, rec):
28 44 """Add a new Task Record, by msg_id."""
29 45 # print rec
46 rec = _binary_buffers(rec)
30 47 obj_id = self._records.insert(rec)
31 48 self._table[msg_id] = obj_id
32 49
@@ -36,6 +53,7 b' class MongoDB(BaseDB):'
36 53
37 54 def update_record(self, msg_id, rec):
38 55 """Update the data in an existing record."""
56 rec = _binary_buffers(rec)
39 57 obj_id = self._table[msg_id]
40 58 self._records.update({'_id':obj_id}, {'$set': rec})
41 59
General Comments 0
You need to be logged in to leave comments. Login now