##// END OF EJS Templates
Add SQLite backend, DB backends are Configurable...
MinRK -
Show More
@@ -134,3 +134,47 b' c = get_config()'
134 # set the engine heartbeat ports to use:
134 # set the engine heartbeat ports to use:
135 # c.HubFactory.hb = (10303,10313)
135 # c.HubFactory.hb = (10303,10313)
136
136
137 #-----------------------------------------------------------------------------
138 # Configure the TaskRecord database backend
139 #-----------------------------------------------------------------------------
140
141 # For memory/persistance reasons, tasks can be stored out-of-memory in a database.
142 # Currently, only sqlite and mongodb are supported as backends, but the interface
143 # is fairly simple, so advanced developers could write their own backend.
144
145 # ----- in-memory configuration --------
146 # this line restores the default behavior: in-memory storage of all results.
147 # c.HubFactory.db_class = 'IPython.zmq.parallel.dictdb.DictDB'
148
149 # ----- sqlite configuration --------
150 # use this line to activate sqlite:
151 # c.HubFactory.db_class = 'IPython.zmq.parallel.sqlitedb.SQLiteDB'
152
153 # You can specify the name of the db-file. By default, this will be located
154 # in the active cluster_dir, e.g. ~/.ipython/clusterz_default/tasks.db
155 # c.SQLiteDB.filename = 'tasks.db'
156
157 # You can also specify the location of the db-file, if you want it to be somewhere
158 # other than the cluster_dir.
159 # c.SQLiteDB.location = '/scratch/'
160
161 # This will specify the name of the table for the controller to use. The default
162 # behavior is to use the session ID of the SessionFactory object (a uuid). Overriding
163 # this will result in results persisting for multiple sessions.
164 # c.SQLiteDB.table = 'results'
165
166 # ----- mongodb configuration --------
167 # use this line to activate mongodb:
168 # c.HubFactory.db_class = 'IPython.zmq.parallel.mongodb.MongoDB'
169
170 # You can specify the args and kwargs pymongo will use when creating the Connection.
171 # For more information on what these options might be, see pymongo documentation.
172 # c.MongoDB.connection_kwargs = {}
173 # c.MongoDB.connection_args = []
174
175 # This will specify the name of the mongo database for the controller to use. The default
176 # behavior is to use the session ID of the SessionFactory object (a uuid). Overriding
177 # this will result in task results persisting through multiple sessions.
178 # c.MongoDB.database = 'ipythondb'
179
180
@@ -146,7 +146,7 b' class UnSerializeIt(UnSerialized):'
146 typeDescriptor = self.serialized.getTypeDescriptor()
146 typeDescriptor = self.serialized.getTypeDescriptor()
147 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
147 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
148 buf = self.serialized.getData()
148 buf = self.serialized.getData()
149 if isinstance(buf, buffer):
149 if isinstance(buf, (buffer,bytes)):
150 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
150 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
151 else:
151 else:
152 # memoryview
152 # memoryview
@@ -44,6 +44,10 b' We support a subset of mongodb operators:'
44
44
45 from datetime import datetime
45 from datetime import datetime
46
46
47 from IPython.config.configurable import Configurable
48
49 from IPython.utils.traitlets import Dict, CUnicode
50
47 filters = {
51 filters = {
48 '$lt' : lambda a,b: a < b,
52 '$lt' : lambda a,b: a < b,
49 '$gt' : lambda a,b: b > a,
53 '$gt' : lambda a,b: b > a,
@@ -75,9 +79,10 b' class CompositeFilter(object):'
75 return False
79 return False
76 return True
80 return True
77
81
78 class BaseDB(object):
82 class BaseDB(Configurable):
79 """Empty Parent class so traitlets work on DB."""
83 """Empty Parent class so traitlets work on DB."""
80 pass
84 # base configurable traits:
85 session = CUnicode("")
81
86
82 class DictDB(BaseDB):
87 class DictDB(BaseDB):
83 """Basic in-memory dict-based object for saving Task Records.
88 """Basic in-memory dict-based object for saving Task Records.
@@ -88,10 +93,8 b' class DictDB(BaseDB):'
88 The interface is based on MongoDB, so adding a MongoDB
93 The interface is based on MongoDB, so adding a MongoDB
89 backend should be straightforward.
94 backend should be straightforward.
90 """
95 """
91 _records = None
92
96
93 def __init__(self, *args, **kwargs):
97 _records = Dict()
94 self._records = dict()
95
98
96 def _match_one(self, rec, tests):
99 def _match_one(self, rec, tests):
97 """Check if a specific record matches tests."""
100 """Check if a specific record matches tests."""
@@ -34,13 +34,6 b' from . import error'
34 from .heartmonitor import HeartMonitor
34 from .heartmonitor import HeartMonitor
35 from .util import validate_url_container, ISO8601
35 from .util import validate_url_container, ISO8601
36
36
37 try:
38 from pymongo.binary import Binary
39 except ImportError:
40 MongoDB=None
41 else:
42 from mongodb import MongoDB
43
44 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
45 # Code
38 # Code
46 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
@@ -236,7 +229,10 b' class HubFactory(RegistrationFactory):'
236 sub = ZMQStream(sub, loop)
229 sub = ZMQStream(sub, loop)
237
230
238 # connect the db
231 # connect the db
239 self.db = import_item(self.db_class)(self.session.session)
232 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
233 cdir = self.config.Global.cluster_dir
234 print (cdir)
235 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
240 time.sleep(.25)
236 time.sleep(.25)
241
237
242 # build connection dicts
238 # build connection dicts
@@ -257,8 +253,8 b' class HubFactory(RegistrationFactory):'
257 'iopub' : client_iface%self.iopub[0],
253 'iopub' : client_iface%self.iopub[0],
258 'notification': client_iface%self.notifier_port
254 'notification': client_iface%self.notifier_port
259 }
255 }
260 self.log.debug("hub::Hub engine addrs: %s"%self.engine_info)
256 self.log.debug("Hub engine addrs: %s"%self.engine_info)
261 self.log.debug("hub::Hub client addrs: %s"%self.client_info)
257 self.log.debug("Hub client addrs: %s"%self.client_info)
262 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
258 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
263 registrar=reg, clientele=c, notifier=n, db=self.db,
259 registrar=reg, clientele=c, notifier=n, db=self.db,
264 engine_info=self.engine_info, client_info=self.client_info,
260 engine_info=self.engine_info, client_info=self.client_info,
@@ -569,8 +565,7 b' class Hub(LoggingFactory):'
569 record['engine_uuid'] = queue_id
565 record['engine_uuid'] = queue_id
570 record['client_uuid'] = client_id
566 record['client_uuid'] = client_id
571 record['queue'] = 'mux'
567 record['queue'] = 'mux'
572 if MongoDB is not None and isinstance(self.db, MongoDB):
568
573 record['buffers'] = map(Binary, record['buffers'])
574 self.pending.add(msg_id)
569 self.pending.add(msg_id)
575 self.queues[eid].append(msg_id)
570 self.queues[eid].append(msg_id)
576 self.db.add_record(msg_id, record)
571 self.db.add_record(msg_id, record)
@@ -614,10 +609,8 b' class Hub(LoggingFactory):'
614 'started' : started,
609 'started' : started,
615 'completed' : completed
610 'completed' : completed
616 }
611 }
617 if MongoDB is not None and isinstance(self.db, MongoDB):
612
618 result['result_buffers'] = map(Binary, msg['buffers'])
613 result['result_buffers'] = msg['buffers']
619 else:
620 result['result_buffers'] = msg['buffers']
621 self.db.update_record(msg_id, result)
614 self.db.update_record(msg_id, result)
622 else:
615 else:
623 self.log.debug("queue:: unknown msg finished %s"%msg_id)
616 self.log.debug("queue:: unknown msg finished %s"%msg_id)
@@ -635,8 +628,7 b' class Hub(LoggingFactory):'
635 client_id, msg), exc_info=True)
628 client_id, msg), exc_info=True)
636 return
629 return
637 record = init_record(msg)
630 record = init_record(msg)
638 if MongoDB is not None and isinstance(self.db, MongoDB):
631
639 record['buffers'] = map(Binary, record['buffers'])
640 record['client_uuid'] = client_id
632 record['client_uuid'] = client_id
641 record['queue'] = 'task'
633 record['queue'] = 'task'
642 header = msg['header']
634 header = msg['header']
@@ -684,10 +676,8 b' class Hub(LoggingFactory):'
684 'completed' : completed,
676 'completed' : completed,
685 'engine_uuid': engine_uuid
677 'engine_uuid': engine_uuid
686 }
678 }
687 if MongoDB is not None and isinstance(self.db, MongoDB):
679
688 result['result_buffers'] = map(Binary, msg['buffers'])
680 result['result_buffers'] = msg['buffers']
689 else:
690 result['result_buffers'] = msg['buffers']
691 self.db.update_record(msg_id, result)
681 self.db.update_record(msg_id, result)
692
682
693 else:
683 else:
@@ -118,7 +118,11 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):'
118 paa('--mongodb',
118 paa('--mongodb',
119 dest='HubFactory.db_class', action='store_const',
119 dest='HubFactory.db_class', action='store_const',
120 const='IPython.zmq.parallel.mongodb.MongoDB',
120 const='IPython.zmq.parallel.mongodb.MongoDB',
121 help='Use MongoDB task storage [default: in-memory]')
121 help='Use MongoDB for task storage [default: in-memory]')
122 paa('--sqlite',
123 dest='HubFactory.db_class', action='store_const',
124 const='IPython.zmq.parallel.sqlitedb.SQLiteDB',
125 help='Use SQLite3 for DB task storage [default: in-memory]')
122 paa('--hb',
126 paa('--hb',
123 type=int, dest='HubFactory.hb', nargs=2,
127 type=int, dest='HubFactory.hb', nargs=2,
124 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
128 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
@@ -9,6 +9,9 b''
9 from datetime import datetime
9 from datetime import datetime
10
10
11 from pymongo import Connection
11 from pymongo import Connection
12 from pymongo.binary import Binary
13
14 from IPython.utils.traitlets import Dict, List, CUnicode
12
15
13 from .dictdb import BaseDB
16 from .dictdb import BaseDB
14
17
@@ -18,15 +21,29 b' from .dictdb import BaseDB'
18
21
19 class MongoDB(BaseDB):
22 class MongoDB(BaseDB):
20 """MongoDB TaskRecord backend."""
23 """MongoDB TaskRecord backend."""
21 def __init__(self, session_uuid, *args, **kwargs):
24
22 self._connection = Connection(*args, **kwargs)
25 connection_args = List(config=True)
23 self._db = self._connection[session_uuid]
26 connection_kwargs = Dict(config=True)
27 database = CUnicode(config=True)
28 _table = Dict()
29
30 def __init__(self, **kwargs):
31 super(MongoDB, self).__init__(**kwargs)
32 self._connection = Connection(*self.connection_args, **self.connection_kwargs)
33 if not self.database:
34 self.database = self.session
35 self._db = self._connection[self.database]
24 self._records = self._db['task_records']
36 self._records = self._db['task_records']
25 self._table = {}
37
38 def _binary_buffers(self, rec):
39 for key in ('buffers', 'result_buffers'):
40 if key in rec:
41 rec[key] = map(Binary, rec[key])
26
42
27 def add_record(self, msg_id, rec):
43 def add_record(self, msg_id, rec):
28 """Add a new Task Record, by msg_id."""
44 """Add a new Task Record, by msg_id."""
29 # print rec
45 # print rec
46 rec = _binary_buffers(rec)
30 obj_id = self._records.insert(rec)
47 obj_id = self._records.insert(rec)
31 self._table[msg_id] = obj_id
48 self._table[msg_id] = obj_id
32
49
@@ -36,6 +53,7 b' class MongoDB(BaseDB):'
36
53
37 def update_record(self, msg_id, rec):
54 def update_record(self, msg_id, rec):
38 """Update the data in an existing record."""
55 """Update the data in an existing record."""
56 rec = _binary_buffers(rec)
39 obj_id = self._table[msg_id]
57 obj_id = self._table[msg_id]
40 self._records.update({'_id':obj_id}, {'$set': rec})
58 self._records.update({'_id':obj_id}, {'$set': rec})
41
59
General Comments 0
You need to be logged in to leave comments. Login now