Show More
@@ -134,3 +134,47 b' c = get_config()' | |||||
134 | # set the engine heartbeat ports to use: |
|
134 | # set the engine heartbeat ports to use: | |
135 | # c.HubFactory.hb = (10303,10313) |
|
135 | # c.HubFactory.hb = (10303,10313) | |
136 |
|
136 | |||
|
137 | #----------------------------------------------------------------------------- | |||
|
138 | # Configure the TaskRecord database backend | |||
|
139 | #----------------------------------------------------------------------------- | |||
|
140 | ||||
|
141 | # For memory/persistance reasons, tasks can be stored out-of-memory in a database. | |||
|
142 | # Currently, only sqlite and mongodb are supported as backends, but the interface | |||
|
143 | # is fairly simple, so advanced developers could write their own backend. | |||
|
144 | ||||
|
145 | # ----- in-memory configuration -------- | |||
|
146 | # this line restores the default behavior: in-memory storage of all results. | |||
|
147 | # c.HubFactory.db_class = 'IPython.zmq.parallel.dictdb.DictDB' | |||
|
148 | ||||
|
149 | # ----- sqlite configuration -------- | |||
|
150 | # use this line to activate sqlite: | |||
|
151 | # c.HubFactory.db_class = 'IPython.zmq.parallel.sqlitedb.SQLiteDB' | |||
|
152 | ||||
|
153 | # You can specify the name of the db-file. By default, this will be located | |||
|
154 | # in the active cluster_dir, e.g. ~/.ipython/clusterz_default/tasks.db | |||
|
155 | # c.SQLiteDB.filename = 'tasks.db' | |||
|
156 | ||||
|
157 | # You can also specify the location of the db-file, if you want it to be somewhere | |||
|
158 | # other than the cluster_dir. | |||
|
159 | # c.SQLiteDB.location = '/scratch/' | |||
|
160 | ||||
|
161 | # This will specify the name of the table for the controller to use. The default | |||
|
162 | # behavior is to use the session ID of the SessionFactory object (a uuid). Overriding | |||
|
163 | # this will result in results persisting for multiple sessions. | |||
|
164 | # c.SQLiteDB.table = 'results' | |||
|
165 | ||||
|
166 | # ----- mongodb configuration -------- | |||
|
167 | # use this line to activate mongodb: | |||
|
168 | # c.HubFactory.db_class = 'IPython.zmq.parallel.mongodb.MongoDB' | |||
|
169 | ||||
|
170 | # You can specify the args and kwargs pymongo will use when creating the Connection. | |||
|
171 | # For more information on what these options might be, see pymongo documentation. | |||
|
172 | # c.MongoDB.connection_kwargs = {} | |||
|
173 | # c.MongoDB.connection_args = [] | |||
|
174 | ||||
|
175 | # This will specify the name of the mongo database for the controller to use. The default | |||
|
176 | # behavior is to use the session ID of the SessionFactory object (a uuid). Overriding | |||
|
177 | # this will result in task results persisting through multiple sessions. | |||
|
178 | # c.MongoDB.database = 'ipythondb' | |||
|
179 | ||||
|
180 |
@@ -146,7 +146,7 b' class UnSerializeIt(UnSerialized):' | |||||
146 | typeDescriptor = self.serialized.getTypeDescriptor() |
|
146 | typeDescriptor = self.serialized.getTypeDescriptor() | |
147 | if globals().has_key('numpy') and typeDescriptor == 'ndarray': |
|
147 | if globals().has_key('numpy') and typeDescriptor == 'ndarray': | |
148 | buf = self.serialized.getData() |
|
148 | buf = self.serialized.getData() | |
149 | if isinstance(buf, buffer): |
|
149 | if isinstance(buf, (buffer,bytes)): | |
150 | result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype']) |
|
150 | result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype']) | |
151 | else: |
|
151 | else: | |
152 | # memoryview |
|
152 | # memoryview |
@@ -44,6 +44,10 b' We support a subset of mongodb operators:' | |||||
44 |
|
44 | |||
45 | from datetime import datetime |
|
45 | from datetime import datetime | |
46 |
|
46 | |||
|
47 | from IPython.config.configurable import Configurable | |||
|
48 | ||||
|
49 | from IPython.utils.traitlets import Dict, CUnicode | |||
|
50 | ||||
47 | filters = { |
|
51 | filters = { | |
48 | '$lt' : lambda a,b: a < b, |
|
52 | '$lt' : lambda a,b: a < b, | |
49 | '$gt' : lambda a,b: b > a, |
|
53 | '$gt' : lambda a,b: b > a, | |
@@ -75,9 +79,10 b' class CompositeFilter(object):' | |||||
75 | return False |
|
79 | return False | |
76 | return True |
|
80 | return True | |
77 |
|
81 | |||
78 |
class BaseDB( |
|
82 | class BaseDB(Configurable): | |
79 | """Empty Parent class so traitlets work on DB.""" |
|
83 | """Empty Parent class so traitlets work on DB.""" | |
80 | pass |
|
84 | # base configurable traits: | |
|
85 | session = CUnicode("") | |||
81 |
|
86 | |||
82 | class DictDB(BaseDB): |
|
87 | class DictDB(BaseDB): | |
83 | """Basic in-memory dict-based object for saving Task Records. |
|
88 | """Basic in-memory dict-based object for saving Task Records. | |
@@ -88,10 +93,8 b' class DictDB(BaseDB):' | |||||
88 | The interface is based on MongoDB, so adding a MongoDB |
|
93 | The interface is based on MongoDB, so adding a MongoDB | |
89 | backend should be straightforward. |
|
94 | backend should be straightforward. | |
90 | """ |
|
95 | """ | |
91 | _records = None |
|
|||
92 |
|
96 | |||
93 | def __init__(self, *args, **kwargs): |
|
97 | _records = Dict() | |
94 | self._records = dict() |
|
|||
95 |
|
98 | |||
96 | def _match_one(self, rec, tests): |
|
99 | def _match_one(self, rec, tests): | |
97 | """Check if a specific record matches tests.""" |
|
100 | """Check if a specific record matches tests.""" |
@@ -34,13 +34,6 b' from . import error' | |||||
34 | from .heartmonitor import HeartMonitor |
|
34 | from .heartmonitor import HeartMonitor | |
35 | from .util import validate_url_container, ISO8601 |
|
35 | from .util import validate_url_container, ISO8601 | |
36 |
|
36 | |||
37 | try: |
|
|||
38 | from pymongo.binary import Binary |
|
|||
39 | except ImportError: |
|
|||
40 | MongoDB=None |
|
|||
41 | else: |
|
|||
42 | from mongodb import MongoDB |
|
|||
43 |
|
||||
44 | #----------------------------------------------------------------------------- |
|
37 | #----------------------------------------------------------------------------- | |
45 | # Code |
|
38 | # Code | |
46 | #----------------------------------------------------------------------------- |
|
39 | #----------------------------------------------------------------------------- | |
@@ -236,7 +229,10 b' class HubFactory(RegistrationFactory):' | |||||
236 | sub = ZMQStream(sub, loop) |
|
229 | sub = ZMQStream(sub, loop) | |
237 |
|
230 | |||
238 | # connect the db |
|
231 | # connect the db | |
239 | self.db = import_item(self.db_class)(self.session.session) |
|
232 | self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1])) | |
|
233 | cdir = self.config.Global.cluster_dir | |||
|
234 | print (cdir) | |||
|
235 | self.db = import_item(self.db_class)(session=self.session.session, config=self.config) | |||
240 | time.sleep(.25) |
|
236 | time.sleep(.25) | |
241 |
|
237 | |||
242 | # build connection dicts |
|
238 | # build connection dicts | |
@@ -257,8 +253,8 b' class HubFactory(RegistrationFactory):' | |||||
257 | 'iopub' : client_iface%self.iopub[0], |
|
253 | 'iopub' : client_iface%self.iopub[0], | |
258 | 'notification': client_iface%self.notifier_port |
|
254 | 'notification': client_iface%self.notifier_port | |
259 | } |
|
255 | } | |
260 |
self.log.debug(" |
|
256 | self.log.debug("Hub engine addrs: %s"%self.engine_info) | |
261 |
self.log.debug(" |
|
257 | self.log.debug("Hub client addrs: %s"%self.client_info) | |
262 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
|
258 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, | |
263 | registrar=reg, clientele=c, notifier=n, db=self.db, |
|
259 | registrar=reg, clientele=c, notifier=n, db=self.db, | |
264 | engine_info=self.engine_info, client_info=self.client_info, |
|
260 | engine_info=self.engine_info, client_info=self.client_info, | |
@@ -569,8 +565,7 b' class Hub(LoggingFactory):' | |||||
569 | record['engine_uuid'] = queue_id |
|
565 | record['engine_uuid'] = queue_id | |
570 | record['client_uuid'] = client_id |
|
566 | record['client_uuid'] = client_id | |
571 | record['queue'] = 'mux' |
|
567 | record['queue'] = 'mux' | |
572 | if MongoDB is not None and isinstance(self.db, MongoDB): |
|
568 | ||
573 | record['buffers'] = map(Binary, record['buffers']) |
|
|||
574 | self.pending.add(msg_id) |
|
569 | self.pending.add(msg_id) | |
575 | self.queues[eid].append(msg_id) |
|
570 | self.queues[eid].append(msg_id) | |
576 | self.db.add_record(msg_id, record) |
|
571 | self.db.add_record(msg_id, record) | |
@@ -614,10 +609,8 b' class Hub(LoggingFactory):' | |||||
614 | 'started' : started, |
|
609 | 'started' : started, | |
615 | 'completed' : completed |
|
610 | 'completed' : completed | |
616 | } |
|
611 | } | |
617 | if MongoDB is not None and isinstance(self.db, MongoDB): |
|
612 | ||
618 |
|
|
613 | result['result_buffers'] = msg['buffers'] | |
619 | else: |
|
|||
620 | result['result_buffers'] = msg['buffers'] |
|
|||
621 | self.db.update_record(msg_id, result) |
|
614 | self.db.update_record(msg_id, result) | |
622 | else: |
|
615 | else: | |
623 | self.log.debug("queue:: unknown msg finished %s"%msg_id) |
|
616 | self.log.debug("queue:: unknown msg finished %s"%msg_id) | |
@@ -635,8 +628,7 b' class Hub(LoggingFactory):' | |||||
635 | client_id, msg), exc_info=True) |
|
628 | client_id, msg), exc_info=True) | |
636 | return |
|
629 | return | |
637 | record = init_record(msg) |
|
630 | record = init_record(msg) | |
638 | if MongoDB is not None and isinstance(self.db, MongoDB): |
|
631 | ||
639 | record['buffers'] = map(Binary, record['buffers']) |
|
|||
640 | record['client_uuid'] = client_id |
|
632 | record['client_uuid'] = client_id | |
641 | record['queue'] = 'task' |
|
633 | record['queue'] = 'task' | |
642 | header = msg['header'] |
|
634 | header = msg['header'] | |
@@ -684,10 +676,8 b' class Hub(LoggingFactory):' | |||||
684 | 'completed' : completed, |
|
676 | 'completed' : completed, | |
685 | 'engine_uuid': engine_uuid |
|
677 | 'engine_uuid': engine_uuid | |
686 | } |
|
678 | } | |
687 | if MongoDB is not None and isinstance(self.db, MongoDB): |
|
679 | ||
688 |
|
|
680 | result['result_buffers'] = msg['buffers'] | |
689 | else: |
|
|||
690 | result['result_buffers'] = msg['buffers'] |
|
|||
691 | self.db.update_record(msg_id, result) |
|
681 | self.db.update_record(msg_id, result) | |
692 |
|
682 | |||
693 | else: |
|
683 | else: |
@@ -118,7 +118,11 b' class IPControllerAppConfigLoader(ClusterDirConfigLoader):' | |||||
118 | paa('--mongodb', |
|
118 | paa('--mongodb', | |
119 | dest='HubFactory.db_class', action='store_const', |
|
119 | dest='HubFactory.db_class', action='store_const', | |
120 | const='IPython.zmq.parallel.mongodb.MongoDB', |
|
120 | const='IPython.zmq.parallel.mongodb.MongoDB', | |
121 | help='Use MongoDB task storage [default: in-memory]') |
|
121 | help='Use MongoDB for task storage [default: in-memory]') | |
|
122 | paa('--sqlite', | |||
|
123 | dest='HubFactory.db_class', action='store_const', | |||
|
124 | const='IPython.zmq.parallel.sqlitedb.SQLiteDB', | |||
|
125 | help='Use SQLite3 for DB task storage [default: in-memory]') | |||
122 | paa('--hb', |
|
126 | paa('--hb', | |
123 | type=int, dest='HubFactory.hb', nargs=2, |
|
127 | type=int, dest='HubFactory.hb', nargs=2, | |
124 | help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat ' |
|
128 | help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat ' |
@@ -9,6 +9,9 b'' | |||||
9 | from datetime import datetime |
|
9 | from datetime import datetime | |
10 |
|
10 | |||
11 | from pymongo import Connection |
|
11 | from pymongo import Connection | |
|
12 | from pymongo.binary import Binary | |||
|
13 | ||||
|
14 | from IPython.utils.traitlets import Dict, List, CUnicode | |||
12 |
|
15 | |||
13 | from .dictdb import BaseDB |
|
16 | from .dictdb import BaseDB | |
14 |
|
17 | |||
@@ -18,15 +21,29 b' from .dictdb import BaseDB' | |||||
18 |
|
21 | |||
19 | class MongoDB(BaseDB): |
|
22 | class MongoDB(BaseDB): | |
20 | """MongoDB TaskRecord backend.""" |
|
23 | """MongoDB TaskRecord backend.""" | |
21 | def __init__(self, session_uuid, *args, **kwargs): |
|
24 | ||
22 | self._connection = Connection(*args, **kwargs) |
|
25 | connection_args = List(config=True) | |
23 | self._db = self._connection[session_uuid] |
|
26 | connection_kwargs = Dict(config=True) | |
|
27 | database = CUnicode(config=True) | |||
|
28 | _table = Dict() | |||
|
29 | ||||
|
30 | def __init__(self, **kwargs): | |||
|
31 | super(MongoDB, self).__init__(**kwargs) | |||
|
32 | self._connection = Connection(*self.connection_args, **self.connection_kwargs) | |||
|
33 | if not self.database: | |||
|
34 | self.database = self.session | |||
|
35 | self._db = self._connection[self.database] | |||
24 | self._records = self._db['task_records'] |
|
36 | self._records = self._db['task_records'] | |
25 | self._table = {} |
|
37 | ||
|
38 | def _binary_buffers(self, rec): | |||
|
39 | for key in ('buffers', 'result_buffers'): | |||
|
40 | if key in rec: | |||
|
41 | rec[key] = map(Binary, rec[key]) | |||
26 |
|
42 | |||
27 | def add_record(self, msg_id, rec): |
|
43 | def add_record(self, msg_id, rec): | |
28 | """Add a new Task Record, by msg_id.""" |
|
44 | """Add a new Task Record, by msg_id.""" | |
29 | # print rec |
|
45 | # print rec | |
|
46 | rec = _binary_buffers(rec) | |||
30 | obj_id = self._records.insert(rec) |
|
47 | obj_id = self._records.insert(rec) | |
31 | self._table[msg_id] = obj_id |
|
48 | self._table[msg_id] = obj_id | |
32 |
|
49 | |||
@@ -36,6 +53,7 b' class MongoDB(BaseDB):' | |||||
36 |
|
53 | |||
37 | def update_record(self, msg_id, rec): |
|
54 | def update_record(self, msg_id, rec): | |
38 | """Update the data in an existing record.""" |
|
55 | """Update the data in an existing record.""" | |
|
56 | rec = _binary_buffers(rec) | |||
39 | obj_id = self._table[msg_id] |
|
57 | obj_id = self._table[msg_id] | |
40 | self._records.update({'_id':obj_id}, {'$set': rec}) |
|
58 | self._records.update({'_id':obj_id}, {'$set': rec}) | |
41 |
|
59 |
General Comments 0
You need to be logged in to leave comments.
Login now