##// END OF EJS Templates
Add SQLite backend, DB backends are Configurable...
MinRK -
Show More
@@ -1,136 +1,180 b''
1 from IPython.config.loader import Config
1 from IPython.config.loader import Config
2
2
3 c = get_config()
3 c = get_config()
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Global configuration
6 # Global configuration
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 # Basic Global config attributes
9 # Basic Global config attributes
10
10
11 # Start up messages are logged to stdout using the logging module.
11 # Start up messages are logged to stdout using the logging module.
12 # These all happen before the twisted reactor is started and are
12 # These all happen before the twisted reactor is started and are
13 # useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
13 # useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
14 # and smaller is more verbose.
14 # and smaller is more verbose.
15 # c.Global.log_level = 20
15 # c.Global.log_level = 20
16
16
17 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
17 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
18 # c.Global.log_to_file = False
18 # c.Global.log_to_file = False
19
19
20 # Remove old logs from cluster_dir/log before starting.
20 # Remove old logs from cluster_dir/log before starting.
21 # c.Global.clean_logs = True
21 # c.Global.clean_logs = True
22
22
23 # A list of Python statements that will be run before starting the
23 # A list of Python statements that will be run before starting the
24 # controller. This is provided because occasionally certain things need to
24 # controller. This is provided because occasionally certain things need to
25 # be imported in the controller for pickling to work.
25 # be imported in the controller for pickling to work.
26 # c.Global.import_statements = ['import math']
26 # c.Global.import_statements = ['import math']
27
27
28 # Reuse the controller's JSON files. If False, JSON files are regenerated
28 # Reuse the controller's JSON files. If False, JSON files are regenerated
29 # each time the controller is run. If True, they will be reused, *but*, you
29 # each time the controller is run. If True, they will be reused, *but*, you
30 # also must set the network ports by hand. If set, this will override the
30 # also must set the network ports by hand. If set, this will override the
31 # values set for the client and engine connections below.
31 # values set for the client and engine connections below.
32 # c.Global.reuse_files = True
32 # c.Global.reuse_files = True
33
33
34 # Enable exec_key authentication on all messages. Default is True
34 # Enable exec_key authentication on all messages. Default is True
35 # c.Global.secure = True
35 # c.Global.secure = True
36
36
37 # The working directory for the process. The application will use os.chdir
37 # The working directory for the process. The application will use os.chdir
38 # to change to this directory before starting.
38 # to change to this directory before starting.
39 # c.Global.work_dir = os.getcwd()
39 # c.Global.work_dir = os.getcwd()
40
40
41 # The log url for logging to an `iploggerz` application. This will override
41 # The log url for logging to an `iploggerz` application. This will override
42 # log-to-file.
42 # log-to-file.
43 # c.Global.log_url = 'tcp://127.0.0.1:20202'
43 # c.Global.log_url = 'tcp://127.0.0.1:20202'
44
44
45 # The specific external IP that is used to disambiguate multi-interface URLs.
45 # The specific external IP that is used to disambiguate multi-interface URLs.
46 # The default behavior is to guess from external IPs gleaned from `socket`.
46 # The default behavior is to guess from external IPs gleaned from `socket`.
47 # c.Global.location = '192.168.1.123'
47 # c.Global.location = '192.168.1.123'
48
48
49 # The ssh server remote clients should use to connect to this controller.
49 # The ssh server remote clients should use to connect to this controller.
50 # It must be a machine that can see the interface specified in client_ip.
50 # It must be a machine that can see the interface specified in client_ip.
51 # The default for client_ip is localhost, in which case the sshserver must
51 # The default for client_ip is localhost, in which case the sshserver must
52 # be an external IP of the controller machine.
52 # be an external IP of the controller machine.
53 # c.Global.sshserver = 'controller.example.com'
53 # c.Global.sshserver = 'controller.example.com'
54
54
55 # the url to use for registration. If set, this overrides engine-ip,
55 # the url to use for registration. If set, this overrides engine-ip,
56 # engine-transport client-ip,client-transport, and regport.
56 # engine-transport client-ip,client-transport, and regport.
57 # c.RegistrationFactory.url = 'tcp://*:12345'
57 # c.RegistrationFactory.url = 'tcp://*:12345'
58
58
59 # the port to use for registration. Clients and Engines both use this
59 # the port to use for registration. Clients and Engines both use this
60 # port for registration.
60 # port for registration.
61 # c.RegistrationFactory.regport = 10101
61 # c.RegistrationFactory.regport = 10101
62
62
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64 # Configure the Task Scheduler
64 # Configure the Task Scheduler
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66
66
67 # The routing scheme. 'pure' will use the pure-ZMQ scheduler. Any other
67 # The routing scheme. 'pure' will use the pure-ZMQ scheduler. Any other
68 # value will use a Python scheduler with various routing schemes.
68 # value will use a Python scheduler with various routing schemes.
69 # python schemes are: lru, weighted, random, twobin. Default is 'weighted'.
69 # python schemes are: lru, weighted, random, twobin. Default is 'weighted'.
70 # Note that the pure ZMQ scheduler does not support many features, such as
70 # Note that the pure ZMQ scheduler does not support many features, such as
71 # dying engines, dependencies, or engine-subset load-balancing.
71 # dying engines, dependencies, or engine-subset load-balancing.
72 # c.ControllerFactory.scheme = 'pure'
72 # c.ControllerFactory.scheme = 'pure'
73
73
74 # The pure ZMQ scheduler can limit the number of outstanding tasks per engine
74 # The pure ZMQ scheduler can limit the number of outstanding tasks per engine
75 # by using the ZMQ HWM option. This allows engines with long-running tasks
75 # by using the ZMQ HWM option. This allows engines with long-running tasks
76 # to not steal too many tasks from other engines. The default is 0, which
76 # to not steal too many tasks from other engines. The default is 0, which
77 # means agressively distribute messages, never waiting for them to finish.
77 # means agressively distribute messages, never waiting for them to finish.
78 # c.ControllerFactory.hwm = 1
78 # c.ControllerFactory.hwm = 1
79
79
80 # Whether to use Threads or Processes to start the Schedulers. Threads will
80 # Whether to use Threads or Processes to start the Schedulers. Threads will
81 # use less resources, but potentially reduce throughput. Default is to
81 # use less resources, but potentially reduce throughput. Default is to
82 # use processes. Note that the a Python scheduler will always be in a Process.
82 # use processes. Note that the a Python scheduler will always be in a Process.
83 # c.ControllerFactory.usethreads
83 # c.ControllerFactory.usethreads
84
84
85 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
86 # Configure the Hub
86 # Configure the Hub
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88
88
89 # Which class to use for the db backend. Currently supported are DictDB (the
89 # Which class to use for the db backend. Currently supported are DictDB (the
90 # default), and MongoDB. Uncomment this line to enable MongoDB, which will
90 # default), and MongoDB. Uncomment this line to enable MongoDB, which will
91 # slow-down the Hub's responsiveness, but also reduce its memory footprint.
91 # slow-down the Hub's responsiveness, but also reduce its memory footprint.
92 # c.HubFactory.db_class = 'IPython.zmq.parallel.mongodb.MongoDB'
92 # c.HubFactory.db_class = 'IPython.zmq.parallel.mongodb.MongoDB'
93
93
94 # The heartbeat ping frequency. This is the frequency (in ms) at which the
94 # The heartbeat ping frequency. This is the frequency (in ms) at which the
95 # Hub pings engines for heartbeats. This determines how quickly the Hub
95 # Hub pings engines for heartbeats. This determines how quickly the Hub
96 # will react to engines coming and going. A lower number means faster response
96 # will react to engines coming and going. A lower number means faster response
97 # time, but more network activity. The default is 100ms
97 # time, but more network activity. The default is 100ms
98 # c.HubFactory.ping = 100
98 # c.HubFactory.ping = 100
99
99
100 # HubFactory queue port pairs, to set by name: mux, iopub, control, task. Set
100 # HubFactory queue port pairs, to set by name: mux, iopub, control, task. Set
101 # each as a tuple of length 2 of ints. The default is to find random
101 # each as a tuple of length 2 of ints. The default is to find random
102 # available ports
102 # available ports
103 # c.HubFactory.mux = (10102,10112)
103 # c.HubFactory.mux = (10102,10112)
104
104
105 #-----------------------------------------------------------------------------
105 #-----------------------------------------------------------------------------
106 # Configure the client connections
106 # Configure the client connections
107 #-----------------------------------------------------------------------------
107 #-----------------------------------------------------------------------------
108
108
109 # Basic client connection config attributes
109 # Basic client connection config attributes
110
110
111 # The network interface the controller will listen on for client connections.
111 # The network interface the controller will listen on for client connections.
112 # This should be an IP address or interface on the controller. An asterisk
112 # This should be an IP address or interface on the controller. An asterisk
113 # means listen on all interfaces. The transport can be any transport
113 # means listen on all interfaces. The transport can be any transport
114 # supported by zeromq (tcp,epgm,pgm,ib,ipc):
114 # supported by zeromq (tcp,epgm,pgm,ib,ipc):
115 # c.HubFactory.client_ip = '*'
115 # c.HubFactory.client_ip = '*'
116 # c.HubFactory.client_transport = 'tcp'
116 # c.HubFactory.client_transport = 'tcp'
117
117
118 # individual client ports to configure by name: query_port, notifier_port
118 # individual client ports to configure by name: query_port, notifier_port
119 # c.HubFactory.query_port = 12345
119 # c.HubFactory.query_port = 12345
120
120
121 #-----------------------------------------------------------------------------
121 #-----------------------------------------------------------------------------
122 # Configure the engine connections
122 # Configure the engine connections
123 #-----------------------------------------------------------------------------
123 #-----------------------------------------------------------------------------
124
124
125 # Basic config attributes for the engine connections.
125 # Basic config attributes for the engine connections.
126
126
127 # The network interface the controller will listen on for engine connections.
127 # The network interface the controller will listen on for engine connections.
128 # This should be an IP address or interface on the controller. An asterisk
128 # This should be an IP address or interface on the controller. An asterisk
129 # means listen on all interfaces. The transport can be any transport
129 # means listen on all interfaces. The transport can be any transport
130 # supported by zeromq (tcp,epgm,pgm,ib,ipc):
130 # supported by zeromq (tcp,epgm,pgm,ib,ipc):
131 # c.HubFactory.engine_ip = '*'
131 # c.HubFactory.engine_ip = '*'
132 # c.HubFactory.engine_transport = 'tcp'
132 # c.HubFactory.engine_transport = 'tcp'
133
133
134 # set the engine heartbeat ports to use:
134 # set the engine heartbeat ports to use:
135 # c.HubFactory.hb = (10303,10313)
135 # c.HubFactory.hb = (10303,10313)
136
136
137 #-----------------------------------------------------------------------------
138 # Configure the TaskRecord database backend
139 #-----------------------------------------------------------------------------
140
141 # For memory/persistance reasons, tasks can be stored out-of-memory in a database.
142 # Currently, only sqlite and mongodb are supported as backends, but the interface
143 # is fairly simple, so advanced developers could write their own backend.
144
145 # ----- in-memory configuration --------
146 # this line restores the default behavior: in-memory storage of all results.
147 # c.HubFactory.db_class = 'IPython.zmq.parallel.dictdb.DictDB'
148
149 # ----- sqlite configuration --------
150 # use this line to activate sqlite:
151 # c.HubFactory.db_class = 'IPython.zmq.parallel.sqlitedb.SQLiteDB'
152
153 # You can specify the name of the db-file. By default, this will be located
154 # in the active cluster_dir, e.g. ~/.ipython/clusterz_default/tasks.db
155 # c.SQLiteDB.filename = 'tasks.db'
156
157 # You can also specify the location of the db-file, if you want it to be somewhere
158 # other than the cluster_dir.
159 # c.SQLiteDB.location = '/scratch/'
160
161 # This will specify the name of the table for the controller to use. The default
162 # behavior is to use the session ID of the SessionFactory object (a uuid). Overriding
163 # this will result in results persisting for multiple sessions.
164 # c.SQLiteDB.table = 'results'
165
166 # ----- mongodb configuration --------
167 # use this line to activate mongodb:
168 # c.HubFactory.db_class = 'IPython.zmq.parallel.mongodb.MongoDB'
169
170 # You can specify the args and kwargs pymongo will use when creating the Connection.
171 # For more information on what these options might be, see pymongo documentation.
172 # c.MongoDB.connection_kwargs = {}
173 # c.MongoDB.connection_args = []
174
175 # This will specify the name of the mongo database for the controller to use. The default
176 # behavior is to use the session ID of the SessionFactory object (a uuid). Overriding
177 # this will result in task results persisting through multiple sessions.
178 # c.MongoDB.database = 'ipythondb'
179
180
@@ -1,167 +1,167 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_newserialized -*-
2 # -*- test-case-name: IPython.kernel.test.test_newserialized -*-
3
3
4 """Refactored serialization classes and interfaces."""
4 """Refactored serialization classes and interfaces."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 # Tell nose to skip this module
8 # Tell nose to skip this module
9 __test__ = {}
9 __test__ = {}
10
10
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
12 # Copyright (C) 2008 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 #-------------------------------------------------------------------------------
18 #-------------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-------------------------------------------------------------------------------
20 #-------------------------------------------------------------------------------
21
21
22 import cPickle as pickle
22 import cPickle as pickle
23
23
24 try:
24 try:
25 import numpy
25 import numpy
26 except ImportError:
26 except ImportError:
27 pass
27 pass
28
28
29 class SerializationError(Exception):
29 class SerializationError(Exception):
30 pass
30 pass
31
31
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Classes and functions
33 # Classes and functions
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35
35
36 class ISerialized:
36 class ISerialized:
37
37
38 def getData():
38 def getData():
39 """"""
39 """"""
40
40
41 def getDataSize(units=10.0**6):
41 def getDataSize(units=10.0**6):
42 """"""
42 """"""
43
43
44 def getTypeDescriptor():
44 def getTypeDescriptor():
45 """"""
45 """"""
46
46
47 def getMetadata():
47 def getMetadata():
48 """"""
48 """"""
49
49
50
50
51 class IUnSerialized:
51 class IUnSerialized:
52
52
53 def getObject():
53 def getObject():
54 """"""
54 """"""
55
55
56 class Serialized(object):
56 class Serialized(object):
57
57
58 # implements(ISerialized)
58 # implements(ISerialized)
59
59
60 def __init__(self, data, typeDescriptor, metadata={}):
60 def __init__(self, data, typeDescriptor, metadata={}):
61 self.data = data
61 self.data = data
62 self.typeDescriptor = typeDescriptor
62 self.typeDescriptor = typeDescriptor
63 self.metadata = metadata
63 self.metadata = metadata
64
64
65 def getData(self):
65 def getData(self):
66 return self.data
66 return self.data
67
67
68 def getDataSize(self, units=10.0**6):
68 def getDataSize(self, units=10.0**6):
69 return len(self.data)/units
69 return len(self.data)/units
70
70
71 def getTypeDescriptor(self):
71 def getTypeDescriptor(self):
72 return self.typeDescriptor
72 return self.typeDescriptor
73
73
74 def getMetadata(self):
74 def getMetadata(self):
75 return self.metadata
75 return self.metadata
76
76
77
77
78 class UnSerialized(object):
78 class UnSerialized(object):
79
79
80 # implements(IUnSerialized)
80 # implements(IUnSerialized)
81
81
82 def __init__(self, obj):
82 def __init__(self, obj):
83 self.obj = obj
83 self.obj = obj
84
84
85 def getObject(self):
85 def getObject(self):
86 return self.obj
86 return self.obj
87
87
88
88
89 class SerializeIt(object):
89 class SerializeIt(object):
90
90
91 # implements(ISerialized)
91 # implements(ISerialized)
92
92
93 def __init__(self, unSerialized):
93 def __init__(self, unSerialized):
94 self.data = None
94 self.data = None
95 self.obj = unSerialized.getObject()
95 self.obj = unSerialized.getObject()
96 if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray):
96 if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray):
97 if len(self.obj) == 0: # length 0 arrays can't be reconstructed
97 if len(self.obj) == 0: # length 0 arrays can't be reconstructed
98 raise SerializationError("You cannot send a length 0 array")
98 raise SerializationError("You cannot send a length 0 array")
99 self.obj = numpy.ascontiguousarray(self.obj, dtype=None)
99 self.obj = numpy.ascontiguousarray(self.obj, dtype=None)
100 self.typeDescriptor = 'ndarray'
100 self.typeDescriptor = 'ndarray'
101 self.metadata = {'shape':self.obj.shape,
101 self.metadata = {'shape':self.obj.shape,
102 'dtype':self.obj.dtype.str}
102 'dtype':self.obj.dtype.str}
103 elif isinstance(self.obj, bytes):
103 elif isinstance(self.obj, bytes):
104 self.typeDescriptor = 'bytes'
104 self.typeDescriptor = 'bytes'
105 self.metadata = {}
105 self.metadata = {}
106 elif isinstance(self.obj, buffer):
106 elif isinstance(self.obj, buffer):
107 self.typeDescriptor = 'buffer'
107 self.typeDescriptor = 'buffer'
108 self.metadata = {}
108 self.metadata = {}
109 else:
109 else:
110 self.typeDescriptor = 'pickle'
110 self.typeDescriptor = 'pickle'
111 self.metadata = {}
111 self.metadata = {}
112 self._generateData()
112 self._generateData()
113
113
114 def _generateData(self):
114 def _generateData(self):
115 if self.typeDescriptor == 'ndarray':
115 if self.typeDescriptor == 'ndarray':
116 self.data = numpy.getbuffer(self.obj)
116 self.data = numpy.getbuffer(self.obj)
117 elif self.typeDescriptor in ('bytes', 'buffer'):
117 elif self.typeDescriptor in ('bytes', 'buffer'):
118 self.data = self.obj
118 self.data = self.obj
119 elif self.typeDescriptor == 'pickle':
119 elif self.typeDescriptor == 'pickle':
120 self.data = pickle.dumps(self.obj, -1)
120 self.data = pickle.dumps(self.obj, -1)
121 else:
121 else:
122 raise SerializationError("Really wierd serialization error.")
122 raise SerializationError("Really wierd serialization error.")
123 del self.obj
123 del self.obj
124
124
125 def getData(self):
125 def getData(self):
126 return self.data
126 return self.data
127
127
128 def getDataSize(self, units=10.0**6):
128 def getDataSize(self, units=10.0**6):
129 return 1.0*len(self.data)/units
129 return 1.0*len(self.data)/units
130
130
131 def getTypeDescriptor(self):
131 def getTypeDescriptor(self):
132 return self.typeDescriptor
132 return self.typeDescriptor
133
133
134 def getMetadata(self):
134 def getMetadata(self):
135 return self.metadata
135 return self.metadata
136
136
137
137
138 class UnSerializeIt(UnSerialized):
138 class UnSerializeIt(UnSerialized):
139
139
140 # implements(IUnSerialized)
140 # implements(IUnSerialized)
141
141
142 def __init__(self, serialized):
142 def __init__(self, serialized):
143 self.serialized = serialized
143 self.serialized = serialized
144
144
145 def getObject(self):
145 def getObject(self):
146 typeDescriptor = self.serialized.getTypeDescriptor()
146 typeDescriptor = self.serialized.getTypeDescriptor()
147 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
147 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
148 buf = self.serialized.getData()
148 buf = self.serialized.getData()
149 if isinstance(buf, buffer):
149 if isinstance(buf, (buffer,bytes)):
150 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
150 result = numpy.frombuffer(buf, dtype = self.serialized.metadata['dtype'])
151 else:
151 else:
152 # memoryview
152 # memoryview
153 result = numpy.array(buf, dtype = self.serialized.metadata['dtype'])
153 result = numpy.array(buf, dtype = self.serialized.metadata['dtype'])
154 result.shape = self.serialized.metadata['shape']
154 result.shape = self.serialized.metadata['shape']
155 elif typeDescriptor == 'pickle':
155 elif typeDescriptor == 'pickle':
156 result = pickle.loads(self.serialized.getData())
156 result = pickle.loads(self.serialized.getData())
157 elif typeDescriptor in ('bytes', 'buffer'):
157 elif typeDescriptor in ('bytes', 'buffer'):
158 result = self.serialized.getData()
158 result = self.serialized.getData()
159 else:
159 else:
160 raise SerializationError("Really wierd serialization error.")
160 raise SerializationError("Really wierd serialization error.")
161 return result
161 return result
162
162
163 def serialize(obj):
163 def serialize(obj):
164 return SerializeIt(UnSerialized(obj))
164 return SerializeIt(UnSerialized(obj))
165
165
166 def unserialize(serialized):
166 def unserialize(serialized):
167 return UnSerializeIt(serialized).getObject()
167 return UnSerializeIt(serialized).getObject()
@@ -1,152 +1,155 b''
1 """A Task logger that presents our DB interface,
1 """A Task logger that presents our DB interface,
2 but exists entirely in memory and implemented with dicts.
2 but exists entirely in memory and implemented with dicts.
3
3
4 TaskRecords are dicts of the form:
4 TaskRecords are dicts of the form:
5 {
5 {
6 'msg_id' : str(uuid),
6 'msg_id' : str(uuid),
7 'client_uuid' : str(uuid),
7 'client_uuid' : str(uuid),
8 'engine_uuid' : str(uuid) or None,
8 'engine_uuid' : str(uuid) or None,
9 'header' : dict(header),
9 'header' : dict(header),
10 'content': dict(content),
10 'content': dict(content),
11 'buffers': list(buffers),
11 'buffers': list(buffers),
12 'submitted': datetime,
12 'submitted': datetime,
13 'started': datetime or None,
13 'started': datetime or None,
14 'completed': datetime or None,
14 'completed': datetime or None,
15 'resubmitted': datetime or None,
15 'resubmitted': datetime or None,
16 'result_header' : dict(header) or None,
16 'result_header' : dict(header) or None,
17 'result_content' : dict(content) or None,
17 'result_content' : dict(content) or None,
18 'result_buffers' : list(buffers) or None,
18 'result_buffers' : list(buffers) or None,
19 }
19 }
20 With this info, many of the special categories of tasks can be defined by query:
20 With this info, many of the special categories of tasks can be defined by query:
21
21
22 pending: completed is None
22 pending: completed is None
23 client's outstanding: client_uuid = uuid && completed is None
23 client's outstanding: client_uuid = uuid && completed is None
24 MIA: arrived is None (and completed is None)
24 MIA: arrived is None (and completed is None)
25 etc.
25 etc.
26
26
27 EngineRecords are dicts of the form:
27 EngineRecords are dicts of the form:
28 {
28 {
29 'eid' : int(id),
29 'eid' : int(id),
30 'uuid': str(uuid)
30 'uuid': str(uuid)
31 }
31 }
32 This may be extended, but is currently.
32 This may be extended, but is currently.
33
33
34 We support a subset of mongodb operators:
34 We support a subset of mongodb operators:
35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
36 """
36 """
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Copyright (C) 2010 The IPython Development Team
38 # Copyright (C) 2010 The IPython Development Team
39 #
39 #
40 # Distributed under the terms of the BSD License. The full license is in
40 # Distributed under the terms of the BSD License. The full license is in
41 # the file COPYING, distributed as part of this software.
41 # the file COPYING, distributed as part of this software.
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43
43
44
44
45 from datetime import datetime
45 from datetime import datetime
46
46
47 from IPython.config.configurable import Configurable
48
49 from IPython.utils.traitlets import Dict, CUnicode
50
47 filters = {
51 filters = {
48 '$lt' : lambda a,b: a < b,
52 '$lt' : lambda a,b: a < b,
49 '$gt' : lambda a,b: b > a,
53 '$gt' : lambda a,b: b > a,
50 '$eq' : lambda a,b: a == b,
54 '$eq' : lambda a,b: a == b,
51 '$ne' : lambda a,b: a != b,
55 '$ne' : lambda a,b: a != b,
52 '$lte': lambda a,b: a <= b,
56 '$lte': lambda a,b: a <= b,
53 '$gte': lambda a,b: a >= b,
57 '$gte': lambda a,b: a >= b,
54 '$in' : lambda a,b: a in b,
58 '$in' : lambda a,b: a in b,
55 '$nin': lambda a,b: a not in b,
59 '$nin': lambda a,b: a not in b,
56 '$all': lambda a,b: all([ a in bb for bb in b ]),
60 '$all': lambda a,b: all([ a in bb for bb in b ]),
57 '$mod': lambda a,b: a%b[0] == b[1],
61 '$mod': lambda a,b: a%b[0] == b[1],
58 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
62 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
59 }
63 }
60
64
61
65
62 class CompositeFilter(object):
66 class CompositeFilter(object):
63 """Composite filter for matching multiple properties."""
67 """Composite filter for matching multiple properties."""
64
68
65 def __init__(self, dikt):
69 def __init__(self, dikt):
66 self.tests = []
70 self.tests = []
67 self.values = []
71 self.values = []
68 for key, value in dikt.iteritems():
72 for key, value in dikt.iteritems():
69 self.tests.append(filters[key])
73 self.tests.append(filters[key])
70 self.values.append(value)
74 self.values.append(value)
71
75
72 def __call__(self, value):
76 def __call__(self, value):
73 for test,check in zip(self.tests, self.values):
77 for test,check in zip(self.tests, self.values):
74 if not test(value, check):
78 if not test(value, check):
75 return False
79 return False
76 return True
80 return True
77
81
78 class BaseDB(object):
82 class BaseDB(Configurable):
79 """Empty Parent class so traitlets work on DB."""
83 """Empty Parent class so traitlets work on DB."""
80 pass
84 # base configurable traits:
85 session = CUnicode("")
81
86
82 class DictDB(BaseDB):
87 class DictDB(BaseDB):
83 """Basic in-memory dict-based object for saving Task Records.
88 """Basic in-memory dict-based object for saving Task Records.
84
89
85 This is the first object to present the DB interface
90 This is the first object to present the DB interface
86 for logging tasks out of memory.
91 for logging tasks out of memory.
87
92
88 The interface is based on MongoDB, so adding a MongoDB
93 The interface is based on MongoDB, so adding a MongoDB
89 backend should be straightforward.
94 backend should be straightforward.
90 """
95 """
91 _records = None
92
96
93 def __init__(self, *args, **kwargs):
97 _records = Dict()
94 self._records = dict()
95
98
96 def _match_one(self, rec, tests):
99 def _match_one(self, rec, tests):
97 """Check if a specific record matches tests."""
100 """Check if a specific record matches tests."""
98 for key,test in tests.iteritems():
101 for key,test in tests.iteritems():
99 if not test(rec.get(key, None)):
102 if not test(rec.get(key, None)):
100 return False
103 return False
101 return True
104 return True
102
105
103 def _match(self, check, id_only=True):
106 def _match(self, check, id_only=True):
104 """Find all the matches for a check dict."""
107 """Find all the matches for a check dict."""
105 matches = {}
108 matches = {}
106 tests = {}
109 tests = {}
107 for k,v in check.iteritems():
110 for k,v in check.iteritems():
108 if isinstance(v, dict):
111 if isinstance(v, dict):
109 tests[k] = CompositeFilter(v)
112 tests[k] = CompositeFilter(v)
110 else:
113 else:
111 tests[k] = lambda o: o==v
114 tests[k] = lambda o: o==v
112
115
113 for msg_id, rec in self._records.iteritems():
116 for msg_id, rec in self._records.iteritems():
114 if self._match_one(rec, tests):
117 if self._match_one(rec, tests):
115 matches[msg_id] = rec
118 matches[msg_id] = rec
116 if id_only:
119 if id_only:
117 return matches.keys()
120 return matches.keys()
118 else:
121 else:
119 return matches
122 return matches
120
123
121
124
122 def add_record(self, msg_id, rec):
125 def add_record(self, msg_id, rec):
123 """Add a new Task Record, by msg_id."""
126 """Add a new Task Record, by msg_id."""
124 if self._records.has_key(msg_id):
127 if self._records.has_key(msg_id):
125 raise KeyError("Already have msg_id %r"%(msg_id))
128 raise KeyError("Already have msg_id %r"%(msg_id))
126 self._records[msg_id] = rec
129 self._records[msg_id] = rec
127
130
128 def get_record(self, msg_id):
131 def get_record(self, msg_id):
129 """Get a specific Task Record, by msg_id."""
132 """Get a specific Task Record, by msg_id."""
130 if not self._records.has_key(msg_id):
133 if not self._records.has_key(msg_id):
131 raise KeyError("No such msg_id %r"%(msg_id))
134 raise KeyError("No such msg_id %r"%(msg_id))
132 return self._records[msg_id]
135 return self._records[msg_id]
133
136
134 def update_record(self, msg_id, rec):
137 def update_record(self, msg_id, rec):
135 """Update the data in an existing record."""
138 """Update the data in an existing record."""
136 self._records[msg_id].update(rec)
139 self._records[msg_id].update(rec)
137
140
138 def drop_matching_records(self, check):
141 def drop_matching_records(self, check):
139 """Remove a record from the DB."""
142 """Remove a record from the DB."""
140 matches = self._match(check, id_only=True)
143 matches = self._match(check, id_only=True)
141 for m in matches:
144 for m in matches:
142 del self._records[m]
145 del self._records[m]
143
146
144 def drop_record(self, msg_id):
147 def drop_record(self, msg_id):
145 """Remove a record from the DB."""
148 """Remove a record from the DB."""
146 del self._records[msg_id]
149 del self._records[msg_id]
147
150
148
151
149 def find_records(self, check, id_only=False):
152 def find_records(self, check, id_only=False):
150 """Find records matching a query dict."""
153 """Find records matching a query dict."""
151 matches = self._match(check, id_only)
154 matches = self._match(check, id_only)
152 return matches No newline at end of file
155 return matches
@@ -1,1052 +1,1042 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # internal:
26 # internal:
27 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
29
29
30 from .entry_point import select_random_ports
30 from .entry_point import select_random_ports
31 from .factory import RegistrationFactory, LoggingFactory
31 from .factory import RegistrationFactory, LoggingFactory
32
32
33 from . import error
33 from . import error
34 from .heartmonitor import HeartMonitor
34 from .heartmonitor import HeartMonitor
35 from .util import validate_url_container, ISO8601
35 from .util import validate_url_container, ISO8601
36
36
37 try:
38 from pymongo.binary import Binary
39 except ImportError:
40 MongoDB=None
41 else:
42 from mongodb import MongoDB
43
44 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
45 # Code
38 # Code
46 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
47
40
48 def _passer(*args, **kwargs):
41 def _passer(*args, **kwargs):
49 return
42 return
50
43
51 def _printer(*args, **kwargs):
44 def _printer(*args, **kwargs):
52 print (args)
45 print (args)
53 print (kwargs)
46 print (kwargs)
54
47
55 def init_record(msg):
48 def init_record(msg):
56 """Initialize a TaskRecord based on a request."""
49 """Initialize a TaskRecord based on a request."""
57 header = msg['header']
50 header = msg['header']
58 return {
51 return {
59 'msg_id' : header['msg_id'],
52 'msg_id' : header['msg_id'],
60 'header' : header,
53 'header' : header,
61 'content': msg['content'],
54 'content': msg['content'],
62 'buffers': msg['buffers'],
55 'buffers': msg['buffers'],
63 'submitted': datetime.strptime(header['date'], ISO8601),
56 'submitted': datetime.strptime(header['date'], ISO8601),
64 'client_uuid' : None,
57 'client_uuid' : None,
65 'engine_uuid' : None,
58 'engine_uuid' : None,
66 'started': None,
59 'started': None,
67 'completed': None,
60 'completed': None,
68 'resubmitted': None,
61 'resubmitted': None,
69 'result_header' : None,
62 'result_header' : None,
70 'result_content' : None,
63 'result_content' : None,
71 'result_buffers' : None,
64 'result_buffers' : None,
72 'queue' : None,
65 'queue' : None,
73 'pyin' : None,
66 'pyin' : None,
74 'pyout': None,
67 'pyout': None,
75 'pyerr': None,
68 'pyerr': None,
76 'stdout': '',
69 'stdout': '',
77 'stderr': '',
70 'stderr': '',
78 }
71 }
79
72
80
73
81 class EngineConnector(HasTraits):
74 class EngineConnector(HasTraits):
82 """A simple object for accessing the various zmq connections of an object.
75 """A simple object for accessing the various zmq connections of an object.
83 Attributes are:
76 Attributes are:
84 id (int): engine ID
77 id (int): engine ID
85 uuid (str): uuid (unused?)
78 uuid (str): uuid (unused?)
86 queue (str): identity of queue's XREQ socket
79 queue (str): identity of queue's XREQ socket
87 registration (str): identity of registration XREQ socket
80 registration (str): identity of registration XREQ socket
88 heartbeat (str): identity of heartbeat XREQ socket
81 heartbeat (str): identity of heartbeat XREQ socket
89 """
82 """
90 id=Int(0)
83 id=Int(0)
91 queue=Str()
84 queue=Str()
92 control=Str()
85 control=Str()
93 registration=Str()
86 registration=Str()
94 heartbeat=Str()
87 heartbeat=Str()
95 pending=Set()
88 pending=Set()
96
89
97 class HubFactory(RegistrationFactory):
90 class HubFactory(RegistrationFactory):
98 """The Configurable for setting up a Hub."""
91 """The Configurable for setting up a Hub."""
99
92
100 # name of a scheduler scheme
93 # name of a scheduler scheme
101 scheme = Str('leastload', config=True)
94 scheme = Str('leastload', config=True)
102
95
103 # port-pairs for monitoredqueues:
96 # port-pairs for monitoredqueues:
104 hb = Instance(list, config=True)
97 hb = Instance(list, config=True)
105 def _hb_default(self):
98 def _hb_default(self):
106 return select_random_ports(2)
99 return select_random_ports(2)
107
100
108 mux = Instance(list, config=True)
101 mux = Instance(list, config=True)
109 def _mux_default(self):
102 def _mux_default(self):
110 return select_random_ports(2)
103 return select_random_ports(2)
111
104
112 task = Instance(list, config=True)
105 task = Instance(list, config=True)
113 def _task_default(self):
106 def _task_default(self):
114 return select_random_ports(2)
107 return select_random_ports(2)
115
108
116 control = Instance(list, config=True)
109 control = Instance(list, config=True)
117 def _control_default(self):
110 def _control_default(self):
118 return select_random_ports(2)
111 return select_random_ports(2)
119
112
120 iopub = Instance(list, config=True)
113 iopub = Instance(list, config=True)
121 def _iopub_default(self):
114 def _iopub_default(self):
122 return select_random_ports(2)
115 return select_random_ports(2)
123
116
124 # single ports:
117 # single ports:
125 mon_port = Instance(int, config=True)
118 mon_port = Instance(int, config=True)
126 def _mon_port_default(self):
119 def _mon_port_default(self):
127 return select_random_ports(1)[0]
120 return select_random_ports(1)[0]
128
121
129 query_port = Instance(int, config=True)
122 query_port = Instance(int, config=True)
130 def _query_port_default(self):
123 def _query_port_default(self):
131 return select_random_ports(1)[0]
124 return select_random_ports(1)[0]
132
125
133 notifier_port = Instance(int, config=True)
126 notifier_port = Instance(int, config=True)
134 def _notifier_port_default(self):
127 def _notifier_port_default(self):
135 return select_random_ports(1)[0]
128 return select_random_ports(1)[0]
136
129
137 ping = Int(1000, config=True) # ping frequency
130 ping = Int(1000, config=True) # ping frequency
138
131
139 engine_ip = CStr('127.0.0.1', config=True)
132 engine_ip = CStr('127.0.0.1', config=True)
140 engine_transport = CStr('tcp', config=True)
133 engine_transport = CStr('tcp', config=True)
141
134
142 client_ip = CStr('127.0.0.1', config=True)
135 client_ip = CStr('127.0.0.1', config=True)
143 client_transport = CStr('tcp', config=True)
136 client_transport = CStr('tcp', config=True)
144
137
145 monitor_ip = CStr('127.0.0.1', config=True)
138 monitor_ip = CStr('127.0.0.1', config=True)
146 monitor_transport = CStr('tcp', config=True)
139 monitor_transport = CStr('tcp', config=True)
147
140
148 monitor_url = CStr('')
141 monitor_url = CStr('')
149
142
150 db_class = CStr('IPython.zmq.parallel.dictdb.DictDB', config=True)
143 db_class = CStr('IPython.zmq.parallel.dictdb.DictDB', config=True)
151
144
152 # not configurable
145 # not configurable
153 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
146 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
154 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
147 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
155 subconstructors = List()
148 subconstructors = List()
156 _constructed = Bool(False)
149 _constructed = Bool(False)
157
150
158 def _ip_changed(self, name, old, new):
151 def _ip_changed(self, name, old, new):
159 self.engine_ip = new
152 self.engine_ip = new
160 self.client_ip = new
153 self.client_ip = new
161 self.monitor_ip = new
154 self.monitor_ip = new
162 self._update_monitor_url()
155 self._update_monitor_url()
163
156
164 def _update_monitor_url(self):
157 def _update_monitor_url(self):
165 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
158 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
166
159
167 def _transport_changed(self, name, old, new):
160 def _transport_changed(self, name, old, new):
168 self.engine_transport = new
161 self.engine_transport = new
169 self.client_transport = new
162 self.client_transport = new
170 self.monitor_transport = new
163 self.monitor_transport = new
171 self._update_monitor_url()
164 self._update_monitor_url()
172
165
173 def __init__(self, **kwargs):
166 def __init__(self, **kwargs):
174 super(HubFactory, self).__init__(**kwargs)
167 super(HubFactory, self).__init__(**kwargs)
175 self._update_monitor_url()
168 self._update_monitor_url()
176 # self.on_trait_change(self._sync_ips, 'ip')
169 # self.on_trait_change(self._sync_ips, 'ip')
177 # self.on_trait_change(self._sync_transports, 'transport')
170 # self.on_trait_change(self._sync_transports, 'transport')
178 self.subconstructors.append(self.construct_hub)
171 self.subconstructors.append(self.construct_hub)
179
172
180
173
181 def construct(self):
174 def construct(self):
182 assert not self._constructed, "already constructed!"
175 assert not self._constructed, "already constructed!"
183
176
184 for subc in self.subconstructors:
177 for subc in self.subconstructors:
185 subc()
178 subc()
186
179
187 self._constructed = True
180 self._constructed = True
188
181
189
182
190 def start(self):
183 def start(self):
191 assert self._constructed, "must be constructed by self.construct() first!"
184 assert self._constructed, "must be constructed by self.construct() first!"
192 self.heartmonitor.start()
185 self.heartmonitor.start()
193 self.log.info("Heartmonitor started")
186 self.log.info("Heartmonitor started")
194
187
195 def construct_hub(self):
188 def construct_hub(self):
196 """construct"""
189 """construct"""
197 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
190 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
198 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
191 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
199
192
200 ctx = self.context
193 ctx = self.context
201 loop = self.loop
194 loop = self.loop
202
195
203 # Registrar socket
196 # Registrar socket
204 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
197 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
205 reg.bind(client_iface % self.regport)
198 reg.bind(client_iface % self.regport)
206 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
199 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
207 if self.client_ip != self.engine_ip:
200 if self.client_ip != self.engine_ip:
208 reg.bind(engine_iface % self.regport)
201 reg.bind(engine_iface % self.regport)
209 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
202 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
210
203
211 ### Engine connections ###
204 ### Engine connections ###
212
205
213 # heartbeat
206 # heartbeat
214 hpub = ctx.socket(zmq.PUB)
207 hpub = ctx.socket(zmq.PUB)
215 hpub.bind(engine_iface % self.hb[0])
208 hpub.bind(engine_iface % self.hb[0])
216 hrep = ctx.socket(zmq.XREP)
209 hrep = ctx.socket(zmq.XREP)
217 hrep.bind(engine_iface % self.hb[1])
210 hrep.bind(engine_iface % self.hb[1])
218 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
211 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
219 period=self.ping, logname=self.log.name)
212 period=self.ping, logname=self.log.name)
220
213
221 ### Client connections ###
214 ### Client connections ###
222 # Clientele socket
215 # Clientele socket
223 c = ZMQStream(ctx.socket(zmq.XREP), loop)
216 c = ZMQStream(ctx.socket(zmq.XREP), loop)
224 c.bind(client_iface%self.query_port)
217 c.bind(client_iface%self.query_port)
225 # Notifier socket
218 # Notifier socket
226 n = ZMQStream(ctx.socket(zmq.PUB), loop)
219 n = ZMQStream(ctx.socket(zmq.PUB), loop)
227 n.bind(client_iface%self.notifier_port)
220 n.bind(client_iface%self.notifier_port)
228
221
229 ### build and launch the queues ###
222 ### build and launch the queues ###
230
223
231 # monitor socket
224 # monitor socket
232 sub = ctx.socket(zmq.SUB)
225 sub = ctx.socket(zmq.SUB)
233 sub.setsockopt(zmq.SUBSCRIBE, "")
226 sub.setsockopt(zmq.SUBSCRIBE, "")
234 sub.bind(self.monitor_url)
227 sub.bind(self.monitor_url)
235 sub.bind('inproc://monitor')
228 sub.bind('inproc://monitor')
236 sub = ZMQStream(sub, loop)
229 sub = ZMQStream(sub, loop)
237
230
238 # connect the db
231 # connect the db
239 self.db = import_item(self.db_class)(self.session.session)
232 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
233 cdir = self.config.Global.cluster_dir
234 print (cdir)
235 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
240 time.sleep(.25)
236 time.sleep(.25)
241
237
242 # build connection dicts
238 # build connection dicts
243 self.engine_info = {
239 self.engine_info = {
244 'control' : engine_iface%self.control[1],
240 'control' : engine_iface%self.control[1],
245 'mux': engine_iface%self.mux[1],
241 'mux': engine_iface%self.mux[1],
246 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
242 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
247 'task' : engine_iface%self.task[1],
243 'task' : engine_iface%self.task[1],
248 'iopub' : engine_iface%self.iopub[1],
244 'iopub' : engine_iface%self.iopub[1],
249 # 'monitor' : engine_iface%self.mon_port,
245 # 'monitor' : engine_iface%self.mon_port,
250 }
246 }
251
247
252 self.client_info = {
248 self.client_info = {
253 'control' : client_iface%self.control[0],
249 'control' : client_iface%self.control[0],
254 'query': client_iface%self.query_port,
250 'query': client_iface%self.query_port,
255 'mux': client_iface%self.mux[0],
251 'mux': client_iface%self.mux[0],
256 'task' : (self.scheme, client_iface%self.task[0]),
252 'task' : (self.scheme, client_iface%self.task[0]),
257 'iopub' : client_iface%self.iopub[0],
253 'iopub' : client_iface%self.iopub[0],
258 'notification': client_iface%self.notifier_port
254 'notification': client_iface%self.notifier_port
259 }
255 }
260 self.log.debug("hub::Hub engine addrs: %s"%self.engine_info)
256 self.log.debug("Hub engine addrs: %s"%self.engine_info)
261 self.log.debug("hub::Hub client addrs: %s"%self.client_info)
257 self.log.debug("Hub client addrs: %s"%self.client_info)
262 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
258 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
263 registrar=reg, clientele=c, notifier=n, db=self.db,
259 registrar=reg, clientele=c, notifier=n, db=self.db,
264 engine_info=self.engine_info, client_info=self.client_info,
260 engine_info=self.engine_info, client_info=self.client_info,
265 logname=self.log.name)
261 logname=self.log.name)
266
262
267
263
268 class Hub(LoggingFactory):
264 class Hub(LoggingFactory):
269 """The IPython Controller Hub with 0MQ connections
265 """The IPython Controller Hub with 0MQ connections
270
266
271 Parameters
267 Parameters
272 ==========
268 ==========
273 loop: zmq IOLoop instance
269 loop: zmq IOLoop instance
274 session: StreamSession object
270 session: StreamSession object
275 <removed> context: zmq context for creating new connections (?)
271 <removed> context: zmq context for creating new connections (?)
276 queue: ZMQStream for monitoring the command queue (SUB)
272 queue: ZMQStream for monitoring the command queue (SUB)
277 registrar: ZMQStream for engine registration requests (XREP)
273 registrar: ZMQStream for engine registration requests (XREP)
278 heartbeat: HeartMonitor object checking the pulse of the engines
274 heartbeat: HeartMonitor object checking the pulse of the engines
279 clientele: ZMQStream for client connections (XREP)
275 clientele: ZMQStream for client connections (XREP)
280 not used for jobs, only query/control commands
276 not used for jobs, only query/control commands
281 notifier: ZMQStream for broadcasting engine registration changes (PUB)
277 notifier: ZMQStream for broadcasting engine registration changes (PUB)
282 db: connection to db for out of memory logging of commands
278 db: connection to db for out of memory logging of commands
283 NotImplemented
279 NotImplemented
284 engine_info: dict of zmq connection information for engines to connect
280 engine_info: dict of zmq connection information for engines to connect
285 to the queues.
281 to the queues.
286 client_info: dict of zmq connection information for engines to connect
282 client_info: dict of zmq connection information for engines to connect
287 to the queues.
283 to the queues.
288 """
284 """
289 # internal data structures:
285 # internal data structures:
290 ids=Set() # engine IDs
286 ids=Set() # engine IDs
291 keytable=Dict()
287 keytable=Dict()
292 by_ident=Dict()
288 by_ident=Dict()
293 engines=Dict()
289 engines=Dict()
294 clients=Dict()
290 clients=Dict()
295 hearts=Dict()
291 hearts=Dict()
296 pending=Set()
292 pending=Set()
297 queues=Dict() # pending msg_ids keyed by engine_id
293 queues=Dict() # pending msg_ids keyed by engine_id
298 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
294 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
299 completed=Dict() # completed msg_ids keyed by engine_id
295 completed=Dict() # completed msg_ids keyed by engine_id
300 all_completed=Set() # completed msg_ids keyed by engine_id
296 all_completed=Set() # completed msg_ids keyed by engine_id
301 # mia=None
297 # mia=None
302 incoming_registrations=Dict()
298 incoming_registrations=Dict()
303 registration_timeout=Int()
299 registration_timeout=Int()
304 _idcounter=Int(0)
300 _idcounter=Int(0)
305
301
306 # objects from constructor:
302 # objects from constructor:
307 loop=Instance(ioloop.IOLoop)
303 loop=Instance(ioloop.IOLoop)
308 registrar=Instance(ZMQStream)
304 registrar=Instance(ZMQStream)
309 clientele=Instance(ZMQStream)
305 clientele=Instance(ZMQStream)
310 monitor=Instance(ZMQStream)
306 monitor=Instance(ZMQStream)
311 heartmonitor=Instance(HeartMonitor)
307 heartmonitor=Instance(HeartMonitor)
312 notifier=Instance(ZMQStream)
308 notifier=Instance(ZMQStream)
313 db=Instance(object)
309 db=Instance(object)
314 client_info=Dict()
310 client_info=Dict()
315 engine_info=Dict()
311 engine_info=Dict()
316
312
317
313
318 def __init__(self, **kwargs):
314 def __init__(self, **kwargs):
319 """
315 """
320 # universal:
316 # universal:
321 loop: IOLoop for creating future connections
317 loop: IOLoop for creating future connections
322 session: streamsession for sending serialized data
318 session: streamsession for sending serialized data
323 # engine:
319 # engine:
324 queue: ZMQStream for monitoring queue messages
320 queue: ZMQStream for monitoring queue messages
325 registrar: ZMQStream for engine registration
321 registrar: ZMQStream for engine registration
326 heartbeat: HeartMonitor object for tracking engines
322 heartbeat: HeartMonitor object for tracking engines
327 # client:
323 # client:
328 clientele: ZMQStream for client connections
324 clientele: ZMQStream for client connections
329 # extra:
325 # extra:
330 db: ZMQStream for db connection (NotImplemented)
326 db: ZMQStream for db connection (NotImplemented)
331 engine_info: zmq address/protocol dict for engine connections
327 engine_info: zmq address/protocol dict for engine connections
332 client_info: zmq address/protocol dict for client connections
328 client_info: zmq address/protocol dict for client connections
333 """
329 """
334
330
335 super(Hub, self).__init__(**kwargs)
331 super(Hub, self).__init__(**kwargs)
336 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
332 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
337
333
338 # validate connection dicts:
334 # validate connection dicts:
339 for k,v in self.client_info.iteritems():
335 for k,v in self.client_info.iteritems():
340 if k == 'task':
336 if k == 'task':
341 validate_url_container(v[1])
337 validate_url_container(v[1])
342 else:
338 else:
343 validate_url_container(v)
339 validate_url_container(v)
344 # validate_url_container(self.client_info)
340 # validate_url_container(self.client_info)
345 validate_url_container(self.engine_info)
341 validate_url_container(self.engine_info)
346
342
347 # register our callbacks
343 # register our callbacks
348 self.registrar.on_recv(self.dispatch_register_request)
344 self.registrar.on_recv(self.dispatch_register_request)
349 self.clientele.on_recv(self.dispatch_client_msg)
345 self.clientele.on_recv(self.dispatch_client_msg)
350 self.monitor.on_recv(self.dispatch_monitor_traffic)
346 self.monitor.on_recv(self.dispatch_monitor_traffic)
351
347
352 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
348 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
353 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
349 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
354
350
355 self.monitor_handlers = { 'in' : self.save_queue_request,
351 self.monitor_handlers = { 'in' : self.save_queue_request,
356 'out': self.save_queue_result,
352 'out': self.save_queue_result,
357 'intask': self.save_task_request,
353 'intask': self.save_task_request,
358 'outtask': self.save_task_result,
354 'outtask': self.save_task_result,
359 'tracktask': self.save_task_destination,
355 'tracktask': self.save_task_destination,
360 'incontrol': _passer,
356 'incontrol': _passer,
361 'outcontrol': _passer,
357 'outcontrol': _passer,
362 'iopub': self.save_iopub_message,
358 'iopub': self.save_iopub_message,
363 }
359 }
364
360
365 self.client_handlers = {'queue_request': self.queue_status,
361 self.client_handlers = {'queue_request': self.queue_status,
366 'result_request': self.get_results,
362 'result_request': self.get_results,
367 'purge_request': self.purge_results,
363 'purge_request': self.purge_results,
368 'load_request': self.check_load,
364 'load_request': self.check_load,
369 'resubmit_request': self.resubmit_task,
365 'resubmit_request': self.resubmit_task,
370 'shutdown_request': self.shutdown_request,
366 'shutdown_request': self.shutdown_request,
371 }
367 }
372
368
373 self.registrar_handlers = {'registration_request' : self.register_engine,
369 self.registrar_handlers = {'registration_request' : self.register_engine,
374 'unregistration_request' : self.unregister_engine,
370 'unregistration_request' : self.unregister_engine,
375 'connection_request': self.connection_request,
371 'connection_request': self.connection_request,
376 }
372 }
377
373
378 self.log.info("hub::created hub")
374 self.log.info("hub::created hub")
379
375
380 @property
376 @property
381 def _next_id(self):
377 def _next_id(self):
382 """gemerate a new ID.
378 """gemerate a new ID.
383
379
384 No longer reuse old ids, just count from 0."""
380 No longer reuse old ids, just count from 0."""
385 newid = self._idcounter
381 newid = self._idcounter
386 self._idcounter += 1
382 self._idcounter += 1
387 return newid
383 return newid
388 # newid = 0
384 # newid = 0
389 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
385 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
390 # # print newid, self.ids, self.incoming_registrations
386 # # print newid, self.ids, self.incoming_registrations
391 # while newid in self.ids or newid in incoming:
387 # while newid in self.ids or newid in incoming:
392 # newid += 1
388 # newid += 1
393 # return newid
389 # return newid
394
390
395 #-----------------------------------------------------------------------------
391 #-----------------------------------------------------------------------------
396 # message validation
392 # message validation
397 #-----------------------------------------------------------------------------
393 #-----------------------------------------------------------------------------
398
394
399 def _validate_targets(self, targets):
395 def _validate_targets(self, targets):
400 """turn any valid targets argument into a list of integer ids"""
396 """turn any valid targets argument into a list of integer ids"""
401 if targets is None:
397 if targets is None:
402 # default to all
398 # default to all
403 targets = self.ids
399 targets = self.ids
404
400
405 if isinstance(targets, (int,str,unicode)):
401 if isinstance(targets, (int,str,unicode)):
406 # only one target specified
402 # only one target specified
407 targets = [targets]
403 targets = [targets]
408 _targets = []
404 _targets = []
409 for t in targets:
405 for t in targets:
410 # map raw identities to ids
406 # map raw identities to ids
411 if isinstance(t, (str,unicode)):
407 if isinstance(t, (str,unicode)):
412 t = self.by_ident.get(t, t)
408 t = self.by_ident.get(t, t)
413 _targets.append(t)
409 _targets.append(t)
414 targets = _targets
410 targets = _targets
415 bad_targets = [ t for t in targets if t not in self.ids ]
411 bad_targets = [ t for t in targets if t not in self.ids ]
416 if bad_targets:
412 if bad_targets:
417 raise IndexError("No Such Engine: %r"%bad_targets)
413 raise IndexError("No Such Engine: %r"%bad_targets)
418 if not targets:
414 if not targets:
419 raise IndexError("No Engines Registered")
415 raise IndexError("No Engines Registered")
420 return targets
416 return targets
421
417
422 def _validate_client_msg(self, msg):
418 def _validate_client_msg(self, msg):
423 """validates and unpacks headers of a message. Returns False if invalid,
419 """validates and unpacks headers of a message. Returns False if invalid,
424 (ident, header, parent, content)"""
420 (ident, header, parent, content)"""
425 client_id = msg[0]
421 client_id = msg[0]
426 try:
422 try:
427 msg = self.session.unpack_message(msg[1:], content=True)
423 msg = self.session.unpack_message(msg[1:], content=True)
428 except:
424 except:
429 self.log.error("client::Invalid Message %s"%msg, exc_info=True)
425 self.log.error("client::Invalid Message %s"%msg, exc_info=True)
430 return False
426 return False
431
427
432 msg_type = msg.get('msg_type', None)
428 msg_type = msg.get('msg_type', None)
433 if msg_type is None:
429 if msg_type is None:
434 return False
430 return False
435 header = msg.get('header')
431 header = msg.get('header')
436 # session doesn't handle split content for now:
432 # session doesn't handle split content for now:
437 return client_id, msg
433 return client_id, msg
438
434
439
435
440 #-----------------------------------------------------------------------------
436 #-----------------------------------------------------------------------------
441 # dispatch methods (1 per stream)
437 # dispatch methods (1 per stream)
442 #-----------------------------------------------------------------------------
438 #-----------------------------------------------------------------------------
443
439
444 def dispatch_register_request(self, msg):
440 def dispatch_register_request(self, msg):
445 """"""
441 """"""
446 self.log.debug("registration::dispatch_register_request(%s)"%msg)
442 self.log.debug("registration::dispatch_register_request(%s)"%msg)
447 idents,msg = self.session.feed_identities(msg)
443 idents,msg = self.session.feed_identities(msg)
448 if not idents:
444 if not idents:
449 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
445 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
450 return
446 return
451 try:
447 try:
452 msg = self.session.unpack_message(msg,content=True)
448 msg = self.session.unpack_message(msg,content=True)
453 except:
449 except:
454 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
450 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
455 return
451 return
456
452
457 msg_type = msg['msg_type']
453 msg_type = msg['msg_type']
458 content = msg['content']
454 content = msg['content']
459
455
460 handler = self.registrar_handlers.get(msg_type, None)
456 handler = self.registrar_handlers.get(msg_type, None)
461 if handler is None:
457 if handler is None:
462 self.log.error("registration::got bad registration message: %s"%msg)
458 self.log.error("registration::got bad registration message: %s"%msg)
463 else:
459 else:
464 handler(idents, msg)
460 handler(idents, msg)
465
461
466 def dispatch_monitor_traffic(self, msg):
462 def dispatch_monitor_traffic(self, msg):
467 """all ME and Task queue messages come through here, as well as
463 """all ME and Task queue messages come through here, as well as
468 IOPub traffic."""
464 IOPub traffic."""
469 self.log.debug("monitor traffic: %s"%msg[:2])
465 self.log.debug("monitor traffic: %s"%msg[:2])
470 switch = msg[0]
466 switch = msg[0]
471 idents, msg = self.session.feed_identities(msg[1:])
467 idents, msg = self.session.feed_identities(msg[1:])
472 if not idents:
468 if not idents:
473 self.log.error("Bad Monitor Message: %s"%msg)
469 self.log.error("Bad Monitor Message: %s"%msg)
474 return
470 return
475 handler = self.monitor_handlers.get(switch, None)
471 handler = self.monitor_handlers.get(switch, None)
476 if handler is not None:
472 if handler is not None:
477 handler(idents, msg)
473 handler(idents, msg)
478 else:
474 else:
479 self.log.error("Invalid monitor topic: %s"%switch)
475 self.log.error("Invalid monitor topic: %s"%switch)
480
476
481
477
482 def dispatch_client_msg(self, msg):
478 def dispatch_client_msg(self, msg):
483 """Route messages from clients"""
479 """Route messages from clients"""
484 idents, msg = self.session.feed_identities(msg)
480 idents, msg = self.session.feed_identities(msg)
485 if not idents:
481 if not idents:
486 self.log.error("Bad Client Message: %s"%msg)
482 self.log.error("Bad Client Message: %s"%msg)
487 return
483 return
488 client_id = idents[0]
484 client_id = idents[0]
489 try:
485 try:
490 msg = self.session.unpack_message(msg, content=True)
486 msg = self.session.unpack_message(msg, content=True)
491 except:
487 except:
492 content = error.wrap_exception()
488 content = error.wrap_exception()
493 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
489 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
494 self.session.send(self.clientele, "hub_error", ident=client_id,
490 self.session.send(self.clientele, "hub_error", ident=client_id,
495 content=content)
491 content=content)
496 return
492 return
497
493
498 # print client_id, header, parent, content
494 # print client_id, header, parent, content
499 #switch on message type:
495 #switch on message type:
500 msg_type = msg['msg_type']
496 msg_type = msg['msg_type']
501 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
497 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
502 handler = self.client_handlers.get(msg_type, None)
498 handler = self.client_handlers.get(msg_type, None)
503 try:
499 try:
504 assert handler is not None, "Bad Message Type: %s"%msg_type
500 assert handler is not None, "Bad Message Type: %s"%msg_type
505 except:
501 except:
506 content = error.wrap_exception()
502 content = error.wrap_exception()
507 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
503 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
508 self.session.send(self.clientele, "hub_error", ident=client_id,
504 self.session.send(self.clientele, "hub_error", ident=client_id,
509 content=content)
505 content=content)
510 return
506 return
511 else:
507 else:
512 handler(client_id, msg)
508 handler(client_id, msg)
513
509
514 def dispatch_db(self, msg):
510 def dispatch_db(self, msg):
515 """"""
511 """"""
516 raise NotImplementedError
512 raise NotImplementedError
517
513
518 #---------------------------------------------------------------------------
514 #---------------------------------------------------------------------------
519 # handler methods (1 per event)
515 # handler methods (1 per event)
520 #---------------------------------------------------------------------------
516 #---------------------------------------------------------------------------
521
517
522 #----------------------- Heartbeat --------------------------------------
518 #----------------------- Heartbeat --------------------------------------
523
519
524 def handle_new_heart(self, heart):
520 def handle_new_heart(self, heart):
525 """handler to attach to heartbeater.
521 """handler to attach to heartbeater.
526 Called when a new heart starts to beat.
522 Called when a new heart starts to beat.
527 Triggers completion of registration."""
523 Triggers completion of registration."""
528 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
524 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
529 if heart not in self.incoming_registrations:
525 if heart not in self.incoming_registrations:
530 self.log.info("heartbeat::ignoring new heart: %r"%heart)
526 self.log.info("heartbeat::ignoring new heart: %r"%heart)
531 else:
527 else:
532 self.finish_registration(heart)
528 self.finish_registration(heart)
533
529
534
530
535 def handle_heart_failure(self, heart):
531 def handle_heart_failure(self, heart):
536 """handler to attach to heartbeater.
532 """handler to attach to heartbeater.
537 called when a previously registered heart fails to respond to beat request.
533 called when a previously registered heart fails to respond to beat request.
538 triggers unregistration"""
534 triggers unregistration"""
539 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
535 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
540 eid = self.hearts.get(heart, None)
536 eid = self.hearts.get(heart, None)
541 queue = self.engines[eid].queue
537 queue = self.engines[eid].queue
542 if eid is None:
538 if eid is None:
543 self.log.info("heartbeat::ignoring heart failure %r"%heart)
539 self.log.info("heartbeat::ignoring heart failure %r"%heart)
544 else:
540 else:
545 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
541 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
546
542
547 #----------------------- MUX Queue Traffic ------------------------------
543 #----------------------- MUX Queue Traffic ------------------------------
548
544
549 def save_queue_request(self, idents, msg):
545 def save_queue_request(self, idents, msg):
550 if len(idents) < 2:
546 if len(idents) < 2:
551 self.log.error("invalid identity prefix: %s"%idents)
547 self.log.error("invalid identity prefix: %s"%idents)
552 return
548 return
553 queue_id, client_id = idents[:2]
549 queue_id, client_id = idents[:2]
554 try:
550 try:
555 msg = self.session.unpack_message(msg, content=False)
551 msg = self.session.unpack_message(msg, content=False)
556 except:
552 except:
557 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
553 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
558 return
554 return
559
555
560 eid = self.by_ident.get(queue_id, None)
556 eid = self.by_ident.get(queue_id, None)
561 if eid is None:
557 if eid is None:
562 self.log.error("queue::target %r not registered"%queue_id)
558 self.log.error("queue::target %r not registered"%queue_id)
563 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
559 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
564 return
560 return
565
561
566 header = msg['header']
562 header = msg['header']
567 msg_id = header['msg_id']
563 msg_id = header['msg_id']
568 record = init_record(msg)
564 record = init_record(msg)
569 record['engine_uuid'] = queue_id
565 record['engine_uuid'] = queue_id
570 record['client_uuid'] = client_id
566 record['client_uuid'] = client_id
571 record['queue'] = 'mux'
567 record['queue'] = 'mux'
572 if MongoDB is not None and isinstance(self.db, MongoDB):
568
573 record['buffers'] = map(Binary, record['buffers'])
574 self.pending.add(msg_id)
569 self.pending.add(msg_id)
575 self.queues[eid].append(msg_id)
570 self.queues[eid].append(msg_id)
576 self.db.add_record(msg_id, record)
571 self.db.add_record(msg_id, record)
577
572
578 def save_queue_result(self, idents, msg):
573 def save_queue_result(self, idents, msg):
579 if len(idents) < 2:
574 if len(idents) < 2:
580 self.log.error("invalid identity prefix: %s"%idents)
575 self.log.error("invalid identity prefix: %s"%idents)
581 return
576 return
582
577
583 client_id, queue_id = idents[:2]
578 client_id, queue_id = idents[:2]
584 try:
579 try:
585 msg = self.session.unpack_message(msg, content=False)
580 msg = self.session.unpack_message(msg, content=False)
586 except:
581 except:
587 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
582 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
588 queue_id,client_id, msg), exc_info=True)
583 queue_id,client_id, msg), exc_info=True)
589 return
584 return
590
585
591 eid = self.by_ident.get(queue_id, None)
586 eid = self.by_ident.get(queue_id, None)
592 if eid is None:
587 if eid is None:
593 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
588 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
594 self.log.debug("queue:: %s"%msg[2:])
589 self.log.debug("queue:: %s"%msg[2:])
595 return
590 return
596
591
597 parent = msg['parent_header']
592 parent = msg['parent_header']
598 if not parent:
593 if not parent:
599 return
594 return
600 msg_id = parent['msg_id']
595 msg_id = parent['msg_id']
601 if msg_id in self.pending:
596 if msg_id in self.pending:
602 self.pending.remove(msg_id)
597 self.pending.remove(msg_id)
603 self.all_completed.add(msg_id)
598 self.all_completed.add(msg_id)
604 self.queues[eid].remove(msg_id)
599 self.queues[eid].remove(msg_id)
605 self.completed[eid].append(msg_id)
600 self.completed[eid].append(msg_id)
606 rheader = msg['header']
601 rheader = msg['header']
607 completed = datetime.strptime(rheader['date'], ISO8601)
602 completed = datetime.strptime(rheader['date'], ISO8601)
608 started = rheader.get('started', None)
603 started = rheader.get('started', None)
609 if started is not None:
604 if started is not None:
610 started = datetime.strptime(started, ISO8601)
605 started = datetime.strptime(started, ISO8601)
611 result = {
606 result = {
612 'result_header' : rheader,
607 'result_header' : rheader,
613 'result_content': msg['content'],
608 'result_content': msg['content'],
614 'started' : started,
609 'started' : started,
615 'completed' : completed
610 'completed' : completed
616 }
611 }
617 if MongoDB is not None and isinstance(self.db, MongoDB):
612
618 result['result_buffers'] = map(Binary, msg['buffers'])
613 result['result_buffers'] = msg['buffers']
619 else:
620 result['result_buffers'] = msg['buffers']
621 self.db.update_record(msg_id, result)
614 self.db.update_record(msg_id, result)
622 else:
615 else:
623 self.log.debug("queue:: unknown msg finished %s"%msg_id)
616 self.log.debug("queue:: unknown msg finished %s"%msg_id)
624
617
625 #--------------------- Task Queue Traffic ------------------------------
618 #--------------------- Task Queue Traffic ------------------------------
626
619
627 def save_task_request(self, idents, msg):
620 def save_task_request(self, idents, msg):
628 """Save the submission of a task."""
621 """Save the submission of a task."""
629 client_id = idents[0]
622 client_id = idents[0]
630
623
631 try:
624 try:
632 msg = self.session.unpack_message(msg, content=False)
625 msg = self.session.unpack_message(msg, content=False)
633 except:
626 except:
634 self.log.error("task::client %r sent invalid task message: %s"%(
627 self.log.error("task::client %r sent invalid task message: %s"%(
635 client_id, msg), exc_info=True)
628 client_id, msg), exc_info=True)
636 return
629 return
637 record = init_record(msg)
630 record = init_record(msg)
638 if MongoDB is not None and isinstance(self.db, MongoDB):
631
639 record['buffers'] = map(Binary, record['buffers'])
640 record['client_uuid'] = client_id
632 record['client_uuid'] = client_id
641 record['queue'] = 'task'
633 record['queue'] = 'task'
642 header = msg['header']
634 header = msg['header']
643 msg_id = header['msg_id']
635 msg_id = header['msg_id']
644 self.pending.add(msg_id)
636 self.pending.add(msg_id)
645 self.db.add_record(msg_id, record)
637 self.db.add_record(msg_id, record)
646
638
647 def save_task_result(self, idents, msg):
639 def save_task_result(self, idents, msg):
648 """save the result of a completed task."""
640 """save the result of a completed task."""
649 client_id = idents[0]
641 client_id = idents[0]
650 try:
642 try:
651 msg = self.session.unpack_message(msg, content=False)
643 msg = self.session.unpack_message(msg, content=False)
652 except:
644 except:
653 self.log.error("task::invalid task result message send to %r: %s"%(
645 self.log.error("task::invalid task result message send to %r: %s"%(
654 client_id, msg), exc_info=True)
646 client_id, msg), exc_info=True)
655 raise
647 raise
656 return
648 return
657
649
658 parent = msg['parent_header']
650 parent = msg['parent_header']
659 if not parent:
651 if not parent:
660 # print msg
652 # print msg
661 self.log.warn("Task %r had no parent!"%msg)
653 self.log.warn("Task %r had no parent!"%msg)
662 return
654 return
663 msg_id = parent['msg_id']
655 msg_id = parent['msg_id']
664
656
665 header = msg['header']
657 header = msg['header']
666 engine_uuid = header.get('engine', None)
658 engine_uuid = header.get('engine', None)
667 eid = self.by_ident.get(engine_uuid, None)
659 eid = self.by_ident.get(engine_uuid, None)
668
660
669 if msg_id in self.pending:
661 if msg_id in self.pending:
670 self.pending.remove(msg_id)
662 self.pending.remove(msg_id)
671 self.all_completed.add(msg_id)
663 self.all_completed.add(msg_id)
672 if eid is not None:
664 if eid is not None:
673 self.completed[eid].append(msg_id)
665 self.completed[eid].append(msg_id)
674 if msg_id in self.tasks[eid]:
666 if msg_id in self.tasks[eid]:
675 self.tasks[eid].remove(msg_id)
667 self.tasks[eid].remove(msg_id)
676 completed = datetime.strptime(header['date'], ISO8601)
668 completed = datetime.strptime(header['date'], ISO8601)
677 started = header.get('started', None)
669 started = header.get('started', None)
678 if started is not None:
670 if started is not None:
679 started = datetime.strptime(started, ISO8601)
671 started = datetime.strptime(started, ISO8601)
680 result = {
672 result = {
681 'result_header' : header,
673 'result_header' : header,
682 'result_content': msg['content'],
674 'result_content': msg['content'],
683 'started' : started,
675 'started' : started,
684 'completed' : completed,
676 'completed' : completed,
685 'engine_uuid': engine_uuid
677 'engine_uuid': engine_uuid
686 }
678 }
687 if MongoDB is not None and isinstance(self.db, MongoDB):
679
688 result['result_buffers'] = map(Binary, msg['buffers'])
680 result['result_buffers'] = msg['buffers']
689 else:
690 result['result_buffers'] = msg['buffers']
691 self.db.update_record(msg_id, result)
681 self.db.update_record(msg_id, result)
692
682
693 else:
683 else:
694 self.log.debug("task::unknown task %s finished"%msg_id)
684 self.log.debug("task::unknown task %s finished"%msg_id)
695
685
696 def save_task_destination(self, idents, msg):
686 def save_task_destination(self, idents, msg):
697 try:
687 try:
698 msg = self.session.unpack_message(msg, content=True)
688 msg = self.session.unpack_message(msg, content=True)
699 except:
689 except:
700 self.log.error("task::invalid task tracking message", exc_info=True)
690 self.log.error("task::invalid task tracking message", exc_info=True)
701 return
691 return
702 content = msg['content']
692 content = msg['content']
703 # print (content)
693 # print (content)
704 msg_id = content['msg_id']
694 msg_id = content['msg_id']
705 engine_uuid = content['engine_id']
695 engine_uuid = content['engine_id']
706 eid = self.by_ident[engine_uuid]
696 eid = self.by_ident[engine_uuid]
707
697
708 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
698 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
709 # if msg_id in self.mia:
699 # if msg_id in self.mia:
710 # self.mia.remove(msg_id)
700 # self.mia.remove(msg_id)
711 # else:
701 # else:
712 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
702 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
713
703
714 self.tasks[eid].append(msg_id)
704 self.tasks[eid].append(msg_id)
715 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
705 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
716 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
706 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
717
707
718 def mia_task_request(self, idents, msg):
708 def mia_task_request(self, idents, msg):
719 raise NotImplementedError
709 raise NotImplementedError
720 client_id = idents[0]
710 client_id = idents[0]
721 # content = dict(mia=self.mia,status='ok')
711 # content = dict(mia=self.mia,status='ok')
722 # self.session.send('mia_reply', content=content, idents=client_id)
712 # self.session.send('mia_reply', content=content, idents=client_id)
723
713
724
714
725 #--------------------- IOPub Traffic ------------------------------
715 #--------------------- IOPub Traffic ------------------------------
726
716
727 def save_iopub_message(self, topics, msg):
717 def save_iopub_message(self, topics, msg):
728 """save an iopub message into the db"""
718 """save an iopub message into the db"""
729 # print (topics)
719 # print (topics)
730 try:
720 try:
731 msg = self.session.unpack_message(msg, content=True)
721 msg = self.session.unpack_message(msg, content=True)
732 except:
722 except:
733 self.log.error("iopub::invalid IOPub message", exc_info=True)
723 self.log.error("iopub::invalid IOPub message", exc_info=True)
734 return
724 return
735
725
736 parent = msg['parent_header']
726 parent = msg['parent_header']
737 if not parent:
727 if not parent:
738 self.log.error("iopub::invalid IOPub message: %s"%msg)
728 self.log.error("iopub::invalid IOPub message: %s"%msg)
739 return
729 return
740 msg_id = parent['msg_id']
730 msg_id = parent['msg_id']
741 msg_type = msg['msg_type']
731 msg_type = msg['msg_type']
742 content = msg['content']
732 content = msg['content']
743
733
744 # ensure msg_id is in db
734 # ensure msg_id is in db
745 try:
735 try:
746 rec = self.db.get_record(msg_id)
736 rec = self.db.get_record(msg_id)
747 except:
737 except:
748 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
738 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
749 return
739 return
750 # stream
740 # stream
751 d = {}
741 d = {}
752 if msg_type == 'stream':
742 if msg_type == 'stream':
753 name = content['name']
743 name = content['name']
754 s = rec[name] or ''
744 s = rec[name] or ''
755 d[name] = s + content['data']
745 d[name] = s + content['data']
756
746
757 elif msg_type == 'pyerr':
747 elif msg_type == 'pyerr':
758 d['pyerr'] = content
748 d['pyerr'] = content
759 else:
749 else:
760 d[msg_type] = content['data']
750 d[msg_type] = content['data']
761
751
762 self.db.update_record(msg_id, d)
752 self.db.update_record(msg_id, d)
763
753
764
754
765
755
766 #-------------------------------------------------------------------------
756 #-------------------------------------------------------------------------
767 # Registration requests
757 # Registration requests
768 #-------------------------------------------------------------------------
758 #-------------------------------------------------------------------------
769
759
770 def connection_request(self, client_id, msg):
760 def connection_request(self, client_id, msg):
771 """Reply with connection addresses for clients."""
761 """Reply with connection addresses for clients."""
772 self.log.info("client::client %s connected"%client_id)
762 self.log.info("client::client %s connected"%client_id)
773 content = dict(status='ok')
763 content = dict(status='ok')
774 content.update(self.client_info)
764 content.update(self.client_info)
775 jsonable = {}
765 jsonable = {}
776 for k,v in self.keytable.iteritems():
766 for k,v in self.keytable.iteritems():
777 jsonable[str(k)] = v
767 jsonable[str(k)] = v
778 content['engines'] = jsonable
768 content['engines'] = jsonable
779 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
769 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
780
770
781 def register_engine(self, reg, msg):
771 def register_engine(self, reg, msg):
782 """Register a new engine."""
772 """Register a new engine."""
783 content = msg['content']
773 content = msg['content']
784 try:
774 try:
785 queue = content['queue']
775 queue = content['queue']
786 except KeyError:
776 except KeyError:
787 self.log.error("registration::queue not specified", exc_info=True)
777 self.log.error("registration::queue not specified", exc_info=True)
788 return
778 return
789 heart = content.get('heartbeat', None)
779 heart = content.get('heartbeat', None)
790 """register a new engine, and create the socket(s) necessary"""
780 """register a new engine, and create the socket(s) necessary"""
791 eid = self._next_id
781 eid = self._next_id
792 # print (eid, queue, reg, heart)
782 # print (eid, queue, reg, heart)
793
783
794 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
784 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
795
785
796 content = dict(id=eid,status='ok')
786 content = dict(id=eid,status='ok')
797 content.update(self.engine_info)
787 content.update(self.engine_info)
798 # check if requesting available IDs:
788 # check if requesting available IDs:
799 if queue in self.by_ident:
789 if queue in self.by_ident:
800 try:
790 try:
801 raise KeyError("queue_id %r in use"%queue)
791 raise KeyError("queue_id %r in use"%queue)
802 except:
792 except:
803 content = error.wrap_exception()
793 content = error.wrap_exception()
804 self.log.error("queue_id %r in use"%queue, exc_info=True)
794 self.log.error("queue_id %r in use"%queue, exc_info=True)
805 elif heart in self.hearts: # need to check unique hearts?
795 elif heart in self.hearts: # need to check unique hearts?
806 try:
796 try:
807 raise KeyError("heart_id %r in use"%heart)
797 raise KeyError("heart_id %r in use"%heart)
808 except:
798 except:
809 self.log.error("heart_id %r in use"%heart, exc_info=True)
799 self.log.error("heart_id %r in use"%heart, exc_info=True)
810 content = error.wrap_exception()
800 content = error.wrap_exception()
811 else:
801 else:
812 for h, pack in self.incoming_registrations.iteritems():
802 for h, pack in self.incoming_registrations.iteritems():
813 if heart == h:
803 if heart == h:
814 try:
804 try:
815 raise KeyError("heart_id %r in use"%heart)
805 raise KeyError("heart_id %r in use"%heart)
816 except:
806 except:
817 self.log.error("heart_id %r in use"%heart, exc_info=True)
807 self.log.error("heart_id %r in use"%heart, exc_info=True)
818 content = error.wrap_exception()
808 content = error.wrap_exception()
819 break
809 break
820 elif queue == pack[1]:
810 elif queue == pack[1]:
821 try:
811 try:
822 raise KeyError("queue_id %r in use"%queue)
812 raise KeyError("queue_id %r in use"%queue)
823 except:
813 except:
824 self.log.error("queue_id %r in use"%queue, exc_info=True)
814 self.log.error("queue_id %r in use"%queue, exc_info=True)
825 content = error.wrap_exception()
815 content = error.wrap_exception()
826 break
816 break
827
817
828 msg = self.session.send(self.registrar, "registration_reply",
818 msg = self.session.send(self.registrar, "registration_reply",
829 content=content,
819 content=content,
830 ident=reg)
820 ident=reg)
831
821
832 if content['status'] == 'ok':
822 if content['status'] == 'ok':
833 if heart in self.heartmonitor.hearts:
823 if heart in self.heartmonitor.hearts:
834 # already beating
824 # already beating
835 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
825 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
836 self.finish_registration(heart)
826 self.finish_registration(heart)
837 else:
827 else:
838 purge = lambda : self._purge_stalled_registration(heart)
828 purge = lambda : self._purge_stalled_registration(heart)
839 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
829 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
840 dc.start()
830 dc.start()
841 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
831 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
842 else:
832 else:
843 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
833 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
844 return eid
834 return eid
845
835
846 def unregister_engine(self, ident, msg):
836 def unregister_engine(self, ident, msg):
847 """Unregister an engine that explicitly requested to leave."""
837 """Unregister an engine that explicitly requested to leave."""
848 try:
838 try:
849 eid = msg['content']['id']
839 eid = msg['content']['id']
850 except:
840 except:
851 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
841 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
852 return
842 return
853 self.log.info("registration::unregister_engine(%s)"%eid)
843 self.log.info("registration::unregister_engine(%s)"%eid)
854 content=dict(id=eid, queue=self.engines[eid].queue)
844 content=dict(id=eid, queue=self.engines[eid].queue)
855 self.ids.remove(eid)
845 self.ids.remove(eid)
856 self.keytable.pop(eid)
846 self.keytable.pop(eid)
857 ec = self.engines.pop(eid)
847 ec = self.engines.pop(eid)
858 self.hearts.pop(ec.heartbeat)
848 self.hearts.pop(ec.heartbeat)
859 self.by_ident.pop(ec.queue)
849 self.by_ident.pop(ec.queue)
860 self.completed.pop(eid)
850 self.completed.pop(eid)
861 for msg_id in self.queues.pop(eid):
851 for msg_id in self.queues.pop(eid):
862 msg = self.pending.remove(msg_id)
852 msg = self.pending.remove(msg_id)
863 ############## TODO: HANDLE IT ################
853 ############## TODO: HANDLE IT ################
864
854
865 if self.notifier:
855 if self.notifier:
866 self.session.send(self.notifier, "unregistration_notification", content=content)
856 self.session.send(self.notifier, "unregistration_notification", content=content)
867
857
868 def finish_registration(self, heart):
858 def finish_registration(self, heart):
869 """Second half of engine registration, called after our HeartMonitor
859 """Second half of engine registration, called after our HeartMonitor
870 has received a beat from the Engine's Heart."""
860 has received a beat from the Engine's Heart."""
871 try:
861 try:
872 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
862 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
873 except KeyError:
863 except KeyError:
874 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
864 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
875 return
865 return
876 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
866 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
877 if purge is not None:
867 if purge is not None:
878 purge.stop()
868 purge.stop()
879 control = queue
869 control = queue
880 self.ids.add(eid)
870 self.ids.add(eid)
881 self.keytable[eid] = queue
871 self.keytable[eid] = queue
882 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
872 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
883 control=control, heartbeat=heart)
873 control=control, heartbeat=heart)
884 self.by_ident[queue] = eid
874 self.by_ident[queue] = eid
885 self.queues[eid] = list()
875 self.queues[eid] = list()
886 self.tasks[eid] = list()
876 self.tasks[eid] = list()
887 self.completed[eid] = list()
877 self.completed[eid] = list()
888 self.hearts[heart] = eid
878 self.hearts[heart] = eid
889 content = dict(id=eid, queue=self.engines[eid].queue)
879 content = dict(id=eid, queue=self.engines[eid].queue)
890 if self.notifier:
880 if self.notifier:
891 self.session.send(self.notifier, "registration_notification", content=content)
881 self.session.send(self.notifier, "registration_notification", content=content)
892 self.log.info("engine::Engine Connected: %i"%eid)
882 self.log.info("engine::Engine Connected: %i"%eid)
893
883
894 def _purge_stalled_registration(self, heart):
884 def _purge_stalled_registration(self, heart):
895 if heart in self.incoming_registrations:
885 if heart in self.incoming_registrations:
896 eid = self.incoming_registrations.pop(heart)[0]
886 eid = self.incoming_registrations.pop(heart)[0]
897 self.log.info("registration::purging stalled registration: %i"%eid)
887 self.log.info("registration::purging stalled registration: %i"%eid)
898 else:
888 else:
899 pass
889 pass
900
890
901 #-------------------------------------------------------------------------
891 #-------------------------------------------------------------------------
902 # Client Requests
892 # Client Requests
903 #-------------------------------------------------------------------------
893 #-------------------------------------------------------------------------
904
894
905 def shutdown_request(self, client_id, msg):
895 def shutdown_request(self, client_id, msg):
906 """handle shutdown request."""
896 """handle shutdown request."""
907 # s = self.context.socket(zmq.XREQ)
897 # s = self.context.socket(zmq.XREQ)
908 # s.connect(self.client_connections['mux'])
898 # s.connect(self.client_connections['mux'])
909 # time.sleep(0.1)
899 # time.sleep(0.1)
910 # for eid,ec in self.engines.iteritems():
900 # for eid,ec in self.engines.iteritems():
911 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
901 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
912 # time.sleep(1)
902 # time.sleep(1)
913 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
903 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
914 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
904 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
915 dc.start()
905 dc.start()
916
906
917 def _shutdown(self):
907 def _shutdown(self):
918 self.log.info("hub::hub shutting down.")
908 self.log.info("hub::hub shutting down.")
919 time.sleep(0.1)
909 time.sleep(0.1)
920 sys.exit(0)
910 sys.exit(0)
921
911
922
912
923 def check_load(self, client_id, msg):
913 def check_load(self, client_id, msg):
924 content = msg['content']
914 content = msg['content']
925 try:
915 try:
926 targets = content['targets']
916 targets = content['targets']
927 targets = self._validate_targets(targets)
917 targets = self._validate_targets(targets)
928 except:
918 except:
929 content = error.wrap_exception()
919 content = error.wrap_exception()
930 self.session.send(self.clientele, "hub_error",
920 self.session.send(self.clientele, "hub_error",
931 content=content, ident=client_id)
921 content=content, ident=client_id)
932 return
922 return
933
923
934 content = dict(status='ok')
924 content = dict(status='ok')
935 # loads = {}
925 # loads = {}
936 for t in targets:
926 for t in targets:
937 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
927 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
938 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
928 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
939
929
940
930
941 def queue_status(self, client_id, msg):
931 def queue_status(self, client_id, msg):
942 """Return the Queue status of one or more targets.
932 """Return the Queue status of one or more targets.
943 if verbose: return the msg_ids
933 if verbose: return the msg_ids
944 else: return len of each type.
934 else: return len of each type.
945 keys: queue (pending MUX jobs)
935 keys: queue (pending MUX jobs)
946 tasks (pending Task jobs)
936 tasks (pending Task jobs)
947 completed (finished jobs from both queues)"""
937 completed (finished jobs from both queues)"""
948 content = msg['content']
938 content = msg['content']
949 targets = content['targets']
939 targets = content['targets']
950 try:
940 try:
951 targets = self._validate_targets(targets)
941 targets = self._validate_targets(targets)
952 except:
942 except:
953 content = error.wrap_exception()
943 content = error.wrap_exception()
954 self.session.send(self.clientele, "hub_error",
944 self.session.send(self.clientele, "hub_error",
955 content=content, ident=client_id)
945 content=content, ident=client_id)
956 return
946 return
957 verbose = content.get('verbose', False)
947 verbose = content.get('verbose', False)
958 content = dict(status='ok')
948 content = dict(status='ok')
959 for t in targets:
949 for t in targets:
960 queue = self.queues[t]
950 queue = self.queues[t]
961 completed = self.completed[t]
951 completed = self.completed[t]
962 tasks = self.tasks[t]
952 tasks = self.tasks[t]
963 if not verbose:
953 if not verbose:
964 queue = len(queue)
954 queue = len(queue)
965 completed = len(completed)
955 completed = len(completed)
966 tasks = len(tasks)
956 tasks = len(tasks)
967 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
957 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
968 # pending
958 # pending
969 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
959 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
970
960
971 def purge_results(self, client_id, msg):
961 def purge_results(self, client_id, msg):
972 """Purge results from memory. This method is more valuable before we move
962 """Purge results from memory. This method is more valuable before we move
973 to a DB based message storage mechanism."""
963 to a DB based message storage mechanism."""
974 content = msg['content']
964 content = msg['content']
975 msg_ids = content.get('msg_ids', [])
965 msg_ids = content.get('msg_ids', [])
976 reply = dict(status='ok')
966 reply = dict(status='ok')
977 if msg_ids == 'all':
967 if msg_ids == 'all':
978 self.db.drop_matching_records(dict(completed={'$ne':None}))
968 self.db.drop_matching_records(dict(completed={'$ne':None}))
979 else:
969 else:
980 for msg_id in msg_ids:
970 for msg_id in msg_ids:
981 if msg_id in self.all_completed:
971 if msg_id in self.all_completed:
982 self.db.drop_record(msg_id)
972 self.db.drop_record(msg_id)
983 else:
973 else:
984 if msg_id in self.pending:
974 if msg_id in self.pending:
985 try:
975 try:
986 raise IndexError("msg pending: %r"%msg_id)
976 raise IndexError("msg pending: %r"%msg_id)
987 except:
977 except:
988 reply = error.wrap_exception()
978 reply = error.wrap_exception()
989 else:
979 else:
990 try:
980 try:
991 raise IndexError("No such msg: %r"%msg_id)
981 raise IndexError("No such msg: %r"%msg_id)
992 except:
982 except:
993 reply = error.wrap_exception()
983 reply = error.wrap_exception()
994 break
984 break
995 eids = content.get('engine_ids', [])
985 eids = content.get('engine_ids', [])
996 for eid in eids:
986 for eid in eids:
997 if eid not in self.engines:
987 if eid not in self.engines:
998 try:
988 try:
999 raise IndexError("No such engine: %i"%eid)
989 raise IndexError("No such engine: %i"%eid)
1000 except:
990 except:
1001 reply = error.wrap_exception()
991 reply = error.wrap_exception()
1002 break
992 break
1003 msg_ids = self.completed.pop(eid)
993 msg_ids = self.completed.pop(eid)
1004 uid = self.engines[eid].queue
994 uid = self.engines[eid].queue
1005 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
995 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1006
996
1007 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
997 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
1008
998
1009 def resubmit_task(self, client_id, msg, buffers):
999 def resubmit_task(self, client_id, msg, buffers):
1010 """Resubmit a task."""
1000 """Resubmit a task."""
1011 raise NotImplementedError
1001 raise NotImplementedError
1012
1002
1013 def get_results(self, client_id, msg):
1003 def get_results(self, client_id, msg):
1014 """Get the result of 1 or more messages."""
1004 """Get the result of 1 or more messages."""
1015 content = msg['content']
1005 content = msg['content']
1016 msg_ids = sorted(set(content['msg_ids']))
1006 msg_ids = sorted(set(content['msg_ids']))
1017 statusonly = content.get('status_only', False)
1007 statusonly = content.get('status_only', False)
1018 pending = []
1008 pending = []
1019 completed = []
1009 completed = []
1020 content = dict(status='ok')
1010 content = dict(status='ok')
1021 content['pending'] = pending
1011 content['pending'] = pending
1022 content['completed'] = completed
1012 content['completed'] = completed
1023 buffers = []
1013 buffers = []
1024 if not statusonly:
1014 if not statusonly:
1025 content['results'] = {}
1015 content['results'] = {}
1026 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1016 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1027 for msg_id in msg_ids:
1017 for msg_id in msg_ids:
1028 if msg_id in self.pending:
1018 if msg_id in self.pending:
1029 pending.append(msg_id)
1019 pending.append(msg_id)
1030 elif msg_id in self.all_completed:
1020 elif msg_id in self.all_completed:
1031 completed.append(msg_id)
1021 completed.append(msg_id)
1032 if not statusonly:
1022 if not statusonly:
1033 rec = records[msg_id]
1023 rec = records[msg_id]
1034 io_dict = {}
1024 io_dict = {}
1035 for key in 'pyin pyout pyerr stdout stderr'.split():
1025 for key in 'pyin pyout pyerr stdout stderr'.split():
1036 io_dict[key] = rec[key]
1026 io_dict[key] = rec[key]
1037 content[msg_id] = { 'result_content': rec['result_content'],
1027 content[msg_id] = { 'result_content': rec['result_content'],
1038 'header': rec['header'],
1028 'header': rec['header'],
1039 'result_header' : rec['result_header'],
1029 'result_header' : rec['result_header'],
1040 'io' : io_dict,
1030 'io' : io_dict,
1041 }
1031 }
1042 buffers.extend(map(str, rec['result_buffers']))
1032 buffers.extend(map(str, rec['result_buffers']))
1043 else:
1033 else:
1044 try:
1034 try:
1045 raise KeyError('No such message: '+msg_id)
1035 raise KeyError('No such message: '+msg_id)
1046 except:
1036 except:
1047 content = error.wrap_exception()
1037 content = error.wrap_exception()
1048 break
1038 break
1049 self.session.send(self.clientele, "result_reply", content=content,
1039 self.session.send(self.clientele, "result_reply", content=content,
1050 parent=msg, ident=client_id,
1040 parent=msg, ident=client_id,
1051 buffers=buffers)
1041 buffers=buffers)
1052
1042
@@ -1,427 +1,431 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
20 import copy
21 import os
21 import os
22 import logging
22 import logging
23 import socket
23 import socket
24 import stat
24 import stat
25 import sys
25 import sys
26 import uuid
26 import uuid
27
27
28 import zmq
28 import zmq
29 from zmq.log.handlers import PUBHandler
29 from zmq.log.handlers import PUBHandler
30 from zmq.utils import jsonapi as json
30 from zmq.utils import jsonapi as json
31
31
32 from IPython.config.loader import Config
32 from IPython.config.loader import Config
33 from IPython.zmq.parallel import factory
33 from IPython.zmq.parallel import factory
34 from IPython.zmq.parallel.controller import ControllerFactory
34 from IPython.zmq.parallel.controller import ControllerFactory
35 from IPython.zmq.parallel.clusterdir import (
35 from IPython.zmq.parallel.clusterdir import (
36 ApplicationWithClusterDir,
36 ApplicationWithClusterDir,
37 ClusterDirConfigLoader
37 ClusterDirConfigLoader
38 )
38 )
39 from IPython.zmq.parallel.util import disambiguate_ip_address, split_url
39 from IPython.zmq.parallel.util import disambiguate_ip_address, split_url
40 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
40 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
41 from IPython.utils.traitlets import Instance, Unicode
41 from IPython.utils.traitlets import Instance, Unicode
42
42
43
43
44
44
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46 # Module level variables
46 # Module level variables
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48
48
49
49
50 #: The default config file name for this application
50 #: The default config file name for this application
51 default_config_file_name = u'ipcontrollerz_config.py'
51 default_config_file_name = u'ipcontrollerz_config.py'
52
52
53
53
54 _description = """Start the IPython controller for parallel computing.
54 _description = """Start the IPython controller for parallel computing.
55
55
56 The IPython controller provides a gateway between the IPython engines and
56 The IPython controller provides a gateway between the IPython engines and
57 clients. The controller needs to be started before the engines and can be
57 clients. The controller needs to be started before the engines and can be
58 configured using command line options or using a cluster directory. Cluster
58 configured using command line options or using a cluster directory. Cluster
59 directories contain config, log and security files and are usually located in
59 directories contain config, log and security files and are usually located in
60 your ipython directory and named as "clusterz_<profile>". See the --profile
60 your ipython directory and named as "clusterz_<profile>". See the --profile
61 and --cluster-dir options for details.
61 and --cluster-dir options for details.
62 """
62 """
63
63
64 #-----------------------------------------------------------------------------
64 #-----------------------------------------------------------------------------
65 # Default interfaces
65 # Default interfaces
66 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
67
67
68 # The default client interfaces for FCClientServiceFactory.interfaces
68 # The default client interfaces for FCClientServiceFactory.interfaces
69 default_client_interfaces = Config()
69 default_client_interfaces = Config()
70 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
70 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
71
71
72 # Make this a dict we can pass to Config.__init__ for the default
72 # Make this a dict we can pass to Config.__init__ for the default
73 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
73 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
74
74
75
75
76
76
77 # The default engine interfaces for FCEngineServiceFactory.interfaces
77 # The default engine interfaces for FCEngineServiceFactory.interfaces
78 default_engine_interfaces = Config()
78 default_engine_interfaces = Config()
79 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
79 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
80
80
81 # Make this a dict we can pass to Config.__init__ for the default
81 # Make this a dict we can pass to Config.__init__ for the default
82 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
82 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
83
83
84
84
85 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
86 # Service factories
86 # Service factories
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88
88
89 #
89 #
90 # class FCClientServiceFactory(FCServiceFactory):
90 # class FCClientServiceFactory(FCServiceFactory):
91 # """A Foolscap implementation of the client services."""
91 # """A Foolscap implementation of the client services."""
92 #
92 #
93 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
93 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
94 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
94 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
95 # allow_none=False, config=True)
95 # allow_none=False, config=True)
96 #
96 #
97 #
97 #
98 # class FCEngineServiceFactory(FCServiceFactory):
98 # class FCEngineServiceFactory(FCServiceFactory):
99 # """A Foolscap implementation of the engine services."""
99 # """A Foolscap implementation of the engine services."""
100 #
100 #
101 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
101 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
102 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
102 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
103 # allow_none=False, config=True)
103 # allow_none=False, config=True)
104 #
104 #
105
105
106 #-----------------------------------------------------------------------------
106 #-----------------------------------------------------------------------------
107 # Command line options
107 # Command line options
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109
109
110
110
111 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
111 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
112
112
113 def _add_arguments(self):
113 def _add_arguments(self):
114 super(IPControllerAppConfigLoader, self)._add_arguments()
114 super(IPControllerAppConfigLoader, self)._add_arguments()
115 paa = self.parser.add_argument
115 paa = self.parser.add_argument
116
116
117 ## Hub Config:
117 ## Hub Config:
118 paa('--mongodb',
118 paa('--mongodb',
119 dest='HubFactory.db_class', action='store_const',
119 dest='HubFactory.db_class', action='store_const',
120 const='IPython.zmq.parallel.mongodb.MongoDB',
120 const='IPython.zmq.parallel.mongodb.MongoDB',
121 help='Use MongoDB task storage [default: in-memory]')
121 help='Use MongoDB for task storage [default: in-memory]')
122 paa('--sqlite',
123 dest='HubFactory.db_class', action='store_const',
124 const='IPython.zmq.parallel.sqlitedb.SQLiteDB',
125 help='Use SQLite3 for DB task storage [default: in-memory]')
122 paa('--hb',
126 paa('--hb',
123 type=int, dest='HubFactory.hb', nargs=2,
127 type=int, dest='HubFactory.hb', nargs=2,
124 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
128 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
125 'connections [default: random]',
129 'connections [default: random]',
126 metavar='Hub.hb_ports')
130 metavar='Hub.hb_ports')
127 paa('--ping',
131 paa('--ping',
128 type=int, dest='HubFactory.ping',
132 type=int, dest='HubFactory.ping',
129 help='The frequency at which the Hub pings the engines for heartbeats '
133 help='The frequency at which the Hub pings the engines for heartbeats '
130 ' (in ms) [default: 100]',
134 ' (in ms) [default: 100]',
131 metavar='Hub.ping')
135 metavar='Hub.ping')
132
136
133 # Client config
137 # Client config
134 paa('--client-ip',
138 paa('--client-ip',
135 type=str, dest='HubFactory.client_ip',
139 type=str, dest='HubFactory.client_ip',
136 help='The IP address or hostname the Hub will listen on for '
140 help='The IP address or hostname the Hub will listen on for '
137 'client connections. Both engine-ip and client-ip can be set simultaneously '
141 'client connections. Both engine-ip and client-ip can be set simultaneously '
138 'via --ip [default: loopback]',
142 'via --ip [default: loopback]',
139 metavar='Hub.client_ip')
143 metavar='Hub.client_ip')
140 paa('--client-transport',
144 paa('--client-transport',
141 type=str, dest='HubFactory.client_transport',
145 type=str, dest='HubFactory.client_transport',
142 help='The ZeroMQ transport the Hub will use for '
146 help='The ZeroMQ transport the Hub will use for '
143 'client connections. Both engine-transport and client-transport can be set simultaneously '
147 'client connections. Both engine-transport and client-transport can be set simultaneously '
144 'via --transport [default: tcp]',
148 'via --transport [default: tcp]',
145 metavar='Hub.client_transport')
149 metavar='Hub.client_transport')
146 paa('--query',
150 paa('--query',
147 type=int, dest='HubFactory.query_port',
151 type=int, dest='HubFactory.query_port',
148 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
152 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
149 metavar='Hub.query_port')
153 metavar='Hub.query_port')
150 paa('--notifier',
154 paa('--notifier',
151 type=int, dest='HubFactory.notifier_port',
155 type=int, dest='HubFactory.notifier_port',
152 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
156 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
153 metavar='Hub.notifier_port')
157 metavar='Hub.notifier_port')
154
158
155 # Engine config
159 # Engine config
156 paa('--engine-ip',
160 paa('--engine-ip',
157 type=str, dest='HubFactory.engine_ip',
161 type=str, dest='HubFactory.engine_ip',
158 help='The IP address or hostname the Hub will listen on for '
162 help='The IP address or hostname the Hub will listen on for '
159 'engine connections. This applies to the Hub and its schedulers'
163 'engine connections. This applies to the Hub and its schedulers'
160 'engine-ip and client-ip can be set simultaneously '
164 'engine-ip and client-ip can be set simultaneously '
161 'via --ip [default: loopback]',
165 'via --ip [default: loopback]',
162 metavar='Hub.engine_ip')
166 metavar='Hub.engine_ip')
163 paa('--engine-transport',
167 paa('--engine-transport',
164 type=str, dest='HubFactory.engine_transport',
168 type=str, dest='HubFactory.engine_transport',
165 help='The ZeroMQ transport the Hub will use for '
169 help='The ZeroMQ transport the Hub will use for '
166 'client connections. Both engine-transport and client-transport can be set simultaneously '
170 'client connections. Both engine-transport and client-transport can be set simultaneously '
167 'via --transport [default: tcp]',
171 'via --transport [default: tcp]',
168 metavar='Hub.engine_transport')
172 metavar='Hub.engine_transport')
169
173
170 # Scheduler config
174 # Scheduler config
171 paa('--mux',
175 paa('--mux',
172 type=int, dest='ControllerFactory.mux', nargs=2,
176 type=int, dest='ControllerFactory.mux', nargs=2,
173 help='The (2) ports the MUX scheduler will listen on for client,engine '
177 help='The (2) ports the MUX scheduler will listen on for client,engine '
174 'connections, respectively [default: random]',
178 'connections, respectively [default: random]',
175 metavar='Scheduler.mux_ports')
179 metavar='Scheduler.mux_ports')
176 paa('--task',
180 paa('--task',
177 type=int, dest='ControllerFactory.task', nargs=2,
181 type=int, dest='ControllerFactory.task', nargs=2,
178 help='The (2) ports the Task scheduler will listen on for client,engine '
182 help='The (2) ports the Task scheduler will listen on for client,engine '
179 'connections, respectively [default: random]',
183 'connections, respectively [default: random]',
180 metavar='Scheduler.task_ports')
184 metavar='Scheduler.task_ports')
181 paa('--control',
185 paa('--control',
182 type=int, dest='ControllerFactory.control', nargs=2,
186 type=int, dest='ControllerFactory.control', nargs=2,
183 help='The (2) ports the Control scheduler will listen on for client,engine '
187 help='The (2) ports the Control scheduler will listen on for client,engine '
184 'connections, respectively [default: random]',
188 'connections, respectively [default: random]',
185 metavar='Scheduler.control_ports')
189 metavar='Scheduler.control_ports')
186 paa('--iopub',
190 paa('--iopub',
187 type=int, dest='ControllerFactory.iopub', nargs=2,
191 type=int, dest='ControllerFactory.iopub', nargs=2,
188 help='The (2) ports the IOPub scheduler will listen on for client,engine '
192 help='The (2) ports the IOPub scheduler will listen on for client,engine '
189 'connections, respectively [default: random]',
193 'connections, respectively [default: random]',
190 metavar='Scheduler.iopub_ports')
194 metavar='Scheduler.iopub_ports')
191
195
192 paa('--scheme',
196 paa('--scheme',
193 type=str, dest='HubFactory.scheme',
197 type=str, dest='HubFactory.scheme',
194 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
198 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
195 help='select the task scheduler scheme [default: Python LRU]',
199 help='select the task scheduler scheme [default: Python LRU]',
196 metavar='Scheduler.scheme')
200 metavar='Scheduler.scheme')
197 paa('--usethreads',
201 paa('--usethreads',
198 dest='ControllerFactory.usethreads', action="store_true",
202 dest='ControllerFactory.usethreads', action="store_true",
199 help='Use threads instead of processes for the schedulers',
203 help='Use threads instead of processes for the schedulers',
200 )
204 )
201 paa('--hwm',
205 paa('--hwm',
202 dest='ControllerFactory.hwm', type=int,
206 dest='ControllerFactory.hwm', type=int,
203 help='specify the High Water Mark (HWM) for the downstream '
207 help='specify the High Water Mark (HWM) for the downstream '
204 'socket in the pure ZMQ scheduler. This is the maximum number '
208 'socket in the pure ZMQ scheduler. This is the maximum number '
205 'of allowed outstanding tasks on each engine.',
209 'of allowed outstanding tasks on each engine.',
206 )
210 )
207
211
208 ## Global config
212 ## Global config
209 paa('--log-to-file',
213 paa('--log-to-file',
210 action='store_true', dest='Global.log_to_file',
214 action='store_true', dest='Global.log_to_file',
211 help='Log to a file in the log directory (default is stdout)')
215 help='Log to a file in the log directory (default is stdout)')
212 paa('--log-url',
216 paa('--log-url',
213 type=str, dest='Global.log_url',
217 type=str, dest='Global.log_url',
214 help='Broadcast logs to an iploggerz process [default: disabled]')
218 help='Broadcast logs to an iploggerz process [default: disabled]')
215 paa('-r','--reuse-files',
219 paa('-r','--reuse-files',
216 action='store_true', dest='Global.reuse_files',
220 action='store_true', dest='Global.reuse_files',
217 help='Try to reuse existing json connection files.')
221 help='Try to reuse existing json connection files.')
218 paa('--no-secure',
222 paa('--no-secure',
219 action='store_false', dest='Global.secure',
223 action='store_false', dest='Global.secure',
220 help='Turn off execution keys (default).')
224 help='Turn off execution keys (default).')
221 paa('--secure',
225 paa('--secure',
222 action='store_true', dest='Global.secure',
226 action='store_true', dest='Global.secure',
223 help='Turn on execution keys.')
227 help='Turn on execution keys.')
224 paa('--execkey',
228 paa('--execkey',
225 type=str, dest='Global.exec_key',
229 type=str, dest='Global.exec_key',
226 help='path to a file containing an execution key.',
230 help='path to a file containing an execution key.',
227 metavar='keyfile')
231 metavar='keyfile')
228 paa('--ssh',
232 paa('--ssh',
229 type=str, dest='Global.sshserver',
233 type=str, dest='Global.sshserver',
230 help='ssh url for clients to use when connecting to the Controller '
234 help='ssh url for clients to use when connecting to the Controller '
231 'processes. It should be of the form: [user@]server[:port]. The '
235 'processes. It should be of the form: [user@]server[:port]. The '
232 'Controller\'s listening addresses must be accessible from the ssh server',
236 'Controller\'s listening addresses must be accessible from the ssh server',
233 metavar='Global.sshserver')
237 metavar='Global.sshserver')
234 paa('--location',
238 paa('--location',
235 type=str, dest='Global.location',
239 type=str, dest='Global.location',
236 help="The external IP or domain name of this machine, used for disambiguating "
240 help="The external IP or domain name of this machine, used for disambiguating "
237 "engine and client connections.",
241 "engine and client connections.",
238 metavar='Global.location')
242 metavar='Global.location')
239 factory.add_session_arguments(self.parser)
243 factory.add_session_arguments(self.parser)
240 factory.add_registration_arguments(self.parser)
244 factory.add_registration_arguments(self.parser)
241
245
242
246
243 #-----------------------------------------------------------------------------
247 #-----------------------------------------------------------------------------
244 # The main application
248 # The main application
245 #-----------------------------------------------------------------------------
249 #-----------------------------------------------------------------------------
246
250
247
251
248 class IPControllerApp(ApplicationWithClusterDir):
252 class IPControllerApp(ApplicationWithClusterDir):
249
253
250 name = u'ipcontrollerz'
254 name = u'ipcontrollerz'
251 description = _description
255 description = _description
252 command_line_loader = IPControllerAppConfigLoader
256 command_line_loader = IPControllerAppConfigLoader
253 default_config_file_name = default_config_file_name
257 default_config_file_name = default_config_file_name
254 auto_create_cluster_dir = True
258 auto_create_cluster_dir = True
255
259
256
260
257 def create_default_config(self):
261 def create_default_config(self):
258 super(IPControllerApp, self).create_default_config()
262 super(IPControllerApp, self).create_default_config()
259 # Don't set defaults for Global.secure or Global.reuse_furls
263 # Don't set defaults for Global.secure or Global.reuse_furls
260 # as those are set in a component.
264 # as those are set in a component.
261 self.default_config.Global.import_statements = []
265 self.default_config.Global.import_statements = []
262 self.default_config.Global.clean_logs = True
266 self.default_config.Global.clean_logs = True
263 self.default_config.Global.secure = True
267 self.default_config.Global.secure = True
264 self.default_config.Global.reuse_files = False
268 self.default_config.Global.reuse_files = False
265 self.default_config.Global.exec_key = "exec_key.key"
269 self.default_config.Global.exec_key = "exec_key.key"
266 self.default_config.Global.sshserver = None
270 self.default_config.Global.sshserver = None
267 self.default_config.Global.location = None
271 self.default_config.Global.location = None
268
272
269 def pre_construct(self):
273 def pre_construct(self):
270 super(IPControllerApp, self).pre_construct()
274 super(IPControllerApp, self).pre_construct()
271 c = self.master_config
275 c = self.master_config
272 # The defaults for these are set in FCClientServiceFactory and
276 # The defaults for these are set in FCClientServiceFactory and
273 # FCEngineServiceFactory, so we only set them here if the global
277 # FCEngineServiceFactory, so we only set them here if the global
274 # options have be set to override the class level defaults.
278 # options have be set to override the class level defaults.
275
279
276 # if hasattr(c.Global, 'reuse_furls'):
280 # if hasattr(c.Global, 'reuse_furls'):
277 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
281 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
278 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
282 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
279 # del c.Global.reuse_furls
283 # del c.Global.reuse_furls
280 # if hasattr(c.Global, 'secure'):
284 # if hasattr(c.Global, 'secure'):
281 # c.FCClientServiceFactory.secure = c.Global.secure
285 # c.FCClientServiceFactory.secure = c.Global.secure
282 # c.FCEngineServiceFactory.secure = c.Global.secure
286 # c.FCEngineServiceFactory.secure = c.Global.secure
283 # del c.Global.secure
287 # del c.Global.secure
284
288
285 def save_connection_dict(self, fname, cdict):
289 def save_connection_dict(self, fname, cdict):
286 """save a connection dict to json file."""
290 """save a connection dict to json file."""
287 c = self.master_config
291 c = self.master_config
288 url = cdict['url']
292 url = cdict['url']
289 location = cdict['location']
293 location = cdict['location']
290 if not location:
294 if not location:
291 try:
295 try:
292 proto,ip,port = split_url(url)
296 proto,ip,port = split_url(url)
293 except AssertionError:
297 except AssertionError:
294 pass
298 pass
295 else:
299 else:
296 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
300 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
297 cdict['location'] = location
301 cdict['location'] = location
298 fname = os.path.join(c.Global.security_dir, fname)
302 fname = os.path.join(c.Global.security_dir, fname)
299 with open(fname, 'w') as f:
303 with open(fname, 'w') as f:
300 f.write(json.dumps(cdict, indent=2))
304 f.write(json.dumps(cdict, indent=2))
301 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
305 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
302
306
303 def load_config_from_json(self):
307 def load_config_from_json(self):
304 """load config from existing json connector files."""
308 """load config from existing json connector files."""
305 c = self.master_config
309 c = self.master_config
306 # load from engine config
310 # load from engine config
307 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
311 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
308 cfg = json.loads(f.read())
312 cfg = json.loads(f.read())
309 key = c.SessionFactory.exec_key = cfg['exec_key']
313 key = c.SessionFactory.exec_key = cfg['exec_key']
310 xport,addr = cfg['url'].split('://')
314 xport,addr = cfg['url'].split('://')
311 c.HubFactory.engine_transport = xport
315 c.HubFactory.engine_transport = xport
312 ip,ports = addr.split(':')
316 ip,ports = addr.split(':')
313 c.HubFactory.engine_ip = ip
317 c.HubFactory.engine_ip = ip
314 c.HubFactory.regport = int(ports)
318 c.HubFactory.regport = int(ports)
315 c.Global.location = cfg['location']
319 c.Global.location = cfg['location']
316
320
317 # load client config
321 # load client config
318 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
322 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
319 cfg = json.loads(f.read())
323 cfg = json.loads(f.read())
320 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
324 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
321 xport,addr = cfg['url'].split('://')
325 xport,addr = cfg['url'].split('://')
322 c.HubFactory.client_transport = xport
326 c.HubFactory.client_transport = xport
323 ip,ports = addr.split(':')
327 ip,ports = addr.split(':')
324 c.HubFactory.client_ip = ip
328 c.HubFactory.client_ip = ip
325 c.Global.sshserver = cfg['ssh']
329 c.Global.sshserver = cfg['ssh']
326 assert int(ports) == c.HubFactory.regport, "regport mismatch"
330 assert int(ports) == c.HubFactory.regport, "regport mismatch"
327
331
328 def construct(self):
332 def construct(self):
329 # This is the working dir by now.
333 # This is the working dir by now.
330 sys.path.insert(0, '')
334 sys.path.insert(0, '')
331 c = self.master_config
335 c = self.master_config
332
336
333 self.import_statements()
337 self.import_statements()
334 reusing = c.Global.reuse_files
338 reusing = c.Global.reuse_files
335 if reusing:
339 if reusing:
336 try:
340 try:
337 self.load_config_from_json()
341 self.load_config_from_json()
338 except (AssertionError,IOError):
342 except (AssertionError,IOError):
339 reusing=False
343 reusing=False
340 # check again, because reusing may have failed:
344 # check again, because reusing may have failed:
341 if reusing:
345 if reusing:
342 pass
346 pass
343 elif c.Global.secure:
347 elif c.Global.secure:
344 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
348 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
345 key = str(uuid.uuid4())
349 key = str(uuid.uuid4())
346 with open(keyfile, 'w') as f:
350 with open(keyfile, 'w') as f:
347 f.write(key)
351 f.write(key)
348 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
352 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
349 c.SessionFactory.exec_key = key
353 c.SessionFactory.exec_key = key
350 else:
354 else:
351 c.SessionFactory.exec_key = ''
355 c.SessionFactory.exec_key = ''
352 key = None
356 key = None
353
357
354 try:
358 try:
355 self.factory = ControllerFactory(config=c, logname=self.log.name)
359 self.factory = ControllerFactory(config=c, logname=self.log.name)
356 self.start_logging()
360 self.start_logging()
357 self.factory.construct()
361 self.factory.construct()
358 except:
362 except:
359 self.log.error("Couldn't construct the Controller", exc_info=True)
363 self.log.error("Couldn't construct the Controller", exc_info=True)
360 self.exit(1)
364 self.exit(1)
361
365
362 if not reusing:
366 if not reusing:
363 # save to new json config files
367 # save to new json config files
364 f = self.factory
368 f = self.factory
365 cdict = {'exec_key' : key,
369 cdict = {'exec_key' : key,
366 'ssh' : c.Global.sshserver,
370 'ssh' : c.Global.sshserver,
367 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
371 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
368 'location' : c.Global.location
372 'location' : c.Global.location
369 }
373 }
370 self.save_connection_dict('ipcontroller-client.json', cdict)
374 self.save_connection_dict('ipcontroller-client.json', cdict)
371 edict = cdict
375 edict = cdict
372 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
376 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
373 self.save_connection_dict('ipcontroller-engine.json', edict)
377 self.save_connection_dict('ipcontroller-engine.json', edict)
374
378
375
379
376 def save_urls(self):
380 def save_urls(self):
377 """save the registration urls to files."""
381 """save the registration urls to files."""
378 c = self.master_config
382 c = self.master_config
379
383
380 sec_dir = c.Global.security_dir
384 sec_dir = c.Global.security_dir
381 cf = self.factory
385 cf = self.factory
382
386
383 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
387 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
384 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
388 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
385
389
386 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
390 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
387 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
391 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
388
392
389
393
390 def import_statements(self):
394 def import_statements(self):
391 statements = self.master_config.Global.import_statements
395 statements = self.master_config.Global.import_statements
392 for s in statements:
396 for s in statements:
393 try:
397 try:
394 self.log.msg("Executing statement: '%s'" % s)
398 self.log.msg("Executing statement: '%s'" % s)
395 exec s in globals(), locals()
399 exec s in globals(), locals()
396 except:
400 except:
397 self.log.msg("Error running statement: %s" % s)
401 self.log.msg("Error running statement: %s" % s)
398
402
399 def start_logging(self):
403 def start_logging(self):
400 super(IPControllerApp, self).start_logging()
404 super(IPControllerApp, self).start_logging()
401 if self.master_config.Global.log_url:
405 if self.master_config.Global.log_url:
402 context = self.factory.context
406 context = self.factory.context
403 lsock = context.socket(zmq.PUB)
407 lsock = context.socket(zmq.PUB)
404 lsock.connect(self.master_config.Global.log_url)
408 lsock.connect(self.master_config.Global.log_url)
405 handler = PUBHandler(lsock)
409 handler = PUBHandler(lsock)
406 handler.root_topic = 'controller'
410 handler.root_topic = 'controller'
407 handler.setLevel(self.log_level)
411 handler.setLevel(self.log_level)
408 self.log.addHandler(handler)
412 self.log.addHandler(handler)
409 #
413 #
410 def start_app(self):
414 def start_app(self):
411 # Start the subprocesses:
415 # Start the subprocesses:
412 self.factory.start()
416 self.factory.start()
413 self.write_pid_file(overwrite=True)
417 self.write_pid_file(overwrite=True)
414 try:
418 try:
415 self.factory.loop.start()
419 self.factory.loop.start()
416 except KeyboardInterrupt:
420 except KeyboardInterrupt:
417 self.log.critical("Interrupted, Exiting...\n")
421 self.log.critical("Interrupted, Exiting...\n")
418
422
419
423
420 def launch_new_instance():
424 def launch_new_instance():
421 """Create and run the IPython controller"""
425 """Create and run the IPython controller"""
422 app = IPControllerApp()
426 app = IPControllerApp()
423 app.start()
427 app.start()
424
428
425
429
426 if __name__ == '__main__':
430 if __name__ == '__main__':
427 launch_new_instance()
431 launch_new_instance()
@@ -1,62 +1,80 b''
1 """A TaskRecord backend using mongodb"""
1 """A TaskRecord backend using mongodb"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 from datetime import datetime
9 from datetime import datetime
10
10
11 from pymongo import Connection
11 from pymongo import Connection
12 from pymongo.binary import Binary
13
14 from IPython.utils.traitlets import Dict, List, CUnicode
12
15
13 from .dictdb import BaseDB
16 from .dictdb import BaseDB
14
17
15 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
16 # MongoDB class
19 # MongoDB class
17 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
18
21
19 class MongoDB(BaseDB):
22 class MongoDB(BaseDB):
20 """MongoDB TaskRecord backend."""
23 """MongoDB TaskRecord backend."""
21 def __init__(self, session_uuid, *args, **kwargs):
24
22 self._connection = Connection(*args, **kwargs)
25 connection_args = List(config=True)
23 self._db = self._connection[session_uuid]
26 connection_kwargs = Dict(config=True)
27 database = CUnicode(config=True)
28 _table = Dict()
29
30 def __init__(self, **kwargs):
31 super(MongoDB, self).__init__(**kwargs)
32 self._connection = Connection(*self.connection_args, **self.connection_kwargs)
33 if not self.database:
34 self.database = self.session
35 self._db = self._connection[self.database]
24 self._records = self._db['task_records']
36 self._records = self._db['task_records']
25 self._table = {}
37
38 def _binary_buffers(self, rec):
39 for key in ('buffers', 'result_buffers'):
40 if key in rec:
41 rec[key] = map(Binary, rec[key])
26
42
27 def add_record(self, msg_id, rec):
43 def add_record(self, msg_id, rec):
28 """Add a new Task Record, by msg_id."""
44 """Add a new Task Record, by msg_id."""
29 # print rec
45 # print rec
46 rec = _binary_buffers(rec)
30 obj_id = self._records.insert(rec)
47 obj_id = self._records.insert(rec)
31 self._table[msg_id] = obj_id
48 self._table[msg_id] = obj_id
32
49
33 def get_record(self, msg_id):
50 def get_record(self, msg_id):
34 """Get a specific Task Record, by msg_id."""
51 """Get a specific Task Record, by msg_id."""
35 return self._records.find_one(self._table[msg_id])
52 return self._records.find_one(self._table[msg_id])
36
53
37 def update_record(self, msg_id, rec):
54 def update_record(self, msg_id, rec):
38 """Update the data in an existing record."""
55 """Update the data in an existing record."""
56 rec = _binary_buffers(rec)
39 obj_id = self._table[msg_id]
57 obj_id = self._table[msg_id]
40 self._records.update({'_id':obj_id}, {'$set': rec})
58 self._records.update({'_id':obj_id}, {'$set': rec})
41
59
42 def drop_matching_records(self, check):
60 def drop_matching_records(self, check):
43 """Remove a record from the DB."""
61 """Remove a record from the DB."""
44 self._records.remove(check)
62 self._records.remove(check)
45
63
46 def drop_record(self, msg_id):
64 def drop_record(self, msg_id):
47 """Remove a record from the DB."""
65 """Remove a record from the DB."""
48 obj_id = self._table.pop(msg_id)
66 obj_id = self._table.pop(msg_id)
49 self._records.remove(obj_id)
67 self._records.remove(obj_id)
50
68
51 def find_records(self, check, id_only=False):
69 def find_records(self, check, id_only=False):
52 """Find records matching a query dict."""
70 """Find records matching a query dict."""
53 matches = list(self._records.find(check))
71 matches = list(self._records.find(check))
54 if id_only:
72 if id_only:
55 return [ rec['msg_id'] for rec in matches ]
73 return [ rec['msg_id'] for rec in matches ]
56 else:
74 else:
57 data = {}
75 data = {}
58 for rec in matches:
76 for rec in matches:
59 data[rec['msg_id']] = rec
77 data[rec['msg_id']] = rec
60 return data
78 return data
61
79
62
80
General Comments 0
You need to be logged in to leave comments. Login now