##// END OF EJS Templates
use HMAC digest to sign messages instead of cleartext key...
MinRK -
Show More
@@ -1,398 +1,402 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import socket
21 import socket
22 import stat
22 import stat
23 import sys
23 import sys
24 import uuid
24 import uuid
25
25
26 from multiprocessing import Process
26 from multiprocessing import Process
27
27
28 import zmq
28 import zmq
29 from zmq.devices import ProcessMonitoredQueue
29 from zmq.devices import ProcessMonitoredQueue
30 from zmq.log.handlers import PUBHandler
30 from zmq.log.handlers import PUBHandler
31 from zmq.utils import jsonapi as json
31 from zmq.utils import jsonapi as json
32
32
33 from IPython.config.application import boolean_flag
33 from IPython.core.newapplication import ProfileDir
34 from IPython.core.newapplication import ProfileDir
34
35
35 from IPython.parallel.apps.baseapp import (
36 from IPython.parallel.apps.baseapp import (
36 BaseParallelApplication,
37 BaseParallelApplication,
37 base_flags
38 base_flags
38 )
39 )
39 from IPython.utils.importstring import import_item
40 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
41 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
41
42
42 # from IPython.parallel.controller.controller import ControllerFactory
43 # from IPython.parallel.controller.controller import ControllerFactory
43 from IPython.parallel.streamsession import StreamSession
44 from IPython.parallel.streamsession import StreamSession
44 from IPython.parallel.controller.heartmonitor import HeartMonitor
45 from IPython.parallel.controller.heartmonitor import HeartMonitor
45 from IPython.parallel.controller.hub import HubFactory
46 from IPython.parallel.controller.hub import HubFactory
46 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
47 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
47 from IPython.parallel.controller.sqlitedb import SQLiteDB
48 from IPython.parallel.controller.sqlitedb import SQLiteDB
48
49
49 from IPython.parallel.util import signal_children, split_url
50 from IPython.parallel.util import signal_children, split_url
50
51
51 # conditional import of MongoDB backend class
52 # conditional import of MongoDB backend class
52
53
53 try:
54 try:
54 from IPython.parallel.controller.mongodb import MongoDB
55 from IPython.parallel.controller.mongodb import MongoDB
55 except ImportError:
56 except ImportError:
56 maybe_mongo = []
57 maybe_mongo = []
57 else:
58 else:
58 maybe_mongo = [MongoDB]
59 maybe_mongo = [MongoDB]
59
60
60
61
61 #-----------------------------------------------------------------------------
62 #-----------------------------------------------------------------------------
62 # Module level variables
63 # Module level variables
63 #-----------------------------------------------------------------------------
64 #-----------------------------------------------------------------------------
64
65
65
66
66 #: The default config file name for this application
67 #: The default config file name for this application
67 default_config_file_name = u'ipcontroller_config.py'
68 default_config_file_name = u'ipcontroller_config.py'
68
69
69
70
70 _description = """Start the IPython controller for parallel computing.
71 _description = """Start the IPython controller for parallel computing.
71
72
72 The IPython controller provides a gateway between the IPython engines and
73 The IPython controller provides a gateway between the IPython engines and
73 clients. The controller needs to be started before the engines and can be
74 clients. The controller needs to be started before the engines and can be
74 configured using command line options or using a cluster directory. Cluster
75 configured using command line options or using a cluster directory. Cluster
75 directories contain config, log and security files and are usually located in
76 directories contain config, log and security files and are usually located in
76 your ipython directory and named as "cluster_<profile>". See the `profile`
77 your ipython directory and named as "cluster_<profile>". See the `profile`
77 and `profile_dir` options for details.
78 and `profile_dir` options for details.
78 """
79 """
79
80
80
81
81
82
82
83
83 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
84 # The main application
85 # The main application
85 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
86 flags = {}
87 flags = {}
87 flags.update(base_flags)
88 flags.update(base_flags)
88 flags.update({
89 flags.update({
89 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
90 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
90 'Use threads instead of processes for the schedulers'),
91 'Use threads instead of processes for the schedulers'),
91 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
92 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
92 'use the SQLiteDB backend'),
93 'use the SQLiteDB backend'),
93 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
94 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
94 'use the MongoDB backend'),
95 'use the MongoDB backend'),
95 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
96 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
96 'use the in-memory DictDB backend'),
97 'use the in-memory DictDB backend'),
97 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
98 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
98 'reuse existing json connection files')
99 'reuse existing json connection files')
99 })
100 })
100
101
101 flags.update()
102 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
103 "Use HMAC digests for authentication of messages.",
104 "Don't authenticate messages."
105 ))
102
106
103 class IPControllerApp(BaseParallelApplication):
107 class IPControllerApp(BaseParallelApplication):
104
108
105 name = u'ipcontroller'
109 name = u'ipcontroller'
106 description = _description
110 description = _description
107 config_file_name = Unicode(default_config_file_name)
111 config_file_name = Unicode(default_config_file_name)
108 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
112 classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
109
113
110 # change default to True
114 # change default to True
111 auto_create = Bool(True, config=True,
115 auto_create = Bool(True, config=True,
112 help="""Whether to create profile dir if it doesn't exist""")
116 help="""Whether to create profile dir if it doesn't exist.""")
113
117
114 reuse_files = Bool(False, config=True,
118 reuse_files = Bool(False, config=True,
115 help='Whether to reuse existing json connection files [default: False]'
119 help='Whether to reuse existing json connection files.'
116 )
120 )
117 secure = Bool(True, config=True,
121 secure = Bool(True, config=True,
118 help='Whether to use exec_keys for extra authentication [default: True]'
122 help='Whether to use HMAC digests for extra message authentication.'
119 )
123 )
120 ssh_server = Unicode(u'', config=True,
124 ssh_server = Unicode(u'', config=True,
121 help="""ssh url for clients to use when connecting to the Controller
125 help="""ssh url for clients to use when connecting to the Controller
122 processes. It should be of the form: [user@]server[:port]. The
126 processes. It should be of the form: [user@]server[:port]. The
123 Controller\'s listening addresses must be accessible from the ssh server""",
127 Controller's listening addresses must be accessible from the ssh server""",
124 )
128 )
125 location = Unicode(u'', config=True,
129 location = Unicode(u'', config=True,
126 help="""The external IP or domain name of the Controller, used for disambiguating
130 help="""The external IP or domain name of the Controller, used for disambiguating
127 engine and client connections.""",
131 engine and client connections.""",
128 )
132 )
129 import_statements = List([], config=True,
133 import_statements = List([], config=True,
130 help="import statements to be run at startup. Necessary in some environments"
134 help="import statements to be run at startup. Necessary in some environments"
131 )
135 )
132
136
133 use_threads = Bool(False, config=True,
137 use_threads = Bool(False, config=True,
134 help='Use threads instead of processes for the schedulers',
138 help='Use threads instead of processes for the schedulers',
135 )
139 )
136
140
137 # internal
141 # internal
138 children = List()
142 children = List()
139 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
143 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
140
144
141 def _use_threads_changed(self, name, old, new):
145 def _use_threads_changed(self, name, old, new):
142 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
146 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
143
147
144 aliases = Dict(dict(
148 aliases = Dict(dict(
145 log_level = 'IPControllerApp.log_level',
149 log_level = 'IPControllerApp.log_level',
146 log_url = 'IPControllerApp.log_url',
150 log_url = 'IPControllerApp.log_url',
147 reuse_files = 'IPControllerApp.reuse_files',
151 reuse_files = 'IPControllerApp.reuse_files',
148 secure = 'IPControllerApp.secure',
152 secure = 'IPControllerApp.secure',
149 ssh = 'IPControllerApp.ssh_server',
153 ssh = 'IPControllerApp.ssh_server',
150 use_threads = 'IPControllerApp.use_threads',
154 use_threads = 'IPControllerApp.use_threads',
151 import_statements = 'IPControllerApp.import_statements',
155 import_statements = 'IPControllerApp.import_statements',
152 location = 'IPControllerApp.location',
156 location = 'IPControllerApp.location',
153
157
154 ident = 'StreamSession.session',
158 ident = 'StreamSession.session',
155 user = 'StreamSession.username',
159 user = 'StreamSession.username',
156 exec_key = 'StreamSession.keyfile',
160 exec_key = 'StreamSession.keyfile',
157
161
158 url = 'HubFactory.url',
162 url = 'HubFactory.url',
159 ip = 'HubFactory.ip',
163 ip = 'HubFactory.ip',
160 transport = 'HubFactory.transport',
164 transport = 'HubFactory.transport',
161 port = 'HubFactory.regport',
165 port = 'HubFactory.regport',
162
166
163 ping = 'HeartMonitor.period',
167 ping = 'HeartMonitor.period',
164
168
165 scheme = 'TaskScheduler.scheme_name',
169 scheme = 'TaskScheduler.scheme_name',
166 hwm = 'TaskScheduler.hwm',
170 hwm = 'TaskScheduler.hwm',
167
171
168
172
169 profile = "BaseIPythonApplication.profile",
173 profile = "BaseIPythonApplication.profile",
170 profile_dir = 'ProfileDir.location',
174 profile_dir = 'ProfileDir.location',
171
175
172 ))
176 ))
173 flags = Dict(flags)
177 flags = Dict(flags)
174
178
175
179
176 def save_connection_dict(self, fname, cdict):
180 def save_connection_dict(self, fname, cdict):
177 """save a connection dict to json file."""
181 """save a connection dict to json file."""
178 c = self.config
182 c = self.config
179 url = cdict['url']
183 url = cdict['url']
180 location = cdict['location']
184 location = cdict['location']
181 if not location:
185 if not location:
182 try:
186 try:
183 proto,ip,port = split_url(url)
187 proto,ip,port = split_url(url)
184 except AssertionError:
188 except AssertionError:
185 pass
189 pass
186 else:
190 else:
187 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
191 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
188 cdict['location'] = location
192 cdict['location'] = location
189 fname = os.path.join(self.profile_dir.security_dir, fname)
193 fname = os.path.join(self.profile_dir.security_dir, fname)
190 with open(fname, 'w') as f:
194 with open(fname, 'w') as f:
191 f.write(json.dumps(cdict, indent=2))
195 f.write(json.dumps(cdict, indent=2))
192 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
196 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
193
197
194 def load_config_from_json(self):
198 def load_config_from_json(self):
195 """load config from existing json connector files."""
199 """load config from existing json connector files."""
196 c = self.config
200 c = self.config
197 # load from engine config
201 # load from engine config
198 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
202 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
199 cfg = json.loads(f.read())
203 cfg = json.loads(f.read())
200 key = c.StreamSession.key = cfg['exec_key']
204 key = c.StreamSession.key = cfg['exec_key']
201 xport,addr = cfg['url'].split('://')
205 xport,addr = cfg['url'].split('://')
202 c.HubFactory.engine_transport = xport
206 c.HubFactory.engine_transport = xport
203 ip,ports = addr.split(':')
207 ip,ports = addr.split(':')
204 c.HubFactory.engine_ip = ip
208 c.HubFactory.engine_ip = ip
205 c.HubFactory.regport = int(ports)
209 c.HubFactory.regport = int(ports)
206 self.location = cfg['location']
210 self.location = cfg['location']
207
211
208 # load client config
212 # load client config
209 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
213 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
210 cfg = json.loads(f.read())
214 cfg = json.loads(f.read())
211 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
215 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
212 xport,addr = cfg['url'].split('://')
216 xport,addr = cfg['url'].split('://')
213 c.HubFactory.client_transport = xport
217 c.HubFactory.client_transport = xport
214 ip,ports = addr.split(':')
218 ip,ports = addr.split(':')
215 c.HubFactory.client_ip = ip
219 c.HubFactory.client_ip = ip
216 self.ssh_server = cfg['ssh']
220 self.ssh_server = cfg['ssh']
217 assert int(ports) == c.HubFactory.regport, "regport mismatch"
221 assert int(ports) == c.HubFactory.regport, "regport mismatch"
218
222
219 def init_hub(self):
223 def init_hub(self):
220 c = self.config
224 c = self.config
221
225
222 self.do_import_statements()
226 self.do_import_statements()
223 reusing = self.reuse_files
227 reusing = self.reuse_files
224 if reusing:
228 if reusing:
225 try:
229 try:
226 self.load_config_from_json()
230 self.load_config_from_json()
227 except (AssertionError,IOError):
231 except (AssertionError,IOError):
228 reusing=False
232 reusing=False
229 # check again, because reusing may have failed:
233 # check again, because reusing may have failed:
230 if reusing:
234 if reusing:
231 pass
235 pass
232 elif self.secure:
236 elif self.secure:
233 key = str(uuid.uuid4())
237 key = str(uuid.uuid4())
234 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
238 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
235 # with open(keyfile, 'w') as f:
239 # with open(keyfile, 'w') as f:
236 # f.write(key)
240 # f.write(key)
237 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
241 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
238 c.StreamSession.key = key
242 c.StreamSession.key = key
239 else:
243 else:
240 key = c.StreamSession.key = ''
244 key = c.StreamSession.key = ''
241
245
242 try:
246 try:
243 self.factory = HubFactory(config=c, log=self.log)
247 self.factory = HubFactory(config=c, log=self.log)
244 # self.start_logging()
248 # self.start_logging()
245 self.factory.init_hub()
249 self.factory.init_hub()
246 except:
250 except:
247 self.log.error("Couldn't construct the Controller", exc_info=True)
251 self.log.error("Couldn't construct the Controller", exc_info=True)
248 self.exit(1)
252 self.exit(1)
249
253
250 if not reusing:
254 if not reusing:
251 # save to new json config files
255 # save to new json config files
252 f = self.factory
256 f = self.factory
253 cdict = {'exec_key' : key,
257 cdict = {'exec_key' : key,
254 'ssh' : self.ssh_server,
258 'ssh' : self.ssh_server,
255 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
259 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
256 'location' : self.location
260 'location' : self.location
257 }
261 }
258 self.save_connection_dict('ipcontroller-client.json', cdict)
262 self.save_connection_dict('ipcontroller-client.json', cdict)
259 edict = cdict
263 edict = cdict
260 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
264 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
261 self.save_connection_dict('ipcontroller-engine.json', edict)
265 self.save_connection_dict('ipcontroller-engine.json', edict)
262
266
263 #
267 #
264 def init_schedulers(self):
268 def init_schedulers(self):
265 children = self.children
269 children = self.children
266 mq = import_item(str(self.mq_class))
270 mq = import_item(str(self.mq_class))
267
271
268 hub = self.factory
272 hub = self.factory
269 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
273 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
270 # IOPub relay (in a Process)
274 # IOPub relay (in a Process)
271 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
275 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
272 q.bind_in(hub.client_info['iopub'])
276 q.bind_in(hub.client_info['iopub'])
273 q.bind_out(hub.engine_info['iopub'])
277 q.bind_out(hub.engine_info['iopub'])
274 q.setsockopt_out(zmq.SUBSCRIBE, '')
278 q.setsockopt_out(zmq.SUBSCRIBE, '')
275 q.connect_mon(hub.monitor_url)
279 q.connect_mon(hub.monitor_url)
276 q.daemon=True
280 q.daemon=True
277 children.append(q)
281 children.append(q)
278
282
279 # Multiplexer Queue (in a Process)
283 # Multiplexer Queue (in a Process)
280 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
284 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
281 q.bind_in(hub.client_info['mux'])
285 q.bind_in(hub.client_info['mux'])
282 q.setsockopt_in(zmq.IDENTITY, 'mux')
286 q.setsockopt_in(zmq.IDENTITY, 'mux')
283 q.bind_out(hub.engine_info['mux'])
287 q.bind_out(hub.engine_info['mux'])
284 q.connect_mon(hub.monitor_url)
288 q.connect_mon(hub.monitor_url)
285 q.daemon=True
289 q.daemon=True
286 children.append(q)
290 children.append(q)
287
291
288 # Control Queue (in a Process)
292 # Control Queue (in a Process)
289 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
293 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
290 q.bind_in(hub.client_info['control'])
294 q.bind_in(hub.client_info['control'])
291 q.setsockopt_in(zmq.IDENTITY, 'control')
295 q.setsockopt_in(zmq.IDENTITY, 'control')
292 q.bind_out(hub.engine_info['control'])
296 q.bind_out(hub.engine_info['control'])
293 q.connect_mon(hub.monitor_url)
297 q.connect_mon(hub.monitor_url)
294 q.daemon=True
298 q.daemon=True
295 children.append(q)
299 children.append(q)
296 try:
300 try:
297 scheme = self.config.TaskScheduler.scheme_name
301 scheme = self.config.TaskScheduler.scheme_name
298 except AttributeError:
302 except AttributeError:
299 scheme = TaskScheduler.scheme_name.get_default_value()
303 scheme = TaskScheduler.scheme_name.get_default_value()
300 # Task Queue (in a Process)
304 # Task Queue (in a Process)
301 if scheme == 'pure':
305 if scheme == 'pure':
302 self.log.warn("task::using pure XREQ Task scheduler")
306 self.log.warn("task::using pure XREQ Task scheduler")
303 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
307 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
304 # q.setsockopt_out(zmq.HWM, hub.hwm)
308 # q.setsockopt_out(zmq.HWM, hub.hwm)
305 q.bind_in(hub.client_info['task'][1])
309 q.bind_in(hub.client_info['task'][1])
306 q.setsockopt_in(zmq.IDENTITY, 'task')
310 q.setsockopt_in(zmq.IDENTITY, 'task')
307 q.bind_out(hub.engine_info['task'])
311 q.bind_out(hub.engine_info['task'])
308 q.connect_mon(hub.monitor_url)
312 q.connect_mon(hub.monitor_url)
309 q.daemon=True
313 q.daemon=True
310 children.append(q)
314 children.append(q)
311 elif scheme == 'none':
315 elif scheme == 'none':
312 self.log.warn("task::using no Task scheduler")
316 self.log.warn("task::using no Task scheduler")
313
317
314 else:
318 else:
315 self.log.info("task::using Python %s Task scheduler"%scheme)
319 self.log.info("task::using Python %s Task scheduler"%scheme)
316 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
320 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
317 hub.monitor_url, hub.client_info['notification'])
321 hub.monitor_url, hub.client_info['notification'])
318 kwargs = dict(logname='scheduler', loglevel=self.log_level,
322 kwargs = dict(logname='scheduler', loglevel=self.log_level,
319 log_url = self.log_url, config=dict(self.config))
323 log_url = self.log_url, config=dict(self.config))
320 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
324 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
321 q.daemon=True
325 q.daemon=True
322 children.append(q)
326 children.append(q)
323
327
324
328
325 def save_urls(self):
329 def save_urls(self):
326 """save the registration urls to files."""
330 """save the registration urls to files."""
327 c = self.config
331 c = self.config
328
332
329 sec_dir = self.profile_dir.security_dir
333 sec_dir = self.profile_dir.security_dir
330 cf = self.factory
334 cf = self.factory
331
335
332 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
336 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
333 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
337 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
334
338
335 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
339 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
336 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
340 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
337
341
338
342
339 def do_import_statements(self):
343 def do_import_statements(self):
340 statements = self.import_statements
344 statements = self.import_statements
341 for s in statements:
345 for s in statements:
342 try:
346 try:
343 self.log.msg("Executing statement: '%s'" % s)
347 self.log.msg("Executing statement: '%s'" % s)
344 exec s in globals(), locals()
348 exec s in globals(), locals()
345 except:
349 except:
346 self.log.msg("Error running statement: %s" % s)
350 self.log.msg("Error running statement: %s" % s)
347
351
348 def forward_logging(self):
352 def forward_logging(self):
349 if self.log_url:
353 if self.log_url:
350 self.log.info("Forwarding logging to %s"%self.log_url)
354 self.log.info("Forwarding logging to %s"%self.log_url)
351 context = zmq.Context.instance()
355 context = zmq.Context.instance()
352 lsock = context.socket(zmq.PUB)
356 lsock = context.socket(zmq.PUB)
353 lsock.connect(self.log_url)
357 lsock.connect(self.log_url)
354 handler = PUBHandler(lsock)
358 handler = PUBHandler(lsock)
355 self.log.removeHandler(self._log_handler)
359 self.log.removeHandler(self._log_handler)
356 handler.root_topic = 'controller'
360 handler.root_topic = 'controller'
357 handler.setLevel(self.log_level)
361 handler.setLevel(self.log_level)
358 self.log.addHandler(handler)
362 self.log.addHandler(handler)
359 self._log_handler = handler
363 self._log_handler = handler
360 # #
364 # #
361
365
362 def initialize(self, argv=None):
366 def initialize(self, argv=None):
363 super(IPControllerApp, self).initialize(argv)
367 super(IPControllerApp, self).initialize(argv)
364 self.forward_logging()
368 self.forward_logging()
365 self.init_hub()
369 self.init_hub()
366 self.init_schedulers()
370 self.init_schedulers()
367
371
368 def start(self):
372 def start(self):
369 # Start the subprocesses:
373 # Start the subprocesses:
370 self.factory.start()
374 self.factory.start()
371 child_procs = []
375 child_procs = []
372 for child in self.children:
376 for child in self.children:
373 child.start()
377 child.start()
374 if isinstance(child, ProcessMonitoredQueue):
378 if isinstance(child, ProcessMonitoredQueue):
375 child_procs.append(child.launcher)
379 child_procs.append(child.launcher)
376 elif isinstance(child, Process):
380 elif isinstance(child, Process):
377 child_procs.append(child)
381 child_procs.append(child)
378 if child_procs:
382 if child_procs:
379 signal_children(child_procs)
383 signal_children(child_procs)
380
384
381 self.write_pid_file(overwrite=True)
385 self.write_pid_file(overwrite=True)
382
386
383 try:
387 try:
384 self.factory.loop.start()
388 self.factory.loop.start()
385 except KeyboardInterrupt:
389 except KeyboardInterrupt:
386 self.log.critical("Interrupted, Exiting...\n")
390 self.log.critical("Interrupted, Exiting...\n")
387
391
388
392
389
393
390 def launch_new_instance():
394 def launch_new_instance():
391 """Create and run the IPython controller"""
395 """Create and run the IPython controller"""
392 app = IPControllerApp.instance()
396 app = IPControllerApp.instance()
393 app.initialize()
397 app.initialize()
394 app.start()
398 app.start()
395
399
396
400
397 if __name__ == '__main__':
401 if __name__ == '__main__':
398 launch_new_instance()
402 launch_new_instance()
@@ -1,1270 +1,1277 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # internal:
26 # internal:
27 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
28 from IPython.utils.traitlets import (
28 from IPython.utils.traitlets import (
29 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr
29 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr
30 )
30 )
31
31
32 from IPython.parallel import error, util
32 from IPython.parallel import error, util
33 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
33 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
34
34
35 from .heartmonitor import HeartMonitor
35 from .heartmonitor import HeartMonitor
36
36
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Code
38 # Code
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40
40
41 def _passer(*args, **kwargs):
41 def _passer(*args, **kwargs):
42 return
42 return
43
43
44 def _printer(*args, **kwargs):
44 def _printer(*args, **kwargs):
45 print (args)
45 print (args)
46 print (kwargs)
46 print (kwargs)
47
47
48 def empty_record():
48 def empty_record():
49 """Return an empty dict with all record keys."""
49 """Return an empty dict with all record keys."""
50 return {
50 return {
51 'msg_id' : None,
51 'msg_id' : None,
52 'header' : None,
52 'header' : None,
53 'content': None,
53 'content': None,
54 'buffers': None,
54 'buffers': None,
55 'submitted': None,
55 'submitted': None,
56 'client_uuid' : None,
56 'client_uuid' : None,
57 'engine_uuid' : None,
57 'engine_uuid' : None,
58 'started': None,
58 'started': None,
59 'completed': None,
59 'completed': None,
60 'resubmitted': None,
60 'resubmitted': None,
61 'result_header' : None,
61 'result_header' : None,
62 'result_content' : None,
62 'result_content' : None,
63 'result_buffers' : None,
63 'result_buffers' : None,
64 'queue' : None,
64 'queue' : None,
65 'pyin' : None,
65 'pyin' : None,
66 'pyout': None,
66 'pyout': None,
67 'pyerr': None,
67 'pyerr': None,
68 'stdout': '',
68 'stdout': '',
69 'stderr': '',
69 'stderr': '',
70 }
70 }
71
71
72 def init_record(msg):
72 def init_record(msg):
73 """Initialize a TaskRecord based on a request."""
73 """Initialize a TaskRecord based on a request."""
74 header = msg['header']
74 header = msg['header']
75 return {
75 return {
76 'msg_id' : header['msg_id'],
76 'msg_id' : header['msg_id'],
77 'header' : header,
77 'header' : header,
78 'content': msg['content'],
78 'content': msg['content'],
79 'buffers': msg['buffers'],
79 'buffers': msg['buffers'],
80 'submitted': datetime.strptime(header['date'], util.ISO8601),
80 'submitted': datetime.strptime(header['date'], util.ISO8601),
81 'client_uuid' : None,
81 'client_uuid' : None,
82 'engine_uuid' : None,
82 'engine_uuid' : None,
83 'started': None,
83 'started': None,
84 'completed': None,
84 'completed': None,
85 'resubmitted': None,
85 'resubmitted': None,
86 'result_header' : None,
86 'result_header' : None,
87 'result_content' : None,
87 'result_content' : None,
88 'result_buffers' : None,
88 'result_buffers' : None,
89 'queue' : None,
89 'queue' : None,
90 'pyin' : None,
90 'pyin' : None,
91 'pyout': None,
91 'pyout': None,
92 'pyerr': None,
92 'pyerr': None,
93 'stdout': '',
93 'stdout': '',
94 'stderr': '',
94 'stderr': '',
95 }
95 }
96
96
97
97
98 class EngineConnector(HasTraits):
98 class EngineConnector(HasTraits):
99 """A simple object for accessing the various zmq connections of an object.
99 """A simple object for accessing the various zmq connections of an object.
100 Attributes are:
100 Attributes are:
101 id (int): engine ID
101 id (int): engine ID
102 uuid (str): uuid (unused?)
102 uuid (str): uuid (unused?)
103 queue (str): identity of queue's XREQ socket
103 queue (str): identity of queue's XREQ socket
104 registration (str): identity of registration XREQ socket
104 registration (str): identity of registration XREQ socket
105 heartbeat (str): identity of heartbeat XREQ socket
105 heartbeat (str): identity of heartbeat XREQ socket
106 """
106 """
107 id=Int(0)
107 id=Int(0)
108 queue=CStr()
108 queue=CStr()
109 control=CStr()
109 control=CStr()
110 registration=CStr()
110 registration=CStr()
111 heartbeat=CStr()
111 heartbeat=CStr()
112 pending=Set()
112 pending=Set()
113
113
114 class HubFactory(RegistrationFactory):
114 class HubFactory(RegistrationFactory):
115 """The Configurable for setting up a Hub."""
115 """The Configurable for setting up a Hub."""
116
116
117 # port-pairs for monitoredqueues:
117 # port-pairs for monitoredqueues:
118 hb = Tuple(Int,Int,config=True,
118 hb = Tuple(Int,Int,config=True,
119 help="""XREQ/SUB Port pair for Engine heartbeats""")
119 help="""XREQ/SUB Port pair for Engine heartbeats""")
120 def _hb_default(self):
120 def _hb_default(self):
121 return tuple(util.select_random_ports(2))
121 return tuple(util.select_random_ports(2))
122
122
123 mux = Tuple(Int,Int,config=True,
123 mux = Tuple(Int,Int,config=True,
124 help="""Engine/Client Port pair for MUX queue""")
124 help="""Engine/Client Port pair for MUX queue""")
125
125
126 def _mux_default(self):
126 def _mux_default(self):
127 return tuple(util.select_random_ports(2))
127 return tuple(util.select_random_ports(2))
128
128
129 task = Tuple(Int,Int,config=True,
129 task = Tuple(Int,Int,config=True,
130 help="""Engine/Client Port pair for Task queue""")
130 help="""Engine/Client Port pair for Task queue""")
131 def _task_default(self):
131 def _task_default(self):
132 return tuple(util.select_random_ports(2))
132 return tuple(util.select_random_ports(2))
133
133
134 control = Tuple(Int,Int,config=True,
134 control = Tuple(Int,Int,config=True,
135 help="""Engine/Client Port pair for Control queue""")
135 help="""Engine/Client Port pair for Control queue""")
136
136
137 def _control_default(self):
137 def _control_default(self):
138 return tuple(util.select_random_ports(2))
138 return tuple(util.select_random_ports(2))
139
139
140 iopub = Tuple(Int,Int,config=True,
140 iopub = Tuple(Int,Int,config=True,
141 help="""Engine/Client Port pair for IOPub relay""")
141 help="""Engine/Client Port pair for IOPub relay""")
142
142
143 def _iopub_default(self):
143 def _iopub_default(self):
144 return tuple(util.select_random_ports(2))
144 return tuple(util.select_random_ports(2))
145
145
146 # single ports:
146 # single ports:
147 mon_port = Int(config=True,
147 mon_port = Int(config=True,
148 help="""Monitor (SUB) port for queue traffic""")
148 help="""Monitor (SUB) port for queue traffic""")
149
149
150 def _mon_port_default(self):
150 def _mon_port_default(self):
151 return util.select_random_ports(1)[0]
151 return util.select_random_ports(1)[0]
152
152
153 notifier_port = Int(config=True,
153 notifier_port = Int(config=True,
154 help="""PUB port for sending engine status notifications""")
154 help="""PUB port for sending engine status notifications""")
155
155
156 def _notifier_port_default(self):
156 def _notifier_port_default(self):
157 return util.select_random_ports(1)[0]
157 return util.select_random_ports(1)[0]
158
158
159 engine_ip = Unicode('127.0.0.1', config=True,
159 engine_ip = Unicode('127.0.0.1', config=True,
160 help="IP on which to listen for engine connections. [default: loopback]")
160 help="IP on which to listen for engine connections. [default: loopback]")
161 engine_transport = Unicode('tcp', config=True,
161 engine_transport = Unicode('tcp', config=True,
162 help="0MQ transport for engine connections. [default: tcp]")
162 help="0MQ transport for engine connections. [default: tcp]")
163
163
164 client_ip = Unicode('127.0.0.1', config=True,
164 client_ip = Unicode('127.0.0.1', config=True,
165 help="IP on which to listen for client connections. [default: loopback]")
165 help="IP on which to listen for client connections. [default: loopback]")
166 client_transport = Unicode('tcp', config=True,
166 client_transport = Unicode('tcp', config=True,
167 help="0MQ transport for client connections. [default : tcp]")
167 help="0MQ transport for client connections. [default : tcp]")
168
168
169 monitor_ip = Unicode('127.0.0.1', config=True,
169 monitor_ip = Unicode('127.0.0.1', config=True,
170 help="IP on which to listen for monitor messages. [default: loopback]")
170 help="IP on which to listen for monitor messages. [default: loopback]")
171 monitor_transport = Unicode('tcp', config=True,
171 monitor_transport = Unicode('tcp', config=True,
172 help="0MQ transport for monitor messages. [default : tcp]")
172 help="0MQ transport for monitor messages. [default : tcp]")
173
173
174 monitor_url = Unicode('')
174 monitor_url = Unicode('')
175
175
176 db_class = Unicode('IPython.parallel.controller.dictdb.DictDB', config=True,
176 db_class = Unicode('IPython.parallel.controller.dictdb.DictDB', config=True,
177 help="""The class to use for the DB backend""")
177 help="""The class to use for the DB backend""")
178
178
179 # not configurable
179 # not configurable
180 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
180 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
181 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
181 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
182
182
183 def _ip_changed(self, name, old, new):
183 def _ip_changed(self, name, old, new):
184 self.engine_ip = new
184 self.engine_ip = new
185 self.client_ip = new
185 self.client_ip = new
186 self.monitor_ip = new
186 self.monitor_ip = new
187 self._update_monitor_url()
187 self._update_monitor_url()
188
188
189 def _update_monitor_url(self):
189 def _update_monitor_url(self):
190 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
190 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
191
191
192 def _transport_changed(self, name, old, new):
192 def _transport_changed(self, name, old, new):
193 self.engine_transport = new
193 self.engine_transport = new
194 self.client_transport = new
194 self.client_transport = new
195 self.monitor_transport = new
195 self.monitor_transport = new
196 self._update_monitor_url()
196 self._update_monitor_url()
197
197
198 def __init__(self, **kwargs):
198 def __init__(self, **kwargs):
199 super(HubFactory, self).__init__(**kwargs)
199 super(HubFactory, self).__init__(**kwargs)
200 self._update_monitor_url()
200 self._update_monitor_url()
201
201
202
202
203 def construct(self):
203 def construct(self):
204 self.init_hub()
204 self.init_hub()
205
205
206 def start(self):
206 def start(self):
207 self.heartmonitor.start()
207 self.heartmonitor.start()
208 self.log.info("Heartmonitor started")
208 self.log.info("Heartmonitor started")
209
209
210 def init_hub(self):
210 def init_hub(self):
211 """construct"""
211 """construct"""
212 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
212 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
213 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
213 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
214
214
215 ctx = self.context
215 ctx = self.context
216 loop = self.loop
216 loop = self.loop
217
217
218 # Registrar socket
218 # Registrar socket
219 q = ZMQStream(ctx.socket(zmq.XREP), loop)
219 q = ZMQStream(ctx.socket(zmq.XREP), loop)
220 q.bind(client_iface % self.regport)
220 q.bind(client_iface % self.regport)
221 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
221 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
222 if self.client_ip != self.engine_ip:
222 if self.client_ip != self.engine_ip:
223 q.bind(engine_iface % self.regport)
223 q.bind(engine_iface % self.regport)
224 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
224 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
225
225
226 ### Engine connections ###
226 ### Engine connections ###
227
227
228 # heartbeat
228 # heartbeat
229 hpub = ctx.socket(zmq.PUB)
229 hpub = ctx.socket(zmq.PUB)
230 hpub.bind(engine_iface % self.hb[0])
230 hpub.bind(engine_iface % self.hb[0])
231 hrep = ctx.socket(zmq.XREP)
231 hrep = ctx.socket(zmq.XREP)
232 hrep.bind(engine_iface % self.hb[1])
232 hrep.bind(engine_iface % self.hb[1])
233 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
233 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
234 config=self.config)
234 config=self.config)
235
235
236 ### Client connections ###
236 ### Client connections ###
237 # Notifier socket
237 # Notifier socket
238 n = ZMQStream(ctx.socket(zmq.PUB), loop)
238 n = ZMQStream(ctx.socket(zmq.PUB), loop)
239 n.bind(client_iface%self.notifier_port)
239 n.bind(client_iface%self.notifier_port)
240
240
241 ### build and launch the queues ###
241 ### build and launch the queues ###
242
242
243 # monitor socket
243 # monitor socket
244 sub = ctx.socket(zmq.SUB)
244 sub = ctx.socket(zmq.SUB)
245 sub.setsockopt(zmq.SUBSCRIBE, "")
245 sub.setsockopt(zmq.SUBSCRIBE, "")
246 sub.bind(self.monitor_url)
246 sub.bind(self.monitor_url)
247 sub.bind('inproc://monitor')
247 sub.bind('inproc://monitor')
248 sub = ZMQStream(sub, loop)
248 sub = ZMQStream(sub, loop)
249
249
250 # connect the db
250 # connect the db
251 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
251 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
252 # cdir = self.config.Global.cluster_dir
252 # cdir = self.config.Global.cluster_dir
253 self.db = import_item(str(self.db_class))(session=self.session.session, config=self.config)
253 self.db = import_item(str(self.db_class))(session=self.session.session, config=self.config)
254 time.sleep(.25)
254 time.sleep(.25)
255 try:
255 try:
256 scheme = self.config.TaskScheduler.scheme_name
256 scheme = self.config.TaskScheduler.scheme_name
257 except AttributeError:
257 except AttributeError:
258 from .scheduler import TaskScheduler
258 from .scheduler import TaskScheduler
259 scheme = TaskScheduler.scheme_name.get_default_value()
259 scheme = TaskScheduler.scheme_name.get_default_value()
260 # build connection dicts
260 # build connection dicts
261 self.engine_info = {
261 self.engine_info = {
262 'control' : engine_iface%self.control[1],
262 'control' : engine_iface%self.control[1],
263 'mux': engine_iface%self.mux[1],
263 'mux': engine_iface%self.mux[1],
264 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
264 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
265 'task' : engine_iface%self.task[1],
265 'task' : engine_iface%self.task[1],
266 'iopub' : engine_iface%self.iopub[1],
266 'iopub' : engine_iface%self.iopub[1],
267 # 'monitor' : engine_iface%self.mon_port,
267 # 'monitor' : engine_iface%self.mon_port,
268 }
268 }
269
269
270 self.client_info = {
270 self.client_info = {
271 'control' : client_iface%self.control[0],
271 'control' : client_iface%self.control[0],
272 'mux': client_iface%self.mux[0],
272 'mux': client_iface%self.mux[0],
273 'task' : (scheme, client_iface%self.task[0]),
273 'task' : (scheme, client_iface%self.task[0]),
274 'iopub' : client_iface%self.iopub[0],
274 'iopub' : client_iface%self.iopub[0],
275 'notification': client_iface%self.notifier_port
275 'notification': client_iface%self.notifier_port
276 }
276 }
277 self.log.debug("Hub engine addrs: %s"%self.engine_info)
277 self.log.debug("Hub engine addrs: %s"%self.engine_info)
278 self.log.debug("Hub client addrs: %s"%self.client_info)
278 self.log.debug("Hub client addrs: %s"%self.client_info)
279
279
280 # resubmit stream
280 # resubmit stream
281 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
281 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
282 url = util.disambiguate_url(self.client_info['task'][-1])
282 url = util.disambiguate_url(self.client_info['task'][-1])
283 r.setsockopt(zmq.IDENTITY, self.session.session)
283 r.setsockopt(zmq.IDENTITY, self.session.session)
284 r.connect(url)
284 r.connect(url)
285
285
286 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
286 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
287 query=q, notifier=n, resubmit=r, db=self.db,
287 query=q, notifier=n, resubmit=r, db=self.db,
288 engine_info=self.engine_info, client_info=self.client_info,
288 engine_info=self.engine_info, client_info=self.client_info,
289 logname=self.log.name)
289 logname=self.log.name)
290
290
291
291
292 class Hub(LoggingFactory):
292 class Hub(LoggingFactory):
293 """The IPython Controller Hub with 0MQ connections
293 """The IPython Controller Hub with 0MQ connections
294
294
295 Parameters
295 Parameters
296 ==========
296 ==========
297 loop: zmq IOLoop instance
297 loop: zmq IOLoop instance
298 session: StreamSession object
298 session: StreamSession object
299 <removed> context: zmq context for creating new connections (?)
299 <removed> context: zmq context for creating new connections (?)
300 queue: ZMQStream for monitoring the command queue (SUB)
300 queue: ZMQStream for monitoring the command queue (SUB)
301 query: ZMQStream for engine registration and client queries requests (XREP)
301 query: ZMQStream for engine registration and client queries requests (XREP)
302 heartbeat: HeartMonitor object checking the pulse of the engines
302 heartbeat: HeartMonitor object checking the pulse of the engines
303 notifier: ZMQStream for broadcasting engine registration changes (PUB)
303 notifier: ZMQStream for broadcasting engine registration changes (PUB)
304 db: connection to db for out of memory logging of commands
304 db: connection to db for out of memory logging of commands
305 NotImplemented
305 NotImplemented
306 engine_info: dict of zmq connection information for engines to connect
306 engine_info: dict of zmq connection information for engines to connect
307 to the queues.
307 to the queues.
308 client_info: dict of zmq connection information for engines to connect
308 client_info: dict of zmq connection information for engines to connect
309 to the queues.
309 to the queues.
310 """
310 """
311 # internal data structures:
311 # internal data structures:
312 ids=Set() # engine IDs
312 ids=Set() # engine IDs
313 keytable=Dict()
313 keytable=Dict()
314 by_ident=Dict()
314 by_ident=Dict()
315 engines=Dict()
315 engines=Dict()
316 clients=Dict()
316 clients=Dict()
317 hearts=Dict()
317 hearts=Dict()
318 pending=Set()
318 pending=Set()
319 queues=Dict() # pending msg_ids keyed by engine_id
319 queues=Dict() # pending msg_ids keyed by engine_id
320 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
320 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
321 completed=Dict() # completed msg_ids keyed by engine_id
321 completed=Dict() # completed msg_ids keyed by engine_id
322 all_completed=Set() # completed msg_ids keyed by engine_id
322 all_completed=Set() # completed msg_ids keyed by engine_id
323 dead_engines=Set() # completed msg_ids keyed by engine_id
323 dead_engines=Set() # completed msg_ids keyed by engine_id
324 unassigned=Set() # set of task msg_ds not yet assigned a destination
324 unassigned=Set() # set of task msg_ds not yet assigned a destination
325 incoming_registrations=Dict()
325 incoming_registrations=Dict()
326 registration_timeout=Int()
326 registration_timeout=Int()
327 _idcounter=Int(0)
327 _idcounter=Int(0)
328
328
329 # objects from constructor:
329 # objects from constructor:
330 loop=Instance(ioloop.IOLoop)
330 loop=Instance(ioloop.IOLoop)
331 query=Instance(ZMQStream)
331 query=Instance(ZMQStream)
332 monitor=Instance(ZMQStream)
332 monitor=Instance(ZMQStream)
333 notifier=Instance(ZMQStream)
333 notifier=Instance(ZMQStream)
334 resubmit=Instance(ZMQStream)
334 resubmit=Instance(ZMQStream)
335 heartmonitor=Instance(HeartMonitor)
335 heartmonitor=Instance(HeartMonitor)
336 db=Instance(object)
336 db=Instance(object)
337 client_info=Dict()
337 client_info=Dict()
338 engine_info=Dict()
338 engine_info=Dict()
339
339
340
340
341 def __init__(self, **kwargs):
341 def __init__(self, **kwargs):
342 """
342 """
343 # universal:
343 # universal:
344 loop: IOLoop for creating future connections
344 loop: IOLoop for creating future connections
345 session: streamsession for sending serialized data
345 session: streamsession for sending serialized data
346 # engine:
346 # engine:
347 queue: ZMQStream for monitoring queue messages
347 queue: ZMQStream for monitoring queue messages
348 query: ZMQStream for engine+client registration and client requests
348 query: ZMQStream for engine+client registration and client requests
349 heartbeat: HeartMonitor object for tracking engines
349 heartbeat: HeartMonitor object for tracking engines
350 # extra:
350 # extra:
351 db: ZMQStream for db connection (NotImplemented)
351 db: ZMQStream for db connection (NotImplemented)
352 engine_info: zmq address/protocol dict for engine connections
352 engine_info: zmq address/protocol dict for engine connections
353 client_info: zmq address/protocol dict for client connections
353 client_info: zmq address/protocol dict for client connections
354 """
354 """
355
355
356 super(Hub, self).__init__(**kwargs)
356 super(Hub, self).__init__(**kwargs)
357 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
357 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
358
358
359 # validate connection dicts:
359 # validate connection dicts:
360 for k,v in self.client_info.iteritems():
360 for k,v in self.client_info.iteritems():
361 if k == 'task':
361 if k == 'task':
362 util.validate_url_container(v[1])
362 util.validate_url_container(v[1])
363 else:
363 else:
364 util.validate_url_container(v)
364 util.validate_url_container(v)
365 # util.validate_url_container(self.client_info)
365 # util.validate_url_container(self.client_info)
366 util.validate_url_container(self.engine_info)
366 util.validate_url_container(self.engine_info)
367
367
368 # register our callbacks
368 # register our callbacks
369 self.query.on_recv(self.dispatch_query)
369 self.query.on_recv(self.dispatch_query)
370 self.monitor.on_recv(self.dispatch_monitor_traffic)
370 self.monitor.on_recv(self.dispatch_monitor_traffic)
371
371
372 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
372 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
373 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
373 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
374
374
375 self.monitor_handlers = { 'in' : self.save_queue_request,
375 self.monitor_handlers = { 'in' : self.save_queue_request,
376 'out': self.save_queue_result,
376 'out': self.save_queue_result,
377 'intask': self.save_task_request,
377 'intask': self.save_task_request,
378 'outtask': self.save_task_result,
378 'outtask': self.save_task_result,
379 'tracktask': self.save_task_destination,
379 'tracktask': self.save_task_destination,
380 'incontrol': _passer,
380 'incontrol': _passer,
381 'outcontrol': _passer,
381 'outcontrol': _passer,
382 'iopub': self.save_iopub_message,
382 'iopub': self.save_iopub_message,
383 }
383 }
384
384
385 self.query_handlers = {'queue_request': self.queue_status,
385 self.query_handlers = {'queue_request': self.queue_status,
386 'result_request': self.get_results,
386 'result_request': self.get_results,
387 'history_request': self.get_history,
387 'history_request': self.get_history,
388 'db_request': self.db_query,
388 'db_request': self.db_query,
389 'purge_request': self.purge_results,
389 'purge_request': self.purge_results,
390 'load_request': self.check_load,
390 'load_request': self.check_load,
391 'resubmit_request': self.resubmit_task,
391 'resubmit_request': self.resubmit_task,
392 'shutdown_request': self.shutdown_request,
392 'shutdown_request': self.shutdown_request,
393 'registration_request' : self.register_engine,
393 'registration_request' : self.register_engine,
394 'unregistration_request' : self.unregister_engine,
394 'unregistration_request' : self.unregister_engine,
395 'connection_request': self.connection_request,
395 'connection_request': self.connection_request,
396 }
396 }
397
397
398 # ignore resubmit replies
398 # ignore resubmit replies
399 self.resubmit.on_recv(lambda msg: None, copy=False)
399 self.resubmit.on_recv(lambda msg: None, copy=False)
400
400
401 self.log.info("hub::created hub")
401 self.log.info("hub::created hub")
402
402
403 @property
403 @property
404 def _next_id(self):
404 def _next_id(self):
405 """gemerate a new ID.
405 """gemerate a new ID.
406
406
407 No longer reuse old ids, just count from 0."""
407 No longer reuse old ids, just count from 0."""
408 newid = self._idcounter
408 newid = self._idcounter
409 self._idcounter += 1
409 self._idcounter += 1
410 return newid
410 return newid
411 # newid = 0
411 # newid = 0
412 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
412 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
413 # # print newid, self.ids, self.incoming_registrations
413 # # print newid, self.ids, self.incoming_registrations
414 # while newid in self.ids or newid in incoming:
414 # while newid in self.ids or newid in incoming:
415 # newid += 1
415 # newid += 1
416 # return newid
416 # return newid
417
417
418 #-----------------------------------------------------------------------------
418 #-----------------------------------------------------------------------------
419 # message validation
419 # message validation
420 #-----------------------------------------------------------------------------
420 #-----------------------------------------------------------------------------
421
421
422 def _validate_targets(self, targets):
422 def _validate_targets(self, targets):
423 """turn any valid targets argument into a list of integer ids"""
423 """turn any valid targets argument into a list of integer ids"""
424 if targets is None:
424 if targets is None:
425 # default to all
425 # default to all
426 targets = self.ids
426 targets = self.ids
427
427
428 if isinstance(targets, (int,str,unicode)):
428 if isinstance(targets, (int,str,unicode)):
429 # only one target specified
429 # only one target specified
430 targets = [targets]
430 targets = [targets]
431 _targets = []
431 _targets = []
432 for t in targets:
432 for t in targets:
433 # map raw identities to ids
433 # map raw identities to ids
434 if isinstance(t, (str,unicode)):
434 if isinstance(t, (str,unicode)):
435 t = self.by_ident.get(t, t)
435 t = self.by_ident.get(t, t)
436 _targets.append(t)
436 _targets.append(t)
437 targets = _targets
437 targets = _targets
438 bad_targets = [ t for t in targets if t not in self.ids ]
438 bad_targets = [ t for t in targets if t not in self.ids ]
439 if bad_targets:
439 if bad_targets:
440 raise IndexError("No Such Engine: %r"%bad_targets)
440 raise IndexError("No Such Engine: %r"%bad_targets)
441 if not targets:
441 if not targets:
442 raise IndexError("No Engines Registered")
442 raise IndexError("No Engines Registered")
443 return targets
443 return targets
444
444
445 #-----------------------------------------------------------------------------
445 #-----------------------------------------------------------------------------
446 # dispatch methods (1 per stream)
446 # dispatch methods (1 per stream)
447 #-----------------------------------------------------------------------------
447 #-----------------------------------------------------------------------------
448
448
449
449
450 def dispatch_monitor_traffic(self, msg):
450 def dispatch_monitor_traffic(self, msg):
451 """all ME and Task queue messages come through here, as well as
451 """all ME and Task queue messages come through here, as well as
452 IOPub traffic."""
452 IOPub traffic."""
453 self.log.debug("monitor traffic: %r"%msg[:2])
453 self.log.debug("monitor traffic: %r"%msg[:2])
454 switch = msg[0]
454 switch = msg[0]
455 try:
455 try:
456 idents, msg = self.session.feed_identities(msg[1:])
456 idents, msg = self.session.feed_identities(msg[1:])
457 except ValueError:
457 except ValueError:
458 idents=[]
458 idents=[]
459 if not idents:
459 if not idents:
460 self.log.error("Bad Monitor Message: %r"%msg)
460 self.log.error("Bad Monitor Message: %r"%msg)
461 return
461 return
462 handler = self.monitor_handlers.get(switch, None)
462 handler = self.monitor_handlers.get(switch, None)
463 if handler is not None:
463 if handler is not None:
464 handler(idents, msg)
464 handler(idents, msg)
465 else:
465 else:
466 self.log.error("Invalid monitor topic: %r"%switch)
466 self.log.error("Invalid monitor topic: %r"%switch)
467
467
468
468
469 def dispatch_query(self, msg):
469 def dispatch_query(self, msg):
470 """Route registration requests and queries from clients."""
470 """Route registration requests and queries from clients."""
471 idents, msg = self.session.feed_identities(msg)
471 try:
472 idents, msg = self.session.feed_identities(msg)
473 except ValueError:
474 idents = []
472 if not idents:
475 if not idents:
473 self.log.error("Bad Query Message: %r"%msg)
476 self.log.error("Bad Query Message: %r"%msg)
474 return
477 return
475 client_id = idents[0]
478 client_id = idents[0]
476 try:
479 try:
477 msg = self.session.unpack_message(msg, content=True)
480 msg = self.session.unpack_message(msg, content=True)
478 except:
481 except Exception:
479 content = error.wrap_exception()
482 content = error.wrap_exception()
480 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
483 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
481 self.session.send(self.query, "hub_error", ident=client_id,
484 self.session.send(self.query, "hub_error", ident=client_id,
482 content=content)
485 content=content)
483 return
486 return
484
487 print( idents, msg)
485 # print client_id, header, parent, content
488 # print client_id, header, parent, content
486 #switch on message type:
489 #switch on message type:
487 msg_type = msg['msg_type']
490 msg_type = msg['msg_type']
488 self.log.info("client::client %r requested %r"%(client_id, msg_type))
491 self.log.info("client::client %r requested %r"%(client_id, msg_type))
489 handler = self.query_handlers.get(msg_type, None)
492 handler = self.query_handlers.get(msg_type, None)
490 try:
493 try:
491 assert handler is not None, "Bad Message Type: %r"%msg_type
494 assert handler is not None, "Bad Message Type: %r"%msg_type
492 except:
495 except:
493 content = error.wrap_exception()
496 content = error.wrap_exception()
494 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
497 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
495 self.session.send(self.query, "hub_error", ident=client_id,
498 self.session.send(self.query, "hub_error", ident=client_id,
496 content=content)
499 content=content)
497 return
500 return
498
501
499 else:
502 else:
500 handler(idents, msg)
503 handler(idents, msg)
501
504
502 def dispatch_db(self, msg):
505 def dispatch_db(self, msg):
503 """"""
506 """"""
504 raise NotImplementedError
507 raise NotImplementedError
505
508
506 #---------------------------------------------------------------------------
509 #---------------------------------------------------------------------------
507 # handler methods (1 per event)
510 # handler methods (1 per event)
508 #---------------------------------------------------------------------------
511 #---------------------------------------------------------------------------
509
512
510 #----------------------- Heartbeat --------------------------------------
513 #----------------------- Heartbeat --------------------------------------
511
514
512 def handle_new_heart(self, heart):
515 def handle_new_heart(self, heart):
513 """handler to attach to heartbeater.
516 """handler to attach to heartbeater.
514 Called when a new heart starts to beat.
517 Called when a new heart starts to beat.
515 Triggers completion of registration."""
518 Triggers completion of registration."""
516 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
519 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
517 if heart not in self.incoming_registrations:
520 if heart not in self.incoming_registrations:
518 self.log.info("heartbeat::ignoring new heart: %r"%heart)
521 self.log.info("heartbeat::ignoring new heart: %r"%heart)
519 else:
522 else:
520 self.finish_registration(heart)
523 self.finish_registration(heart)
521
524
522
525
523 def handle_heart_failure(self, heart):
526 def handle_heart_failure(self, heart):
524 """handler to attach to heartbeater.
527 """handler to attach to heartbeater.
525 called when a previously registered heart fails to respond to beat request.
528 called when a previously registered heart fails to respond to beat request.
526 triggers unregistration"""
529 triggers unregistration"""
527 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
530 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
528 eid = self.hearts.get(heart, None)
531 eid = self.hearts.get(heart, None)
529 queue = self.engines[eid].queue
532 queue = self.engines[eid].queue
530 if eid is None:
533 if eid is None:
531 self.log.info("heartbeat::ignoring heart failure %r"%heart)
534 self.log.info("heartbeat::ignoring heart failure %r"%heart)
532 else:
535 else:
533 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
536 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
534
537
535 #----------------------- MUX Queue Traffic ------------------------------
538 #----------------------- MUX Queue Traffic ------------------------------
536
539
537 def save_queue_request(self, idents, msg):
540 def save_queue_request(self, idents, msg):
538 if len(idents) < 2:
541 if len(idents) < 2:
539 self.log.error("invalid identity prefix: %r"%idents)
542 self.log.error("invalid identity prefix: %r"%idents)
540 return
543 return
541 queue_id, client_id = idents[:2]
544 queue_id, client_id = idents[:2]
542 try:
545 try:
543 msg = self.session.unpack_message(msg, content=False)
546 msg = self.session.unpack_message(msg, content=False)
544 except Exception:
547 except Exception:
545 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
548 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
546 return
549 return
547
550
548 eid = self.by_ident.get(queue_id, None)
551 eid = self.by_ident.get(queue_id, None)
549 if eid is None:
552 if eid is None:
550 self.log.error("queue::target %r not registered"%queue_id)
553 self.log.error("queue::target %r not registered"%queue_id)
551 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
554 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
552 return
555 return
553
556
554 header = msg['header']
557 header = msg['header']
555 msg_id = header['msg_id']
558 msg_id = header['msg_id']
556 record = init_record(msg)
559 record = init_record(msg)
557 record['engine_uuid'] = queue_id
560 record['engine_uuid'] = queue_id
558 record['client_uuid'] = client_id
561 record['client_uuid'] = client_id
559 record['queue'] = 'mux'
562 record['queue'] = 'mux'
560
563
561 try:
564 try:
562 # it's posible iopub arrived first:
565 # it's posible iopub arrived first:
563 existing = self.db.get_record(msg_id)
566 existing = self.db.get_record(msg_id)
564 for key,evalue in existing.iteritems():
567 for key,evalue in existing.iteritems():
565 rvalue = record.get(key, None)
568 rvalue = record.get(key, None)
566 if evalue and rvalue and evalue != rvalue:
569 if evalue and rvalue and evalue != rvalue:
567 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
570 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
568 elif evalue and not rvalue:
571 elif evalue and not rvalue:
569 record[key] = evalue
572 record[key] = evalue
570 self.db.update_record(msg_id, record)
573 self.db.update_record(msg_id, record)
571 except KeyError:
574 except KeyError:
572 self.db.add_record(msg_id, record)
575 self.db.add_record(msg_id, record)
573
576
574 self.pending.add(msg_id)
577 self.pending.add(msg_id)
575 self.queues[eid].append(msg_id)
578 self.queues[eid].append(msg_id)
576
579
577 def save_queue_result(self, idents, msg):
580 def save_queue_result(self, idents, msg):
578 if len(idents) < 2:
581 if len(idents) < 2:
579 self.log.error("invalid identity prefix: %r"%idents)
582 self.log.error("invalid identity prefix: %r"%idents)
580 return
583 return
581
584
582 client_id, queue_id = idents[:2]
585 client_id, queue_id = idents[:2]
583 try:
586 try:
584 msg = self.session.unpack_message(msg, content=False)
587 msg = self.session.unpack_message(msg, content=False)
585 except Exception:
588 except Exception:
586 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
589 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
587 queue_id,client_id, msg), exc_info=True)
590 queue_id,client_id, msg), exc_info=True)
588 return
591 return
589
592
590 eid = self.by_ident.get(queue_id, None)
593 eid = self.by_ident.get(queue_id, None)
591 if eid is None:
594 if eid is None:
592 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
595 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
593 return
596 return
594
597
595 parent = msg['parent_header']
598 parent = msg['parent_header']
596 if not parent:
599 if not parent:
597 return
600 return
598 msg_id = parent['msg_id']
601 msg_id = parent['msg_id']
599 if msg_id in self.pending:
602 if msg_id in self.pending:
600 self.pending.remove(msg_id)
603 self.pending.remove(msg_id)
601 self.all_completed.add(msg_id)
604 self.all_completed.add(msg_id)
602 self.queues[eid].remove(msg_id)
605 self.queues[eid].remove(msg_id)
603 self.completed[eid].append(msg_id)
606 self.completed[eid].append(msg_id)
604 elif msg_id not in self.all_completed:
607 elif msg_id not in self.all_completed:
605 # it could be a result from a dead engine that died before delivering the
608 # it could be a result from a dead engine that died before delivering the
606 # result
609 # result
607 self.log.warn("queue:: unknown msg finished %r"%msg_id)
610 self.log.warn("queue:: unknown msg finished %r"%msg_id)
608 return
611 return
609 # update record anyway, because the unregistration could have been premature
612 # update record anyway, because the unregistration could have been premature
610 rheader = msg['header']
613 rheader = msg['header']
611 completed = datetime.strptime(rheader['date'], util.ISO8601)
614 completed = datetime.strptime(rheader['date'], util.ISO8601)
612 started = rheader.get('started', None)
615 started = rheader.get('started', None)
613 if started is not None:
616 if started is not None:
614 started = datetime.strptime(started, util.ISO8601)
617 started = datetime.strptime(started, util.ISO8601)
615 result = {
618 result = {
616 'result_header' : rheader,
619 'result_header' : rheader,
617 'result_content': msg['content'],
620 'result_content': msg['content'],
618 'started' : started,
621 'started' : started,
619 'completed' : completed
622 'completed' : completed
620 }
623 }
621
624
622 result['result_buffers'] = msg['buffers']
625 result['result_buffers'] = msg['buffers']
623 try:
626 try:
624 self.db.update_record(msg_id, result)
627 self.db.update_record(msg_id, result)
625 except Exception:
628 except Exception:
626 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
629 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
627
630
628
631
629 #--------------------- Task Queue Traffic ------------------------------
632 #--------------------- Task Queue Traffic ------------------------------
630
633
631 def save_task_request(self, idents, msg):
634 def save_task_request(self, idents, msg):
632 """Save the submission of a task."""
635 """Save the submission of a task."""
633 client_id = idents[0]
636 client_id = idents[0]
634
637
635 try:
638 try:
636 msg = self.session.unpack_message(msg, content=False)
639 msg = self.session.unpack_message(msg, content=False)
637 except Exception:
640 except Exception:
638 self.log.error("task::client %r sent invalid task message: %r"%(
641 self.log.error("task::client %r sent invalid task message: %r"%(
639 client_id, msg), exc_info=True)
642 client_id, msg), exc_info=True)
640 return
643 return
641 record = init_record(msg)
644 record = init_record(msg)
642
645
643 record['client_uuid'] = client_id
646 record['client_uuid'] = client_id
644 record['queue'] = 'task'
647 record['queue'] = 'task'
645 header = msg['header']
648 header = msg['header']
646 msg_id = header['msg_id']
649 msg_id = header['msg_id']
647 self.pending.add(msg_id)
650 self.pending.add(msg_id)
648 self.unassigned.add(msg_id)
651 self.unassigned.add(msg_id)
649 try:
652 try:
650 # it's posible iopub arrived first:
653 # it's posible iopub arrived first:
651 existing = self.db.get_record(msg_id)
654 existing = self.db.get_record(msg_id)
652 if existing['resubmitted']:
655 if existing['resubmitted']:
653 for key in ('submitted', 'client_uuid', 'buffers'):
656 for key in ('submitted', 'client_uuid', 'buffers'):
654 # don't clobber these keys on resubmit
657 # don't clobber these keys on resubmit
655 # submitted and client_uuid should be different
658 # submitted and client_uuid should be different
656 # and buffers might be big, and shouldn't have changed
659 # and buffers might be big, and shouldn't have changed
657 record.pop(key)
660 record.pop(key)
658 # still check content,header which should not change
661 # still check content,header which should not change
659 # but are not expensive to compare as buffers
662 # but are not expensive to compare as buffers
660
663
661 for key,evalue in existing.iteritems():
664 for key,evalue in existing.iteritems():
662 if key.endswith('buffers'):
665 if key.endswith('buffers'):
663 # don't compare buffers
666 # don't compare buffers
664 continue
667 continue
665 rvalue = record.get(key, None)
668 rvalue = record.get(key, None)
666 if evalue and rvalue and evalue != rvalue:
669 if evalue and rvalue and evalue != rvalue:
667 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
670 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
668 elif evalue and not rvalue:
671 elif evalue and not rvalue:
669 record[key] = evalue
672 record[key] = evalue
670 self.db.update_record(msg_id, record)
673 self.db.update_record(msg_id, record)
671 except KeyError:
674 except KeyError:
672 self.db.add_record(msg_id, record)
675 self.db.add_record(msg_id, record)
673 except Exception:
676 except Exception:
674 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
677 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
675
678
676 def save_task_result(self, idents, msg):
679 def save_task_result(self, idents, msg):
677 """save the result of a completed task."""
680 """save the result of a completed task."""
678 client_id = idents[0]
681 client_id = idents[0]
679 try:
682 try:
680 msg = self.session.unpack_message(msg, content=False)
683 msg = self.session.unpack_message(msg, content=False)
681 except Exception:
684 except Exception:
682 self.log.error("task::invalid task result message send to %r: %r"%(
685 self.log.error("task::invalid task result message send to %r: %r"%(
683 client_id, msg), exc_info=True)
686 client_id, msg), exc_info=True)
684 return
687 return
685
688
686 parent = msg['parent_header']
689 parent = msg['parent_header']
687 if not parent:
690 if not parent:
688 # print msg
691 # print msg
689 self.log.warn("Task %r had no parent!"%msg)
692 self.log.warn("Task %r had no parent!"%msg)
690 return
693 return
691 msg_id = parent['msg_id']
694 msg_id = parent['msg_id']
692 if msg_id in self.unassigned:
695 if msg_id in self.unassigned:
693 self.unassigned.remove(msg_id)
696 self.unassigned.remove(msg_id)
694
697
695 header = msg['header']
698 header = msg['header']
696 engine_uuid = header.get('engine', None)
699 engine_uuid = header.get('engine', None)
697 eid = self.by_ident.get(engine_uuid, None)
700 eid = self.by_ident.get(engine_uuid, None)
698
701
699 if msg_id in self.pending:
702 if msg_id in self.pending:
700 self.pending.remove(msg_id)
703 self.pending.remove(msg_id)
701 self.all_completed.add(msg_id)
704 self.all_completed.add(msg_id)
702 if eid is not None:
705 if eid is not None:
703 self.completed[eid].append(msg_id)
706 self.completed[eid].append(msg_id)
704 if msg_id in self.tasks[eid]:
707 if msg_id in self.tasks[eid]:
705 self.tasks[eid].remove(msg_id)
708 self.tasks[eid].remove(msg_id)
706 completed = datetime.strptime(header['date'], util.ISO8601)
709 completed = datetime.strptime(header['date'], util.ISO8601)
707 started = header.get('started', None)
710 started = header.get('started', None)
708 if started is not None:
711 if started is not None:
709 started = datetime.strptime(started, util.ISO8601)
712 started = datetime.strptime(started, util.ISO8601)
710 result = {
713 result = {
711 'result_header' : header,
714 'result_header' : header,
712 'result_content': msg['content'],
715 'result_content': msg['content'],
713 'started' : started,
716 'started' : started,
714 'completed' : completed,
717 'completed' : completed,
715 'engine_uuid': engine_uuid
718 'engine_uuid': engine_uuid
716 }
719 }
717
720
718 result['result_buffers'] = msg['buffers']
721 result['result_buffers'] = msg['buffers']
719 try:
722 try:
720 self.db.update_record(msg_id, result)
723 self.db.update_record(msg_id, result)
721 except Exception:
724 except Exception:
722 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
725 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
723
726
724 else:
727 else:
725 self.log.debug("task::unknown task %r finished"%msg_id)
728 self.log.debug("task::unknown task %r finished"%msg_id)
726
729
727 def save_task_destination(self, idents, msg):
730 def save_task_destination(self, idents, msg):
728 try:
731 try:
729 msg = self.session.unpack_message(msg, content=True)
732 msg = self.session.unpack_message(msg, content=True)
730 except Exception:
733 except Exception:
731 self.log.error("task::invalid task tracking message", exc_info=True)
734 self.log.error("task::invalid task tracking message", exc_info=True)
732 return
735 return
733 content = msg['content']
736 content = msg['content']
734 # print (content)
737 # print (content)
735 msg_id = content['msg_id']
738 msg_id = content['msg_id']
736 engine_uuid = content['engine_id']
739 engine_uuid = content['engine_id']
737 eid = self.by_ident[engine_uuid]
740 eid = self.by_ident[engine_uuid]
738
741
739 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
742 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
740 if msg_id in self.unassigned:
743 if msg_id in self.unassigned:
741 self.unassigned.remove(msg_id)
744 self.unassigned.remove(msg_id)
742 # else:
745 # else:
743 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
746 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
744
747
745 self.tasks[eid].append(msg_id)
748 self.tasks[eid].append(msg_id)
746 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
749 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
747 try:
750 try:
748 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
751 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
749 except Exception:
752 except Exception:
750 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
753 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
751
754
752
755
753 def mia_task_request(self, idents, msg):
756 def mia_task_request(self, idents, msg):
754 raise NotImplementedError
757 raise NotImplementedError
755 client_id = idents[0]
758 client_id = idents[0]
756 # content = dict(mia=self.mia,status='ok')
759 # content = dict(mia=self.mia,status='ok')
757 # self.session.send('mia_reply', content=content, idents=client_id)
760 # self.session.send('mia_reply', content=content, idents=client_id)
758
761
759
762
760 #--------------------- IOPub Traffic ------------------------------
763 #--------------------- IOPub Traffic ------------------------------
761
764
762 def save_iopub_message(self, topics, msg):
765 def save_iopub_message(self, topics, msg):
763 """save an iopub message into the db"""
766 """save an iopub message into the db"""
764 # print (topics)
767 # print (topics)
765 try:
768 try:
766 msg = self.session.unpack_message(msg, content=True)
769 msg = self.session.unpack_message(msg, content=True)
767 except Exception:
770 except Exception:
768 self.log.error("iopub::invalid IOPub message", exc_info=True)
771 self.log.error("iopub::invalid IOPub message", exc_info=True)
769 return
772 return
770
773
771 parent = msg['parent_header']
774 parent = msg['parent_header']
772 if not parent:
775 if not parent:
773 self.log.error("iopub::invalid IOPub message: %r"%msg)
776 self.log.error("iopub::invalid IOPub message: %r"%msg)
774 return
777 return
775 msg_id = parent['msg_id']
778 msg_id = parent['msg_id']
776 msg_type = msg['msg_type']
779 msg_type = msg['msg_type']
777 content = msg['content']
780 content = msg['content']
778
781
779 # ensure msg_id is in db
782 # ensure msg_id is in db
780 try:
783 try:
781 rec = self.db.get_record(msg_id)
784 rec = self.db.get_record(msg_id)
782 except KeyError:
785 except KeyError:
783 rec = empty_record()
786 rec = empty_record()
784 rec['msg_id'] = msg_id
787 rec['msg_id'] = msg_id
785 self.db.add_record(msg_id, rec)
788 self.db.add_record(msg_id, rec)
786 # stream
789 # stream
787 d = {}
790 d = {}
788 if msg_type == 'stream':
791 if msg_type == 'stream':
789 name = content['name']
792 name = content['name']
790 s = rec[name] or ''
793 s = rec[name] or ''
791 d[name] = s + content['data']
794 d[name] = s + content['data']
792
795
793 elif msg_type == 'pyerr':
796 elif msg_type == 'pyerr':
794 d['pyerr'] = content
797 d['pyerr'] = content
795 elif msg_type == 'pyin':
798 elif msg_type == 'pyin':
796 d['pyin'] = content['code']
799 d['pyin'] = content['code']
797 else:
800 else:
798 d[msg_type] = content.get('data', '')
801 d[msg_type] = content.get('data', '')
799
802
800 try:
803 try:
801 self.db.update_record(msg_id, d)
804 self.db.update_record(msg_id, d)
802 except Exception:
805 except Exception:
803 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
806 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
804
807
805
808
806
809
807 #-------------------------------------------------------------------------
810 #-------------------------------------------------------------------------
808 # Registration requests
811 # Registration requests
809 #-------------------------------------------------------------------------
812 #-------------------------------------------------------------------------
810
813
811 def connection_request(self, client_id, msg):
814 def connection_request(self, client_id, msg):
812 """Reply with connection addresses for clients."""
815 """Reply with connection addresses for clients."""
813 self.log.info("client::client %r connected"%client_id)
816 self.log.info("client::client %r connected"%client_id)
814 content = dict(status='ok')
817 content = dict(status='ok')
815 content.update(self.client_info)
818 content.update(self.client_info)
816 jsonable = {}
819 jsonable = {}
817 for k,v in self.keytable.iteritems():
820 for k,v in self.keytable.iteritems():
818 if v not in self.dead_engines:
821 if v not in self.dead_engines:
819 jsonable[str(k)] = v
822 jsonable[str(k)] = v
820 content['engines'] = jsonable
823 content['engines'] = jsonable
821 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
824 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
822
825
823 def register_engine(self, reg, msg):
826 def register_engine(self, reg, msg):
824 """Register a new engine."""
827 """Register a new engine."""
825 content = msg['content']
828 content = msg['content']
826 try:
829 try:
827 queue = content['queue']
830 queue = content['queue']
828 except KeyError:
831 except KeyError:
829 self.log.error("registration::queue not specified", exc_info=True)
832 self.log.error("registration::queue not specified", exc_info=True)
830 return
833 return
831 heart = content.get('heartbeat', None)
834 heart = content.get('heartbeat', None)
832 """register a new engine, and create the socket(s) necessary"""
835 """register a new engine, and create the socket(s) necessary"""
833 eid = self._next_id
836 eid = self._next_id
834 # print (eid, queue, reg, heart)
837 # print (eid, queue, reg, heart)
835
838
836 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
839 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
837
840
838 content = dict(id=eid,status='ok')
841 content = dict(id=eid,status='ok')
839 content.update(self.engine_info)
842 content.update(self.engine_info)
840 # check if requesting available IDs:
843 # check if requesting available IDs:
841 if queue in self.by_ident:
844 if queue in self.by_ident:
842 try:
845 try:
843 raise KeyError("queue_id %r in use"%queue)
846 raise KeyError("queue_id %r in use"%queue)
844 except:
847 except:
845 content = error.wrap_exception()
848 content = error.wrap_exception()
846 self.log.error("queue_id %r in use"%queue, exc_info=True)
849 self.log.error("queue_id %r in use"%queue, exc_info=True)
847 elif heart in self.hearts: # need to check unique hearts?
850 elif heart in self.hearts: # need to check unique hearts?
848 try:
851 try:
849 raise KeyError("heart_id %r in use"%heart)
852 raise KeyError("heart_id %r in use"%heart)
850 except:
853 except:
851 self.log.error("heart_id %r in use"%heart, exc_info=True)
854 self.log.error("heart_id %r in use"%heart, exc_info=True)
852 content = error.wrap_exception()
855 content = error.wrap_exception()
853 else:
856 else:
854 for h, pack in self.incoming_registrations.iteritems():
857 for h, pack in self.incoming_registrations.iteritems():
855 if heart == h:
858 if heart == h:
856 try:
859 try:
857 raise KeyError("heart_id %r in use"%heart)
860 raise KeyError("heart_id %r in use"%heart)
858 except:
861 except:
859 self.log.error("heart_id %r in use"%heart, exc_info=True)
862 self.log.error("heart_id %r in use"%heart, exc_info=True)
860 content = error.wrap_exception()
863 content = error.wrap_exception()
861 break
864 break
862 elif queue == pack[1]:
865 elif queue == pack[1]:
863 try:
866 try:
864 raise KeyError("queue_id %r in use"%queue)
867 raise KeyError("queue_id %r in use"%queue)
865 except:
868 except:
866 self.log.error("queue_id %r in use"%queue, exc_info=True)
869 self.log.error("queue_id %r in use"%queue, exc_info=True)
867 content = error.wrap_exception()
870 content = error.wrap_exception()
868 break
871 break
869
872
870 msg = self.session.send(self.query, "registration_reply",
873 msg = self.session.send(self.query, "registration_reply",
871 content=content,
874 content=content,
872 ident=reg)
875 ident=reg)
873
876
874 if content['status'] == 'ok':
877 if content['status'] == 'ok':
875 if heart in self.heartmonitor.hearts:
878 if heart in self.heartmonitor.hearts:
876 # already beating
879 # already beating
877 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
880 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
878 self.finish_registration(heart)
881 self.finish_registration(heart)
879 else:
882 else:
880 purge = lambda : self._purge_stalled_registration(heart)
883 purge = lambda : self._purge_stalled_registration(heart)
881 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
884 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
882 dc.start()
885 dc.start()
883 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
886 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
884 else:
887 else:
885 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
888 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
886 return eid
889 return eid
887
890
888 def unregister_engine(self, ident, msg):
891 def unregister_engine(self, ident, msg):
889 """Unregister an engine that explicitly requested to leave."""
892 """Unregister an engine that explicitly requested to leave."""
890 try:
893 try:
891 eid = msg['content']['id']
894 eid = msg['content']['id']
892 except:
895 except:
893 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
896 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
894 return
897 return
895 self.log.info("registration::unregister_engine(%r)"%eid)
898 self.log.info("registration::unregister_engine(%r)"%eid)
896 # print (eid)
899 # print (eid)
897 uuid = self.keytable[eid]
900 uuid = self.keytable[eid]
898 content=dict(id=eid, queue=uuid)
901 content=dict(id=eid, queue=uuid)
899 self.dead_engines.add(uuid)
902 self.dead_engines.add(uuid)
900 # self.ids.remove(eid)
903 # self.ids.remove(eid)
901 # uuid = self.keytable.pop(eid)
904 # uuid = self.keytable.pop(eid)
902 #
905 #
903 # ec = self.engines.pop(eid)
906 # ec = self.engines.pop(eid)
904 # self.hearts.pop(ec.heartbeat)
907 # self.hearts.pop(ec.heartbeat)
905 # self.by_ident.pop(ec.queue)
908 # self.by_ident.pop(ec.queue)
906 # self.completed.pop(eid)
909 # self.completed.pop(eid)
907 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
910 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
908 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
911 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
909 dc.start()
912 dc.start()
910 ############## TODO: HANDLE IT ################
913 ############## TODO: HANDLE IT ################
911
914
912 if self.notifier:
915 if self.notifier:
913 self.session.send(self.notifier, "unregistration_notification", content=content)
916 self.session.send(self.notifier, "unregistration_notification", content=content)
914
917
915 def _handle_stranded_msgs(self, eid, uuid):
918 def _handle_stranded_msgs(self, eid, uuid):
916 """Handle messages known to be on an engine when the engine unregisters.
919 """Handle messages known to be on an engine when the engine unregisters.
917
920
918 It is possible that this will fire prematurely - that is, an engine will
921 It is possible that this will fire prematurely - that is, an engine will
919 go down after completing a result, and the client will be notified
922 go down after completing a result, and the client will be notified
920 that the result failed and later receive the actual result.
923 that the result failed and later receive the actual result.
921 """
924 """
922
925
923 outstanding = self.queues[eid]
926 outstanding = self.queues[eid]
924
927
925 for msg_id in outstanding:
928 for msg_id in outstanding:
926 self.pending.remove(msg_id)
929 self.pending.remove(msg_id)
927 self.all_completed.add(msg_id)
930 self.all_completed.add(msg_id)
928 try:
931 try:
929 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
932 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
930 except:
933 except:
931 content = error.wrap_exception()
934 content = error.wrap_exception()
932 # build a fake header:
935 # build a fake header:
933 header = {}
936 header = {}
934 header['engine'] = uuid
937 header['engine'] = uuid
935 header['date'] = datetime.now()
938 header['date'] = datetime.now()
936 rec = dict(result_content=content, result_header=header, result_buffers=[])
939 rec = dict(result_content=content, result_header=header, result_buffers=[])
937 rec['completed'] = header['date']
940 rec['completed'] = header['date']
938 rec['engine_uuid'] = uuid
941 rec['engine_uuid'] = uuid
939 try:
942 try:
940 self.db.update_record(msg_id, rec)
943 self.db.update_record(msg_id, rec)
941 except Exception:
944 except Exception:
942 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
945 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
943
946
944
947
945 def finish_registration(self, heart):
948 def finish_registration(self, heart):
946 """Second half of engine registration, called after our HeartMonitor
949 """Second half of engine registration, called after our HeartMonitor
947 has received a beat from the Engine's Heart."""
950 has received a beat from the Engine's Heart."""
948 try:
951 try:
949 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
952 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
950 except KeyError:
953 except KeyError:
951 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
954 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
952 return
955 return
953 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
956 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
954 if purge is not None:
957 if purge is not None:
955 purge.stop()
958 purge.stop()
956 control = queue
959 control = queue
957 self.ids.add(eid)
960 self.ids.add(eid)
958 self.keytable[eid] = queue
961 self.keytable[eid] = queue
959 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
962 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
960 control=control, heartbeat=heart)
963 control=control, heartbeat=heart)
961 self.by_ident[queue] = eid
964 self.by_ident[queue] = eid
962 self.queues[eid] = list()
965 self.queues[eid] = list()
963 self.tasks[eid] = list()
966 self.tasks[eid] = list()
964 self.completed[eid] = list()
967 self.completed[eid] = list()
965 self.hearts[heart] = eid
968 self.hearts[heart] = eid
966 content = dict(id=eid, queue=self.engines[eid].queue)
969 content = dict(id=eid, queue=self.engines[eid].queue)
967 if self.notifier:
970 if self.notifier:
968 self.session.send(self.notifier, "registration_notification", content=content)
971 self.session.send(self.notifier, "registration_notification", content=content)
969 self.log.info("engine::Engine Connected: %i"%eid)
972 self.log.info("engine::Engine Connected: %i"%eid)
970
973
971 def _purge_stalled_registration(self, heart):
974 def _purge_stalled_registration(self, heart):
972 if heart in self.incoming_registrations:
975 if heart in self.incoming_registrations:
973 eid = self.incoming_registrations.pop(heart)[0]
976 eid = self.incoming_registrations.pop(heart)[0]
974 self.log.info("registration::purging stalled registration: %i"%eid)
977 self.log.info("registration::purging stalled registration: %i"%eid)
975 else:
978 else:
976 pass
979 pass
977
980
978 #-------------------------------------------------------------------------
981 #-------------------------------------------------------------------------
979 # Client Requests
982 # Client Requests
980 #-------------------------------------------------------------------------
983 #-------------------------------------------------------------------------
981
984
982 def shutdown_request(self, client_id, msg):
985 def shutdown_request(self, client_id, msg):
983 """handle shutdown request."""
986 """handle shutdown request."""
984 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
987 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
985 # also notify other clients of shutdown
988 # also notify other clients of shutdown
986 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
989 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
987 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
990 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
988 dc.start()
991 dc.start()
989
992
990 def _shutdown(self):
993 def _shutdown(self):
991 self.log.info("hub::hub shutting down.")
994 self.log.info("hub::hub shutting down.")
992 time.sleep(0.1)
995 time.sleep(0.1)
993 sys.exit(0)
996 sys.exit(0)
994
997
995
998
996 def check_load(self, client_id, msg):
999 def check_load(self, client_id, msg):
997 content = msg['content']
1000 content = msg['content']
998 try:
1001 try:
999 targets = content['targets']
1002 targets = content['targets']
1000 targets = self._validate_targets(targets)
1003 targets = self._validate_targets(targets)
1001 except:
1004 except:
1002 content = error.wrap_exception()
1005 content = error.wrap_exception()
1003 self.session.send(self.query, "hub_error",
1006 self.session.send(self.query, "hub_error",
1004 content=content, ident=client_id)
1007 content=content, ident=client_id)
1005 return
1008 return
1006
1009
1007 content = dict(status='ok')
1010 content = dict(status='ok')
1008 # loads = {}
1011 # loads = {}
1009 for t in targets:
1012 for t in targets:
1010 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1013 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1011 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1014 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1012
1015
1013
1016
1014 def queue_status(self, client_id, msg):
1017 def queue_status(self, client_id, msg):
1015 """Return the Queue status of one or more targets.
1018 """Return the Queue status of one or more targets.
1016 if verbose: return the msg_ids
1019 if verbose: return the msg_ids
1017 else: return len of each type.
1020 else: return len of each type.
1018 keys: queue (pending MUX jobs)
1021 keys: queue (pending MUX jobs)
1019 tasks (pending Task jobs)
1022 tasks (pending Task jobs)
1020 completed (finished jobs from both queues)"""
1023 completed (finished jobs from both queues)"""
1021 content = msg['content']
1024 content = msg['content']
1022 targets = content['targets']
1025 targets = content['targets']
1023 try:
1026 try:
1024 targets = self._validate_targets(targets)
1027 targets = self._validate_targets(targets)
1025 except:
1028 except:
1026 content = error.wrap_exception()
1029 content = error.wrap_exception()
1027 self.session.send(self.query, "hub_error",
1030 self.session.send(self.query, "hub_error",
1028 content=content, ident=client_id)
1031 content=content, ident=client_id)
1029 return
1032 return
1030 verbose = content.get('verbose', False)
1033 verbose = content.get('verbose', False)
1031 content = dict(status='ok')
1034 content = dict(status='ok')
1032 for t in targets:
1035 for t in targets:
1033 queue = self.queues[t]
1036 queue = self.queues[t]
1034 completed = self.completed[t]
1037 completed = self.completed[t]
1035 tasks = self.tasks[t]
1038 tasks = self.tasks[t]
1036 if not verbose:
1039 if not verbose:
1037 queue = len(queue)
1040 queue = len(queue)
1038 completed = len(completed)
1041 completed = len(completed)
1039 tasks = len(tasks)
1042 tasks = len(tasks)
1040 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1043 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1041 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1044 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1042
1045
1043 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1046 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1044
1047
1045 def purge_results(self, client_id, msg):
1048 def purge_results(self, client_id, msg):
1046 """Purge results from memory. This method is more valuable before we move
1049 """Purge results from memory. This method is more valuable before we move
1047 to a DB based message storage mechanism."""
1050 to a DB based message storage mechanism."""
1048 content = msg['content']
1051 content = msg['content']
1049 msg_ids = content.get('msg_ids', [])
1052 msg_ids = content.get('msg_ids', [])
1050 reply = dict(status='ok')
1053 reply = dict(status='ok')
1051 if msg_ids == 'all':
1054 if msg_ids == 'all':
1052 try:
1055 try:
1053 self.db.drop_matching_records(dict(completed={'$ne':None}))
1056 self.db.drop_matching_records(dict(completed={'$ne':None}))
1054 except Exception:
1057 except Exception:
1055 reply = error.wrap_exception()
1058 reply = error.wrap_exception()
1056 else:
1059 else:
1057 pending = filter(lambda m: m in self.pending, msg_ids)
1060 pending = filter(lambda m: m in self.pending, msg_ids)
1058 if pending:
1061 if pending:
1059 try:
1062 try:
1060 raise IndexError("msg pending: %r"%pending[0])
1063 raise IndexError("msg pending: %r"%pending[0])
1061 except:
1064 except:
1062 reply = error.wrap_exception()
1065 reply = error.wrap_exception()
1063 else:
1066 else:
1064 try:
1067 try:
1065 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1068 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1066 except Exception:
1069 except Exception:
1067 reply = error.wrap_exception()
1070 reply = error.wrap_exception()
1068
1071
1069 if reply['status'] == 'ok':
1072 if reply['status'] == 'ok':
1070 eids = content.get('engine_ids', [])
1073 eids = content.get('engine_ids', [])
1071 for eid in eids:
1074 for eid in eids:
1072 if eid not in self.engines:
1075 if eid not in self.engines:
1073 try:
1076 try:
1074 raise IndexError("No such engine: %i"%eid)
1077 raise IndexError("No such engine: %i"%eid)
1075 except:
1078 except:
1076 reply = error.wrap_exception()
1079 reply = error.wrap_exception()
1077 break
1080 break
1078 msg_ids = self.completed.pop(eid)
1081 msg_ids = self.completed.pop(eid)
1079 uid = self.engines[eid].queue
1082 uid = self.engines[eid].queue
1080 try:
1083 try:
1081 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1084 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1082 except Exception:
1085 except Exception:
1083 reply = error.wrap_exception()
1086 reply = error.wrap_exception()
1084 break
1087 break
1085
1088
1086 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1089 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1087
1090
1088 def resubmit_task(self, client_id, msg):
1091 def resubmit_task(self, client_id, msg):
1089 """Resubmit one or more tasks."""
1092 """Resubmit one or more tasks."""
1090 def finish(reply):
1093 def finish(reply):
1091 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1094 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1092
1095
1093 content = msg['content']
1096 content = msg['content']
1094 msg_ids = content['msg_ids']
1097 msg_ids = content['msg_ids']
1095 reply = dict(status='ok')
1098 reply = dict(status='ok')
1096 try:
1099 try:
1097 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1100 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1098 'header', 'content', 'buffers'])
1101 'header', 'content', 'buffers'])
1099 except Exception:
1102 except Exception:
1100 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1103 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1101 return finish(error.wrap_exception())
1104 return finish(error.wrap_exception())
1102
1105
1103 # validate msg_ids
1106 # validate msg_ids
1104 found_ids = [ rec['msg_id'] for rec in records ]
1107 found_ids = [ rec['msg_id'] for rec in records ]
1105 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1108 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1106 if len(records) > len(msg_ids):
1109 if len(records) > len(msg_ids):
1107 try:
1110 try:
1108 raise RuntimeError("DB appears to be in an inconsistent state."
1111 raise RuntimeError("DB appears to be in an inconsistent state."
1109 "More matching records were found than should exist")
1112 "More matching records were found than should exist")
1110 except Exception:
1113 except Exception:
1111 return finish(error.wrap_exception())
1114 return finish(error.wrap_exception())
1112 elif len(records) < len(msg_ids):
1115 elif len(records) < len(msg_ids):
1113 missing = [ m for m in msg_ids if m not in found_ids ]
1116 missing = [ m for m in msg_ids if m not in found_ids ]
1114 try:
1117 try:
1115 raise KeyError("No such msg(s): %r"%missing)
1118 raise KeyError("No such msg(s): %r"%missing)
1116 except KeyError:
1119 except KeyError:
1117 return finish(error.wrap_exception())
1120 return finish(error.wrap_exception())
1118 elif invalid_ids:
1121 elif invalid_ids:
1119 msg_id = invalid_ids[0]
1122 msg_id = invalid_ids[0]
1120 try:
1123 try:
1121 raise ValueError("Task %r appears to be inflight"%(msg_id))
1124 raise ValueError("Task %r appears to be inflight"%(msg_id))
1122 except Exception:
1125 except Exception:
1123 return finish(error.wrap_exception())
1126 return finish(error.wrap_exception())
1124
1127
1125 # clear the existing records
1128 # clear the existing records
1129 now = datetime.now()
1126 rec = empty_record()
1130 rec = empty_record()
1127 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1131 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1128 rec['resubmitted'] = datetime.now()
1132 rec['resubmitted'] = now
1129 rec['queue'] = 'task'
1133 rec['queue'] = 'task'
1130 rec['client_uuid'] = client_id[0]
1134 rec['client_uuid'] = client_id[0]
1131 try:
1135 try:
1132 for msg_id in msg_ids:
1136 for msg_id in msg_ids:
1133 self.all_completed.discard(msg_id)
1137 self.all_completed.discard(msg_id)
1134 self.db.update_record(msg_id, rec)
1138 self.db.update_record(msg_id, rec)
1135 except Exception:
1139 except Exception:
1136 self.log.error('db::db error upating record', exc_info=True)
1140 self.log.error('db::db error upating record', exc_info=True)
1137 reply = error.wrap_exception()
1141 reply = error.wrap_exception()
1138 else:
1142 else:
1139 # send the messages
1143 # send the messages
1144 now_s = now.strftime(util.ISO8601)
1140 for rec in records:
1145 for rec in records:
1141 header = rec['header']
1146 header = rec['header']
1147 # include resubmitted in header to prevent digest collision
1148 header['resubmitted'] = now_s
1142 msg = self.session.msg(header['msg_type'])
1149 msg = self.session.msg(header['msg_type'])
1143 msg['content'] = rec['content']
1150 msg['content'] = rec['content']
1144 msg['header'] = header
1151 msg['header'] = header
1145 msg['msg_id'] = rec['msg_id']
1152 msg['msg_id'] = rec['msg_id']
1146 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1153 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1147
1154
1148 finish(dict(status='ok'))
1155 finish(dict(status='ok'))
1149
1156
1150
1157
1151 def _extract_record(self, rec):
1158 def _extract_record(self, rec):
1152 """decompose a TaskRecord dict into subsection of reply for get_result"""
1159 """decompose a TaskRecord dict into subsection of reply for get_result"""
1153 io_dict = {}
1160 io_dict = {}
1154 for key in 'pyin pyout pyerr stdout stderr'.split():
1161 for key in 'pyin pyout pyerr stdout stderr'.split():
1155 io_dict[key] = rec[key]
1162 io_dict[key] = rec[key]
1156 content = { 'result_content': rec['result_content'],
1163 content = { 'result_content': rec['result_content'],
1157 'header': rec['header'],
1164 'header': rec['header'],
1158 'result_header' : rec['result_header'],
1165 'result_header' : rec['result_header'],
1159 'io' : io_dict,
1166 'io' : io_dict,
1160 }
1167 }
1161 if rec['result_buffers']:
1168 if rec['result_buffers']:
1162 buffers = map(str, rec['result_buffers'])
1169 buffers = map(str, rec['result_buffers'])
1163 else:
1170 else:
1164 buffers = []
1171 buffers = []
1165
1172
1166 return content, buffers
1173 return content, buffers
1167
1174
1168 def get_results(self, client_id, msg):
1175 def get_results(self, client_id, msg):
1169 """Get the result of 1 or more messages."""
1176 """Get the result of 1 or more messages."""
1170 content = msg['content']
1177 content = msg['content']
1171 msg_ids = sorted(set(content['msg_ids']))
1178 msg_ids = sorted(set(content['msg_ids']))
1172 statusonly = content.get('status_only', False)
1179 statusonly = content.get('status_only', False)
1173 pending = []
1180 pending = []
1174 completed = []
1181 completed = []
1175 content = dict(status='ok')
1182 content = dict(status='ok')
1176 content['pending'] = pending
1183 content['pending'] = pending
1177 content['completed'] = completed
1184 content['completed'] = completed
1178 buffers = []
1185 buffers = []
1179 if not statusonly:
1186 if not statusonly:
1180 try:
1187 try:
1181 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1188 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1182 # turn match list into dict, for faster lookup
1189 # turn match list into dict, for faster lookup
1183 records = {}
1190 records = {}
1184 for rec in matches:
1191 for rec in matches:
1185 records[rec['msg_id']] = rec
1192 records[rec['msg_id']] = rec
1186 except Exception:
1193 except Exception:
1187 content = error.wrap_exception()
1194 content = error.wrap_exception()
1188 self.session.send(self.query, "result_reply", content=content,
1195 self.session.send(self.query, "result_reply", content=content,
1189 parent=msg, ident=client_id)
1196 parent=msg, ident=client_id)
1190 return
1197 return
1191 else:
1198 else:
1192 records = {}
1199 records = {}
1193 for msg_id in msg_ids:
1200 for msg_id in msg_ids:
1194 if msg_id in self.pending:
1201 if msg_id in self.pending:
1195 pending.append(msg_id)
1202 pending.append(msg_id)
1196 elif msg_id in self.all_completed:
1203 elif msg_id in self.all_completed:
1197 completed.append(msg_id)
1204 completed.append(msg_id)
1198 if not statusonly:
1205 if not statusonly:
1199 c,bufs = self._extract_record(records[msg_id])
1206 c,bufs = self._extract_record(records[msg_id])
1200 content[msg_id] = c
1207 content[msg_id] = c
1201 buffers.extend(bufs)
1208 buffers.extend(bufs)
1202 elif msg_id in records:
1209 elif msg_id in records:
1203 if rec['completed']:
1210 if rec['completed']:
1204 completed.append(msg_id)
1211 completed.append(msg_id)
1205 c,bufs = self._extract_record(records[msg_id])
1212 c,bufs = self._extract_record(records[msg_id])
1206 content[msg_id] = c
1213 content[msg_id] = c
1207 buffers.extend(bufs)
1214 buffers.extend(bufs)
1208 else:
1215 else:
1209 pending.append(msg_id)
1216 pending.append(msg_id)
1210 else:
1217 else:
1211 try:
1218 try:
1212 raise KeyError('No such message: '+msg_id)
1219 raise KeyError('No such message: '+msg_id)
1213 except:
1220 except:
1214 content = error.wrap_exception()
1221 content = error.wrap_exception()
1215 break
1222 break
1216 self.session.send(self.query, "result_reply", content=content,
1223 self.session.send(self.query, "result_reply", content=content,
1217 parent=msg, ident=client_id,
1224 parent=msg, ident=client_id,
1218 buffers=buffers)
1225 buffers=buffers)
1219
1226
1220 def get_history(self, client_id, msg):
1227 def get_history(self, client_id, msg):
1221 """Get a list of all msg_ids in our DB records"""
1228 """Get a list of all msg_ids in our DB records"""
1222 try:
1229 try:
1223 msg_ids = self.db.get_history()
1230 msg_ids = self.db.get_history()
1224 except Exception as e:
1231 except Exception as e:
1225 content = error.wrap_exception()
1232 content = error.wrap_exception()
1226 else:
1233 else:
1227 content = dict(status='ok', history=msg_ids)
1234 content = dict(status='ok', history=msg_ids)
1228
1235
1229 self.session.send(self.query, "history_reply", content=content,
1236 self.session.send(self.query, "history_reply", content=content,
1230 parent=msg, ident=client_id)
1237 parent=msg, ident=client_id)
1231
1238
1232 def db_query(self, client_id, msg):
1239 def db_query(self, client_id, msg):
1233 """Perform a raw query on the task record database."""
1240 """Perform a raw query on the task record database."""
1234 content = msg['content']
1241 content = msg['content']
1235 query = content.get('query', {})
1242 query = content.get('query', {})
1236 keys = content.get('keys', None)
1243 keys = content.get('keys', None)
1237 query = util.extract_dates(query)
1244 query = util.extract_dates(query)
1238 buffers = []
1245 buffers = []
1239 empty = list()
1246 empty = list()
1240
1247
1241 try:
1248 try:
1242 records = self.db.find_records(query, keys)
1249 records = self.db.find_records(query, keys)
1243 except Exception as e:
1250 except Exception as e:
1244 content = error.wrap_exception()
1251 content = error.wrap_exception()
1245 else:
1252 else:
1246 # extract buffers from reply content:
1253 # extract buffers from reply content:
1247 if keys is not None:
1254 if keys is not None:
1248 buffer_lens = [] if 'buffers' in keys else None
1255 buffer_lens = [] if 'buffers' in keys else None
1249 result_buffer_lens = [] if 'result_buffers' in keys else None
1256 result_buffer_lens = [] if 'result_buffers' in keys else None
1250 else:
1257 else:
1251 buffer_lens = []
1258 buffer_lens = []
1252 result_buffer_lens = []
1259 result_buffer_lens = []
1253
1260
1254 for rec in records:
1261 for rec in records:
1255 # buffers may be None, so double check
1262 # buffers may be None, so double check
1256 if buffer_lens is not None:
1263 if buffer_lens is not None:
1257 b = rec.pop('buffers', empty) or empty
1264 b = rec.pop('buffers', empty) or empty
1258 buffer_lens.append(len(b))
1265 buffer_lens.append(len(b))
1259 buffers.extend(b)
1266 buffers.extend(b)
1260 if result_buffer_lens is not None:
1267 if result_buffer_lens is not None:
1261 rb = rec.pop('result_buffers', empty) or empty
1268 rb = rec.pop('result_buffers', empty) or empty
1262 result_buffer_lens.append(len(rb))
1269 result_buffer_lens.append(len(rb))
1263 buffers.extend(rb)
1270 buffers.extend(rb)
1264 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1271 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1265 result_buffer_lens=result_buffer_lens)
1272 result_buffer_lens=result_buffer_lens)
1266
1273
1267 self.session.send(self.query, "db_reply", content=content,
1274 self.session.send(self.query, "db_reply", content=content,
1268 parent=msg, ident=client_id,
1275 parent=msg, ident=client_id,
1269 buffers=buffers)
1276 buffers=buffers)
1270
1277
@@ -1,679 +1,687 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #----------------------------------------------------------------------
14 #----------------------------------------------------------------------
15 # Imports
15 # Imports
16 #----------------------------------------------------------------------
16 #----------------------------------------------------------------------
17
17
18 from __future__ import print_function
18 from __future__ import print_function
19
19
20 import logging
20 import logging
21 import sys
21 import sys
22
22
23 from datetime import datetime, timedelta
23 from datetime import datetime, timedelta
24 from random import randint, random
24 from random import randint, random
25 from types import FunctionType
25 from types import FunctionType
26
26
27 try:
27 try:
28 import numpy
28 import numpy
29 except ImportError:
29 except ImportError:
30 numpy = None
30 numpy = None
31
31
32 import zmq
32 import zmq
33 from zmq.eventloop import ioloop, zmqstream
33 from zmq.eventloop import ioloop, zmqstream
34
34
35 # local imports
35 # local imports
36 from IPython.external.decorator import decorator
36 from IPython.external.decorator import decorator
37 from IPython.config.loader import Config
37 from IPython.config.loader import Config
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum
39
39
40 from IPython.parallel import error
40 from IPython.parallel import error
41 from IPython.parallel.factory import SessionFactory
41 from IPython.parallel.factory import SessionFactory
42 from IPython.parallel.util import connect_logger, local_logger
42 from IPython.parallel.util import connect_logger, local_logger
43
43
44 from .dependency import Dependency
44 from .dependency import Dependency
45
45
46 @decorator
46 @decorator
47 def logged(f,self,*args,**kwargs):
47 def logged(f,self,*args,**kwargs):
48 # print ("#--------------------")
48 # print ("#--------------------")
49 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
49 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
50 # print ("#--")
50 # print ("#--")
51 return f(self,*args, **kwargs)
51 return f(self,*args, **kwargs)
52
52
53 #----------------------------------------------------------------------
53 #----------------------------------------------------------------------
54 # Chooser functions
54 # Chooser functions
55 #----------------------------------------------------------------------
55 #----------------------------------------------------------------------
56
56
57 def plainrandom(loads):
57 def plainrandom(loads):
58 """Plain random pick."""
58 """Plain random pick."""
59 n = len(loads)
59 n = len(loads)
60 return randint(0,n-1)
60 return randint(0,n-1)
61
61
62 def lru(loads):
62 def lru(loads):
63 """Always pick the front of the line.
63 """Always pick the front of the line.
64
64
65 The content of `loads` is ignored.
65 The content of `loads` is ignored.
66
66
67 Assumes LRU ordering of loads, with oldest first.
67 Assumes LRU ordering of loads, with oldest first.
68 """
68 """
69 return 0
69 return 0
70
70
71 def twobin(loads):
71 def twobin(loads):
72 """Pick two at random, use the LRU of the two.
72 """Pick two at random, use the LRU of the two.
73
73
74 The content of loads is ignored.
74 The content of loads is ignored.
75
75
76 Assumes LRU ordering of loads, with oldest first.
76 Assumes LRU ordering of loads, with oldest first.
77 """
77 """
78 n = len(loads)
78 n = len(loads)
79 a = randint(0,n-1)
79 a = randint(0,n-1)
80 b = randint(0,n-1)
80 b = randint(0,n-1)
81 return min(a,b)
81 return min(a,b)
82
82
83 def weighted(loads):
83 def weighted(loads):
84 """Pick two at random using inverse load as weight.
84 """Pick two at random using inverse load as weight.
85
85
86 Return the less loaded of the two.
86 Return the less loaded of the two.
87 """
87 """
88 # weight 0 a million times more than 1:
88 # weight 0 a million times more than 1:
89 weights = 1./(1e-6+numpy.array(loads))
89 weights = 1./(1e-6+numpy.array(loads))
90 sums = weights.cumsum()
90 sums = weights.cumsum()
91 t = sums[-1]
91 t = sums[-1]
92 x = random()*t
92 x = random()*t
93 y = random()*t
93 y = random()*t
94 idx = 0
94 idx = 0
95 idy = 0
95 idy = 0
96 while sums[idx] < x:
96 while sums[idx] < x:
97 idx += 1
97 idx += 1
98 while sums[idy] < y:
98 while sums[idy] < y:
99 idy += 1
99 idy += 1
100 if weights[idy] > weights[idx]:
100 if weights[idy] > weights[idx]:
101 return idy
101 return idy
102 else:
102 else:
103 return idx
103 return idx
104
104
105 def leastload(loads):
105 def leastload(loads):
106 """Always choose the lowest load.
106 """Always choose the lowest load.
107
107
108 If the lowest load occurs more than once, the first
108 If the lowest load occurs more than once, the first
109 occurance will be used. If loads has LRU ordering, this means
109 occurance will be used. If loads has LRU ordering, this means
110 the LRU of those with the lowest load is chosen.
110 the LRU of those with the lowest load is chosen.
111 """
111 """
112 return loads.index(min(loads))
112 return loads.index(min(loads))
113
113
114 #---------------------------------------------------------------------
114 #---------------------------------------------------------------------
115 # Classes
115 # Classes
116 #---------------------------------------------------------------------
116 #---------------------------------------------------------------------
117 # store empty default dependency:
117 # store empty default dependency:
118 MET = Dependency([])
118 MET = Dependency([])
119
119
120 class TaskScheduler(SessionFactory):
120 class TaskScheduler(SessionFactory):
121 """Python TaskScheduler object.
121 """Python TaskScheduler object.
122
122
123 This is the simplest object that supports msg_id based
123 This is the simplest object that supports msg_id based
124 DAG dependencies. *Only* task msg_ids are checked, not
124 DAG dependencies. *Only* task msg_ids are checked, not
125 msg_ids of jobs submitted via the MUX queue.
125 msg_ids of jobs submitted via the MUX queue.
126
126
127 """
127 """
128
128
129 hwm = Int(0, config=True, shortname='hwm',
129 hwm = Int(0, config=True, shortname='hwm',
130 help="""specify the High Water Mark (HWM) for the downstream
130 help="""specify the High Water Mark (HWM) for the downstream
131 socket in the Task scheduler. This is the maximum number
131 socket in the Task scheduler. This is the maximum number
132 of allowed outstanding tasks on each engine."""
132 of allowed outstanding tasks on each engine."""
133 )
133 )
134 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
134 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
135 'leastload', config=True, shortname='scheme', allow_none=False,
135 'leastload', config=True, shortname='scheme', allow_none=False,
136 help="""select the task scheduler scheme [default: Python LRU]
136 help="""select the task scheduler scheme [default: Python LRU]
137 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
137 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
138 )
138 )
139 def _scheme_name_changed(self, old, new):
139 def _scheme_name_changed(self, old, new):
140 self.log.debug("Using scheme %r"%new)
140 self.log.debug("Using scheme %r"%new)
141 self.scheme = globals()[new]
141 self.scheme = globals()[new]
142
142
143 # input arguments:
143 # input arguments:
144 scheme = Instance(FunctionType) # function for determining the destination
144 scheme = Instance(FunctionType) # function for determining the destination
145 def _scheme_default(self):
145 def _scheme_default(self):
146 return leastload
146 return leastload
147 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
147 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
148 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
148 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
149 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
149 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
150 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
150 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
151
151
152 # internals:
152 # internals:
153 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
153 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
154 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
154 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
155 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
155 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
156 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
156 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
157 pending = Dict() # dict by engine_uuid of submitted tasks
157 pending = Dict() # dict by engine_uuid of submitted tasks
158 completed = Dict() # dict by engine_uuid of completed tasks
158 completed = Dict() # dict by engine_uuid of completed tasks
159 failed = Dict() # dict by engine_uuid of failed tasks
159 failed = Dict() # dict by engine_uuid of failed tasks
160 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
160 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
161 clients = Dict() # dict by msg_id for who submitted the task
161 clients = Dict() # dict by msg_id for who submitted the task
162 targets = List() # list of target IDENTs
162 targets = List() # list of target IDENTs
163 loads = List() # list of engine loads
163 loads = List() # list of engine loads
164 # full = Set() # set of IDENTs that have HWM outstanding tasks
164 # full = Set() # set of IDENTs that have HWM outstanding tasks
165 all_completed = Set() # set of all completed tasks
165 all_completed = Set() # set of all completed tasks
166 all_failed = Set() # set of all failed tasks
166 all_failed = Set() # set of all failed tasks
167 all_done = Set() # set of all finished tasks=union(completed,failed)
167 all_done = Set() # set of all finished tasks=union(completed,failed)
168 all_ids = Set() # set of all submitted task IDs
168 all_ids = Set() # set of all submitted task IDs
169 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
169 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
170 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
170 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
171
171
172
172
173 def start(self):
173 def start(self):
174 self.engine_stream.on_recv(self.dispatch_result, copy=False)
174 self.engine_stream.on_recv(self.dispatch_result, copy=False)
175 self._notification_handlers = dict(
175 self._notification_handlers = dict(
176 registration_notification = self._register_engine,
176 registration_notification = self._register_engine,
177 unregistration_notification = self._unregister_engine
177 unregistration_notification = self._unregister_engine
178 )
178 )
179 self.notifier_stream.on_recv(self.dispatch_notification)
179 self.notifier_stream.on_recv(self.dispatch_notification)
180 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
180 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
181 self.auditor.start()
181 self.auditor.start()
182 self.log.info("Scheduler started...%r"%self)
182 self.log.info("Scheduler started...%r"%self)
183
183
184 def resume_receiving(self):
184 def resume_receiving(self):
185 """Resume accepting jobs."""
185 """Resume accepting jobs."""
186 self.client_stream.on_recv(self.dispatch_submission, copy=False)
186 self.client_stream.on_recv(self.dispatch_submission, copy=False)
187
187
188 def stop_receiving(self):
188 def stop_receiving(self):
189 """Stop accepting jobs while there are no engines.
189 """Stop accepting jobs while there are no engines.
190 Leave them in the ZMQ queue."""
190 Leave them in the ZMQ queue."""
191 self.client_stream.on_recv(None)
191 self.client_stream.on_recv(None)
192
192
193 #-----------------------------------------------------------------------
193 #-----------------------------------------------------------------------
194 # [Un]Registration Handling
194 # [Un]Registration Handling
195 #-----------------------------------------------------------------------
195 #-----------------------------------------------------------------------
196
196
197 def dispatch_notification(self, msg):
197 def dispatch_notification(self, msg):
198 """dispatch register/unregister events."""
198 """dispatch register/unregister events."""
199 idents,msg = self.session.feed_identities(msg)
199 try:
200 msg = self.session.unpack_message(msg)
200 idents,msg = self.session.feed_identities(msg)
201 except ValueError:
202 self.log.warn("task::Invalid Message: %r"%msg)
203 return
204 try:
205 msg = self.session.unpack_message(msg)
206 except ValueError:
207 self.log.warn("task::Unauthorized message from: %r"%idents)
208 return
209
201 msg_type = msg['msg_type']
210 msg_type = msg['msg_type']
211
202 handler = self._notification_handlers.get(msg_type, None)
212 handler = self._notification_handlers.get(msg_type, None)
203 if handler is None:
213 if handler is None:
204 raise Exception("Unhandled message type: %s"%msg_type)
214 self.log.error("Unhandled message type: %r"%msg_type)
205 else:
215 else:
206 try:
216 try:
207 handler(str(msg['content']['queue']))
217 handler(str(msg['content']['queue']))
208 except KeyError:
218 except KeyError:
209 self.log.error("task::Invalid notification msg: %s"%msg)
219 self.log.error("task::Invalid notification msg: %r"%msg)
210
220
211 @logged
221 @logged
212 def _register_engine(self, uid):
222 def _register_engine(self, uid):
213 """New engine with ident `uid` became available."""
223 """New engine with ident `uid` became available."""
214 # head of the line:
224 # head of the line:
215 self.targets.insert(0,uid)
225 self.targets.insert(0,uid)
216 self.loads.insert(0,0)
226 self.loads.insert(0,0)
217 # initialize sets
227 # initialize sets
218 self.completed[uid] = set()
228 self.completed[uid] = set()
219 self.failed[uid] = set()
229 self.failed[uid] = set()
220 self.pending[uid] = {}
230 self.pending[uid] = {}
221 if len(self.targets) == 1:
231 if len(self.targets) == 1:
222 self.resume_receiving()
232 self.resume_receiving()
223 # rescan the graph:
233 # rescan the graph:
224 self.update_graph(None)
234 self.update_graph(None)
225
235
226 def _unregister_engine(self, uid):
236 def _unregister_engine(self, uid):
227 """Existing engine with ident `uid` became unavailable."""
237 """Existing engine with ident `uid` became unavailable."""
228 if len(self.targets) == 1:
238 if len(self.targets) == 1:
229 # this was our only engine
239 # this was our only engine
230 self.stop_receiving()
240 self.stop_receiving()
231
241
232 # handle any potentially finished tasks:
242 # handle any potentially finished tasks:
233 self.engine_stream.flush()
243 self.engine_stream.flush()
234
244
235 # don't pop destinations, because they might be used later
245 # don't pop destinations, because they might be used later
236 # map(self.destinations.pop, self.completed.pop(uid))
246 # map(self.destinations.pop, self.completed.pop(uid))
237 # map(self.destinations.pop, self.failed.pop(uid))
247 # map(self.destinations.pop, self.failed.pop(uid))
238
248
239 # prevent this engine from receiving work
249 # prevent this engine from receiving work
240 idx = self.targets.index(uid)
250 idx = self.targets.index(uid)
241 self.targets.pop(idx)
251 self.targets.pop(idx)
242 self.loads.pop(idx)
252 self.loads.pop(idx)
243
253
244 # wait 5 seconds before cleaning up pending jobs, since the results might
254 # wait 5 seconds before cleaning up pending jobs, since the results might
245 # still be incoming
255 # still be incoming
246 if self.pending[uid]:
256 if self.pending[uid]:
247 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
257 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
248 dc.start()
258 dc.start()
249 else:
259 else:
250 self.completed.pop(uid)
260 self.completed.pop(uid)
251 self.failed.pop(uid)
261 self.failed.pop(uid)
252
262
253
263
254 @logged
264 @logged
255 def handle_stranded_tasks(self, engine):
265 def handle_stranded_tasks(self, engine):
256 """Deal with jobs resident in an engine that died."""
266 """Deal with jobs resident in an engine that died."""
257 lost = self.pending[engine]
267 lost = self.pending[engine]
258 for msg_id in lost.keys():
268 for msg_id in lost.keys():
259 if msg_id not in self.pending[engine]:
269 if msg_id not in self.pending[engine]:
260 # prevent double-handling of messages
270 # prevent double-handling of messages
261 continue
271 continue
262
272
263 raw_msg = lost[msg_id][0]
273 raw_msg = lost[msg_id][0]
264 idents,msg = self.session.feed_identities(raw_msg, copy=False)
274 idents,msg = self.session.feed_identities(raw_msg, copy=False)
265 msg = self.session.unpack_message(msg, copy=False, content=False)
275 parent = self.session.unpack(msg[1].bytes)
266 parent = msg['header']
267 idents = [engine, idents[0]]
276 idents = [engine, idents[0]]
268
277
269 # build fake error reply
278 # build fake error reply
270 try:
279 try:
271 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
280 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
272 except:
281 except:
273 content = error.wrap_exception()
282 content = error.wrap_exception()
274 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
283 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
275 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
284 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
276 # and dispatch it
285 # and dispatch it
277 self.dispatch_result(raw_reply)
286 self.dispatch_result(raw_reply)
278
287
279 # finally scrub completed/failed lists
288 # finally scrub completed/failed lists
280 self.completed.pop(engine)
289 self.completed.pop(engine)
281 self.failed.pop(engine)
290 self.failed.pop(engine)
282
291
283
292
284 #-----------------------------------------------------------------------
293 #-----------------------------------------------------------------------
285 # Job Submission
294 # Job Submission
286 #-----------------------------------------------------------------------
295 #-----------------------------------------------------------------------
287 @logged
296 @logged
288 def dispatch_submission(self, raw_msg):
297 def dispatch_submission(self, raw_msg):
289 """Dispatch job submission to appropriate handlers."""
298 """Dispatch job submission to appropriate handlers."""
290 # ensure targets up to date:
299 # ensure targets up to date:
291 self.notifier_stream.flush()
300 self.notifier_stream.flush()
292 try:
301 try:
293 idents, msg = self.session.feed_identities(raw_msg, copy=False)
302 idents, msg = self.session.feed_identities(raw_msg, copy=False)
294 msg = self.session.unpack_message(msg, content=False, copy=False)
303 msg = self.session.unpack_message(msg, content=False, copy=False)
295 except Exception:
304 except Exception:
296 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
305 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
297 return
306 return
298
307
299
308
300 # send to monitor
309 # send to monitor
301 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
310 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
302
311
303 header = msg['header']
312 header = msg['header']
304 msg_id = header['msg_id']
313 msg_id = header['msg_id']
305 self.all_ids.add(msg_id)
314 self.all_ids.add(msg_id)
306
315
307 # targets
316 # targets
308 targets = set(header.get('targets', []))
317 targets = set(header.get('targets', []))
309 retries = header.get('retries', 0)
318 retries = header.get('retries', 0)
310 self.retries[msg_id] = retries
319 self.retries[msg_id] = retries
311
320
312 # time dependencies
321 # time dependencies
313 after = Dependency(header.get('after', []))
322 after = Dependency(header.get('after', []))
314 if after.all:
323 if after.all:
315 if after.success:
324 if after.success:
316 after.difference_update(self.all_completed)
325 after.difference_update(self.all_completed)
317 if after.failure:
326 if after.failure:
318 after.difference_update(self.all_failed)
327 after.difference_update(self.all_failed)
319 if after.check(self.all_completed, self.all_failed):
328 if after.check(self.all_completed, self.all_failed):
320 # recast as empty set, if `after` already met,
329 # recast as empty set, if `after` already met,
321 # to prevent unnecessary set comparisons
330 # to prevent unnecessary set comparisons
322 after = MET
331 after = MET
323
332
324 # location dependencies
333 # location dependencies
325 follow = Dependency(header.get('follow', []))
334 follow = Dependency(header.get('follow', []))
326
335
327 # turn timeouts into datetime objects:
336 # turn timeouts into datetime objects:
328 timeout = header.get('timeout', None)
337 timeout = header.get('timeout', None)
329 if timeout:
338 if timeout:
330 timeout = datetime.now() + timedelta(0,timeout,0)
339 timeout = datetime.now() + timedelta(0,timeout,0)
331
340
332 args = [raw_msg, targets, after, follow, timeout]
341 args = [raw_msg, targets, after, follow, timeout]
333
342
334 # validate and reduce dependencies:
343 # validate and reduce dependencies:
335 for dep in after,follow:
344 for dep in after,follow:
336 # check valid:
345 # check valid:
337 if msg_id in dep or dep.difference(self.all_ids):
346 if msg_id in dep or dep.difference(self.all_ids):
338 self.depending[msg_id] = args
347 self.depending[msg_id] = args
339 return self.fail_unreachable(msg_id, error.InvalidDependency)
348 return self.fail_unreachable(msg_id, error.InvalidDependency)
340 # check if unreachable:
349 # check if unreachable:
341 if dep.unreachable(self.all_completed, self.all_failed):
350 if dep.unreachable(self.all_completed, self.all_failed):
342 self.depending[msg_id] = args
351 self.depending[msg_id] = args
343 return self.fail_unreachable(msg_id)
352 return self.fail_unreachable(msg_id)
344
353
345 if after.check(self.all_completed, self.all_failed):
354 if after.check(self.all_completed, self.all_failed):
346 # time deps already met, try to run
355 # time deps already met, try to run
347 if not self.maybe_run(msg_id, *args):
356 if not self.maybe_run(msg_id, *args):
348 # can't run yet
357 # can't run yet
349 if msg_id not in self.all_failed:
358 if msg_id not in self.all_failed:
350 # could have failed as unreachable
359 # could have failed as unreachable
351 self.save_unmet(msg_id, *args)
360 self.save_unmet(msg_id, *args)
352 else:
361 else:
353 self.save_unmet(msg_id, *args)
362 self.save_unmet(msg_id, *args)
354
363
355 # @logged
364 # @logged
356 def audit_timeouts(self):
365 def audit_timeouts(self):
357 """Audit all waiting tasks for expired timeouts."""
366 """Audit all waiting tasks for expired timeouts."""
358 now = datetime.now()
367 now = datetime.now()
359 for msg_id in self.depending.keys():
368 for msg_id in self.depending.keys():
360 # must recheck, in case one failure cascaded to another:
369 # must recheck, in case one failure cascaded to another:
361 if msg_id in self.depending:
370 if msg_id in self.depending:
362 raw,after,targets,follow,timeout = self.depending[msg_id]
371 raw,after,targets,follow,timeout = self.depending[msg_id]
363 if timeout and timeout < now:
372 if timeout and timeout < now:
364 self.fail_unreachable(msg_id, error.TaskTimeout)
373 self.fail_unreachable(msg_id, error.TaskTimeout)
365
374
366 @logged
375 @logged
367 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
376 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
368 """a task has become unreachable, send a reply with an ImpossibleDependency
377 """a task has become unreachable, send a reply with an ImpossibleDependency
369 error."""
378 error."""
370 if msg_id not in self.depending:
379 if msg_id not in self.depending:
371 self.log.error("msg %r already failed!"%msg_id)
380 self.log.error("msg %r already failed!"%msg_id)
372 return
381 return
373 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
382 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
374 for mid in follow.union(after):
383 for mid in follow.union(after):
375 if mid in self.graph:
384 if mid in self.graph:
376 self.graph[mid].remove(msg_id)
385 self.graph[mid].remove(msg_id)
377
386
378 # FIXME: unpacking a message I've already unpacked, but didn't save:
387 # FIXME: unpacking a message I've already unpacked, but didn't save:
379 idents,msg = self.session.feed_identities(raw_msg, copy=False)
388 idents,msg = self.session.feed_identities(raw_msg, copy=False)
380 msg = self.session.unpack_message(msg, copy=False, content=False)
389 header = self.session.unpack(msg[1].bytes)
381 header = msg['header']
382
390
383 try:
391 try:
384 raise why()
392 raise why()
385 except:
393 except:
386 content = error.wrap_exception()
394 content = error.wrap_exception()
387
395
388 self.all_done.add(msg_id)
396 self.all_done.add(msg_id)
389 self.all_failed.add(msg_id)
397 self.all_failed.add(msg_id)
390
398
391 msg = self.session.send(self.client_stream, 'apply_reply', content,
399 msg = self.session.send(self.client_stream, 'apply_reply', content,
392 parent=header, ident=idents)
400 parent=header, ident=idents)
393 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
401 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
394
402
395 self.update_graph(msg_id, success=False)
403 self.update_graph(msg_id, success=False)
396
404
397 @logged
405 @logged
398 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
406 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
399 """check location dependencies, and run if they are met."""
407 """check location dependencies, and run if they are met."""
400 blacklist = self.blacklist.setdefault(msg_id, set())
408 blacklist = self.blacklist.setdefault(msg_id, set())
401 if follow or targets or blacklist or self.hwm:
409 if follow or targets or blacklist or self.hwm:
402 # we need a can_run filter
410 # we need a can_run filter
403 def can_run(idx):
411 def can_run(idx):
404 # check hwm
412 # check hwm
405 if self.hwm and self.loads[idx] == self.hwm:
413 if self.hwm and self.loads[idx] == self.hwm:
406 return False
414 return False
407 target = self.targets[idx]
415 target = self.targets[idx]
408 # check blacklist
416 # check blacklist
409 if target in blacklist:
417 if target in blacklist:
410 return False
418 return False
411 # check targets
419 # check targets
412 if targets and target not in targets:
420 if targets and target not in targets:
413 return False
421 return False
414 # check follow
422 # check follow
415 return follow.check(self.completed[target], self.failed[target])
423 return follow.check(self.completed[target], self.failed[target])
416
424
417 indices = filter(can_run, range(len(self.targets)))
425 indices = filter(can_run, range(len(self.targets)))
418
426
419 if not indices:
427 if not indices:
420 # couldn't run
428 # couldn't run
421 if follow.all:
429 if follow.all:
422 # check follow for impossibility
430 # check follow for impossibility
423 dests = set()
431 dests = set()
424 relevant = set()
432 relevant = set()
425 if follow.success:
433 if follow.success:
426 relevant = self.all_completed
434 relevant = self.all_completed
427 if follow.failure:
435 if follow.failure:
428 relevant = relevant.union(self.all_failed)
436 relevant = relevant.union(self.all_failed)
429 for m in follow.intersection(relevant):
437 for m in follow.intersection(relevant):
430 dests.add(self.destinations[m])
438 dests.add(self.destinations[m])
431 if len(dests) > 1:
439 if len(dests) > 1:
432 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
440 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
433 self.fail_unreachable(msg_id)
441 self.fail_unreachable(msg_id)
434 return False
442 return False
435 if targets:
443 if targets:
436 # check blacklist+targets for impossibility
444 # check blacklist+targets for impossibility
437 targets.difference_update(blacklist)
445 targets.difference_update(blacklist)
438 if not targets or not targets.intersection(self.targets):
446 if not targets or not targets.intersection(self.targets):
439 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
447 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
440 self.fail_unreachable(msg_id)
448 self.fail_unreachable(msg_id)
441 return False
449 return False
442 return False
450 return False
443 else:
451 else:
444 indices = None
452 indices = None
445
453
446 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
454 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
447 return True
455 return True
448
456
449 @logged
457 @logged
450 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
458 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
451 """Save a message for later submission when its dependencies are met."""
459 """Save a message for later submission when its dependencies are met."""
452 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
460 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
453 # track the ids in follow or after, but not those already finished
461 # track the ids in follow or after, but not those already finished
454 for dep_id in after.union(follow).difference(self.all_done):
462 for dep_id in after.union(follow).difference(self.all_done):
455 if dep_id not in self.graph:
463 if dep_id not in self.graph:
456 self.graph[dep_id] = set()
464 self.graph[dep_id] = set()
457 self.graph[dep_id].add(msg_id)
465 self.graph[dep_id].add(msg_id)
458
466
459 @logged
467 @logged
460 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
468 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
461 """Submit a task to any of a subset of our targets."""
469 """Submit a task to any of a subset of our targets."""
462 if indices:
470 if indices:
463 loads = [self.loads[i] for i in indices]
471 loads = [self.loads[i] for i in indices]
464 else:
472 else:
465 loads = self.loads
473 loads = self.loads
466 idx = self.scheme(loads)
474 idx = self.scheme(loads)
467 if indices:
475 if indices:
468 idx = indices[idx]
476 idx = indices[idx]
469 target = self.targets[idx]
477 target = self.targets[idx]
470 # print (target, map(str, msg[:3]))
478 # print (target, map(str, msg[:3]))
471 # send job to the engine
479 # send job to the engine
472 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
480 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
473 self.engine_stream.send_multipart(raw_msg, copy=False)
481 self.engine_stream.send_multipart(raw_msg, copy=False)
474 # update load
482 # update load
475 self.add_job(idx)
483 self.add_job(idx)
476 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
484 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
477 # notify Hub
485 # notify Hub
478 content = dict(msg_id=msg_id, engine_id=target)
486 content = dict(msg_id=msg_id, engine_id=target)
479 self.session.send(self.mon_stream, 'task_destination', content=content,
487 self.session.send(self.mon_stream, 'task_destination', content=content,
480 ident=['tracktask',self.session.session])
488 ident=['tracktask',self.session.session])
481
489
482
490
483 #-----------------------------------------------------------------------
491 #-----------------------------------------------------------------------
484 # Result Handling
492 # Result Handling
485 #-----------------------------------------------------------------------
493 #-----------------------------------------------------------------------
486 @logged
494 @logged
487 def dispatch_result(self, raw_msg):
495 def dispatch_result(self, raw_msg):
488 """dispatch method for result replies"""
496 """dispatch method for result replies"""
489 try:
497 try:
490 idents,msg = self.session.feed_identities(raw_msg, copy=False)
498 idents,msg = self.session.feed_identities(raw_msg, copy=False)
491 msg = self.session.unpack_message(msg, content=False, copy=False)
499 msg = self.session.unpack_message(msg, content=False, copy=False)
492 engine = idents[0]
500 engine = idents[0]
493 try:
501 try:
494 idx = self.targets.index(engine)
502 idx = self.targets.index(engine)
495 except ValueError:
503 except ValueError:
496 pass # skip load-update for dead engines
504 pass # skip load-update for dead engines
497 else:
505 else:
498 self.finish_job(idx)
506 self.finish_job(idx)
499 except Exception:
507 except Exception:
500 self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True)
508 self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True)
501 return
509 return
502
510
503 header = msg['header']
511 header = msg['header']
504 parent = msg['parent_header']
512 parent = msg['parent_header']
505 if header.get('dependencies_met', True):
513 if header.get('dependencies_met', True):
506 success = (header['status'] == 'ok')
514 success = (header['status'] == 'ok')
507 msg_id = parent['msg_id']
515 msg_id = parent['msg_id']
508 retries = self.retries[msg_id]
516 retries = self.retries[msg_id]
509 if not success and retries > 0:
517 if not success and retries > 0:
510 # failed
518 # failed
511 self.retries[msg_id] = retries - 1
519 self.retries[msg_id] = retries - 1
512 self.handle_unmet_dependency(idents, parent)
520 self.handle_unmet_dependency(idents, parent)
513 else:
521 else:
514 del self.retries[msg_id]
522 del self.retries[msg_id]
515 # relay to client and update graph
523 # relay to client and update graph
516 self.handle_result(idents, parent, raw_msg, success)
524 self.handle_result(idents, parent, raw_msg, success)
517 # send to Hub monitor
525 # send to Hub monitor
518 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
526 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
519 else:
527 else:
520 self.handle_unmet_dependency(idents, parent)
528 self.handle_unmet_dependency(idents, parent)
521
529
522 @logged
530 @logged
523 def handle_result(self, idents, parent, raw_msg, success=True):
531 def handle_result(self, idents, parent, raw_msg, success=True):
524 """handle a real task result, either success or failure"""
532 """handle a real task result, either success or failure"""
525 # first, relay result to client
533 # first, relay result to client
526 engine = idents[0]
534 engine = idents[0]
527 client = idents[1]
535 client = idents[1]
528 # swap_ids for XREP-XREP mirror
536 # swap_ids for XREP-XREP mirror
529 raw_msg[:2] = [client,engine]
537 raw_msg[:2] = [client,engine]
530 # print (map(str, raw_msg[:4]))
538 # print (map(str, raw_msg[:4]))
531 self.client_stream.send_multipart(raw_msg, copy=False)
539 self.client_stream.send_multipart(raw_msg, copy=False)
532 # now, update our data structures
540 # now, update our data structures
533 msg_id = parent['msg_id']
541 msg_id = parent['msg_id']
534 self.blacklist.pop(msg_id, None)
542 self.blacklist.pop(msg_id, None)
535 self.pending[engine].pop(msg_id)
543 self.pending[engine].pop(msg_id)
536 if success:
544 if success:
537 self.completed[engine].add(msg_id)
545 self.completed[engine].add(msg_id)
538 self.all_completed.add(msg_id)
546 self.all_completed.add(msg_id)
539 else:
547 else:
540 self.failed[engine].add(msg_id)
548 self.failed[engine].add(msg_id)
541 self.all_failed.add(msg_id)
549 self.all_failed.add(msg_id)
542 self.all_done.add(msg_id)
550 self.all_done.add(msg_id)
543 self.destinations[msg_id] = engine
551 self.destinations[msg_id] = engine
544
552
545 self.update_graph(msg_id, success)
553 self.update_graph(msg_id, success)
546
554
547 @logged
555 @logged
548 def handle_unmet_dependency(self, idents, parent):
556 def handle_unmet_dependency(self, idents, parent):
549 """handle an unmet dependency"""
557 """handle an unmet dependency"""
550 engine = idents[0]
558 engine = idents[0]
551 msg_id = parent['msg_id']
559 msg_id = parent['msg_id']
552
560
553 if msg_id not in self.blacklist:
561 if msg_id not in self.blacklist:
554 self.blacklist[msg_id] = set()
562 self.blacklist[msg_id] = set()
555 self.blacklist[msg_id].add(engine)
563 self.blacklist[msg_id].add(engine)
556
564
557 args = self.pending[engine].pop(msg_id)
565 args = self.pending[engine].pop(msg_id)
558 raw,targets,after,follow,timeout = args
566 raw,targets,after,follow,timeout = args
559
567
560 if self.blacklist[msg_id] == targets:
568 if self.blacklist[msg_id] == targets:
561 self.depending[msg_id] = args
569 self.depending[msg_id] = args
562 self.fail_unreachable(msg_id)
570 self.fail_unreachable(msg_id)
563 elif not self.maybe_run(msg_id, *args):
571 elif not self.maybe_run(msg_id, *args):
564 # resubmit failed
572 # resubmit failed
565 if msg_id not in self.all_failed:
573 if msg_id not in self.all_failed:
566 # put it back in our dependency tree
574 # put it back in our dependency tree
567 self.save_unmet(msg_id, *args)
575 self.save_unmet(msg_id, *args)
568
576
569 if self.hwm:
577 if self.hwm:
570 try:
578 try:
571 idx = self.targets.index(engine)
579 idx = self.targets.index(engine)
572 except ValueError:
580 except ValueError:
573 pass # skip load-update for dead engines
581 pass # skip load-update for dead engines
574 else:
582 else:
575 if self.loads[idx] == self.hwm-1:
583 if self.loads[idx] == self.hwm-1:
576 self.update_graph(None)
584 self.update_graph(None)
577
585
578
586
579
587
580 @logged
588 @logged
581 def update_graph(self, dep_id=None, success=True):
589 def update_graph(self, dep_id=None, success=True):
582 """dep_id just finished. Update our dependency
590 """dep_id just finished. Update our dependency
583 graph and submit any jobs that just became runable.
591 graph and submit any jobs that just became runable.
584
592
585 Called with dep_id=None to update entire graph for hwm, but without finishing
593 Called with dep_id=None to update entire graph for hwm, but without finishing
586 a task.
594 a task.
587 """
595 """
588 # print ("\n\n***********")
596 # print ("\n\n***********")
589 # pprint (dep_id)
597 # pprint (dep_id)
590 # pprint (self.graph)
598 # pprint (self.graph)
591 # pprint (self.depending)
599 # pprint (self.depending)
592 # pprint (self.all_completed)
600 # pprint (self.all_completed)
593 # pprint (self.all_failed)
601 # pprint (self.all_failed)
594 # print ("\n\n***********\n\n")
602 # print ("\n\n***********\n\n")
595 # update any jobs that depended on the dependency
603 # update any jobs that depended on the dependency
596 jobs = self.graph.pop(dep_id, [])
604 jobs = self.graph.pop(dep_id, [])
597
605
598 # recheck *all* jobs if
606 # recheck *all* jobs if
599 # a) we have HWM and an engine just become no longer full
607 # a) we have HWM and an engine just become no longer full
600 # or b) dep_id was given as None
608 # or b) dep_id was given as None
601 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
609 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
602 jobs = self.depending.keys()
610 jobs = self.depending.keys()
603
611
604 for msg_id in jobs:
612 for msg_id in jobs:
605 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
613 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
606
614
607 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
615 if after.unreachable(self.all_completed, self.all_failed) or follow.unreachable(self.all_completed, self.all_failed):
608 self.fail_unreachable(msg_id)
616 self.fail_unreachable(msg_id)
609
617
610 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
618 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
611 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
619 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
612
620
613 self.depending.pop(msg_id)
621 self.depending.pop(msg_id)
614 for mid in follow.union(after):
622 for mid in follow.union(after):
615 if mid in self.graph:
623 if mid in self.graph:
616 self.graph[mid].remove(msg_id)
624 self.graph[mid].remove(msg_id)
617
625
618 #----------------------------------------------------------------------
626 #----------------------------------------------------------------------
619 # methods to be overridden by subclasses
627 # methods to be overridden by subclasses
620 #----------------------------------------------------------------------
628 #----------------------------------------------------------------------
621
629
622 def add_job(self, idx):
630 def add_job(self, idx):
623 """Called after self.targets[idx] just got the job with header.
631 """Called after self.targets[idx] just got the job with header.
624 Override with subclasses. The default ordering is simple LRU.
632 Override with subclasses. The default ordering is simple LRU.
625 The default loads are the number of outstanding jobs."""
633 The default loads are the number of outstanding jobs."""
626 self.loads[idx] += 1
634 self.loads[idx] += 1
627 for lis in (self.targets, self.loads):
635 for lis in (self.targets, self.loads):
628 lis.append(lis.pop(idx))
636 lis.append(lis.pop(idx))
629
637
630
638
631 def finish_job(self, idx):
639 def finish_job(self, idx):
632 """Called after self.targets[idx] just finished a job.
640 """Called after self.targets[idx] just finished a job.
633 Override with subclasses."""
641 Override with subclasses."""
634 self.loads[idx] -= 1
642 self.loads[idx] -= 1
635
643
636
644
637
645
638 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
646 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ',
639 log_url=None, loglevel=logging.DEBUG,
647 log_url=None, loglevel=logging.DEBUG,
640 identity=b'task'):
648 identity=b'task'):
641 from zmq.eventloop import ioloop
649 from zmq.eventloop import ioloop
642 from zmq.eventloop.zmqstream import ZMQStream
650 from zmq.eventloop.zmqstream import ZMQStream
643
651
644 if config:
652 if config:
645 # unwrap dict back into Config
653 # unwrap dict back into Config
646 config = Config(config)
654 config = Config(config)
647
655
648 ctx = zmq.Context()
656 ctx = zmq.Context()
649 loop = ioloop.IOLoop()
657 loop = ioloop.IOLoop()
650 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
658 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
651 ins.setsockopt(zmq.IDENTITY, identity)
659 ins.setsockopt(zmq.IDENTITY, identity)
652 ins.bind(in_addr)
660 ins.bind(in_addr)
653
661
654 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
662 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
655 outs.setsockopt(zmq.IDENTITY, identity)
663 outs.setsockopt(zmq.IDENTITY, identity)
656 outs.bind(out_addr)
664 outs.bind(out_addr)
657 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
665 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
658 mons.connect(mon_addr)
666 mons.connect(mon_addr)
659 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
667 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
660 nots.setsockopt(zmq.SUBSCRIBE, '')
668 nots.setsockopt(zmq.SUBSCRIBE, '')
661 nots.connect(not_addr)
669 nots.connect(not_addr)
662
670
663 # setup logging. Note that these will not work in-process, because they clobber
671 # setup logging. Note that these will not work in-process, because they clobber
664 # existing loggers.
672 # existing loggers.
665 if log_url:
673 if log_url:
666 connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
674 connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
667 else:
675 else:
668 local_logger(logname, loglevel)
676 local_logger(logname, loglevel)
669
677
670 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
678 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
671 mon_stream=mons, notifier_stream=nots,
679 mon_stream=mons, notifier_stream=nots,
672 loop=loop, logname=logname,
680 loop=loop, logname=logname,
673 config=config)
681 config=config)
674 scheduler.start()
682 scheduler.start()
675 try:
683 try:
676 loop.start()
684 loop.start()
677 except KeyboardInterrupt:
685 except KeyboardInterrupt:
678 print ("interrupted, exiting...", file=sys.__stderr__)
686 print ("interrupted, exiting...", file=sys.__stderr__)
679
687
@@ -1,446 +1,483 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """edited session.py to work with streams, and move msg_type to the header
2 """edited session.py to work with streams, and move msg_type to the header
3 """
3 """
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2010-2011 The IPython Development Team
5 # Copyright (C) 2010-2011 The IPython Development Team
6 #
6 #
7 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
11
14
15 import hmac
12 import os
16 import os
13 import pprint
17 import pprint
14 import uuid
18 import uuid
15 from datetime import datetime
19 from datetime import datetime
16
20
17 try:
21 try:
18 import cPickle
22 import cPickle
19 pickle = cPickle
23 pickle = cPickle
20 except:
24 except:
21 cPickle = None
25 cPickle = None
22 import pickle
26 import pickle
23
27
24 import zmq
28 import zmq
25 from zmq.utils import jsonapi
29 from zmq.utils import jsonapi
26 from zmq.eventloop.zmqstream import ZMQStream
30 from zmq.eventloop.zmqstream import ZMQStream
27
31
28 from IPython.config.configurable import Configurable
32 from IPython.config.configurable import Configurable
29 from IPython.utils.importstring import import_item
33 from IPython.utils.importstring import import_item
30 from IPython.utils.traitlets import CStr, Unicode, Bool, Any
34 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
31
35
32 from .util import ISO8601
36 from .util import ISO8601
33
37
38 #-----------------------------------------------------------------------------
39 # utility functions
40 #-----------------------------------------------------------------------------
34
41
35 def squash_unicode(obj):
42 def squash_unicode(obj):
36 """coerce unicode back to bytestrings."""
43 """coerce unicode back to bytestrings."""
37 if isinstance(obj,dict):
44 if isinstance(obj,dict):
38 for key in obj.keys():
45 for key in obj.keys():
39 obj[key] = squash_unicode(obj[key])
46 obj[key] = squash_unicode(obj[key])
40 if isinstance(key, unicode):
47 if isinstance(key, unicode):
41 obj[squash_unicode(key)] = obj.pop(key)
48 obj[squash_unicode(key)] = obj.pop(key)
42 elif isinstance(obj, list):
49 elif isinstance(obj, list):
43 for i,v in enumerate(obj):
50 for i,v in enumerate(obj):
44 obj[i] = squash_unicode(v)
51 obj[i] = squash_unicode(v)
45 elif isinstance(obj, unicode):
52 elif isinstance(obj, unicode):
46 obj = obj.encode('utf8')
53 obj = obj.encode('utf8')
47 return obj
54 return obj
48
55
49 def _date_default(obj):
56 def _date_default(obj):
50 if isinstance(obj, datetime):
57 if isinstance(obj, datetime):
51 return obj.strftime(ISO8601)
58 return obj.strftime(ISO8601)
52 else:
59 else:
53 raise TypeError("%r is not JSON serializable"%obj)
60 raise TypeError("%r is not JSON serializable"%obj)
54
61
62 #-----------------------------------------------------------------------------
63 # globals and defaults
64 #-----------------------------------------------------------------------------
65
55 _default_key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
66 _default_key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
56 json_packer = lambda obj: jsonapi.dumps(obj, **{_default_key:_date_default})
67 json_packer = lambda obj: jsonapi.dumps(obj, **{_default_key:_date_default})
57 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
68 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
58
69
59 pickle_packer = lambda o: pickle.dumps(o,-1)
70 pickle_packer = lambda o: pickle.dumps(o,-1)
60 pickle_unpacker = pickle.loads
71 pickle_unpacker = pickle.loads
61
72
62 default_packer = json_packer
73 default_packer = json_packer
63 default_unpacker = json_unpacker
74 default_unpacker = json_unpacker
64
75
65
76
66 DELIM="<IDS|MSG>"
77 DELIM="<IDS|MSG>"
67
78
79 #-----------------------------------------------------------------------------
80 # Classes
81 #-----------------------------------------------------------------------------
82
68 class Message(object):
83 class Message(object):
69 """A simple message object that maps dict keys to attributes.
84 """A simple message object that maps dict keys to attributes.
70
85
71 A Message can be created from a dict and a dict from a Message instance
86 A Message can be created from a dict and a dict from a Message instance
72 simply by calling dict(msg_obj)."""
87 simply by calling dict(msg_obj)."""
73
88
74 def __init__(self, msg_dict):
89 def __init__(self, msg_dict):
75 dct = self.__dict__
90 dct = self.__dict__
76 for k, v in dict(msg_dict).iteritems():
91 for k, v in dict(msg_dict).iteritems():
77 if isinstance(v, dict):
92 if isinstance(v, dict):
78 v = Message(v)
93 v = Message(v)
79 dct[k] = v
94 dct[k] = v
80
95
81 # Having this iterator lets dict(msg_obj) work out of the box.
96 # Having this iterator lets dict(msg_obj) work out of the box.
82 def __iter__(self):
97 def __iter__(self):
83 return iter(self.__dict__.iteritems())
98 return iter(self.__dict__.iteritems())
84
99
85 def __repr__(self):
100 def __repr__(self):
86 return repr(self.__dict__)
101 return repr(self.__dict__)
87
102
88 def __str__(self):
103 def __str__(self):
89 return pprint.pformat(self.__dict__)
104 return pprint.pformat(self.__dict__)
90
105
91 def __contains__(self, k):
106 def __contains__(self, k):
92 return k in self.__dict__
107 return k in self.__dict__
93
108
94 def __getitem__(self, k):
109 def __getitem__(self, k):
95 return self.__dict__[k]
110 return self.__dict__[k]
96
111
97
112
98 def msg_header(msg_id, msg_type, username, session):
113 def msg_header(msg_id, msg_type, username, session):
99 date=datetime.now().strftime(ISO8601)
114 date=datetime.now().strftime(ISO8601)
100 return locals()
115 return locals()
101
116
102 def extract_header(msg_or_header):
117 def extract_header(msg_or_header):
103 """Given a message or header, return the header."""
118 """Given a message or header, return the header."""
104 if not msg_or_header:
119 if not msg_or_header:
105 return {}
120 return {}
106 try:
121 try:
107 # See if msg_or_header is the entire message.
122 # See if msg_or_header is the entire message.
108 h = msg_or_header['header']
123 h = msg_or_header['header']
109 except KeyError:
124 except KeyError:
110 try:
125 try:
111 # See if msg_or_header is just the header
126 # See if msg_or_header is just the header
112 h = msg_or_header['msg_id']
127 h = msg_or_header['msg_id']
113 except KeyError:
128 except KeyError:
114 raise
129 raise
115 else:
130 else:
116 h = msg_or_header
131 h = msg_or_header
117 if not isinstance(h, dict):
132 if not isinstance(h, dict):
118 h = dict(h)
133 h = dict(h)
119 return h
134 return h
120
135
121 class StreamSession(Configurable):
136 class StreamSession(Configurable):
122 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
137 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
123 debug=Bool(False, config=True, help="""Debug output in the StreamSession""")
138 debug=Bool(False, config=True, help="""Debug output in the StreamSession""")
124 packer = Unicode('json',config=True,
139 packer = Unicode('json',config=True,
125 help="""The name of the packer for serializing messages.
140 help="""The name of the packer for serializing messages.
126 Should be one of 'json', 'pickle', or an import name
141 Should be one of 'json', 'pickle', or an import name
127 for a custom serializer.""")
142 for a custom serializer.""")
128 def _packer_changed(self, name, old, new):
143 def _packer_changed(self, name, old, new):
129 if new.lower() == 'json':
144 if new.lower() == 'json':
130 self.pack = json_packer
145 self.pack = json_packer
131 self.unpack = json_unpacker
146 self.unpack = json_unpacker
132 elif new.lower() == 'pickle':
147 elif new.lower() == 'pickle':
133 self.pack = pickle_packer
148 self.pack = pickle_packer
134 self.unpack = pickle_unpacker
149 self.unpack = pickle_unpacker
135 else:
150 else:
136 self.pack = import_item(new)
151 self.pack = import_item(new)
137
152
138 unpacker = Unicode('json',config=True,
153 unpacker = Unicode('json',config=True,
139 help="""The name of the unpacker for unserializing messages.
154 help="""The name of the unpacker for unserializing messages.
140 Only used with custom functions for `packer`.""")
155 Only used with custom functions for `packer`.""")
141 def _unpacker_changed(self, name, old, new):
156 def _unpacker_changed(self, name, old, new):
142 if new.lower() == 'json':
157 if new.lower() == 'json':
143 self.pack = json_packer
158 self.pack = json_packer
144 self.unpack = json_unpacker
159 self.unpack = json_unpacker
145 elif new.lower() == 'pickle':
160 elif new.lower() == 'pickle':
146 self.pack = pickle_packer
161 self.pack = pickle_packer
147 self.unpack = pickle_unpacker
162 self.unpack = pickle_unpacker
148 else:
163 else:
149 self.unpack = import_item(new)
164 self.unpack = import_item(new)
150
165
151 session = CStr('',config=True,
166 session = CStr('',config=True,
152 help="""The UUID identifying this session.""")
167 help="""The UUID identifying this session.""")
153 def _session_default(self):
168 def _session_default(self):
154 return bytes(uuid.uuid4())
169 return bytes(uuid.uuid4())
155 username = Unicode(os.environ.get('USER','username'),config=True,
170 username = Unicode(os.environ.get('USER','username'), config=True,
156 help="""Username for the Session. Default is your system username.""")
171 help="""Username for the Session. Default is your system username.""")
172
173 # message signature related traits:
157 key = CStr('', config=True,
174 key = CStr('', config=True,
158 help="""execution key, for extra authentication.""")
175 help="""execution key, for extra authentication.""")
159
176 def _key_changed(self, name, old, new):
177 if new:
178 self.auth = hmac.HMAC(new)
179 else:
180 self.auth = None
181 auth = Instance(hmac.HMAC)
182 counters = Instance('collections.defaultdict', (int,))
183 digest_history = Set()
184
160 keyfile = Unicode('', config=True,
185 keyfile = Unicode('', config=True,
161 help="""path to file containing execution key.""")
186 help="""path to file containing execution key.""")
162 def _keyfile_changed(self, name, old, new):
187 def _keyfile_changed(self, name, old, new):
163 with open(new, 'rb') as f:
188 with open(new, 'rb') as f:
164 self.key = f.read().strip()
189 self.key = f.read().strip()
165
190
166 pack = Any(default_packer) # the actual packer function
191 pack = Any(default_packer) # the actual packer function
167 def _pack_changed(self, name, old, new):
192 def _pack_changed(self, name, old, new):
168 if not callable(new):
193 if not callable(new):
169 raise TypeError("packer must be callable, not %s"%type(new))
194 raise TypeError("packer must be callable, not %s"%type(new))
170
195
171 unpack = Any(default_unpacker) # the actual packer function
196 unpack = Any(default_unpacker) # the actual packer function
172 def _unpack_changed(self, name, old, new):
197 def _unpack_changed(self, name, old, new):
173 if not callable(new):
198 if not callable(new):
174 raise TypeError("packer must be callable, not %s"%type(new))
199 raise TypeError("packer must be callable, not %s"%type(new))
175
200
176 def __init__(self, **kwargs):
201 def __init__(self, **kwargs):
177 super(StreamSession, self).__init__(**kwargs)
202 super(StreamSession, self).__init__(**kwargs)
178 self.none = self.pack({})
203 self.none = self.pack({})
179
204
180 @property
205 @property
181 def msg_id(self):
206 def msg_id(self):
182 """always return new uuid"""
207 """always return new uuid"""
183 return str(uuid.uuid4())
208 return str(uuid.uuid4())
184
209
185 def msg_header(self, msg_type):
210 def msg_header(self, msg_type):
186 return msg_header(self.msg_id, msg_type, self.username, self.session)
211 return msg_header(self.msg_id, msg_type, self.username, self.session)
187
212
188 def msg(self, msg_type, content=None, parent=None, subheader=None):
213 def msg(self, msg_type, content=None, parent=None, subheader=None):
189 msg = {}
214 msg = {}
190 msg['header'] = self.msg_header(msg_type)
215 msg['header'] = self.msg_header(msg_type)
191 msg['msg_id'] = msg['header']['msg_id']
216 msg['msg_id'] = msg['header']['msg_id']
192 msg['parent_header'] = {} if parent is None else extract_header(parent)
217 msg['parent_header'] = {} if parent is None else extract_header(parent)
193 msg['msg_type'] = msg_type
218 msg['msg_type'] = msg_type
194 msg['content'] = {} if content is None else content
219 msg['content'] = {} if content is None else content
195 sub = {} if subheader is None else subheader
220 sub = {} if subheader is None else subheader
196 msg['header'].update(sub)
221 msg['header'].update(sub)
197 return msg
222 return msg
198
223
199 def check_key(self, msg_or_header):
224 def check_key(self, msg_or_header):
200 """Check that a message's header has the right key"""
225 """Check that a message's header has the right key"""
201 if not self.key:
226 if not self.key:
202 return True
227 return True
203 header = extract_header(msg_or_header)
228 header = extract_header(msg_or_header)
204 return header.get('key', '') == self.key
229 return header.get('key', '') == self.key
205
230
231 def sign(self, msg):
232 """Sign a message with HMAC digest. If no auth, return b''."""
233 if self.auth is None:
234 return b''
235 h = self.auth.copy()
236 for m in msg:
237 h.update(m)
238 return h.hexdigest()
206
239
207 def serialize(self, msg, ident=None):
240 def serialize(self, msg, ident=None):
208 content = msg.get('content', {})
241 content = msg.get('content', {})
209 if content is None:
242 if content is None:
210 content = self.none
243 content = self.none
211 elif isinstance(content, dict):
244 elif isinstance(content, dict):
212 content = self.pack(content)
245 content = self.pack(content)
213 elif isinstance(content, bytes):
246 elif isinstance(content, bytes):
214 # content is already packed, as in a relayed message
247 # content is already packed, as in a relayed message
215 pass
248 pass
216 elif isinstance(content, unicode):
249 elif isinstance(content, unicode):
217 # should be bytes, but JSON often spits out unicode
250 # should be bytes, but JSON often spits out unicode
218 content = content.encode('utf8')
251 content = content.encode('utf8')
219 else:
252 else:
220 raise TypeError("Content incorrect type: %s"%type(content))
253 raise TypeError("Content incorrect type: %s"%type(content))
221
254
255 real_message = [self.pack(msg['header']),
256 self.pack(msg['parent_header']),
257 content
258 ]
259
222 to_send = []
260 to_send = []
223
261
224 if isinstance(ident, list):
262 if isinstance(ident, list):
225 # accept list of idents
263 # accept list of idents
226 to_send.extend(ident)
264 to_send.extend(ident)
227 elif ident is not None:
265 elif ident is not None:
228 to_send.append(ident)
266 to_send.append(ident)
229 to_send.append(DELIM)
267 to_send.append(DELIM)
230 if self.key:
268
231 to_send.append(self.key)
269 signature = self.sign(real_message)
232 to_send.append(self.pack(msg['header']))
270 to_send.append(signature)
233 to_send.append(self.pack(msg['parent_header']))
271
234 to_send.append(content)
272 to_send.extend(real_message)
235
273
236 return to_send
274 return to_send
237
275
238 def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False):
276 def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None, track=False):
239 """Build and send a message via stream or socket.
277 """Build and send a message via stream or socket.
240
278
241 Parameters
279 Parameters
242 ----------
280 ----------
243
281
244 stream : zmq.Socket or ZMQStream
282 stream : zmq.Socket or ZMQStream
245 the socket-like object used to send the data
283 the socket-like object used to send the data
246 msg_or_type : str or Message/dict
284 msg_or_type : str or Message/dict
247 Normally, msg_or_type will be a msg_type unless a message is being sent more
285 Normally, msg_or_type will be a msg_type unless a message is being sent more
248 than once.
286 than once.
249
287
250 content : dict or None
288 content : dict or None
251 the content of the message (ignored if msg_or_type is a message)
289 the content of the message (ignored if msg_or_type is a message)
252 buffers : list or None
290 buffers : list or None
253 the already-serialized buffers to be appended to the message
291 the already-serialized buffers to be appended to the message
254 parent : Message or dict or None
292 parent : Message or dict or None
255 the parent or parent header describing the parent of this message
293 the parent or parent header describing the parent of this message
256 subheader : dict or None
294 subheader : dict or None
257 extra header keys for this message's header
295 extra header keys for this message's header
258 ident : bytes or list of bytes
296 ident : bytes or list of bytes
259 the zmq.IDENTITY routing path
297 the zmq.IDENTITY routing path
260 track : bool
298 track : bool
261 whether to track. Only for use with Sockets, because ZMQStream objects cannot track messages.
299 whether to track. Only for use with Sockets, because ZMQStream objects cannot track messages.
262
300
263 Returns
301 Returns
264 -------
302 -------
265 msg : message dict
303 msg : message dict
266 the constructed message
304 the constructed message
267 (msg,tracker) : (message dict, MessageTracker)
305 (msg,tracker) : (message dict, MessageTracker)
268 if track=True, then a 2-tuple will be returned, the first element being the constructed
306 if track=True, then a 2-tuple will be returned, the first element being the constructed
269 message, and the second being the MessageTracker
307 message, and the second being the MessageTracker
270
308
271 """
309 """
272
310
273 if not isinstance(stream, (zmq.Socket, ZMQStream)):
311 if not isinstance(stream, (zmq.Socket, ZMQStream)):
274 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
312 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
275 elif track and isinstance(stream, ZMQStream):
313 elif track and isinstance(stream, ZMQStream):
276 raise TypeError("ZMQStream cannot track messages")
314 raise TypeError("ZMQStream cannot track messages")
277
315
278 if isinstance(msg_or_type, (Message, dict)):
316 if isinstance(msg_or_type, (Message, dict)):
279 # we got a Message, not a msg_type
317 # we got a Message, not a msg_type
280 # don't build a new Message
318 # don't build a new Message
281 msg = msg_or_type
319 msg = msg_or_type
282 else:
320 else:
283 msg = self.msg(msg_or_type, content, parent, subheader)
321 msg = self.msg(msg_or_type, content, parent, subheader)
284
322
285 buffers = [] if buffers is None else buffers
323 buffers = [] if buffers is None else buffers
286 to_send = self.serialize(msg, ident)
324 to_send = self.serialize(msg, ident)
287 flag = 0
325 flag = 0
288 if buffers:
326 if buffers:
289 flag = zmq.SNDMORE
327 flag = zmq.SNDMORE
290 _track = False
328 _track = False
291 else:
329 else:
292 _track=track
330 _track=track
293 if track:
331 if track:
294 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
332 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
295 else:
333 else:
296 tracker = stream.send_multipart(to_send, flag, copy=False)
334 tracker = stream.send_multipart(to_send, flag, copy=False)
297 for b in buffers[:-1]:
335 for b in buffers[:-1]:
298 stream.send(b, flag, copy=False)
336 stream.send(b, flag, copy=False)
299 if buffers:
337 if buffers:
300 if track:
338 if track:
301 tracker = stream.send(buffers[-1], copy=False, track=track)
339 tracker = stream.send(buffers[-1], copy=False, track=track)
302 else:
340 else:
303 tracker = stream.send(buffers[-1], copy=False)
341 tracker = stream.send(buffers[-1], copy=False)
304
342
305 # omsg = Message(msg)
343 # omsg = Message(msg)
306 if self.debug:
344 if self.debug:
307 pprint.pprint(msg)
345 pprint.pprint(msg)
308 pprint.pprint(to_send)
346 pprint.pprint(to_send)
309 pprint.pprint(buffers)
347 pprint.pprint(buffers)
310
348
311 msg['tracker'] = tracker
349 msg['tracker'] = tracker
312
350
313 return msg
351 return msg
314
352
315 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
353 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
316 """Send a raw message via ident path.
354 """Send a raw message via ident path.
317
355
318 Parameters
356 Parameters
319 ----------
357 ----------
320 msg : list of sendable buffers"""
358 msg : list of sendable buffers"""
321 to_send = []
359 to_send = []
322 if isinstance(ident, bytes):
360 if isinstance(ident, bytes):
323 ident = [ident]
361 ident = [ident]
324 if ident is not None:
362 if ident is not None:
325 to_send.extend(ident)
363 to_send.extend(ident)
364
326 to_send.append(DELIM)
365 to_send.append(DELIM)
327 if self.key:
366 to_send.append(self.sign(msg))
328 to_send.append(self.key)
329 to_send.extend(msg)
367 to_send.extend(msg)
330 stream.send_multipart(msg, flags, copy=copy)
368 stream.send_multipart(msg, flags, copy=copy)
331
369
332 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
370 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
333 """receives and unpacks a message
371 """receives and unpacks a message
334 returns [idents], msg"""
372 returns [idents], msg"""
335 if isinstance(socket, ZMQStream):
373 if isinstance(socket, ZMQStream):
336 socket = socket.socket
374 socket = socket.socket
337 try:
375 try:
338 msg = socket.recv_multipart(mode, copy=copy)
376 msg = socket.recv_multipart(mode, copy=copy)
339 except zmq.ZMQError as e:
377 except zmq.ZMQError as e:
340 if e.errno == zmq.EAGAIN:
378 if e.errno == zmq.EAGAIN:
341 # We can convert EAGAIN to None as we know in this case
379 # We can convert EAGAIN to None as we know in this case
342 # recv_multipart won't return None.
380 # recv_multipart won't return None.
343 return None
381 return None
344 else:
382 else:
345 raise
383 raise
346 # return an actual Message object
384 # return an actual Message object
347 # determine the number of idents by trying to unpack them.
385 # determine the number of idents by trying to unpack them.
348 # this is terrible:
386 # this is terrible:
349 idents, msg = self.feed_identities(msg, copy)
387 idents, msg = self.feed_identities(msg, copy)
350 try:
388 try:
351 return idents, self.unpack_message(msg, content=content, copy=copy)
389 return idents, self.unpack_message(msg, content=content, copy=copy)
352 except Exception as e:
390 except Exception as e:
353 print (idents, msg)
391 print (idents, msg)
354 # TODO: handle it
392 # TODO: handle it
355 raise e
393 raise e
356
394
357 def feed_identities(self, msg, copy=True):
395 def feed_identities(self, msg, copy=True):
358 """feed until DELIM is reached, then return the prefix as idents and remainder as
396 """feed until DELIM is reached, then return the prefix as idents and remainder as
359 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
397 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
360
398
361 Parameters
399 Parameters
362 ----------
400 ----------
363 msg : a list of Message or bytes objects
401 msg : a list of Message or bytes objects
364 the message to be split
402 the message to be split
365 copy : bool
403 copy : bool
366 flag determining whether the arguments are bytes or Messages
404 flag determining whether the arguments are bytes or Messages
367
405
368 Returns
406 Returns
369 -------
407 -------
370 (idents,msg) : two lists
408 (idents,msg) : two lists
371 idents will always be a list of bytes - the indentity prefix
409 idents will always be a list of bytes - the indentity prefix
372 msg will be a list of bytes or Messages, unchanged from input
410 msg will be a list of bytes or Messages, unchanged from input
373 msg should be unpackable via self.unpack_message at this point.
411 msg should be unpackable via self.unpack_message at this point.
374 """
412 """
375 ikey = int(self.key != '')
413 if copy:
376 minlen = 3 + ikey
414 idx = msg.index(DELIM)
377 msg = list(msg)
415 return msg[:idx], msg[idx+1:]
378 idents = []
416 else:
379 while len(msg) > minlen:
417 failed = True
380 if copy:
418 for idx,m in enumerate(msg):
381 s = msg[0]
419 if m.bytes == DELIM:
382 else:
420 failed = False
383 s = msg[0].bytes
421 break
384 if s == DELIM:
422 if failed:
385 msg.pop(0)
423 raise ValueError("DELIM not in msg")
386 break
424 idents, msg = msg[:idx], msg[idx+1:]
387 else:
425 return [m.bytes for m in idents], msg
388 idents.append(s)
389 msg.pop(0)
390
391 return idents, msg
392
426
393 def unpack_message(self, msg, content=True, copy=True):
427 def unpack_message(self, msg, content=True, copy=True):
394 """Return a message object from the format
428 """Return a message object from the format
395 sent by self.send.
429 sent by self.send.
396
430
397 Parameters:
431 Parameters:
398 -----------
432 -----------
399
433
400 content : bool (True)
434 content : bool (True)
401 whether to unpack the content dict (True),
435 whether to unpack the content dict (True),
402 or leave it serialized (False)
436 or leave it serialized (False)
403
437
404 copy : bool (True)
438 copy : bool (True)
405 whether to return the bytes (True),
439 whether to return the bytes (True),
406 or the non-copying Message object in each place (False)
440 or the non-copying Message object in each place (False)
407
441
408 """
442 """
409 ikey = int(self.key != '')
443 minlen = 4
410 minlen = 3 + ikey
411 message = {}
444 message = {}
412 if not copy:
445 if not copy:
413 for i in range(minlen):
446 for i in range(minlen):
414 msg[i] = msg[i].bytes
447 msg[i] = msg[i].bytes
415 if ikey:
448 if self.auth is not None:
416 if not self.key == msg[0]:
449 signature = msg[0]
417 raise KeyError("Invalid Session Key: %s"%msg[0])
450 if signature in self.digest_history:
451 raise ValueError("Duplicate Signature: %r"%signature)
452 self.digest_history.add(signature)
453 check = self.sign(msg[1:4])
454 if not signature == check:
455 raise ValueError("Invalid Signature: %r"%signature)
418 if not len(msg) >= minlen:
456 if not len(msg) >= minlen:
419 raise TypeError("malformed message, must have at least %i elements"%minlen)
457 raise TypeError("malformed message, must have at least %i elements"%minlen)
420 message['header'] = self.unpack(msg[ikey+0])
458 message['header'] = self.unpack(msg[1])
421 message['msg_type'] = message['header']['msg_type']
459 message['msg_type'] = message['header']['msg_type']
422 message['parent_header'] = self.unpack(msg[ikey+1])
460 message['parent_header'] = self.unpack(msg[2])
423 if content:
461 if content:
424 message['content'] = self.unpack(msg[ikey+2])
462 message['content'] = self.unpack(msg[3])
425 else:
463 else:
426 message['content'] = msg[ikey+2]
464 message['content'] = msg[3]
427
465
428 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
466 message['buffers'] = msg[4:]
429 return message
467 return message
430
431
468
432 def test_msg2obj():
469 def test_msg2obj():
433 am = dict(x=1)
470 am = dict(x=1)
434 ao = Message(am)
471 ao = Message(am)
435 assert ao.x == am['x']
472 assert ao.x == am['x']
436
473
437 am['y'] = dict(z=1)
474 am['y'] = dict(z=1)
438 ao = Message(am)
475 ao = Message(am)
439 assert ao.y.z == am['y']['z']
476 assert ao.y.z == am['y']['z']
440
477
441 k1, k2 = 'y', 'z'
478 k1, k2 = 'y', 'z'
442 assert ao[k1][k2] == am[k1][k2]
479 assert ao[k1][k2] == am[k1][k2]
443
480
444 am2 = dict(ao)
481 am2 = dict(ao)
445 assert am['x'] == am2['x']
482 assert am['x'] == am2['x']
446 assert am['y']['z'] == am2['y']['z']
483 assert am['y']['z'] == am2['y']['z']
@@ -1,324 +1,324 b''
1 .. _parallelsecurity:
1 .. _parallelsecurity:
2
2
3 ===========================
3 ===========================
4 Security details of IPython
4 Security details of IPython
5 ===========================
5 ===========================
6
6
7 .. note::
7 .. note::
8
8
9 This section is not thorough, and IPython.zmq needs a thorough security
9 This section is not thorough, and IPython.zmq needs a thorough security
10 audit.
10 audit.
11
11
12 IPython's :mod:`IPython.zmq` package exposes the full power of the
12 IPython's :mod:`IPython.zmq` package exposes the full power of the
13 Python interpreter over a TCP/IP network for the purposes of parallel
13 Python interpreter over a TCP/IP network for the purposes of parallel
14 computing. This feature brings up the important question of IPython's security
14 computing. This feature brings up the important question of IPython's security
15 model. This document gives details about this model and how it is implemented
15 model. This document gives details about this model and how it is implemented
16 in IPython's architecture.
16 in IPython's architecture.
17
17
18 Processs and network topology
18 Process and network topology
19 =============================
19 ============================
20
20
21 To enable parallel computing, IPython has a number of different processes that
21 To enable parallel computing, IPython has a number of different processes that
22 run. These processes are discussed at length in the IPython documentation and
22 run. These processes are discussed at length in the IPython documentation and
23 are summarized here:
23 are summarized here:
24
24
25 * The IPython *engine*. This process is a full blown Python
25 * The IPython *engine*. This process is a full blown Python
26 interpreter in which user code is executed. Multiple
26 interpreter in which user code is executed. Multiple
27 engines are started to make parallel computing possible.
27 engines are started to make parallel computing possible.
28 * The IPython *hub*. This process monitors a set of
28 * The IPython *hub*. This process monitors a set of
29 engines and schedulers, and keeps track of the state of the processes. It listens
29 engines and schedulers, and keeps track of the state of the processes. It listens
30 for registration connections from engines and clients, and monitor connections
30 for registration connections from engines and clients, and monitor connections
31 from schedulers.
31 from schedulers.
32 * The IPython *schedulers*. This is a set of processes that relay commands and results
32 * The IPython *schedulers*. This is a set of processes that relay commands and results
33 between clients and engines. They are typically on the same machine as the controller,
33 between clients and engines. They are typically on the same machine as the controller,
34 and listen for connections from engines and clients, but connect to the Hub.
34 and listen for connections from engines and clients, but connect to the Hub.
35 * The IPython *client*. This process is typically an
35 * The IPython *client*. This process is typically an
36 interactive Python process that is used to coordinate the
36 interactive Python process that is used to coordinate the
37 engines to get a parallel computation done.
37 engines to get a parallel computation done.
38
38
39 Collectively, these processes are called the IPython *kernel*, and the hub and schedulers
39 Collectively, these processes are called the IPython *cluster*, and the hub and schedulers
40 together are referred to as the *controller*.
40 together are referred to as the *controller*.
41
41
42 .. note::
43
44 Are these really still referred to as the Kernel? It doesn't seem so to me. 'cluster'
45 seems more accurate.
46
47 -MinRK
48
42
49 These processes communicate over any transport supported by ZeroMQ (tcp,pgm,infiniband,ipc)
43 These processes communicate over any transport supported by ZeroMQ (tcp,pgm,infiniband,ipc)
50 with a well defined topology. The IPython hub and schedulers listen on sockets. Upon
44 with a well defined topology. The IPython hub and schedulers listen on sockets. Upon
51 starting, an engine connects to a hub and registers itself, which then informs the engine
45 starting, an engine connects to a hub and registers itself, which then informs the engine
52 of the connection information for the schedulers, and the engine then connects to the
46 of the connection information for the schedulers, and the engine then connects to the
53 schedulers. These engine/hub and engine/scheduler connections persist for the
47 schedulers. These engine/hub and engine/scheduler connections persist for the
54 lifetime of each engine.
48 lifetime of each engine.
55
49
56 The IPython client also connects to the controller processes using a number of socket
50 The IPython client also connects to the controller processes using a number of socket
57 connections. As of writing, this is one socket per scheduler (4), and 3 connections to the
51 connections. As of writing, this is one socket per scheduler (4), and 3 connections to the
58 hub for a total of 7. These connections persist for the lifetime of the client only.
52 hub for a total of 7. These connections persist for the lifetime of the client only.
59
53
60 A given IPython controller and set of engines engines typically has a relatively
54 A given IPython controller and set of engines engines typically has a relatively
61 short lifetime. Typically this lifetime corresponds to the duration of a single parallel
55 short lifetime. Typically this lifetime corresponds to the duration of a single parallel
62 simulation performed by a single user. Finally, the hub, schedulers, engines, and client
56 simulation performed by a single user. Finally, the hub, schedulers, engines, and client
63 processes typically execute with the permissions of that same user. More specifically, the
57 processes typically execute with the permissions of that same user. More specifically, the
64 controller and engines are *not* executed as root or with any other superuser permissions.
58 controller and engines are *not* executed as root or with any other superuser permissions.
65
59
66 Application logic
60 Application logic
67 =================
61 =================
68
62
69 When running the IPython kernel to perform a parallel computation, a user
63 When running the IPython kernel to perform a parallel computation, a user
70 utilizes the IPython client to send Python commands and data through the
64 utilizes the IPython client to send Python commands and data through the
71 IPython schedulers to the IPython engines, where those commands are executed
65 IPython schedulers to the IPython engines, where those commands are executed
72 and the data processed. The design of IPython ensures that the client is the
66 and the data processed. The design of IPython ensures that the client is the
73 only access point for the capabilities of the engines. That is, the only way
67 only access point for the capabilities of the engines. That is, the only way
74 of addressing the engines is through a client.
68 of addressing the engines is through a client.
75
69
76 A user can utilize the client to instruct the IPython engines to execute
70 A user can utilize the client to instruct the IPython engines to execute
77 arbitrary Python commands. These Python commands can include calls to the
71 arbitrary Python commands. These Python commands can include calls to the
78 system shell, access the filesystem, etc., as required by the user's
72 system shell, access the filesystem, etc., as required by the user's
79 application code. From this perspective, when a user runs an IPython engine on
73 application code. From this perspective, when a user runs an IPython engine on
80 a host, that engine has the same capabilities and permissions as the user
74 a host, that engine has the same capabilities and permissions as the user
81 themselves (as if they were logged onto the engine's host with a terminal).
75 themselves (as if they were logged onto the engine's host with a terminal).
82
76
83 Secure network connections
77 Secure network connections
84 ==========================
78 ==========================
85
79
86 Overview
80 Overview
87 --------
81 --------
88
82
89 ZeroMQ provides exactly no security. For this reason, users of IPython must be very
83 ZeroMQ provides exactly no security. For this reason, users of IPython must be very
90 careful in managing connections, because an open TCP/IP socket presents access to
84 careful in managing connections, because an open TCP/IP socket presents access to
91 arbitrary execution as the user on the engine machines. As a result, the default behavior
85 arbitrary execution as the user on the engine machines. As a result, the default behavior
92 of controller processes is to only listen for clients on the loopback interface, and the
86 of controller processes is to only listen for clients on the loopback interface, and the
93 client must establish SSH tunnels to connect to the controller processes.
87 client must establish SSH tunnels to connect to the controller processes.
94
88
95 .. warning::
89 .. warning::
96
90
97 If the controller's loopback interface is untrusted, then IPython should be considered
91 If the controller's loopback interface is untrusted, then IPython should be considered
98 vulnerable, and this extends to the loopback of all connected clients, which have
92 vulnerable, and this extends to the loopback of all connected clients, which have
99 opened a loopback port that is redirected to the controller's loopback port.
93 opened a loopback port that is redirected to the controller's loopback port.
100
94
101
95
102 SSH
96 SSH
103 ---
97 ---
104
98
105 Since ZeroMQ provides no security, SSH tunnels are the primary source of secure
99 Since ZeroMQ provides no security, SSH tunnels are the primary source of secure
106 connections. A connector file, such as `ipcontroller-client.json`, will contain
100 connections. A connector file, such as `ipcontroller-client.json`, will contain
107 information for connecting to the controller, possibly including the address of an
101 information for connecting to the controller, possibly including the address of an
108 ssh-server through with the client is to tunnel. The Client object then creates tunnels
102 ssh-server through with the client is to tunnel. The Client object then creates tunnels
109 using either [OpenSSH]_ or [Paramiko]_, depending on the platform. If users do not wish to
103 using either [OpenSSH]_ or [Paramiko]_, depending on the platform. If users do not wish to
110 use OpenSSH or Paramiko, or the tunneling utilities are insufficient, then they may
104 use OpenSSH or Paramiko, or the tunneling utilities are insufficient, then they may
111 construct the tunnels themselves, and simply connect clients and engines as if the
105 construct the tunnels themselves, and simply connect clients and engines as if the
112 controller were on loopback on the connecting machine.
106 controller were on loopback on the connecting machine.
113
107
114 .. note::
108 .. note::
115
109
116 There is not currently tunneling available for engines.
110 There is not currently tunneling available for engines.
117
111
118 Authentication
112 Authentication
119 --------------
113 --------------
120
114
121 To protect users of shared machines, an execution key is used to authenticate all messages.
115 To protect users of shared machines, [HMAC]_ digests are used to sign messages, using a
116 shared key.
122
117
123 The Session object that handles the message protocol uses a unique key to verify valid
118 The Session object that handles the message protocol uses a unique key to verify valid
124 messages. This can be any value specified by the user, but the default behavior is a
119 messages. This can be any value specified by the user, but the default behavior is a
125 pseudo-random 128-bit number, as generated by `uuid.uuid4()`. This key is checked on every
120 pseudo-random 128-bit number, as generated by `uuid.uuid4()`. This key is used to
126 message everywhere it is unpacked (Controller, Engine, and Client) to ensure that it came
121 initialize an HMAC object, which digests all messages, and includes that digest as a
127 from an authentic user, and no messages that do not contain this key are acted upon in any
122 signature and part of the message. Every message that is unpacked (on Controller, Engine,
128 way.
123 and Client) will also be digested by the receiver, ensuring that the sender's key is the
129
124 same as the receiver's. No messages that do not contain this key are acted upon in any
130 There is exactly one key per cluster - it must be the same everywhere. Typically, the
125 way. The key itself is never sent over the network.
131 controller creates this key, and stores it in the private connection files
126
127 There is exactly one shared key per cluster - it must be the same everywhere. Typically,
128 the controller creates this key, and stores it in the private connection files
132 `ipython-{engine|client}.json`. These files are typically stored in the
129 `ipython-{engine|client}.json`. These files are typically stored in the
133 `~/.ipython/cluster_<profile>/security` directory, and are maintained as readable only by
130 `~/.ipython/profile_<name>/security` directory, and are maintained as readable only by the
134 the owner, just as is common practice with a user's keys in their `.ssh` directory.
131 owner, just as is common practice with a user's keys in their `.ssh` directory.
135
132
136 .. warning::
133 .. warning::
137
134
138 It is important to note that the key authentication, as emphasized by the use of
135 It is important to note that the key authentication, as emphasized by the use of
139 a uuid rather than generating a key with a cryptographic library, provides a
136 a uuid rather than generating a key with a cryptographic library, provides a
140 defense against *accidental* messages more than it does against malicious attacks.
137 defense against *accidental* messages more than it does against malicious attacks.
141 If loopback is compromised, it would be trivial for an attacker to intercept messages
138 If loopback is compromised, it would be trivial for an attacker to intercept messages
142 and deduce the key, as there is no encryption.
139 and deduce the key, as there is no encryption.
143
140
144
141
145
142
146 Specific security vulnerabilities
143 Specific security vulnerabilities
147 =================================
144 =================================
148
145
149 There are a number of potential security vulnerabilities present in IPython's
146 There are a number of potential security vulnerabilities present in IPython's
150 architecture. In this section we discuss those vulnerabilities and detail how
147 architecture. In this section we discuss those vulnerabilities and detail how
151 the security architecture described above prevents them from being exploited.
148 the security architecture described above prevents them from being exploited.
152
149
153 Unauthorized clients
150 Unauthorized clients
154 --------------------
151 --------------------
155
152
156 The IPython client can instruct the IPython engines to execute arbitrary
153 The IPython client can instruct the IPython engines to execute arbitrary
157 Python code with the permissions of the user who started the engines. If an
154 Python code with the permissions of the user who started the engines. If an
158 attacker were able to connect their own hostile IPython client to the IPython
155 attacker were able to connect their own hostile IPython client to the IPython
159 controller, they could instruct the engines to execute code.
156 controller, they could instruct the engines to execute code.
160
157
161
158
162 On the first level, this attack is prevented by requiring access to the controller's
159 On the first level, this attack is prevented by requiring access to the controller's
163 ports, which are recommended to only be open on loopback if the controller is on an
160 ports, which are recommended to only be open on loopback if the controller is on an
164 untrusted local network. If the attacker does have access to the Controller's ports, then
161 untrusted local network. If the attacker does have access to the Controller's ports, then
165 the attack is prevented by the capabilities based client authentication of the execution
162 the attack is prevented by the capabilities based client authentication of the execution
166 key. The relevant authentication information is encoded into the JSON file that clients
163 key. The relevant authentication information is encoded into the JSON file that clients
167 must present to gain access to the IPython controller. By limiting the distribution of
164 must present to gain access to the IPython controller. By limiting the distribution of
168 those keys, a user can grant access to only authorized persons, just as with SSH keys.
165 those keys, a user can grant access to only authorized persons, just as with SSH keys.
169
166
170 It is highly unlikely that an execution key could be guessed by an attacker
167 It is highly unlikely that an execution key could be guessed by an attacker
171 in a brute force guessing attack. A given instance of the IPython controller
168 in a brute force guessing attack. A given instance of the IPython controller
172 only runs for a relatively short amount of time (on the order of hours). Thus
169 only runs for a relatively short amount of time (on the order of hours). Thus
173 an attacker would have only a limited amount of time to test a search space of
170 an attacker would have only a limited amount of time to test a search space of
174 size 2**128.
171 size 2**128. For added security, users can have arbitrarily long keys.
175
172
176 .. warning::
173 .. warning::
177
174
178 If the attacker has gained enough access to intercept loopback connections on
175 If the attacker has gained enough access to intercept loopback connections on *either* the
179 *either* the controller or client, then the key is easily deduced from network
176 controller or client, then a duplicate message can be sent. To protect against this,
180 traffic.
177 recipients only allow each signature once, and consider duplicates invalid. However,
178 the duplicate message could be sent to *another* recipient using the same key,
179 and it would be considered valid.
181
180
182
181
183 Unauthorized engines
182 Unauthorized engines
184 --------------------
183 --------------------
185
184
186 If an attacker were able to connect a hostile engine to a user's controller,
185 If an attacker were able to connect a hostile engine to a user's controller,
187 the user might unknowingly send sensitive code or data to the hostile engine.
186 the user might unknowingly send sensitive code or data to the hostile engine.
188 This attacker's engine would then have full access to that code and data.
187 This attacker's engine would then have full access to that code and data.
189
188
190 This type of attack is prevented in the same way as the unauthorized client
189 This type of attack is prevented in the same way as the unauthorized client
191 attack, through the usage of the capabilities based authentication scheme.
190 attack, through the usage of the capabilities based authentication scheme.
192
191
193 Unauthorized controllers
192 Unauthorized controllers
194 ------------------------
193 ------------------------
195
194
196 It is also possible that an attacker could try to convince a user's IPython
195 It is also possible that an attacker could try to convince a user's IPython
197 client or engine to connect to a hostile IPython controller. That controller
196 client or engine to connect to a hostile IPython controller. That controller
198 would then have full access to the code and data sent between the IPython
197 would then have full access to the code and data sent between the IPython
199 client and the IPython engines.
198 client and the IPython engines.
200
199
201 Again, this attack is prevented through the capabilities in a connection file, which
200 Again, this attack is prevented through the capabilities in a connection file, which
202 ensure that a client or engine connects to the correct controller. It is also important to
201 ensure that a client or engine connects to the correct controller. It is also important to
203 note that the connection files also encode the IP address and port that the controller is
202 note that the connection files also encode the IP address and port that the controller is
204 listening on, so there is little chance of mistakenly connecting to a controller running
203 listening on, so there is little chance of mistakenly connecting to a controller running
205 on a different IP address and port.
204 on a different IP address and port.
206
205
207 When starting an engine or client, a user must specify the key to use
206 When starting an engine or client, a user must specify the key to use
208 for that connection. Thus, in order to introduce a hostile controller, the
207 for that connection. Thus, in order to introduce a hostile controller, the
209 attacker must convince the user to use the key associated with the
208 attacker must convince the user to use the key associated with the
210 hostile controller. As long as a user is diligent in only using keys from
209 hostile controller. As long as a user is diligent in only using keys from
211 trusted sources, this attack is not possible.
210 trusted sources, this attack is not possible.
212
211
213 .. note::
212 .. note::
214
213
215 I may be wrong, the unauthorized controller may be easier to fake than this.
214 I may be wrong, the unauthorized controller may be easier to fake than this.
216
215
217 Other security measures
216 Other security measures
218 =======================
217 =======================
219
218
220 A number of other measures are taken to further limit the security risks
219 A number of other measures are taken to further limit the security risks
221 involved in running the IPython kernel.
220 involved in running the IPython kernel.
222
221
223 First, by default, the IPython controller listens on random port numbers.
222 First, by default, the IPython controller listens on random port numbers.
224 While this can be overridden by the user, in the default configuration, an
223 While this can be overridden by the user, in the default configuration, an
225 attacker would have to do a port scan to even find a controller to attack.
224 attacker would have to do a port scan to even find a controller to attack.
226 When coupled with the relatively short running time of a typical controller
225 When coupled with the relatively short running time of a typical controller
227 (on the order of hours), an attacker would have to work extremely hard and
226 (on the order of hours), an attacker would have to work extremely hard and
228 extremely *fast* to even find a running controller to attack.
227 extremely *fast* to even find a running controller to attack.
229
228
230 Second, much of the time, especially when run on supercomputers or clusters,
229 Second, much of the time, especially when run on supercomputers or clusters,
231 the controller is running behind a firewall. Thus, for engines or client to
230 the controller is running behind a firewall. Thus, for engines or client to
232 connect to the controller:
231 connect to the controller:
233
232
234 * The different processes have to all be behind the firewall.
233 * The different processes have to all be behind the firewall.
235
234
236 or:
235 or:
237
236
238 * The user has to use SSH port forwarding to tunnel the
237 * The user has to use SSH port forwarding to tunnel the
239 connections through the firewall.
238 connections through the firewall.
240
239
241 In either case, an attacker is presented with additional barriers that prevent
240 In either case, an attacker is presented with additional barriers that prevent
242 attacking or even probing the system.
241 attacking or even probing the system.
243
242
244 Summary
243 Summary
245 =======
244 =======
246
245
247 IPython's architecture has been carefully designed with security in mind. The
246 IPython's architecture has been carefully designed with security in mind. The
248 capabilities based authentication model, in conjunction with SSH tunneled
247 capabilities based authentication model, in conjunction with SSH tunneled
249 TCP/IP channels, address the core potential vulnerabilities in the system,
248 TCP/IP channels, address the core potential vulnerabilities in the system,
250 while still enabling user's to use the system in open networks.
249 while still enabling user's to use the system in open networks.
251
250
252 Other questions
251 Other questions
253 ===============
252 ===============
254
253
255 .. note::
254 .. note::
256
255
257 this does not apply to ZMQ, but I am sure there will be questions.
256 this does not apply to ZMQ, but I am sure there will be questions.
258
257
259 About keys
258 About keys
260 ----------
259 ----------
261
260
262 Can you clarify the roles of the certificate and its keys versus the FURL,
261 Can you clarify the roles of the certificate and its keys versus the FURL,
263 which is also called a key?
262 which is also called a key?
264
263
265 The certificate created by IPython processes is a standard public key x509
264 The certificate created by IPython processes is a standard public key x509
266 certificate, that is used by the SSL handshake protocol to setup encrypted
265 certificate, that is used by the SSL handshake protocol to setup encrypted
267 channel between the controller and the IPython engine or client. This public
266 channel between the controller and the IPython engine or client. This public
268 and private key associated with this certificate are used only by the SSL
267 and private key associated with this certificate are used only by the SSL
269 handshake protocol in setting up this encrypted channel.
268 handshake protocol in setting up this encrypted channel.
270
269
271 The FURL serves a completely different and independent purpose from the
270 The FURL serves a completely different and independent purpose from the
272 key pair associated with the certificate. When we refer to a FURL as a
271 key pair associated with the certificate. When we refer to a FURL as a
273 key, we are using the word "key" in the capabilities based security model
272 key, we are using the word "key" in the capabilities based security model
274 sense. This has nothing to do with "key" in the public/private key sense used
273 sense. This has nothing to do with "key" in the public/private key sense used
275 in the SSL protocol.
274 in the SSL protocol.
276
275
277 With that said the FURL is used as an cryptographic key, to grant
276 With that said the FURL is used as an cryptographic key, to grant
278 IPython engines and clients access to particular capabilities that the
277 IPython engines and clients access to particular capabilities that the
279 controller offers.
278 controller offers.
280
279
281 Self signed certificates
280 Self signed certificates
282 ------------------------
281 ------------------------
283
282
284 Is the controller creating a self-signed certificate? Is this created for per
283 Is the controller creating a self-signed certificate? Is this created for per
285 instance/session, one-time-setup or each-time the controller is started?
284 instance/session, one-time-setup or each-time the controller is started?
286
285
287 The Foolscap network protocol, which handles the SSL protocol details, creates
286 The Foolscap network protocol, which handles the SSL protocol details, creates
288 a self-signed x509 certificate using OpenSSL for each IPython process. The
287 a self-signed x509 certificate using OpenSSL for each IPython process. The
289 lifetime of the certificate is handled differently for the IPython controller
288 lifetime of the certificate is handled differently for the IPython controller
290 and the engines/client.
289 and the engines/client.
291
290
292 For the IPython engines and client, the certificate is only held in memory for
291 For the IPython engines and client, the certificate is only held in memory for
293 the lifetime of its process. It is never written to disk.
292 the lifetime of its process. It is never written to disk.
294
293
295 For the controller, the certificate can be created anew each time the
294 For the controller, the certificate can be created anew each time the
296 controller starts or it can be created once and reused each time the
295 controller starts or it can be created once and reused each time the
297 controller starts. If at any point, the certificate is deleted, a new one is
296 controller starts. If at any point, the certificate is deleted, a new one is
298 created the next time the controller starts.
297 created the next time the controller starts.
299
298
300 SSL private key
299 SSL private key
301 ---------------
300 ---------------
302
301
303 How the private key (associated with the certificate) is distributed?
302 How the private key (associated with the certificate) is distributed?
304
303
305 In the usual implementation of the SSL protocol, the private key is never
304 In the usual implementation of the SSL protocol, the private key is never
306 distributed. We follow this standard always.
305 distributed. We follow this standard always.
307
306
308 SSL versus Foolscap authentication
307 SSL versus Foolscap authentication
309 ----------------------------------
308 ----------------------------------
310
309
311 Many SSL connections only perform one sided authentication (the server to the
310 Many SSL connections only perform one sided authentication (the server to the
312 client). How is the client authentication in IPython's system related to SSL
311 client). How is the client authentication in IPython's system related to SSL
313 authentication?
312 authentication?
314
313
315 We perform a two way SSL handshake in which both parties request and verify
314 We perform a two way SSL handshake in which both parties request and verify
316 the certificate of their peer. This mutual authentication is handled by the
315 the certificate of their peer. This mutual authentication is handled by the
317 SSL handshake and is separate and independent from the additional
316 SSL handshake and is separate and independent from the additional
318 authentication steps that the CLIENT and SERVER perform after an encrypted
317 authentication steps that the CLIENT and SERVER perform after an encrypted
319 channel is established.
318 channel is established.
320
319
321 .. [RFC5246] <http://tools.ietf.org/html/rfc5246>
320 .. [RFC5246] <http://tools.ietf.org/html/rfc5246>
322
321
323 .. [OpenSSH] <http://www.openssh.com/>
322 .. [OpenSSH] <http://www.openssh.com/>
324 .. [Paramiko] <http://www.lag.net/paramiko/>
323 .. [Paramiko] <http://www.lag.net/paramiko/>
324 .. [HMAC] <http://tools.ietf.org/html/rfc2104.html>
General Comments 0
You need to be logged in to leave comments. Login now