##// END OF EJS Templates
set unlimited HWM for all relay devices...
MinRK -
Show More
@@ -1,537 +1,554 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import json
26 import json
27 import os
27 import os
28 import stat
28 import stat
29 import sys
29 import sys
30
30
31 from multiprocessing import Process
31 from multiprocessing import Process
32 from signal import signal, SIGINT, SIGABRT, SIGTERM
32 from signal import signal, SIGINT, SIGABRT, SIGTERM
33
33
34 import zmq
34 import zmq
35 from zmq.devices import ProcessMonitoredQueue
35 from zmq.devices import ProcessMonitoredQueue
36 from zmq.log.handlers import PUBHandler
36 from zmq.log.handlers import PUBHandler
37
37
38 from IPython.core.profiledir import ProfileDir
38 from IPython.core.profiledir import ProfileDir
39
39
40 from IPython.parallel.apps.baseapp import (
40 from IPython.parallel.apps.baseapp import (
41 BaseParallelApplication,
41 BaseParallelApplication,
42 base_aliases,
42 base_aliases,
43 base_flags,
43 base_flags,
44 catch_config_error,
44 catch_config_error,
45 )
45 )
46 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.localinterfaces import LOCALHOST, PUBLIC_IPS
47 from IPython.utils.localinterfaces import LOCALHOST, PUBLIC_IPS
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
49
49
50 from IPython.kernel.zmq.session import (
50 from IPython.kernel.zmq.session import (
51 Session, session_aliases, session_flags, default_secure
51 Session, session_aliases, session_flags, default_secure
52 )
52 )
53
53
54 from IPython.parallel.controller.heartmonitor import HeartMonitor
54 from IPython.parallel.controller.heartmonitor import HeartMonitor
55 from IPython.parallel.controller.hub import HubFactory
55 from IPython.parallel.controller.hub import HubFactory
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 from IPython.parallel.controller.dictdb import DictDB
57 from IPython.parallel.controller.dictdb import DictDB
58
58
59 from IPython.parallel.util import split_url, disambiguate_url
59 from IPython.parallel.util import split_url, disambiguate_url, set_hwm
60
60
61 # conditional import of SQLiteDB / MongoDB backend class
61 # conditional import of SQLiteDB / MongoDB backend class
62 real_dbs = []
62 real_dbs = []
63
63
64 try:
64 try:
65 from IPython.parallel.controller.sqlitedb import SQLiteDB
65 from IPython.parallel.controller.sqlitedb import SQLiteDB
66 except ImportError:
66 except ImportError:
67 pass
67 pass
68 else:
68 else:
69 real_dbs.append(SQLiteDB)
69 real_dbs.append(SQLiteDB)
70
70
71 try:
71 try:
72 from IPython.parallel.controller.mongodb import MongoDB
72 from IPython.parallel.controller.mongodb import MongoDB
73 except ImportError:
73 except ImportError:
74 pass
74 pass
75 else:
75 else:
76 real_dbs.append(MongoDB)
76 real_dbs.append(MongoDB)
77
77
78
78
79
79
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81 # Module level variables
81 # Module level variables
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83
83
84
84
85 #: The default config file name for this application
85 #: The default config file name for this application
86 default_config_file_name = u'ipcontroller_config.py'
86 default_config_file_name = u'ipcontroller_config.py'
87
87
88
88
89 _description = """Start the IPython controller for parallel computing.
89 _description = """Start the IPython controller for parallel computing.
90
90
91 The IPython controller provides a gateway between the IPython engines and
91 The IPython controller provides a gateway between the IPython engines and
92 clients. The controller needs to be started before the engines and can be
92 clients. The controller needs to be started before the engines and can be
93 configured using command line options or using a cluster directory. Cluster
93 configured using command line options or using a cluster directory. Cluster
94 directories contain config, log and security files and are usually located in
94 directories contain config, log and security files and are usually located in
95 your ipython directory and named as "profile_name". See the `profile`
95 your ipython directory and named as "profile_name". See the `profile`
96 and `profile-dir` options for details.
96 and `profile-dir` options for details.
97 """
97 """
98
98
99 _examples = """
99 _examples = """
100 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
100 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
101 ipcontroller --scheme=pure # use the pure zeromq scheduler
101 ipcontroller --scheme=pure # use the pure zeromq scheduler
102 """
102 """
103
103
104
104
105 #-----------------------------------------------------------------------------
105 #-----------------------------------------------------------------------------
106 # The main application
106 # The main application
107 #-----------------------------------------------------------------------------
107 #-----------------------------------------------------------------------------
108 flags = {}
108 flags = {}
109 flags.update(base_flags)
109 flags.update(base_flags)
110 flags.update({
110 flags.update({
111 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
111 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
112 'Use threads instead of processes for the schedulers'),
112 'Use threads instead of processes for the schedulers'),
113 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
113 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
114 'use the SQLiteDB backend'),
114 'use the SQLiteDB backend'),
115 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
115 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
116 'use the MongoDB backend'),
116 'use the MongoDB backend'),
117 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
117 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
118 'use the in-memory DictDB backend'),
118 'use the in-memory DictDB backend'),
119 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
119 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
120 """use dummy DB backend, which doesn't store any information.
120 """use dummy DB backend, which doesn't store any information.
121
121
122 This is the default as of IPython 0.13.
122 This is the default as of IPython 0.13.
123
123
124 To enable delayed or repeated retrieval of results from the Hub,
124 To enable delayed or repeated retrieval of results from the Hub,
125 select one of the true db backends.
125 select one of the true db backends.
126 """),
126 """),
127 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
127 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
128 'reuse existing json connection files'),
128 'reuse existing json connection files'),
129 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
129 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
130 'Attempt to restore engines from a JSON file. '
130 'Attempt to restore engines from a JSON file. '
131 'For use when resuming a crashed controller'),
131 'For use when resuming a crashed controller'),
132 })
132 })
133
133
134 flags.update(session_flags)
134 flags.update(session_flags)
135
135
136 aliases = dict(
136 aliases = dict(
137 ssh = 'IPControllerApp.ssh_server',
137 ssh = 'IPControllerApp.ssh_server',
138 enginessh = 'IPControllerApp.engine_ssh_server',
138 enginessh = 'IPControllerApp.engine_ssh_server',
139 location = 'IPControllerApp.location',
139 location = 'IPControllerApp.location',
140
140
141 url = 'HubFactory.url',
141 url = 'HubFactory.url',
142 ip = 'HubFactory.ip',
142 ip = 'HubFactory.ip',
143 transport = 'HubFactory.transport',
143 transport = 'HubFactory.transport',
144 port = 'HubFactory.regport',
144 port = 'HubFactory.regport',
145
145
146 ping = 'HeartMonitor.period',
146 ping = 'HeartMonitor.period',
147
147
148 scheme = 'TaskScheduler.scheme_name',
148 scheme = 'TaskScheduler.scheme_name',
149 hwm = 'TaskScheduler.hwm',
149 hwm = 'TaskScheduler.hwm',
150 )
150 )
151 aliases.update(base_aliases)
151 aliases.update(base_aliases)
152 aliases.update(session_aliases)
152 aliases.update(session_aliases)
153
153
154 class IPControllerApp(BaseParallelApplication):
154 class IPControllerApp(BaseParallelApplication):
155
155
156 name = u'ipcontroller'
156 name = u'ipcontroller'
157 description = _description
157 description = _description
158 examples = _examples
158 examples = _examples
159 config_file_name = Unicode(default_config_file_name)
159 config_file_name = Unicode(default_config_file_name)
160 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, DictDB] + real_dbs
160 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, DictDB] + real_dbs
161
161
162 # change default to True
162 # change default to True
163 auto_create = Bool(True, config=True,
163 auto_create = Bool(True, config=True,
164 help="""Whether to create profile dir if it doesn't exist.""")
164 help="""Whether to create profile dir if it doesn't exist.""")
165
165
166 reuse_files = Bool(False, config=True,
166 reuse_files = Bool(False, config=True,
167 help="""Whether to reuse existing json connection files.
167 help="""Whether to reuse existing json connection files.
168 If False, connection files will be removed on a clean exit.
168 If False, connection files will be removed on a clean exit.
169 """
169 """
170 )
170 )
171 restore_engines = Bool(False, config=True,
171 restore_engines = Bool(False, config=True,
172 help="""Reload engine state from JSON file
172 help="""Reload engine state from JSON file
173 """
173 """
174 )
174 )
175 ssh_server = Unicode(u'', config=True,
175 ssh_server = Unicode(u'', config=True,
176 help="""ssh url for clients to use when connecting to the Controller
176 help="""ssh url for clients to use when connecting to the Controller
177 processes. It should be of the form: [user@]server[:port]. The
177 processes. It should be of the form: [user@]server[:port]. The
178 Controller's listening addresses must be accessible from the ssh server""",
178 Controller's listening addresses must be accessible from the ssh server""",
179 )
179 )
180 engine_ssh_server = Unicode(u'', config=True,
180 engine_ssh_server = Unicode(u'', config=True,
181 help="""ssh url for engines to use when connecting to the Controller
181 help="""ssh url for engines to use when connecting to the Controller
182 processes. It should be of the form: [user@]server[:port]. The
182 processes. It should be of the form: [user@]server[:port]. The
183 Controller's listening addresses must be accessible from the ssh server""",
183 Controller's listening addresses must be accessible from the ssh server""",
184 )
184 )
185 location = Unicode(u'', config=True,
185 location = Unicode(u'', config=True,
186 help="""The external IP or domain name of the Controller, used for disambiguating
186 help="""The external IP or domain name of the Controller, used for disambiguating
187 engine and client connections.""",
187 engine and client connections.""",
188 )
188 )
189 import_statements = List([], config=True,
189 import_statements = List([], config=True,
190 help="import statements to be run at startup. Necessary in some environments"
190 help="import statements to be run at startup. Necessary in some environments"
191 )
191 )
192
192
193 use_threads = Bool(False, config=True,
193 use_threads = Bool(False, config=True,
194 help='Use threads instead of processes for the schedulers',
194 help='Use threads instead of processes for the schedulers',
195 )
195 )
196
196
197 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
197 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
198 help="JSON filename where engine connection info will be stored.")
198 help="JSON filename where engine connection info will be stored.")
199 client_json_file = Unicode('ipcontroller-client.json', config=True,
199 client_json_file = Unicode('ipcontroller-client.json', config=True,
200 help="JSON filename where client connection info will be stored.")
200 help="JSON filename where client connection info will be stored.")
201
201
202 def _cluster_id_changed(self, name, old, new):
202 def _cluster_id_changed(self, name, old, new):
203 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
203 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
204 self.engine_json_file = "%s-engine.json" % self.name
204 self.engine_json_file = "%s-engine.json" % self.name
205 self.client_json_file = "%s-client.json" % self.name
205 self.client_json_file = "%s-client.json" % self.name
206
206
207
207
208 # internal
208 # internal
209 children = List()
209 children = List()
210 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
210 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
211
211
212 def _use_threads_changed(self, name, old, new):
212 def _use_threads_changed(self, name, old, new):
213 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
213 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
214
214
215 write_connection_files = Bool(True,
215 write_connection_files = Bool(True,
216 help="""Whether to write connection files to disk.
216 help="""Whether to write connection files to disk.
217 True in all cases other than runs with `reuse_files=True` *after the first*
217 True in all cases other than runs with `reuse_files=True` *after the first*
218 """
218 """
219 )
219 )
220
220
221 aliases = Dict(aliases)
221 aliases = Dict(aliases)
222 flags = Dict(flags)
222 flags = Dict(flags)
223
223
224
224
225 def save_connection_dict(self, fname, cdict):
225 def save_connection_dict(self, fname, cdict):
226 """save a connection dict to json file."""
226 """save a connection dict to json file."""
227 c = self.config
227 c = self.config
228 url = cdict['registration']
228 url = cdict['registration']
229 location = cdict['location']
229 location = cdict['location']
230
230
231 if not location:
231 if not location:
232 if PUBLIC_IPS:
232 if PUBLIC_IPS:
233 location = PUBLIC_IPS[-1]
233 location = PUBLIC_IPS[-1]
234 else:
234 else:
235 self.log.warn("Could not identify this machine's IP, assuming %s."
235 self.log.warn("Could not identify this machine's IP, assuming %s."
236 " You may need to specify '--location=<external_ip_address>' to help"
236 " You may need to specify '--location=<external_ip_address>' to help"
237 " IPython decide when to connect via loopback." % LOCALHOST)
237 " IPython decide when to connect via loopback." % LOCALHOST)
238 location = LOCALHOST
238 location = LOCALHOST
239 cdict['location'] = location
239 cdict['location'] = location
240 fname = os.path.join(self.profile_dir.security_dir, fname)
240 fname = os.path.join(self.profile_dir.security_dir, fname)
241 self.log.info("writing connection info to %s", fname)
241 self.log.info("writing connection info to %s", fname)
242 with open(fname, 'w') as f:
242 with open(fname, 'w') as f:
243 f.write(json.dumps(cdict, indent=2))
243 f.write(json.dumps(cdict, indent=2))
244 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
244 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
245
245
246 def load_config_from_json(self):
246 def load_config_from_json(self):
247 """load config from existing json connector files."""
247 """load config from existing json connector files."""
248 c = self.config
248 c = self.config
249 self.log.debug("loading config from JSON")
249 self.log.debug("loading config from JSON")
250
250
251 # load engine config
251 # load engine config
252
252
253 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
253 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
254 self.log.info("loading connection info from %s", fname)
254 self.log.info("loading connection info from %s", fname)
255 with open(fname) as f:
255 with open(fname) as f:
256 ecfg = json.loads(f.read())
256 ecfg = json.loads(f.read())
257
257
258 # json gives unicode, Session.key wants bytes
258 # json gives unicode, Session.key wants bytes
259 c.Session.key = ecfg['exec_key'].encode('ascii')
259 c.Session.key = ecfg['exec_key'].encode('ascii')
260
260
261 xport,ip = ecfg['interface'].split('://')
261 xport,ip = ecfg['interface'].split('://')
262
262
263 c.HubFactory.engine_ip = ip
263 c.HubFactory.engine_ip = ip
264 c.HubFactory.engine_transport = xport
264 c.HubFactory.engine_transport = xport
265
265
266 self.location = ecfg['location']
266 self.location = ecfg['location']
267 if not self.engine_ssh_server:
267 if not self.engine_ssh_server:
268 self.engine_ssh_server = ecfg['ssh']
268 self.engine_ssh_server = ecfg['ssh']
269
269
270 # load client config
270 # load client config
271
271
272 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
272 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
273 self.log.info("loading connection info from %s", fname)
273 self.log.info("loading connection info from %s", fname)
274 with open(fname) as f:
274 with open(fname) as f:
275 ccfg = json.loads(f.read())
275 ccfg = json.loads(f.read())
276
276
277 for key in ('exec_key', 'registration', 'pack', 'unpack'):
277 for key in ('exec_key', 'registration', 'pack', 'unpack'):
278 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
278 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
279
279
280 xport,addr = ccfg['interface'].split('://')
280 xport,addr = ccfg['interface'].split('://')
281
281
282 c.HubFactory.client_transport = xport
282 c.HubFactory.client_transport = xport
283 c.HubFactory.client_ip = ip
283 c.HubFactory.client_ip = ip
284 if not self.ssh_server:
284 if not self.ssh_server:
285 self.ssh_server = ccfg['ssh']
285 self.ssh_server = ccfg['ssh']
286
286
287 # load port config:
287 # load port config:
288 c.HubFactory.regport = ecfg['registration']
288 c.HubFactory.regport = ecfg['registration']
289 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
289 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
290 c.HubFactory.control = (ccfg['control'], ecfg['control'])
290 c.HubFactory.control = (ccfg['control'], ecfg['control'])
291 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
291 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
292 c.HubFactory.task = (ccfg['task'], ecfg['task'])
292 c.HubFactory.task = (ccfg['task'], ecfg['task'])
293 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
293 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
294 c.HubFactory.notifier_port = ccfg['notification']
294 c.HubFactory.notifier_port = ccfg['notification']
295
295
296 def cleanup_connection_files(self):
296 def cleanup_connection_files(self):
297 if self.reuse_files:
297 if self.reuse_files:
298 self.log.debug("leaving JSON connection files for reuse")
298 self.log.debug("leaving JSON connection files for reuse")
299 return
299 return
300 self.log.debug("cleaning up JSON connection files")
300 self.log.debug("cleaning up JSON connection files")
301 for f in (self.client_json_file, self.engine_json_file):
301 for f in (self.client_json_file, self.engine_json_file):
302 f = os.path.join(self.profile_dir.security_dir, f)
302 f = os.path.join(self.profile_dir.security_dir, f)
303 try:
303 try:
304 os.remove(f)
304 os.remove(f)
305 except Exception as e:
305 except Exception as e:
306 self.log.error("Failed to cleanup connection file: %s", e)
306 self.log.error("Failed to cleanup connection file: %s", e)
307 else:
307 else:
308 self.log.debug(u"removed %s", f)
308 self.log.debug(u"removed %s", f)
309
309
310 def load_secondary_config(self):
310 def load_secondary_config(self):
311 """secondary config, loading from JSON and setting defaults"""
311 """secondary config, loading from JSON and setting defaults"""
312 if self.reuse_files:
312 if self.reuse_files:
313 try:
313 try:
314 self.load_config_from_json()
314 self.load_config_from_json()
315 except (AssertionError,IOError) as e:
315 except (AssertionError,IOError) as e:
316 self.log.error("Could not load config from JSON: %s" % e)
316 self.log.error("Could not load config from JSON: %s" % e)
317 else:
317 else:
318 # successfully loaded config from JSON, and reuse=True
318 # successfully loaded config from JSON, and reuse=True
319 # no need to wite back the same file
319 # no need to wite back the same file
320 self.write_connection_files = False
320 self.write_connection_files = False
321
321
322 # switch Session.key default to secure
322 # switch Session.key default to secure
323 default_secure(self.config)
323 default_secure(self.config)
324 self.log.debug("Config changed")
324 self.log.debug("Config changed")
325 self.log.debug(repr(self.config))
325 self.log.debug(repr(self.config))
326
326
327 def init_hub(self):
327 def init_hub(self):
328 c = self.config
328 c = self.config
329
329
330 self.do_import_statements()
330 self.do_import_statements()
331
331
332 try:
332 try:
333 self.factory = HubFactory(config=c, log=self.log)
333 self.factory = HubFactory(config=c, log=self.log)
334 # self.start_logging()
334 # self.start_logging()
335 self.factory.init_hub()
335 self.factory.init_hub()
336 except TraitError:
336 except TraitError:
337 raise
337 raise
338 except Exception:
338 except Exception:
339 self.log.error("Couldn't construct the Controller", exc_info=True)
339 self.log.error("Couldn't construct the Controller", exc_info=True)
340 self.exit(1)
340 self.exit(1)
341
341
342 if self.write_connection_files:
342 if self.write_connection_files:
343 # save to new json config files
343 # save to new json config files
344 f = self.factory
344 f = self.factory
345 base = {
345 base = {
346 'exec_key' : f.session.key.decode('ascii'),
346 'exec_key' : f.session.key.decode('ascii'),
347 'location' : self.location,
347 'location' : self.location,
348 'pack' : f.session.packer,
348 'pack' : f.session.packer,
349 'unpack' : f.session.unpacker,
349 'unpack' : f.session.unpacker,
350 }
350 }
351
351
352 cdict = {'ssh' : self.ssh_server}
352 cdict = {'ssh' : self.ssh_server}
353 cdict.update(f.client_info)
353 cdict.update(f.client_info)
354 cdict.update(base)
354 cdict.update(base)
355 self.save_connection_dict(self.client_json_file, cdict)
355 self.save_connection_dict(self.client_json_file, cdict)
356
356
357 edict = {'ssh' : self.engine_ssh_server}
357 edict = {'ssh' : self.engine_ssh_server}
358 edict.update(f.engine_info)
358 edict.update(f.engine_info)
359 edict.update(base)
359 edict.update(base)
360 self.save_connection_dict(self.engine_json_file, edict)
360 self.save_connection_dict(self.engine_json_file, edict)
361
361
362 fname = "engines%s.json" % self.cluster_id
362 fname = "engines%s.json" % self.cluster_id
363 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
363 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
364 if self.restore_engines:
364 if self.restore_engines:
365 self.factory.hub._load_engine_state()
365 self.factory.hub._load_engine_state()
366
366
367 def init_schedulers(self):
367 def init_schedulers(self):
368 children = self.children
368 children = self.children
369 mq = import_item(str(self.mq_class))
369 mq = import_item(str(self.mq_class))
370
370
371 f = self.factory
371 f = self.factory
372 ident = f.session.bsession
372 ident = f.session.bsession
373 # disambiguate url, in case of *
373 # disambiguate url, in case of *
374 monitor_url = disambiguate_url(f.monitor_url)
374 monitor_url = disambiguate_url(f.monitor_url)
375 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
375 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
376 # IOPub relay (in a Process)
376 # IOPub relay (in a Process)
377 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
377 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
378 q.bind_in(f.client_url('iopub'))
378 q.bind_in(f.client_url('iopub'))
379 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
379 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
380 q.bind_out(f.engine_url('iopub'))
380 q.bind_out(f.engine_url('iopub'))
381 q.setsockopt_out(zmq.SUBSCRIBE, b'')
381 q.setsockopt_out(zmq.SUBSCRIBE, b'')
382 q.connect_mon(monitor_url)
382 q.connect_mon(monitor_url)
383 q.daemon=True
383 q.daemon=True
384 children.append(q)
384 children.append(q)
385
385
386 # Multiplexer Queue (in a Process)
386 # Multiplexer Queue (in a Process)
387 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
387 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
388
388 q.bind_in(f.client_url('mux'))
389 q.bind_in(f.client_url('mux'))
389 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
390 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
390 q.bind_out(f.engine_url('mux'))
391 q.bind_out(f.engine_url('mux'))
391 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
392 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
392 q.connect_mon(monitor_url)
393 q.connect_mon(monitor_url)
393 q.daemon=True
394 q.daemon=True
394 children.append(q)
395 children.append(q)
395
396
396 # Control Queue (in a Process)
397 # Control Queue (in a Process)
397 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
398 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
398 q.bind_in(f.client_url('control'))
399 q.bind_in(f.client_url('control'))
399 q.setsockopt_in(zmq.IDENTITY, b'control_in')
400 q.setsockopt_in(zmq.IDENTITY, b'control_in')
400 q.bind_out(f.engine_url('control'))
401 q.bind_out(f.engine_url('control'))
401 q.setsockopt_out(zmq.IDENTITY, b'control_out')
402 q.setsockopt_out(zmq.IDENTITY, b'control_out')
402 q.connect_mon(monitor_url)
403 q.connect_mon(monitor_url)
403 q.daemon=True
404 q.daemon=True
404 children.append(q)
405 children.append(q)
405 try:
406 try:
406 scheme = self.config.TaskScheduler.scheme_name
407 scheme = self.config.TaskScheduler.scheme_name
407 except AttributeError:
408 except AttributeError:
408 scheme = TaskScheduler.scheme_name.get_default_value()
409 scheme = TaskScheduler.scheme_name.get_default_value()
409 # Task Queue (in a Process)
410 # Task Queue (in a Process)
410 if scheme == 'pure':
411 if scheme == 'pure':
411 self.log.warn("task::using pure DEALER Task scheduler")
412 self.log.warn("task::using pure DEALER Task scheduler")
412 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
413 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
413 # q.setsockopt_out(zmq.HWM, hub.hwm)
414 # q.setsockopt_out(zmq.HWM, hub.hwm)
414 q.bind_in(f.client_url('task'))
415 q.bind_in(f.client_url('task'))
415 q.setsockopt_in(zmq.IDENTITY, b'task_in')
416 q.setsockopt_in(zmq.IDENTITY, b'task_in')
416 q.bind_out(f.engine_url('task'))
417 q.bind_out(f.engine_url('task'))
417 q.setsockopt_out(zmq.IDENTITY, b'task_out')
418 q.setsockopt_out(zmq.IDENTITY, b'task_out')
418 q.connect_mon(monitor_url)
419 q.connect_mon(monitor_url)
419 q.daemon=True
420 q.daemon=True
420 children.append(q)
421 children.append(q)
421 elif scheme == 'none':
422 elif scheme == 'none':
422 self.log.warn("task::using no Task scheduler")
423 self.log.warn("task::using no Task scheduler")
423
424
424 else:
425 else:
425 self.log.info("task::using Python %s Task scheduler"%scheme)
426 self.log.info("task::using Python %s Task scheduler"%scheme)
426 sargs = (f.client_url('task'), f.engine_url('task'),
427 sargs = (f.client_url('task'), f.engine_url('task'),
427 monitor_url, disambiguate_url(f.client_url('notification')),
428 monitor_url, disambiguate_url(f.client_url('notification')),
428 disambiguate_url(f.client_url('registration')),
429 disambiguate_url(f.client_url('registration')),
429 )
430 )
430 kwargs = dict(logname='scheduler', loglevel=self.log_level,
431 kwargs = dict(logname='scheduler', loglevel=self.log_level,
431 log_url = self.log_url, config=dict(self.config))
432 log_url = self.log_url, config=dict(self.config))
432 if 'Process' in self.mq_class:
433 if 'Process' in self.mq_class:
433 # run the Python scheduler in a Process
434 # run the Python scheduler in a Process
434 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
435 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
435 q.daemon=True
436 q.daemon=True
436 children.append(q)
437 children.append(q)
437 else:
438 else:
438 # single-threaded Controller
439 # single-threaded Controller
439 kwargs['in_thread'] = True
440 kwargs['in_thread'] = True
440 launch_scheduler(*sargs, **kwargs)
441 launch_scheduler(*sargs, **kwargs)
442
443 # set unlimited HWM for all relay devices
444 if hasattr(zmq, 'SNDHWM'):
445 q = children[0]
446 q.setsockopt_in(zmq.RCVHWM, 0)
447 q.setsockopt_out(zmq.SNDHWM, 0)
448
449 for q in children[1:]:
450 if not hasattr(q, 'setsockopt_in'):
451 continue
452 q.setsockopt_in(zmq.SNDHWM, 0)
453 q.setsockopt_in(zmq.RCVHWM, 0)
454 q.setsockopt_out(zmq.SNDHWM, 0)
455 q.setsockopt_out(zmq.RCVHWM, 0)
456 q.setsockopt_mon(zmq.SNDHWM, 0)
457
441
458
442 def terminate_children(self):
459 def terminate_children(self):
443 child_procs = []
460 child_procs = []
444 for child in self.children:
461 for child in self.children:
445 if isinstance(child, ProcessMonitoredQueue):
462 if isinstance(child, ProcessMonitoredQueue):
446 child_procs.append(child.launcher)
463 child_procs.append(child.launcher)
447 elif isinstance(child, Process):
464 elif isinstance(child, Process):
448 child_procs.append(child)
465 child_procs.append(child)
449 if child_procs:
466 if child_procs:
450 self.log.critical("terminating children...")
467 self.log.critical("terminating children...")
451 for child in child_procs:
468 for child in child_procs:
452 try:
469 try:
453 child.terminate()
470 child.terminate()
454 except OSError:
471 except OSError:
455 # already dead
472 # already dead
456 pass
473 pass
457
474
458 def handle_signal(self, sig, frame):
475 def handle_signal(self, sig, frame):
459 self.log.critical("Received signal %i, shutting down", sig)
476 self.log.critical("Received signal %i, shutting down", sig)
460 self.terminate_children()
477 self.terminate_children()
461 self.loop.stop()
478 self.loop.stop()
462
479
463 def init_signal(self):
480 def init_signal(self):
464 for sig in (SIGINT, SIGABRT, SIGTERM):
481 for sig in (SIGINT, SIGABRT, SIGTERM):
465 signal(sig, self.handle_signal)
482 signal(sig, self.handle_signal)
466
483
467 def do_import_statements(self):
484 def do_import_statements(self):
468 statements = self.import_statements
485 statements = self.import_statements
469 for s in statements:
486 for s in statements:
470 try:
487 try:
471 self.log.msg("Executing statement: '%s'" % s)
488 self.log.msg("Executing statement: '%s'" % s)
472 exec s in globals(), locals()
489 exec s in globals(), locals()
473 except:
490 except:
474 self.log.msg("Error running statement: %s" % s)
491 self.log.msg("Error running statement: %s" % s)
475
492
476 def forward_logging(self):
493 def forward_logging(self):
477 if self.log_url:
494 if self.log_url:
478 self.log.info("Forwarding logging to %s"%self.log_url)
495 self.log.info("Forwarding logging to %s"%self.log_url)
479 context = zmq.Context.instance()
496 context = zmq.Context.instance()
480 lsock = context.socket(zmq.PUB)
497 lsock = context.socket(zmq.PUB)
481 lsock.connect(self.log_url)
498 lsock.connect(self.log_url)
482 handler = PUBHandler(lsock)
499 handler = PUBHandler(lsock)
483 handler.root_topic = 'controller'
500 handler.root_topic = 'controller'
484 handler.setLevel(self.log_level)
501 handler.setLevel(self.log_level)
485 self.log.addHandler(handler)
502 self.log.addHandler(handler)
486
503
487 @catch_config_error
504 @catch_config_error
488 def initialize(self, argv=None):
505 def initialize(self, argv=None):
489 super(IPControllerApp, self).initialize(argv)
506 super(IPControllerApp, self).initialize(argv)
490 self.forward_logging()
507 self.forward_logging()
491 self.load_secondary_config()
508 self.load_secondary_config()
492 self.init_hub()
509 self.init_hub()
493 self.init_schedulers()
510 self.init_schedulers()
494
511
495 def start(self):
512 def start(self):
496 # Start the subprocesses:
513 # Start the subprocesses:
497 self.factory.start()
514 self.factory.start()
498 # children must be started before signals are setup,
515 # children must be started before signals are setup,
499 # otherwise signal-handling will fire multiple times
516 # otherwise signal-handling will fire multiple times
500 for child in self.children:
517 for child in self.children:
501 child.start()
518 child.start()
502 self.init_signal()
519 self.init_signal()
503
520
504 self.write_pid_file(overwrite=True)
521 self.write_pid_file(overwrite=True)
505
522
506 try:
523 try:
507 self.factory.loop.start()
524 self.factory.loop.start()
508 except KeyboardInterrupt:
525 except KeyboardInterrupt:
509 self.log.critical("Interrupted, Exiting...\n")
526 self.log.critical("Interrupted, Exiting...\n")
510 finally:
527 finally:
511 self.cleanup_connection_files()
528 self.cleanup_connection_files()
512
529
513
530
514
531
515 def launch_new_instance():
532 def launch_new_instance():
516 """Create and run the IPython controller"""
533 """Create and run the IPython controller"""
517 if sys.platform == 'win32':
534 if sys.platform == 'win32':
518 # make sure we don't get called from a multiprocessing subprocess
535 # make sure we don't get called from a multiprocessing subprocess
519 # this can result in infinite Controllers being started on Windows
536 # this can result in infinite Controllers being started on Windows
520 # which doesn't have a proper fork, so multiprocessing is wonky
537 # which doesn't have a proper fork, so multiprocessing is wonky
521
538
522 # this only comes up when IPython has been installed using vanilla
539 # this only comes up when IPython has been installed using vanilla
523 # setuptools, and *not* distribute.
540 # setuptools, and *not* distribute.
524 import multiprocessing
541 import multiprocessing
525 p = multiprocessing.current_process()
542 p = multiprocessing.current_process()
526 # the main process has name 'MainProcess'
543 # the main process has name 'MainProcess'
527 # subprocesses will have names like 'Process-1'
544 # subprocesses will have names like 'Process-1'
528 if p.name != 'MainProcess':
545 if p.name != 'MainProcess':
529 # we are a subprocess, don't start another Controller!
546 # we are a subprocess, don't start another Controller!
530 return
547 return
531 app = IPControllerApp.instance()
548 app = IPControllerApp.instance()
532 app.initialize()
549 app.initialize()
533 app.start()
550 app.start()
534
551
535
552
536 if __name__ == '__main__':
553 if __name__ == '__main__':
537 launch_new_instance()
554 launch_new_instance()
@@ -1,1415 +1,1417 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import json
21 import json
22 import os
22 import os
23 import sys
23 import sys
24 import time
24 import time
25 from datetime import datetime
25 from datetime import datetime
26
26
27 import zmq
27 import zmq
28 from zmq.eventloop import ioloop
28 from zmq.eventloop import ioloop
29 from zmq.eventloop.zmqstream import ZMQStream
29 from zmq.eventloop.zmqstream import ZMQStream
30
30
31 # internal:
31 # internal:
32 from IPython.utils.importstring import import_item
32 from IPython.utils.importstring import import_item
33 from IPython.utils.localinterfaces import LOCALHOST
33 from IPython.utils.localinterfaces import LOCALHOST
34 from IPython.utils.py3compat import cast_bytes
34 from IPython.utils.py3compat import cast_bytes
35 from IPython.utils.traitlets import (
35 from IPython.utils.traitlets import (
36 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
36 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
37 )
37 )
38
38
39 from IPython.parallel import error, util
39 from IPython.parallel import error, util
40 from IPython.parallel.factory import RegistrationFactory
40 from IPython.parallel.factory import RegistrationFactory
41
41
42 from IPython.kernel.zmq.session import SessionFactory
42 from IPython.kernel.zmq.session import SessionFactory
43
43
44 from .heartmonitor import HeartMonitor
44 from .heartmonitor import HeartMonitor
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Code
47 # Code
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 def _passer(*args, **kwargs):
50 def _passer(*args, **kwargs):
51 return
51 return
52
52
53 def _printer(*args, **kwargs):
53 def _printer(*args, **kwargs):
54 print (args)
54 print (args)
55 print (kwargs)
55 print (kwargs)
56
56
57 def empty_record():
57 def empty_record():
58 """Return an empty dict with all record keys."""
58 """Return an empty dict with all record keys."""
59 return {
59 return {
60 'msg_id' : None,
60 'msg_id' : None,
61 'header' : None,
61 'header' : None,
62 'metadata' : None,
62 'metadata' : None,
63 'content': None,
63 'content': None,
64 'buffers': None,
64 'buffers': None,
65 'submitted': None,
65 'submitted': None,
66 'client_uuid' : None,
66 'client_uuid' : None,
67 'engine_uuid' : None,
67 'engine_uuid' : None,
68 'started': None,
68 'started': None,
69 'completed': None,
69 'completed': None,
70 'resubmitted': None,
70 'resubmitted': None,
71 'received': None,
71 'received': None,
72 'result_header' : None,
72 'result_header' : None,
73 'result_metadata' : None,
73 'result_metadata' : None,
74 'result_content' : None,
74 'result_content' : None,
75 'result_buffers' : None,
75 'result_buffers' : None,
76 'queue' : None,
76 'queue' : None,
77 'pyin' : None,
77 'pyin' : None,
78 'pyout': None,
78 'pyout': None,
79 'pyerr': None,
79 'pyerr': None,
80 'stdout': '',
80 'stdout': '',
81 'stderr': '',
81 'stderr': '',
82 }
82 }
83
83
84 def init_record(msg):
84 def init_record(msg):
85 """Initialize a TaskRecord based on a request."""
85 """Initialize a TaskRecord based on a request."""
86 header = msg['header']
86 header = msg['header']
87 return {
87 return {
88 'msg_id' : header['msg_id'],
88 'msg_id' : header['msg_id'],
89 'header' : header,
89 'header' : header,
90 'content': msg['content'],
90 'content': msg['content'],
91 'metadata': msg['metadata'],
91 'metadata': msg['metadata'],
92 'buffers': msg['buffers'],
92 'buffers': msg['buffers'],
93 'submitted': header['date'],
93 'submitted': header['date'],
94 'client_uuid' : None,
94 'client_uuid' : None,
95 'engine_uuid' : None,
95 'engine_uuid' : None,
96 'started': None,
96 'started': None,
97 'completed': None,
97 'completed': None,
98 'resubmitted': None,
98 'resubmitted': None,
99 'received': None,
99 'received': None,
100 'result_header' : None,
100 'result_header' : None,
101 'result_metadata': None,
101 'result_metadata': None,
102 'result_content' : None,
102 'result_content' : None,
103 'result_buffers' : None,
103 'result_buffers' : None,
104 'queue' : None,
104 'queue' : None,
105 'pyin' : None,
105 'pyin' : None,
106 'pyout': None,
106 'pyout': None,
107 'pyerr': None,
107 'pyerr': None,
108 'stdout': '',
108 'stdout': '',
109 'stderr': '',
109 'stderr': '',
110 }
110 }
111
111
112
112
113 class EngineConnector(HasTraits):
113 class EngineConnector(HasTraits):
114 """A simple object for accessing the various zmq connections of an object.
114 """A simple object for accessing the various zmq connections of an object.
115 Attributes are:
115 Attributes are:
116 id (int): engine ID
116 id (int): engine ID
117 uuid (unicode): engine UUID
117 uuid (unicode): engine UUID
118 pending: set of msg_ids
118 pending: set of msg_ids
119 stallback: DelayedCallback for stalled registration
119 stallback: DelayedCallback for stalled registration
120 """
120 """
121
121
122 id = Integer(0)
122 id = Integer(0)
123 uuid = Unicode()
123 uuid = Unicode()
124 pending = Set()
124 pending = Set()
125 stallback = Instance(ioloop.DelayedCallback)
125 stallback = Instance(ioloop.DelayedCallback)
126
126
127
127
128 _db_shortcuts = {
128 _db_shortcuts = {
129 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
129 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
130 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
130 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
131 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
131 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
132 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
132 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
133 }
133 }
134
134
135 class HubFactory(RegistrationFactory):
135 class HubFactory(RegistrationFactory):
136 """The Configurable for setting up a Hub."""
136 """The Configurable for setting up a Hub."""
137
137
138 # port-pairs for monitoredqueues:
138 # port-pairs for monitoredqueues:
139 hb = Tuple(Integer,Integer,config=True,
139 hb = Tuple(Integer,Integer,config=True,
140 help="""PUB/ROUTER Port pair for Engine heartbeats""")
140 help="""PUB/ROUTER Port pair for Engine heartbeats""")
141 def _hb_default(self):
141 def _hb_default(self):
142 return tuple(util.select_random_ports(2))
142 return tuple(util.select_random_ports(2))
143
143
144 mux = Tuple(Integer,Integer,config=True,
144 mux = Tuple(Integer,Integer,config=True,
145 help="""Client/Engine Port pair for MUX queue""")
145 help="""Client/Engine Port pair for MUX queue""")
146
146
147 def _mux_default(self):
147 def _mux_default(self):
148 return tuple(util.select_random_ports(2))
148 return tuple(util.select_random_ports(2))
149
149
150 task = Tuple(Integer,Integer,config=True,
150 task = Tuple(Integer,Integer,config=True,
151 help="""Client/Engine Port pair for Task queue""")
151 help="""Client/Engine Port pair for Task queue""")
152 def _task_default(self):
152 def _task_default(self):
153 return tuple(util.select_random_ports(2))
153 return tuple(util.select_random_ports(2))
154
154
155 control = Tuple(Integer,Integer,config=True,
155 control = Tuple(Integer,Integer,config=True,
156 help="""Client/Engine Port pair for Control queue""")
156 help="""Client/Engine Port pair for Control queue""")
157
157
158 def _control_default(self):
158 def _control_default(self):
159 return tuple(util.select_random_ports(2))
159 return tuple(util.select_random_ports(2))
160
160
161 iopub = Tuple(Integer,Integer,config=True,
161 iopub = Tuple(Integer,Integer,config=True,
162 help="""Client/Engine Port pair for IOPub relay""")
162 help="""Client/Engine Port pair for IOPub relay""")
163
163
164 def _iopub_default(self):
164 def _iopub_default(self):
165 return tuple(util.select_random_ports(2))
165 return tuple(util.select_random_ports(2))
166
166
167 # single ports:
167 # single ports:
168 mon_port = Integer(config=True,
168 mon_port = Integer(config=True,
169 help="""Monitor (SUB) port for queue traffic""")
169 help="""Monitor (SUB) port for queue traffic""")
170
170
171 def _mon_port_default(self):
171 def _mon_port_default(self):
172 return util.select_random_ports(1)[0]
172 return util.select_random_ports(1)[0]
173
173
174 notifier_port = Integer(config=True,
174 notifier_port = Integer(config=True,
175 help="""PUB port for sending engine status notifications""")
175 help="""PUB port for sending engine status notifications""")
176
176
177 def _notifier_port_default(self):
177 def _notifier_port_default(self):
178 return util.select_random_ports(1)[0]
178 return util.select_random_ports(1)[0]
179
179
180 engine_ip = Unicode(LOCALHOST, config=True,
180 engine_ip = Unicode(LOCALHOST, config=True,
181 help="IP on which to listen for engine connections. [default: loopback]")
181 help="IP on which to listen for engine connections. [default: loopback]")
182 engine_transport = Unicode('tcp', config=True,
182 engine_transport = Unicode('tcp', config=True,
183 help="0MQ transport for engine connections. [default: tcp]")
183 help="0MQ transport for engine connections. [default: tcp]")
184
184
185 client_ip = Unicode(LOCALHOST, config=True,
185 client_ip = Unicode(LOCALHOST, config=True,
186 help="IP on which to listen for client connections. [default: loopback]")
186 help="IP on which to listen for client connections. [default: loopback]")
187 client_transport = Unicode('tcp', config=True,
187 client_transport = Unicode('tcp', config=True,
188 help="0MQ transport for client connections. [default : tcp]")
188 help="0MQ transport for client connections. [default : tcp]")
189
189
190 monitor_ip = Unicode(LOCALHOST, config=True,
190 monitor_ip = Unicode(LOCALHOST, config=True,
191 help="IP on which to listen for monitor messages. [default: loopback]")
191 help="IP on which to listen for monitor messages. [default: loopback]")
192 monitor_transport = Unicode('tcp', config=True,
192 monitor_transport = Unicode('tcp', config=True,
193 help="0MQ transport for monitor messages. [default : tcp]")
193 help="0MQ transport for monitor messages. [default : tcp]")
194
194
195 monitor_url = Unicode('')
195 monitor_url = Unicode('')
196
196
197 db_class = DottedObjectName('NoDB',
197 db_class = DottedObjectName('NoDB',
198 config=True, help="""The class to use for the DB backend
198 config=True, help="""The class to use for the DB backend
199
199
200 Options include:
200 Options include:
201
201
202 SQLiteDB: SQLite
202 SQLiteDB: SQLite
203 MongoDB : use MongoDB
203 MongoDB : use MongoDB
204 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
204 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
205 NoDB : disable database altogether (default)
205 NoDB : disable database altogether (default)
206
206
207 """)
207 """)
208
208
209 # not configurable
209 # not configurable
210 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
210 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
211 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
211 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
212
212
213 def _ip_changed(self, name, old, new):
213 def _ip_changed(self, name, old, new):
214 self.engine_ip = new
214 self.engine_ip = new
215 self.client_ip = new
215 self.client_ip = new
216 self.monitor_ip = new
216 self.monitor_ip = new
217 self._update_monitor_url()
217 self._update_monitor_url()
218
218
219 def _update_monitor_url(self):
219 def _update_monitor_url(self):
220 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
220 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
221
221
222 def _transport_changed(self, name, old, new):
222 def _transport_changed(self, name, old, new):
223 self.engine_transport = new
223 self.engine_transport = new
224 self.client_transport = new
224 self.client_transport = new
225 self.monitor_transport = new
225 self.monitor_transport = new
226 self._update_monitor_url()
226 self._update_monitor_url()
227
227
228 def __init__(self, **kwargs):
228 def __init__(self, **kwargs):
229 super(HubFactory, self).__init__(**kwargs)
229 super(HubFactory, self).__init__(**kwargs)
230 self._update_monitor_url()
230 self._update_monitor_url()
231
231
232
232
233 def construct(self):
233 def construct(self):
234 self.init_hub()
234 self.init_hub()
235
235
236 def start(self):
236 def start(self):
237 self.heartmonitor.start()
237 self.heartmonitor.start()
238 self.log.info("Heartmonitor started")
238 self.log.info("Heartmonitor started")
239
239
240 def client_url(self, channel):
240 def client_url(self, channel):
241 """return full zmq url for a named client channel"""
241 """return full zmq url for a named client channel"""
242 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
242 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
243
243
244 def engine_url(self, channel):
244 def engine_url(self, channel):
245 """return full zmq url for a named engine channel"""
245 """return full zmq url for a named engine channel"""
246 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
246 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
247
247
248 def init_hub(self):
248 def init_hub(self):
249 """construct Hub object"""
249 """construct Hub object"""
250
250
251 ctx = self.context
251 ctx = self.context
252 loop = self.loop
252 loop = self.loop
253
253
254 try:
254 try:
255 scheme = self.config.TaskScheduler.scheme_name
255 scheme = self.config.TaskScheduler.scheme_name
256 except AttributeError:
256 except AttributeError:
257 from .scheduler import TaskScheduler
257 from .scheduler import TaskScheduler
258 scheme = TaskScheduler.scheme_name.get_default_value()
258 scheme = TaskScheduler.scheme_name.get_default_value()
259
259
260 # build connection dicts
260 # build connection dicts
261 engine = self.engine_info = {
261 engine = self.engine_info = {
262 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
262 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
263 'registration' : self.regport,
263 'registration' : self.regport,
264 'control' : self.control[1],
264 'control' : self.control[1],
265 'mux' : self.mux[1],
265 'mux' : self.mux[1],
266 'hb_ping' : self.hb[0],
266 'hb_ping' : self.hb[0],
267 'hb_pong' : self.hb[1],
267 'hb_pong' : self.hb[1],
268 'task' : self.task[1],
268 'task' : self.task[1],
269 'iopub' : self.iopub[1],
269 'iopub' : self.iopub[1],
270 }
270 }
271
271
272 client = self.client_info = {
272 client = self.client_info = {
273 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
273 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
274 'registration' : self.regport,
274 'registration' : self.regport,
275 'control' : self.control[0],
275 'control' : self.control[0],
276 'mux' : self.mux[0],
276 'mux' : self.mux[0],
277 'task' : self.task[0],
277 'task' : self.task[0],
278 'task_scheme' : scheme,
278 'task_scheme' : scheme,
279 'iopub' : self.iopub[0],
279 'iopub' : self.iopub[0],
280 'notification' : self.notifier_port,
280 'notification' : self.notifier_port,
281 }
281 }
282
282
283 self.log.debug("Hub engine addrs: %s", self.engine_info)
283 self.log.debug("Hub engine addrs: %s", self.engine_info)
284 self.log.debug("Hub client addrs: %s", self.client_info)
284 self.log.debug("Hub client addrs: %s", self.client_info)
285
285
286 # Registrar socket
286 # Registrar socket
287 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
287 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
288 util.set_hwm(q, 0)
288 q.bind(self.client_url('registration'))
289 q.bind(self.client_url('registration'))
289 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
290 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
290 if self.client_ip != self.engine_ip:
291 if self.client_ip != self.engine_ip:
291 q.bind(self.engine_url('registration'))
292 q.bind(self.engine_url('registration'))
292 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
293 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
293
294
294 ### Engine connections ###
295 ### Engine connections ###
295
296
296 # heartbeat
297 # heartbeat
297 hpub = ctx.socket(zmq.PUB)
298 hpub = ctx.socket(zmq.PUB)
298 hpub.bind(self.engine_url('hb_ping'))
299 hpub.bind(self.engine_url('hb_ping'))
299 hrep = ctx.socket(zmq.ROUTER)
300 hrep = ctx.socket(zmq.ROUTER)
301 util.set_hwm(hrep, 0)
300 hrep.bind(self.engine_url('hb_pong'))
302 hrep.bind(self.engine_url('hb_pong'))
301 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
303 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
302 pingstream=ZMQStream(hpub,loop),
304 pingstream=ZMQStream(hpub,loop),
303 pongstream=ZMQStream(hrep,loop)
305 pongstream=ZMQStream(hrep,loop)
304 )
306 )
305
307
306 ### Client connections ###
308 ### Client connections ###
307
309
308 # Notifier socket
310 # Notifier socket
309 n = ZMQStream(ctx.socket(zmq.PUB), loop)
311 n = ZMQStream(ctx.socket(zmq.PUB), loop)
310 n.bind(self.client_url('notification'))
312 n.bind(self.client_url('notification'))
311
313
312 ### build and launch the queues ###
314 ### build and launch the queues ###
313
315
314 # monitor socket
316 # monitor socket
315 sub = ctx.socket(zmq.SUB)
317 sub = ctx.socket(zmq.SUB)
316 sub.setsockopt(zmq.SUBSCRIBE, b"")
318 sub.setsockopt(zmq.SUBSCRIBE, b"")
317 sub.bind(self.monitor_url)
319 sub.bind(self.monitor_url)
318 sub.bind('inproc://monitor')
320 sub.bind('inproc://monitor')
319 sub = ZMQStream(sub, loop)
321 sub = ZMQStream(sub, loop)
320
322
321 # connect the db
323 # connect the db
322 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
324 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
323 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
325 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
324 self.db = import_item(str(db_class))(session=self.session.session,
326 self.db = import_item(str(db_class))(session=self.session.session,
325 config=self.config, log=self.log)
327 config=self.config, log=self.log)
326 time.sleep(.25)
328 time.sleep(.25)
327
329
328 # resubmit stream
330 # resubmit stream
329 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
331 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
330 url = util.disambiguate_url(self.client_url('task'))
332 url = util.disambiguate_url(self.client_url('task'))
331 r.connect(url)
333 r.connect(url)
332
334
333 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
335 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
334 query=q, notifier=n, resubmit=r, db=self.db,
336 query=q, notifier=n, resubmit=r, db=self.db,
335 engine_info=self.engine_info, client_info=self.client_info,
337 engine_info=self.engine_info, client_info=self.client_info,
336 log=self.log)
338 log=self.log)
337
339
338
340
339 class Hub(SessionFactory):
341 class Hub(SessionFactory):
340 """The IPython Controller Hub with 0MQ connections
342 """The IPython Controller Hub with 0MQ connections
341
343
342 Parameters
344 Parameters
343 ==========
345 ==========
344 loop: zmq IOLoop instance
346 loop: zmq IOLoop instance
345 session: Session object
347 session: Session object
346 <removed> context: zmq context for creating new connections (?)
348 <removed> context: zmq context for creating new connections (?)
347 queue: ZMQStream for monitoring the command queue (SUB)
349 queue: ZMQStream for monitoring the command queue (SUB)
348 query: ZMQStream for engine registration and client queries requests (ROUTER)
350 query: ZMQStream for engine registration and client queries requests (ROUTER)
349 heartbeat: HeartMonitor object checking the pulse of the engines
351 heartbeat: HeartMonitor object checking the pulse of the engines
350 notifier: ZMQStream for broadcasting engine registration changes (PUB)
352 notifier: ZMQStream for broadcasting engine registration changes (PUB)
351 db: connection to db for out of memory logging of commands
353 db: connection to db for out of memory logging of commands
352 NotImplemented
354 NotImplemented
353 engine_info: dict of zmq connection information for engines to connect
355 engine_info: dict of zmq connection information for engines to connect
354 to the queues.
356 to the queues.
355 client_info: dict of zmq connection information for engines to connect
357 client_info: dict of zmq connection information for engines to connect
356 to the queues.
358 to the queues.
357 """
359 """
358
360
359 engine_state_file = Unicode()
361 engine_state_file = Unicode()
360
362
361 # internal data structures:
363 # internal data structures:
362 ids=Set() # engine IDs
364 ids=Set() # engine IDs
363 keytable=Dict()
365 keytable=Dict()
364 by_ident=Dict()
366 by_ident=Dict()
365 engines=Dict()
367 engines=Dict()
366 clients=Dict()
368 clients=Dict()
367 hearts=Dict()
369 hearts=Dict()
368 pending=Set()
370 pending=Set()
369 queues=Dict() # pending msg_ids keyed by engine_id
371 queues=Dict() # pending msg_ids keyed by engine_id
370 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
372 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
371 completed=Dict() # completed msg_ids keyed by engine_id
373 completed=Dict() # completed msg_ids keyed by engine_id
372 all_completed=Set() # completed msg_ids keyed by engine_id
374 all_completed=Set() # completed msg_ids keyed by engine_id
373 dead_engines=Set() # completed msg_ids keyed by engine_id
375 dead_engines=Set() # completed msg_ids keyed by engine_id
374 unassigned=Set() # set of task msg_ds not yet assigned a destination
376 unassigned=Set() # set of task msg_ds not yet assigned a destination
375 incoming_registrations=Dict()
377 incoming_registrations=Dict()
376 registration_timeout=Integer()
378 registration_timeout=Integer()
377 _idcounter=Integer(0)
379 _idcounter=Integer(0)
378
380
379 # objects from constructor:
381 # objects from constructor:
380 query=Instance(ZMQStream)
382 query=Instance(ZMQStream)
381 monitor=Instance(ZMQStream)
383 monitor=Instance(ZMQStream)
382 notifier=Instance(ZMQStream)
384 notifier=Instance(ZMQStream)
383 resubmit=Instance(ZMQStream)
385 resubmit=Instance(ZMQStream)
384 heartmonitor=Instance(HeartMonitor)
386 heartmonitor=Instance(HeartMonitor)
385 db=Instance(object)
387 db=Instance(object)
386 client_info=Dict()
388 client_info=Dict()
387 engine_info=Dict()
389 engine_info=Dict()
388
390
389
391
390 def __init__(self, **kwargs):
392 def __init__(self, **kwargs):
391 """
393 """
392 # universal:
394 # universal:
393 loop: IOLoop for creating future connections
395 loop: IOLoop for creating future connections
394 session: streamsession for sending serialized data
396 session: streamsession for sending serialized data
395 # engine:
397 # engine:
396 queue: ZMQStream for monitoring queue messages
398 queue: ZMQStream for monitoring queue messages
397 query: ZMQStream for engine+client registration and client requests
399 query: ZMQStream for engine+client registration and client requests
398 heartbeat: HeartMonitor object for tracking engines
400 heartbeat: HeartMonitor object for tracking engines
399 # extra:
401 # extra:
400 db: ZMQStream for db connection (NotImplemented)
402 db: ZMQStream for db connection (NotImplemented)
401 engine_info: zmq address/protocol dict for engine connections
403 engine_info: zmq address/protocol dict for engine connections
402 client_info: zmq address/protocol dict for client connections
404 client_info: zmq address/protocol dict for client connections
403 """
405 """
404
406
405 super(Hub, self).__init__(**kwargs)
407 super(Hub, self).__init__(**kwargs)
406 self.registration_timeout = max(10000, 5*self.heartmonitor.period)
408 self.registration_timeout = max(10000, 5*self.heartmonitor.period)
407
409
408 # register our callbacks
410 # register our callbacks
409 self.query.on_recv(self.dispatch_query)
411 self.query.on_recv(self.dispatch_query)
410 self.monitor.on_recv(self.dispatch_monitor_traffic)
412 self.monitor.on_recv(self.dispatch_monitor_traffic)
411
413
412 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
414 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
413 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
415 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
414
416
415 self.monitor_handlers = {b'in' : self.save_queue_request,
417 self.monitor_handlers = {b'in' : self.save_queue_request,
416 b'out': self.save_queue_result,
418 b'out': self.save_queue_result,
417 b'intask': self.save_task_request,
419 b'intask': self.save_task_request,
418 b'outtask': self.save_task_result,
420 b'outtask': self.save_task_result,
419 b'tracktask': self.save_task_destination,
421 b'tracktask': self.save_task_destination,
420 b'incontrol': _passer,
422 b'incontrol': _passer,
421 b'outcontrol': _passer,
423 b'outcontrol': _passer,
422 b'iopub': self.save_iopub_message,
424 b'iopub': self.save_iopub_message,
423 }
425 }
424
426
425 self.query_handlers = {'queue_request': self.queue_status,
427 self.query_handlers = {'queue_request': self.queue_status,
426 'result_request': self.get_results,
428 'result_request': self.get_results,
427 'history_request': self.get_history,
429 'history_request': self.get_history,
428 'db_request': self.db_query,
430 'db_request': self.db_query,
429 'purge_request': self.purge_results,
431 'purge_request': self.purge_results,
430 'load_request': self.check_load,
432 'load_request': self.check_load,
431 'resubmit_request': self.resubmit_task,
433 'resubmit_request': self.resubmit_task,
432 'shutdown_request': self.shutdown_request,
434 'shutdown_request': self.shutdown_request,
433 'registration_request' : self.register_engine,
435 'registration_request' : self.register_engine,
434 'unregistration_request' : self.unregister_engine,
436 'unregistration_request' : self.unregister_engine,
435 'connection_request': self.connection_request,
437 'connection_request': self.connection_request,
436 }
438 }
437
439
438 # ignore resubmit replies
440 # ignore resubmit replies
439 self.resubmit.on_recv(lambda msg: None, copy=False)
441 self.resubmit.on_recv(lambda msg: None, copy=False)
440
442
441 self.log.info("hub::created hub")
443 self.log.info("hub::created hub")
442
444
443 @property
445 @property
444 def _next_id(self):
446 def _next_id(self):
445 """gemerate a new ID.
447 """gemerate a new ID.
446
448
447 No longer reuse old ids, just count from 0."""
449 No longer reuse old ids, just count from 0."""
448 newid = self._idcounter
450 newid = self._idcounter
449 self._idcounter += 1
451 self._idcounter += 1
450 return newid
452 return newid
451 # newid = 0
453 # newid = 0
452 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
454 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
453 # # print newid, self.ids, self.incoming_registrations
455 # # print newid, self.ids, self.incoming_registrations
454 # while newid in self.ids or newid in incoming:
456 # while newid in self.ids or newid in incoming:
455 # newid += 1
457 # newid += 1
456 # return newid
458 # return newid
457
459
458 #-----------------------------------------------------------------------------
460 #-----------------------------------------------------------------------------
459 # message validation
461 # message validation
460 #-----------------------------------------------------------------------------
462 #-----------------------------------------------------------------------------
461
463
462 def _validate_targets(self, targets):
464 def _validate_targets(self, targets):
463 """turn any valid targets argument into a list of integer ids"""
465 """turn any valid targets argument into a list of integer ids"""
464 if targets is None:
466 if targets is None:
465 # default to all
467 # default to all
466 return self.ids
468 return self.ids
467
469
468 if isinstance(targets, (int,str,unicode)):
470 if isinstance(targets, (int,str,unicode)):
469 # only one target specified
471 # only one target specified
470 targets = [targets]
472 targets = [targets]
471 _targets = []
473 _targets = []
472 for t in targets:
474 for t in targets:
473 # map raw identities to ids
475 # map raw identities to ids
474 if isinstance(t, (str,unicode)):
476 if isinstance(t, (str,unicode)):
475 t = self.by_ident.get(cast_bytes(t), t)
477 t = self.by_ident.get(cast_bytes(t), t)
476 _targets.append(t)
478 _targets.append(t)
477 targets = _targets
479 targets = _targets
478 bad_targets = [ t for t in targets if t not in self.ids ]
480 bad_targets = [ t for t in targets if t not in self.ids ]
479 if bad_targets:
481 if bad_targets:
480 raise IndexError("No Such Engine: %r" % bad_targets)
482 raise IndexError("No Such Engine: %r" % bad_targets)
481 if not targets:
483 if not targets:
482 raise IndexError("No Engines Registered")
484 raise IndexError("No Engines Registered")
483 return targets
485 return targets
484
486
485 #-----------------------------------------------------------------------------
487 #-----------------------------------------------------------------------------
486 # dispatch methods (1 per stream)
488 # dispatch methods (1 per stream)
487 #-----------------------------------------------------------------------------
489 #-----------------------------------------------------------------------------
488
490
489
491
490 @util.log_errors
492 @util.log_errors
491 def dispatch_monitor_traffic(self, msg):
493 def dispatch_monitor_traffic(self, msg):
492 """all ME and Task queue messages come through here, as well as
494 """all ME and Task queue messages come through here, as well as
493 IOPub traffic."""
495 IOPub traffic."""
494 self.log.debug("monitor traffic: %r", msg[0])
496 self.log.debug("monitor traffic: %r", msg[0])
495 switch = msg[0]
497 switch = msg[0]
496 try:
498 try:
497 idents, msg = self.session.feed_identities(msg[1:])
499 idents, msg = self.session.feed_identities(msg[1:])
498 except ValueError:
500 except ValueError:
499 idents=[]
501 idents=[]
500 if not idents:
502 if not idents:
501 self.log.error("Monitor message without topic: %r", msg)
503 self.log.error("Monitor message without topic: %r", msg)
502 return
504 return
503 handler = self.monitor_handlers.get(switch, None)
505 handler = self.monitor_handlers.get(switch, None)
504 if handler is not None:
506 if handler is not None:
505 handler(idents, msg)
507 handler(idents, msg)
506 else:
508 else:
507 self.log.error("Unrecognized monitor topic: %r", switch)
509 self.log.error("Unrecognized monitor topic: %r", switch)
508
510
509
511
510 @util.log_errors
512 @util.log_errors
511 def dispatch_query(self, msg):
513 def dispatch_query(self, msg):
512 """Route registration requests and queries from clients."""
514 """Route registration requests and queries from clients."""
513 try:
515 try:
514 idents, msg = self.session.feed_identities(msg)
516 idents, msg = self.session.feed_identities(msg)
515 except ValueError:
517 except ValueError:
516 idents = []
518 idents = []
517 if not idents:
519 if not idents:
518 self.log.error("Bad Query Message: %r", msg)
520 self.log.error("Bad Query Message: %r", msg)
519 return
521 return
520 client_id = idents[0]
522 client_id = idents[0]
521 try:
523 try:
522 msg = self.session.unserialize(msg, content=True)
524 msg = self.session.unserialize(msg, content=True)
523 except Exception:
525 except Exception:
524 content = error.wrap_exception()
526 content = error.wrap_exception()
525 self.log.error("Bad Query Message: %r", msg, exc_info=True)
527 self.log.error("Bad Query Message: %r", msg, exc_info=True)
526 self.session.send(self.query, "hub_error", ident=client_id,
528 self.session.send(self.query, "hub_error", ident=client_id,
527 content=content)
529 content=content)
528 return
530 return
529 # print client_id, header, parent, content
531 # print client_id, header, parent, content
530 #switch on message type:
532 #switch on message type:
531 msg_type = msg['header']['msg_type']
533 msg_type = msg['header']['msg_type']
532 self.log.info("client::client %r requested %r", client_id, msg_type)
534 self.log.info("client::client %r requested %r", client_id, msg_type)
533 handler = self.query_handlers.get(msg_type, None)
535 handler = self.query_handlers.get(msg_type, None)
534 try:
536 try:
535 assert handler is not None, "Bad Message Type: %r" % msg_type
537 assert handler is not None, "Bad Message Type: %r" % msg_type
536 except:
538 except:
537 content = error.wrap_exception()
539 content = error.wrap_exception()
538 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
540 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
539 self.session.send(self.query, "hub_error", ident=client_id,
541 self.session.send(self.query, "hub_error", ident=client_id,
540 content=content)
542 content=content)
541 return
543 return
542
544
543 else:
545 else:
544 handler(idents, msg)
546 handler(idents, msg)
545
547
546 def dispatch_db(self, msg):
548 def dispatch_db(self, msg):
547 """"""
549 """"""
548 raise NotImplementedError
550 raise NotImplementedError
549
551
550 #---------------------------------------------------------------------------
552 #---------------------------------------------------------------------------
551 # handler methods (1 per event)
553 # handler methods (1 per event)
552 #---------------------------------------------------------------------------
554 #---------------------------------------------------------------------------
553
555
554 #----------------------- Heartbeat --------------------------------------
556 #----------------------- Heartbeat --------------------------------------
555
557
556 def handle_new_heart(self, heart):
558 def handle_new_heart(self, heart):
557 """handler to attach to heartbeater.
559 """handler to attach to heartbeater.
558 Called when a new heart starts to beat.
560 Called when a new heart starts to beat.
559 Triggers completion of registration."""
561 Triggers completion of registration."""
560 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
562 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
561 if heart not in self.incoming_registrations:
563 if heart not in self.incoming_registrations:
562 self.log.info("heartbeat::ignoring new heart: %r", heart)
564 self.log.info("heartbeat::ignoring new heart: %r", heart)
563 else:
565 else:
564 self.finish_registration(heart)
566 self.finish_registration(heart)
565
567
566
568
567 def handle_heart_failure(self, heart):
569 def handle_heart_failure(self, heart):
568 """handler to attach to heartbeater.
570 """handler to attach to heartbeater.
569 called when a previously registered heart fails to respond to beat request.
571 called when a previously registered heart fails to respond to beat request.
570 triggers unregistration"""
572 triggers unregistration"""
571 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
573 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
572 eid = self.hearts.get(heart, None)
574 eid = self.hearts.get(heart, None)
573 uuid = self.engines[eid].uuid
575 uuid = self.engines[eid].uuid
574 if eid is None or self.keytable[eid] in self.dead_engines:
576 if eid is None or self.keytable[eid] in self.dead_engines:
575 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
577 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
576 else:
578 else:
577 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
579 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
578
580
579 #----------------------- MUX Queue Traffic ------------------------------
581 #----------------------- MUX Queue Traffic ------------------------------
580
582
581 def save_queue_request(self, idents, msg):
583 def save_queue_request(self, idents, msg):
582 if len(idents) < 2:
584 if len(idents) < 2:
583 self.log.error("invalid identity prefix: %r", idents)
585 self.log.error("invalid identity prefix: %r", idents)
584 return
586 return
585 queue_id, client_id = idents[:2]
587 queue_id, client_id = idents[:2]
586 try:
588 try:
587 msg = self.session.unserialize(msg)
589 msg = self.session.unserialize(msg)
588 except Exception:
590 except Exception:
589 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
591 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
590 return
592 return
591
593
592 eid = self.by_ident.get(queue_id, None)
594 eid = self.by_ident.get(queue_id, None)
593 if eid is None:
595 if eid is None:
594 self.log.error("queue::target %r not registered", queue_id)
596 self.log.error("queue::target %r not registered", queue_id)
595 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
597 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
596 return
598 return
597 record = init_record(msg)
599 record = init_record(msg)
598 msg_id = record['msg_id']
600 msg_id = record['msg_id']
599 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
601 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
600 # Unicode in records
602 # Unicode in records
601 record['engine_uuid'] = queue_id.decode('ascii')
603 record['engine_uuid'] = queue_id.decode('ascii')
602 record['client_uuid'] = msg['header']['session']
604 record['client_uuid'] = msg['header']['session']
603 record['queue'] = 'mux'
605 record['queue'] = 'mux'
604
606
605 try:
607 try:
606 # it's posible iopub arrived first:
608 # it's posible iopub arrived first:
607 existing = self.db.get_record(msg_id)
609 existing = self.db.get_record(msg_id)
608 for key,evalue in existing.iteritems():
610 for key,evalue in existing.iteritems():
609 rvalue = record.get(key, None)
611 rvalue = record.get(key, None)
610 if evalue and rvalue and evalue != rvalue:
612 if evalue and rvalue and evalue != rvalue:
611 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
613 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
612 elif evalue and not rvalue:
614 elif evalue and not rvalue:
613 record[key] = evalue
615 record[key] = evalue
614 try:
616 try:
615 self.db.update_record(msg_id, record)
617 self.db.update_record(msg_id, record)
616 except Exception:
618 except Exception:
617 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
619 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
618 except KeyError:
620 except KeyError:
619 try:
621 try:
620 self.db.add_record(msg_id, record)
622 self.db.add_record(msg_id, record)
621 except Exception:
623 except Exception:
622 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
624 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
623
625
624
626
625 self.pending.add(msg_id)
627 self.pending.add(msg_id)
626 self.queues[eid].append(msg_id)
628 self.queues[eid].append(msg_id)
627
629
628 def save_queue_result(self, idents, msg):
630 def save_queue_result(self, idents, msg):
629 if len(idents) < 2:
631 if len(idents) < 2:
630 self.log.error("invalid identity prefix: %r", idents)
632 self.log.error("invalid identity prefix: %r", idents)
631 return
633 return
632
634
633 client_id, queue_id = idents[:2]
635 client_id, queue_id = idents[:2]
634 try:
636 try:
635 msg = self.session.unserialize(msg)
637 msg = self.session.unserialize(msg)
636 except Exception:
638 except Exception:
637 self.log.error("queue::engine %r sent invalid message to %r: %r",
639 self.log.error("queue::engine %r sent invalid message to %r: %r",
638 queue_id, client_id, msg, exc_info=True)
640 queue_id, client_id, msg, exc_info=True)
639 return
641 return
640
642
641 eid = self.by_ident.get(queue_id, None)
643 eid = self.by_ident.get(queue_id, None)
642 if eid is None:
644 if eid is None:
643 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
645 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
644 return
646 return
645
647
646 parent = msg['parent_header']
648 parent = msg['parent_header']
647 if not parent:
649 if not parent:
648 return
650 return
649 msg_id = parent['msg_id']
651 msg_id = parent['msg_id']
650 if msg_id in self.pending:
652 if msg_id in self.pending:
651 self.pending.remove(msg_id)
653 self.pending.remove(msg_id)
652 self.all_completed.add(msg_id)
654 self.all_completed.add(msg_id)
653 self.queues[eid].remove(msg_id)
655 self.queues[eid].remove(msg_id)
654 self.completed[eid].append(msg_id)
656 self.completed[eid].append(msg_id)
655 self.log.info("queue::request %r completed on %s", msg_id, eid)
657 self.log.info("queue::request %r completed on %s", msg_id, eid)
656 elif msg_id not in self.all_completed:
658 elif msg_id not in self.all_completed:
657 # it could be a result from a dead engine that died before delivering the
659 # it could be a result from a dead engine that died before delivering the
658 # result
660 # result
659 self.log.warn("queue:: unknown msg finished %r", msg_id)
661 self.log.warn("queue:: unknown msg finished %r", msg_id)
660 return
662 return
661 # update record anyway, because the unregistration could have been premature
663 # update record anyway, because the unregistration could have been premature
662 rheader = msg['header']
664 rheader = msg['header']
663 md = msg['metadata']
665 md = msg['metadata']
664 completed = rheader['date']
666 completed = rheader['date']
665 started = md.get('started', None)
667 started = md.get('started', None)
666 result = {
668 result = {
667 'result_header' : rheader,
669 'result_header' : rheader,
668 'result_metadata': md,
670 'result_metadata': md,
669 'result_content': msg['content'],
671 'result_content': msg['content'],
670 'received': datetime.now(),
672 'received': datetime.now(),
671 'started' : started,
673 'started' : started,
672 'completed' : completed
674 'completed' : completed
673 }
675 }
674
676
675 result['result_buffers'] = msg['buffers']
677 result['result_buffers'] = msg['buffers']
676 try:
678 try:
677 self.db.update_record(msg_id, result)
679 self.db.update_record(msg_id, result)
678 except Exception:
680 except Exception:
679 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
681 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
680
682
681
683
682 #--------------------- Task Queue Traffic ------------------------------
684 #--------------------- Task Queue Traffic ------------------------------
683
685
684 def save_task_request(self, idents, msg):
686 def save_task_request(self, idents, msg):
685 """Save the submission of a task."""
687 """Save the submission of a task."""
686 client_id = idents[0]
688 client_id = idents[0]
687
689
688 try:
690 try:
689 msg = self.session.unserialize(msg)
691 msg = self.session.unserialize(msg)
690 except Exception:
692 except Exception:
691 self.log.error("task::client %r sent invalid task message: %r",
693 self.log.error("task::client %r sent invalid task message: %r",
692 client_id, msg, exc_info=True)
694 client_id, msg, exc_info=True)
693 return
695 return
694 record = init_record(msg)
696 record = init_record(msg)
695
697
696 record['client_uuid'] = msg['header']['session']
698 record['client_uuid'] = msg['header']['session']
697 record['queue'] = 'task'
699 record['queue'] = 'task'
698 header = msg['header']
700 header = msg['header']
699 msg_id = header['msg_id']
701 msg_id = header['msg_id']
700 self.pending.add(msg_id)
702 self.pending.add(msg_id)
701 self.unassigned.add(msg_id)
703 self.unassigned.add(msg_id)
702 try:
704 try:
703 # it's posible iopub arrived first:
705 # it's posible iopub arrived first:
704 existing = self.db.get_record(msg_id)
706 existing = self.db.get_record(msg_id)
705 if existing['resubmitted']:
707 if existing['resubmitted']:
706 for key in ('submitted', 'client_uuid', 'buffers'):
708 for key in ('submitted', 'client_uuid', 'buffers'):
707 # don't clobber these keys on resubmit
709 # don't clobber these keys on resubmit
708 # submitted and client_uuid should be different
710 # submitted and client_uuid should be different
709 # and buffers might be big, and shouldn't have changed
711 # and buffers might be big, and shouldn't have changed
710 record.pop(key)
712 record.pop(key)
711 # still check content,header which should not change
713 # still check content,header which should not change
712 # but are not expensive to compare as buffers
714 # but are not expensive to compare as buffers
713
715
714 for key,evalue in existing.iteritems():
716 for key,evalue in existing.iteritems():
715 if key.endswith('buffers'):
717 if key.endswith('buffers'):
716 # don't compare buffers
718 # don't compare buffers
717 continue
719 continue
718 rvalue = record.get(key, None)
720 rvalue = record.get(key, None)
719 if evalue and rvalue and evalue != rvalue:
721 if evalue and rvalue and evalue != rvalue:
720 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
722 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
721 elif evalue and not rvalue:
723 elif evalue and not rvalue:
722 record[key] = evalue
724 record[key] = evalue
723 try:
725 try:
724 self.db.update_record(msg_id, record)
726 self.db.update_record(msg_id, record)
725 except Exception:
727 except Exception:
726 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
728 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
727 except KeyError:
729 except KeyError:
728 try:
730 try:
729 self.db.add_record(msg_id, record)
731 self.db.add_record(msg_id, record)
730 except Exception:
732 except Exception:
731 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
733 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
732 except Exception:
734 except Exception:
733 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
735 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
734
736
735 def save_task_result(self, idents, msg):
737 def save_task_result(self, idents, msg):
736 """save the result of a completed task."""
738 """save the result of a completed task."""
737 client_id = idents[0]
739 client_id = idents[0]
738 try:
740 try:
739 msg = self.session.unserialize(msg)
741 msg = self.session.unserialize(msg)
740 except Exception:
742 except Exception:
741 self.log.error("task::invalid task result message send to %r: %r",
743 self.log.error("task::invalid task result message send to %r: %r",
742 client_id, msg, exc_info=True)
744 client_id, msg, exc_info=True)
743 return
745 return
744
746
745 parent = msg['parent_header']
747 parent = msg['parent_header']
746 if not parent:
748 if not parent:
747 # print msg
749 # print msg
748 self.log.warn("Task %r had no parent!", msg)
750 self.log.warn("Task %r had no parent!", msg)
749 return
751 return
750 msg_id = parent['msg_id']
752 msg_id = parent['msg_id']
751 if msg_id in self.unassigned:
753 if msg_id in self.unassigned:
752 self.unassigned.remove(msg_id)
754 self.unassigned.remove(msg_id)
753
755
754 header = msg['header']
756 header = msg['header']
755 md = msg['metadata']
757 md = msg['metadata']
756 engine_uuid = md.get('engine', u'')
758 engine_uuid = md.get('engine', u'')
757 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
759 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
758
760
759 status = md.get('status', None)
761 status = md.get('status', None)
760
762
761 if msg_id in self.pending:
763 if msg_id in self.pending:
762 self.log.info("task::task %r finished on %s", msg_id, eid)
764 self.log.info("task::task %r finished on %s", msg_id, eid)
763 self.pending.remove(msg_id)
765 self.pending.remove(msg_id)
764 self.all_completed.add(msg_id)
766 self.all_completed.add(msg_id)
765 if eid is not None:
767 if eid is not None:
766 if status != 'aborted':
768 if status != 'aborted':
767 self.completed[eid].append(msg_id)
769 self.completed[eid].append(msg_id)
768 if msg_id in self.tasks[eid]:
770 if msg_id in self.tasks[eid]:
769 self.tasks[eid].remove(msg_id)
771 self.tasks[eid].remove(msg_id)
770 completed = header['date']
772 completed = header['date']
771 started = md.get('started', None)
773 started = md.get('started', None)
772 result = {
774 result = {
773 'result_header' : header,
775 'result_header' : header,
774 'result_metadata': msg['metadata'],
776 'result_metadata': msg['metadata'],
775 'result_content': msg['content'],
777 'result_content': msg['content'],
776 'started' : started,
778 'started' : started,
777 'completed' : completed,
779 'completed' : completed,
778 'received' : datetime.now(),
780 'received' : datetime.now(),
779 'engine_uuid': engine_uuid,
781 'engine_uuid': engine_uuid,
780 }
782 }
781
783
782 result['result_buffers'] = msg['buffers']
784 result['result_buffers'] = msg['buffers']
783 try:
785 try:
784 self.db.update_record(msg_id, result)
786 self.db.update_record(msg_id, result)
785 except Exception:
787 except Exception:
786 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
788 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
787
789
788 else:
790 else:
789 self.log.debug("task::unknown task %r finished", msg_id)
791 self.log.debug("task::unknown task %r finished", msg_id)
790
792
791 def save_task_destination(self, idents, msg):
793 def save_task_destination(self, idents, msg):
792 try:
794 try:
793 msg = self.session.unserialize(msg, content=True)
795 msg = self.session.unserialize(msg, content=True)
794 except Exception:
796 except Exception:
795 self.log.error("task::invalid task tracking message", exc_info=True)
797 self.log.error("task::invalid task tracking message", exc_info=True)
796 return
798 return
797 content = msg['content']
799 content = msg['content']
798 # print (content)
800 # print (content)
799 msg_id = content['msg_id']
801 msg_id = content['msg_id']
800 engine_uuid = content['engine_id']
802 engine_uuid = content['engine_id']
801 eid = self.by_ident[cast_bytes(engine_uuid)]
803 eid = self.by_ident[cast_bytes(engine_uuid)]
802
804
803 self.log.info("task::task %r arrived on %r", msg_id, eid)
805 self.log.info("task::task %r arrived on %r", msg_id, eid)
804 if msg_id in self.unassigned:
806 if msg_id in self.unassigned:
805 self.unassigned.remove(msg_id)
807 self.unassigned.remove(msg_id)
806 # else:
808 # else:
807 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
809 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
808
810
809 self.tasks[eid].append(msg_id)
811 self.tasks[eid].append(msg_id)
810 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
812 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
811 try:
813 try:
812 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
814 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
813 except Exception:
815 except Exception:
814 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
816 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
815
817
816
818
817 def mia_task_request(self, idents, msg):
819 def mia_task_request(self, idents, msg):
818 raise NotImplementedError
820 raise NotImplementedError
819 client_id = idents[0]
821 client_id = idents[0]
820 # content = dict(mia=self.mia,status='ok')
822 # content = dict(mia=self.mia,status='ok')
821 # self.session.send('mia_reply', content=content, idents=client_id)
823 # self.session.send('mia_reply', content=content, idents=client_id)
822
824
823
825
824 #--------------------- IOPub Traffic ------------------------------
826 #--------------------- IOPub Traffic ------------------------------
825
827
826 def save_iopub_message(self, topics, msg):
828 def save_iopub_message(self, topics, msg):
827 """save an iopub message into the db"""
829 """save an iopub message into the db"""
828 # print (topics)
830 # print (topics)
829 try:
831 try:
830 msg = self.session.unserialize(msg, content=True)
832 msg = self.session.unserialize(msg, content=True)
831 except Exception:
833 except Exception:
832 self.log.error("iopub::invalid IOPub message", exc_info=True)
834 self.log.error("iopub::invalid IOPub message", exc_info=True)
833 return
835 return
834
836
835 parent = msg['parent_header']
837 parent = msg['parent_header']
836 if not parent:
838 if not parent:
837 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
839 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
838 return
840 return
839 msg_id = parent['msg_id']
841 msg_id = parent['msg_id']
840 msg_type = msg['header']['msg_type']
842 msg_type = msg['header']['msg_type']
841 content = msg['content']
843 content = msg['content']
842
844
843 # ensure msg_id is in db
845 # ensure msg_id is in db
844 try:
846 try:
845 rec = self.db.get_record(msg_id)
847 rec = self.db.get_record(msg_id)
846 except KeyError:
848 except KeyError:
847 rec = empty_record()
849 rec = empty_record()
848 rec['msg_id'] = msg_id
850 rec['msg_id'] = msg_id
849 self.db.add_record(msg_id, rec)
851 self.db.add_record(msg_id, rec)
850 # stream
852 # stream
851 d = {}
853 d = {}
852 if msg_type == 'stream':
854 if msg_type == 'stream':
853 name = content['name']
855 name = content['name']
854 s = rec[name] or ''
856 s = rec[name] or ''
855 d[name] = s + content['data']
857 d[name] = s + content['data']
856
858
857 elif msg_type == 'pyerr':
859 elif msg_type == 'pyerr':
858 d['pyerr'] = content
860 d['pyerr'] = content
859 elif msg_type == 'pyin':
861 elif msg_type == 'pyin':
860 d['pyin'] = content['code']
862 d['pyin'] = content['code']
861 elif msg_type in ('display_data', 'pyout'):
863 elif msg_type in ('display_data', 'pyout'):
862 d[msg_type] = content
864 d[msg_type] = content
863 elif msg_type == 'status':
865 elif msg_type == 'status':
864 pass
866 pass
865 elif msg_type == 'data_pub':
867 elif msg_type == 'data_pub':
866 self.log.info("ignored data_pub message for %s" % msg_id)
868 self.log.info("ignored data_pub message for %s" % msg_id)
867 else:
869 else:
868 self.log.warn("unhandled iopub msg_type: %r", msg_type)
870 self.log.warn("unhandled iopub msg_type: %r", msg_type)
869
871
870 if not d:
872 if not d:
871 return
873 return
872
874
873 try:
875 try:
874 self.db.update_record(msg_id, d)
876 self.db.update_record(msg_id, d)
875 except Exception:
877 except Exception:
876 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
878 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
877
879
878
880
879
881
880 #-------------------------------------------------------------------------
882 #-------------------------------------------------------------------------
881 # Registration requests
883 # Registration requests
882 #-------------------------------------------------------------------------
884 #-------------------------------------------------------------------------
883
885
884 def connection_request(self, client_id, msg):
886 def connection_request(self, client_id, msg):
885 """Reply with connection addresses for clients."""
887 """Reply with connection addresses for clients."""
886 self.log.info("client::client %r connected", client_id)
888 self.log.info("client::client %r connected", client_id)
887 content = dict(status='ok')
889 content = dict(status='ok')
888 jsonable = {}
890 jsonable = {}
889 for k,v in self.keytable.iteritems():
891 for k,v in self.keytable.iteritems():
890 if v not in self.dead_engines:
892 if v not in self.dead_engines:
891 jsonable[str(k)] = v
893 jsonable[str(k)] = v
892 content['engines'] = jsonable
894 content['engines'] = jsonable
893 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
895 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
894
896
895 def register_engine(self, reg, msg):
897 def register_engine(self, reg, msg):
896 """Register a new engine."""
898 """Register a new engine."""
897 content = msg['content']
899 content = msg['content']
898 try:
900 try:
899 uuid = content['uuid']
901 uuid = content['uuid']
900 except KeyError:
902 except KeyError:
901 self.log.error("registration::queue not specified", exc_info=True)
903 self.log.error("registration::queue not specified", exc_info=True)
902 return
904 return
903
905
904 eid = self._next_id
906 eid = self._next_id
905
907
906 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
908 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
907
909
908 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
910 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
909 # check if requesting available IDs:
911 # check if requesting available IDs:
910 if cast_bytes(uuid) in self.by_ident:
912 if cast_bytes(uuid) in self.by_ident:
911 try:
913 try:
912 raise KeyError("uuid %r in use" % uuid)
914 raise KeyError("uuid %r in use" % uuid)
913 except:
915 except:
914 content = error.wrap_exception()
916 content = error.wrap_exception()
915 self.log.error("uuid %r in use", uuid, exc_info=True)
917 self.log.error("uuid %r in use", uuid, exc_info=True)
916 else:
918 else:
917 for h, ec in self.incoming_registrations.iteritems():
919 for h, ec in self.incoming_registrations.iteritems():
918 if uuid == h:
920 if uuid == h:
919 try:
921 try:
920 raise KeyError("heart_id %r in use" % uuid)
922 raise KeyError("heart_id %r in use" % uuid)
921 except:
923 except:
922 self.log.error("heart_id %r in use", uuid, exc_info=True)
924 self.log.error("heart_id %r in use", uuid, exc_info=True)
923 content = error.wrap_exception()
925 content = error.wrap_exception()
924 break
926 break
925 elif uuid == ec.uuid:
927 elif uuid == ec.uuid:
926 try:
928 try:
927 raise KeyError("uuid %r in use" % uuid)
929 raise KeyError("uuid %r in use" % uuid)
928 except:
930 except:
929 self.log.error("uuid %r in use", uuid, exc_info=True)
931 self.log.error("uuid %r in use", uuid, exc_info=True)
930 content = error.wrap_exception()
932 content = error.wrap_exception()
931 break
933 break
932
934
933 msg = self.session.send(self.query, "registration_reply",
935 msg = self.session.send(self.query, "registration_reply",
934 content=content,
936 content=content,
935 ident=reg)
937 ident=reg)
936
938
937 heart = cast_bytes(uuid)
939 heart = cast_bytes(uuid)
938
940
939 if content['status'] == 'ok':
941 if content['status'] == 'ok':
940 if heart in self.heartmonitor.hearts:
942 if heart in self.heartmonitor.hearts:
941 # already beating
943 # already beating
942 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
944 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
943 self.finish_registration(heart)
945 self.finish_registration(heart)
944 else:
946 else:
945 purge = lambda : self._purge_stalled_registration(heart)
947 purge = lambda : self._purge_stalled_registration(heart)
946 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
948 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
947 dc.start()
949 dc.start()
948 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
950 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
949 else:
951 else:
950 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
952 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
951
953
952 return eid
954 return eid
953
955
954 def unregister_engine(self, ident, msg):
956 def unregister_engine(self, ident, msg):
955 """Unregister an engine that explicitly requested to leave."""
957 """Unregister an engine that explicitly requested to leave."""
956 try:
958 try:
957 eid = msg['content']['id']
959 eid = msg['content']['id']
958 except:
960 except:
959 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
961 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
960 return
962 return
961 self.log.info("registration::unregister_engine(%r)", eid)
963 self.log.info("registration::unregister_engine(%r)", eid)
962 # print (eid)
964 # print (eid)
963 uuid = self.keytable[eid]
965 uuid = self.keytable[eid]
964 content=dict(id=eid, uuid=uuid)
966 content=dict(id=eid, uuid=uuid)
965 self.dead_engines.add(uuid)
967 self.dead_engines.add(uuid)
966 # self.ids.remove(eid)
968 # self.ids.remove(eid)
967 # uuid = self.keytable.pop(eid)
969 # uuid = self.keytable.pop(eid)
968 #
970 #
969 # ec = self.engines.pop(eid)
971 # ec = self.engines.pop(eid)
970 # self.hearts.pop(ec.heartbeat)
972 # self.hearts.pop(ec.heartbeat)
971 # self.by_ident.pop(ec.queue)
973 # self.by_ident.pop(ec.queue)
972 # self.completed.pop(eid)
974 # self.completed.pop(eid)
973 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
975 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
974 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
976 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
975 dc.start()
977 dc.start()
976 ############## TODO: HANDLE IT ################
978 ############## TODO: HANDLE IT ################
977
979
978 self._save_engine_state()
980 self._save_engine_state()
979
981
980 if self.notifier:
982 if self.notifier:
981 self.session.send(self.notifier, "unregistration_notification", content=content)
983 self.session.send(self.notifier, "unregistration_notification", content=content)
982
984
983 def _handle_stranded_msgs(self, eid, uuid):
985 def _handle_stranded_msgs(self, eid, uuid):
984 """Handle messages known to be on an engine when the engine unregisters.
986 """Handle messages known to be on an engine when the engine unregisters.
985
987
986 It is possible that this will fire prematurely - that is, an engine will
988 It is possible that this will fire prematurely - that is, an engine will
987 go down after completing a result, and the client will be notified
989 go down after completing a result, and the client will be notified
988 that the result failed and later receive the actual result.
990 that the result failed and later receive the actual result.
989 """
991 """
990
992
991 outstanding = self.queues[eid]
993 outstanding = self.queues[eid]
992
994
993 for msg_id in outstanding:
995 for msg_id in outstanding:
994 self.pending.remove(msg_id)
996 self.pending.remove(msg_id)
995 self.all_completed.add(msg_id)
997 self.all_completed.add(msg_id)
996 try:
998 try:
997 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
999 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
998 except:
1000 except:
999 content = error.wrap_exception()
1001 content = error.wrap_exception()
1000 # build a fake header:
1002 # build a fake header:
1001 header = {}
1003 header = {}
1002 header['engine'] = uuid
1004 header['engine'] = uuid
1003 header['date'] = datetime.now()
1005 header['date'] = datetime.now()
1004 rec = dict(result_content=content, result_header=header, result_buffers=[])
1006 rec = dict(result_content=content, result_header=header, result_buffers=[])
1005 rec['completed'] = header['date']
1007 rec['completed'] = header['date']
1006 rec['engine_uuid'] = uuid
1008 rec['engine_uuid'] = uuid
1007 try:
1009 try:
1008 self.db.update_record(msg_id, rec)
1010 self.db.update_record(msg_id, rec)
1009 except Exception:
1011 except Exception:
1010 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1012 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1011
1013
1012
1014
1013 def finish_registration(self, heart):
1015 def finish_registration(self, heart):
1014 """Second half of engine registration, called after our HeartMonitor
1016 """Second half of engine registration, called after our HeartMonitor
1015 has received a beat from the Engine's Heart."""
1017 has received a beat from the Engine's Heart."""
1016 try:
1018 try:
1017 ec = self.incoming_registrations.pop(heart)
1019 ec = self.incoming_registrations.pop(heart)
1018 except KeyError:
1020 except KeyError:
1019 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1021 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1020 return
1022 return
1021 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1023 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1022 if ec.stallback is not None:
1024 if ec.stallback is not None:
1023 ec.stallback.stop()
1025 ec.stallback.stop()
1024 eid = ec.id
1026 eid = ec.id
1025 self.ids.add(eid)
1027 self.ids.add(eid)
1026 self.keytable[eid] = ec.uuid
1028 self.keytable[eid] = ec.uuid
1027 self.engines[eid] = ec
1029 self.engines[eid] = ec
1028 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1030 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1029 self.queues[eid] = list()
1031 self.queues[eid] = list()
1030 self.tasks[eid] = list()
1032 self.tasks[eid] = list()
1031 self.completed[eid] = list()
1033 self.completed[eid] = list()
1032 self.hearts[heart] = eid
1034 self.hearts[heart] = eid
1033 content = dict(id=eid, uuid=self.engines[eid].uuid)
1035 content = dict(id=eid, uuid=self.engines[eid].uuid)
1034 if self.notifier:
1036 if self.notifier:
1035 self.session.send(self.notifier, "registration_notification", content=content)
1037 self.session.send(self.notifier, "registration_notification", content=content)
1036 self.log.info("engine::Engine Connected: %i", eid)
1038 self.log.info("engine::Engine Connected: %i", eid)
1037
1039
1038 self._save_engine_state()
1040 self._save_engine_state()
1039
1041
1040 def _purge_stalled_registration(self, heart):
1042 def _purge_stalled_registration(self, heart):
1041 if heart in self.incoming_registrations:
1043 if heart in self.incoming_registrations:
1042 ec = self.incoming_registrations.pop(heart)
1044 ec = self.incoming_registrations.pop(heart)
1043 self.log.info("registration::purging stalled registration: %i", ec.id)
1045 self.log.info("registration::purging stalled registration: %i", ec.id)
1044 else:
1046 else:
1045 pass
1047 pass
1046
1048
1047 #-------------------------------------------------------------------------
1049 #-------------------------------------------------------------------------
1048 # Engine State
1050 # Engine State
1049 #-------------------------------------------------------------------------
1051 #-------------------------------------------------------------------------
1050
1052
1051
1053
1052 def _cleanup_engine_state_file(self):
1054 def _cleanup_engine_state_file(self):
1053 """cleanup engine state mapping"""
1055 """cleanup engine state mapping"""
1054
1056
1055 if os.path.exists(self.engine_state_file):
1057 if os.path.exists(self.engine_state_file):
1056 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1058 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1057 try:
1059 try:
1058 os.remove(self.engine_state_file)
1060 os.remove(self.engine_state_file)
1059 except IOError:
1061 except IOError:
1060 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1062 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1061
1063
1062
1064
1063 def _save_engine_state(self):
1065 def _save_engine_state(self):
1064 """save engine mapping to JSON file"""
1066 """save engine mapping to JSON file"""
1065 if not self.engine_state_file:
1067 if not self.engine_state_file:
1066 return
1068 return
1067 self.log.debug("save engine state to %s" % self.engine_state_file)
1069 self.log.debug("save engine state to %s" % self.engine_state_file)
1068 state = {}
1070 state = {}
1069 engines = {}
1071 engines = {}
1070 for eid, ec in self.engines.iteritems():
1072 for eid, ec in self.engines.iteritems():
1071 if ec.uuid not in self.dead_engines:
1073 if ec.uuid not in self.dead_engines:
1072 engines[eid] = ec.uuid
1074 engines[eid] = ec.uuid
1073
1075
1074 state['engines'] = engines
1076 state['engines'] = engines
1075
1077
1076 state['next_id'] = self._idcounter
1078 state['next_id'] = self._idcounter
1077
1079
1078 with open(self.engine_state_file, 'w') as f:
1080 with open(self.engine_state_file, 'w') as f:
1079 json.dump(state, f)
1081 json.dump(state, f)
1080
1082
1081
1083
1082 def _load_engine_state(self):
1084 def _load_engine_state(self):
1083 """load engine mapping from JSON file"""
1085 """load engine mapping from JSON file"""
1084 if not os.path.exists(self.engine_state_file):
1086 if not os.path.exists(self.engine_state_file):
1085 return
1087 return
1086
1088
1087 self.log.info("loading engine state from %s" % self.engine_state_file)
1089 self.log.info("loading engine state from %s" % self.engine_state_file)
1088
1090
1089 with open(self.engine_state_file) as f:
1091 with open(self.engine_state_file) as f:
1090 state = json.load(f)
1092 state = json.load(f)
1091
1093
1092 save_notifier = self.notifier
1094 save_notifier = self.notifier
1093 self.notifier = None
1095 self.notifier = None
1094 for eid, uuid in state['engines'].iteritems():
1096 for eid, uuid in state['engines'].iteritems():
1095 heart = uuid.encode('ascii')
1097 heart = uuid.encode('ascii')
1096 # start with this heart as current and beating:
1098 # start with this heart as current and beating:
1097 self.heartmonitor.responses.add(heart)
1099 self.heartmonitor.responses.add(heart)
1098 self.heartmonitor.hearts.add(heart)
1100 self.heartmonitor.hearts.add(heart)
1099
1101
1100 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1102 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1101 self.finish_registration(heart)
1103 self.finish_registration(heart)
1102
1104
1103 self.notifier = save_notifier
1105 self.notifier = save_notifier
1104
1106
1105 self._idcounter = state['next_id']
1107 self._idcounter = state['next_id']
1106
1108
1107 #-------------------------------------------------------------------------
1109 #-------------------------------------------------------------------------
1108 # Client Requests
1110 # Client Requests
1109 #-------------------------------------------------------------------------
1111 #-------------------------------------------------------------------------
1110
1112
1111 def shutdown_request(self, client_id, msg):
1113 def shutdown_request(self, client_id, msg):
1112 """handle shutdown request."""
1114 """handle shutdown request."""
1113 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1115 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1114 # also notify other clients of shutdown
1116 # also notify other clients of shutdown
1115 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1117 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1116 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1118 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1117 dc.start()
1119 dc.start()
1118
1120
1119 def _shutdown(self):
1121 def _shutdown(self):
1120 self.log.info("hub::hub shutting down.")
1122 self.log.info("hub::hub shutting down.")
1121 time.sleep(0.1)
1123 time.sleep(0.1)
1122 sys.exit(0)
1124 sys.exit(0)
1123
1125
1124
1126
1125 def check_load(self, client_id, msg):
1127 def check_load(self, client_id, msg):
1126 content = msg['content']
1128 content = msg['content']
1127 try:
1129 try:
1128 targets = content['targets']
1130 targets = content['targets']
1129 targets = self._validate_targets(targets)
1131 targets = self._validate_targets(targets)
1130 except:
1132 except:
1131 content = error.wrap_exception()
1133 content = error.wrap_exception()
1132 self.session.send(self.query, "hub_error",
1134 self.session.send(self.query, "hub_error",
1133 content=content, ident=client_id)
1135 content=content, ident=client_id)
1134 return
1136 return
1135
1137
1136 content = dict(status='ok')
1138 content = dict(status='ok')
1137 # loads = {}
1139 # loads = {}
1138 for t in targets:
1140 for t in targets:
1139 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1141 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1140 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1142 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1141
1143
1142
1144
1143 def queue_status(self, client_id, msg):
1145 def queue_status(self, client_id, msg):
1144 """Return the Queue status of one or more targets.
1146 """Return the Queue status of one or more targets.
1145 if verbose: return the msg_ids
1147 if verbose: return the msg_ids
1146 else: return len of each type.
1148 else: return len of each type.
1147 keys: queue (pending MUX jobs)
1149 keys: queue (pending MUX jobs)
1148 tasks (pending Task jobs)
1150 tasks (pending Task jobs)
1149 completed (finished jobs from both queues)"""
1151 completed (finished jobs from both queues)"""
1150 content = msg['content']
1152 content = msg['content']
1151 targets = content['targets']
1153 targets = content['targets']
1152 try:
1154 try:
1153 targets = self._validate_targets(targets)
1155 targets = self._validate_targets(targets)
1154 except:
1156 except:
1155 content = error.wrap_exception()
1157 content = error.wrap_exception()
1156 self.session.send(self.query, "hub_error",
1158 self.session.send(self.query, "hub_error",
1157 content=content, ident=client_id)
1159 content=content, ident=client_id)
1158 return
1160 return
1159 verbose = content.get('verbose', False)
1161 verbose = content.get('verbose', False)
1160 content = dict(status='ok')
1162 content = dict(status='ok')
1161 for t in targets:
1163 for t in targets:
1162 queue = self.queues[t]
1164 queue = self.queues[t]
1163 completed = self.completed[t]
1165 completed = self.completed[t]
1164 tasks = self.tasks[t]
1166 tasks = self.tasks[t]
1165 if not verbose:
1167 if not verbose:
1166 queue = len(queue)
1168 queue = len(queue)
1167 completed = len(completed)
1169 completed = len(completed)
1168 tasks = len(tasks)
1170 tasks = len(tasks)
1169 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1171 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1170 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1172 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1171 # print (content)
1173 # print (content)
1172 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1174 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1173
1175
1174 def purge_results(self, client_id, msg):
1176 def purge_results(self, client_id, msg):
1175 """Purge results from memory. This method is more valuable before we move
1177 """Purge results from memory. This method is more valuable before we move
1176 to a DB based message storage mechanism."""
1178 to a DB based message storage mechanism."""
1177 content = msg['content']
1179 content = msg['content']
1178 self.log.info("Dropping records with %s", content)
1180 self.log.info("Dropping records with %s", content)
1179 msg_ids = content.get('msg_ids', [])
1181 msg_ids = content.get('msg_ids', [])
1180 reply = dict(status='ok')
1182 reply = dict(status='ok')
1181 if msg_ids == 'all':
1183 if msg_ids == 'all':
1182 try:
1184 try:
1183 self.db.drop_matching_records(dict(completed={'$ne':None}))
1185 self.db.drop_matching_records(dict(completed={'$ne':None}))
1184 except Exception:
1186 except Exception:
1185 reply = error.wrap_exception()
1187 reply = error.wrap_exception()
1186 else:
1188 else:
1187 pending = filter(lambda m: m in self.pending, msg_ids)
1189 pending = filter(lambda m: m in self.pending, msg_ids)
1188 if pending:
1190 if pending:
1189 try:
1191 try:
1190 raise IndexError("msg pending: %r" % pending[0])
1192 raise IndexError("msg pending: %r" % pending[0])
1191 except:
1193 except:
1192 reply = error.wrap_exception()
1194 reply = error.wrap_exception()
1193 else:
1195 else:
1194 try:
1196 try:
1195 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1197 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1196 except Exception:
1198 except Exception:
1197 reply = error.wrap_exception()
1199 reply = error.wrap_exception()
1198
1200
1199 if reply['status'] == 'ok':
1201 if reply['status'] == 'ok':
1200 eids = content.get('engine_ids', [])
1202 eids = content.get('engine_ids', [])
1201 for eid in eids:
1203 for eid in eids:
1202 if eid not in self.engines:
1204 if eid not in self.engines:
1203 try:
1205 try:
1204 raise IndexError("No such engine: %i" % eid)
1206 raise IndexError("No such engine: %i" % eid)
1205 except:
1207 except:
1206 reply = error.wrap_exception()
1208 reply = error.wrap_exception()
1207 break
1209 break
1208 uid = self.engines[eid].uuid
1210 uid = self.engines[eid].uuid
1209 try:
1211 try:
1210 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1212 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1211 except Exception:
1213 except Exception:
1212 reply = error.wrap_exception()
1214 reply = error.wrap_exception()
1213 break
1215 break
1214
1216
1215 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1217 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1216
1218
1217 def resubmit_task(self, client_id, msg):
1219 def resubmit_task(self, client_id, msg):
1218 """Resubmit one or more tasks."""
1220 """Resubmit one or more tasks."""
1219 def finish(reply):
1221 def finish(reply):
1220 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1222 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1221
1223
1222 content = msg['content']
1224 content = msg['content']
1223 msg_ids = content['msg_ids']
1225 msg_ids = content['msg_ids']
1224 reply = dict(status='ok')
1226 reply = dict(status='ok')
1225 try:
1227 try:
1226 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1228 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1227 'header', 'content', 'buffers'])
1229 'header', 'content', 'buffers'])
1228 except Exception:
1230 except Exception:
1229 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1231 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1230 return finish(error.wrap_exception())
1232 return finish(error.wrap_exception())
1231
1233
1232 # validate msg_ids
1234 # validate msg_ids
1233 found_ids = [ rec['msg_id'] for rec in records ]
1235 found_ids = [ rec['msg_id'] for rec in records ]
1234 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1236 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1235 if len(records) > len(msg_ids):
1237 if len(records) > len(msg_ids):
1236 try:
1238 try:
1237 raise RuntimeError("DB appears to be in an inconsistent state."
1239 raise RuntimeError("DB appears to be in an inconsistent state."
1238 "More matching records were found than should exist")
1240 "More matching records were found than should exist")
1239 except Exception:
1241 except Exception:
1240 return finish(error.wrap_exception())
1242 return finish(error.wrap_exception())
1241 elif len(records) < len(msg_ids):
1243 elif len(records) < len(msg_ids):
1242 missing = [ m for m in msg_ids if m not in found_ids ]
1244 missing = [ m for m in msg_ids if m not in found_ids ]
1243 try:
1245 try:
1244 raise KeyError("No such msg(s): %r" % missing)
1246 raise KeyError("No such msg(s): %r" % missing)
1245 except KeyError:
1247 except KeyError:
1246 return finish(error.wrap_exception())
1248 return finish(error.wrap_exception())
1247 elif pending_ids:
1249 elif pending_ids:
1248 pass
1250 pass
1249 # no need to raise on resubmit of pending task, now that we
1251 # no need to raise on resubmit of pending task, now that we
1250 # resubmit under new ID, but do we want to raise anyway?
1252 # resubmit under new ID, but do we want to raise anyway?
1251 # msg_id = invalid_ids[0]
1253 # msg_id = invalid_ids[0]
1252 # try:
1254 # try:
1253 # raise ValueError("Task(s) %r appears to be inflight" % )
1255 # raise ValueError("Task(s) %r appears to be inflight" % )
1254 # except Exception:
1256 # except Exception:
1255 # return finish(error.wrap_exception())
1257 # return finish(error.wrap_exception())
1256
1258
1257 # mapping of original IDs to resubmitted IDs
1259 # mapping of original IDs to resubmitted IDs
1258 resubmitted = {}
1260 resubmitted = {}
1259
1261
1260 # send the messages
1262 # send the messages
1261 for rec in records:
1263 for rec in records:
1262 header = rec['header']
1264 header = rec['header']
1263 msg = self.session.msg(header['msg_type'], parent=header)
1265 msg = self.session.msg(header['msg_type'], parent=header)
1264 msg_id = msg['msg_id']
1266 msg_id = msg['msg_id']
1265 msg['content'] = rec['content']
1267 msg['content'] = rec['content']
1266
1268
1267 # use the old header, but update msg_id and timestamp
1269 # use the old header, but update msg_id and timestamp
1268 fresh = msg['header']
1270 fresh = msg['header']
1269 header['msg_id'] = fresh['msg_id']
1271 header['msg_id'] = fresh['msg_id']
1270 header['date'] = fresh['date']
1272 header['date'] = fresh['date']
1271 msg['header'] = header
1273 msg['header'] = header
1272
1274
1273 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1275 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1274
1276
1275 resubmitted[rec['msg_id']] = msg_id
1277 resubmitted[rec['msg_id']] = msg_id
1276 self.pending.add(msg_id)
1278 self.pending.add(msg_id)
1277 msg['buffers'] = rec['buffers']
1279 msg['buffers'] = rec['buffers']
1278 try:
1280 try:
1279 self.db.add_record(msg_id, init_record(msg))
1281 self.db.add_record(msg_id, init_record(msg))
1280 except Exception:
1282 except Exception:
1281 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1283 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1282 return finish(error.wrap_exception())
1284 return finish(error.wrap_exception())
1283
1285
1284 finish(dict(status='ok', resubmitted=resubmitted))
1286 finish(dict(status='ok', resubmitted=resubmitted))
1285
1287
1286 # store the new IDs in the Task DB
1288 # store the new IDs in the Task DB
1287 for msg_id, resubmit_id in resubmitted.iteritems():
1289 for msg_id, resubmit_id in resubmitted.iteritems():
1288 try:
1290 try:
1289 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1291 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1290 except Exception:
1292 except Exception:
1291 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1293 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1292
1294
1293
1295
1294 def _extract_record(self, rec):
1296 def _extract_record(self, rec):
1295 """decompose a TaskRecord dict into subsection of reply for get_result"""
1297 """decompose a TaskRecord dict into subsection of reply for get_result"""
1296 io_dict = {}
1298 io_dict = {}
1297 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1299 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1298 io_dict[key] = rec[key]
1300 io_dict[key] = rec[key]
1299 content = {
1301 content = {
1300 'header': rec['header'],
1302 'header': rec['header'],
1301 'metadata': rec['metadata'],
1303 'metadata': rec['metadata'],
1302 'result_metadata': rec['result_metadata'],
1304 'result_metadata': rec['result_metadata'],
1303 'result_header' : rec['result_header'],
1305 'result_header' : rec['result_header'],
1304 'result_content': rec['result_content'],
1306 'result_content': rec['result_content'],
1305 'received' : rec['received'],
1307 'received' : rec['received'],
1306 'io' : io_dict,
1308 'io' : io_dict,
1307 }
1309 }
1308 if rec['result_buffers']:
1310 if rec['result_buffers']:
1309 buffers = map(bytes, rec['result_buffers'])
1311 buffers = map(bytes, rec['result_buffers'])
1310 else:
1312 else:
1311 buffers = []
1313 buffers = []
1312
1314
1313 return content, buffers
1315 return content, buffers
1314
1316
1315 def get_results(self, client_id, msg):
1317 def get_results(self, client_id, msg):
1316 """Get the result of 1 or more messages."""
1318 """Get the result of 1 or more messages."""
1317 content = msg['content']
1319 content = msg['content']
1318 msg_ids = sorted(set(content['msg_ids']))
1320 msg_ids = sorted(set(content['msg_ids']))
1319 statusonly = content.get('status_only', False)
1321 statusonly = content.get('status_only', False)
1320 pending = []
1322 pending = []
1321 completed = []
1323 completed = []
1322 content = dict(status='ok')
1324 content = dict(status='ok')
1323 content['pending'] = pending
1325 content['pending'] = pending
1324 content['completed'] = completed
1326 content['completed'] = completed
1325 buffers = []
1327 buffers = []
1326 if not statusonly:
1328 if not statusonly:
1327 try:
1329 try:
1328 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1330 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1329 # turn match list into dict, for faster lookup
1331 # turn match list into dict, for faster lookup
1330 records = {}
1332 records = {}
1331 for rec in matches:
1333 for rec in matches:
1332 records[rec['msg_id']] = rec
1334 records[rec['msg_id']] = rec
1333 except Exception:
1335 except Exception:
1334 content = error.wrap_exception()
1336 content = error.wrap_exception()
1335 self.session.send(self.query, "result_reply", content=content,
1337 self.session.send(self.query, "result_reply", content=content,
1336 parent=msg, ident=client_id)
1338 parent=msg, ident=client_id)
1337 return
1339 return
1338 else:
1340 else:
1339 records = {}
1341 records = {}
1340 for msg_id in msg_ids:
1342 for msg_id in msg_ids:
1341 if msg_id in self.pending:
1343 if msg_id in self.pending:
1342 pending.append(msg_id)
1344 pending.append(msg_id)
1343 elif msg_id in self.all_completed:
1345 elif msg_id in self.all_completed:
1344 completed.append(msg_id)
1346 completed.append(msg_id)
1345 if not statusonly:
1347 if not statusonly:
1346 c,bufs = self._extract_record(records[msg_id])
1348 c,bufs = self._extract_record(records[msg_id])
1347 content[msg_id] = c
1349 content[msg_id] = c
1348 buffers.extend(bufs)
1350 buffers.extend(bufs)
1349 elif msg_id in records:
1351 elif msg_id in records:
1350 if rec['completed']:
1352 if rec['completed']:
1351 completed.append(msg_id)
1353 completed.append(msg_id)
1352 c,bufs = self._extract_record(records[msg_id])
1354 c,bufs = self._extract_record(records[msg_id])
1353 content[msg_id] = c
1355 content[msg_id] = c
1354 buffers.extend(bufs)
1356 buffers.extend(bufs)
1355 else:
1357 else:
1356 pending.append(msg_id)
1358 pending.append(msg_id)
1357 else:
1359 else:
1358 try:
1360 try:
1359 raise KeyError('No such message: '+msg_id)
1361 raise KeyError('No such message: '+msg_id)
1360 except:
1362 except:
1361 content = error.wrap_exception()
1363 content = error.wrap_exception()
1362 break
1364 break
1363 self.session.send(self.query, "result_reply", content=content,
1365 self.session.send(self.query, "result_reply", content=content,
1364 parent=msg, ident=client_id,
1366 parent=msg, ident=client_id,
1365 buffers=buffers)
1367 buffers=buffers)
1366
1368
1367 def get_history(self, client_id, msg):
1369 def get_history(self, client_id, msg):
1368 """Get a list of all msg_ids in our DB records"""
1370 """Get a list of all msg_ids in our DB records"""
1369 try:
1371 try:
1370 msg_ids = self.db.get_history()
1372 msg_ids = self.db.get_history()
1371 except Exception as e:
1373 except Exception as e:
1372 content = error.wrap_exception()
1374 content = error.wrap_exception()
1373 else:
1375 else:
1374 content = dict(status='ok', history=msg_ids)
1376 content = dict(status='ok', history=msg_ids)
1375
1377
1376 self.session.send(self.query, "history_reply", content=content,
1378 self.session.send(self.query, "history_reply", content=content,
1377 parent=msg, ident=client_id)
1379 parent=msg, ident=client_id)
1378
1380
1379 def db_query(self, client_id, msg):
1381 def db_query(self, client_id, msg):
1380 """Perform a raw query on the task record database."""
1382 """Perform a raw query on the task record database."""
1381 content = msg['content']
1383 content = msg['content']
1382 query = content.get('query', {})
1384 query = content.get('query', {})
1383 keys = content.get('keys', None)
1385 keys = content.get('keys', None)
1384 buffers = []
1386 buffers = []
1385 empty = list()
1387 empty = list()
1386 try:
1388 try:
1387 records = self.db.find_records(query, keys)
1389 records = self.db.find_records(query, keys)
1388 except Exception as e:
1390 except Exception as e:
1389 content = error.wrap_exception()
1391 content = error.wrap_exception()
1390 else:
1392 else:
1391 # extract buffers from reply content:
1393 # extract buffers from reply content:
1392 if keys is not None:
1394 if keys is not None:
1393 buffer_lens = [] if 'buffers' in keys else None
1395 buffer_lens = [] if 'buffers' in keys else None
1394 result_buffer_lens = [] if 'result_buffers' in keys else None
1396 result_buffer_lens = [] if 'result_buffers' in keys else None
1395 else:
1397 else:
1396 buffer_lens = None
1398 buffer_lens = None
1397 result_buffer_lens = None
1399 result_buffer_lens = None
1398
1400
1399 for rec in records:
1401 for rec in records:
1400 # buffers may be None, so double check
1402 # buffers may be None, so double check
1401 b = rec.pop('buffers', empty) or empty
1403 b = rec.pop('buffers', empty) or empty
1402 if buffer_lens is not None:
1404 if buffer_lens is not None:
1403 buffer_lens.append(len(b))
1405 buffer_lens.append(len(b))
1404 buffers.extend(b)
1406 buffers.extend(b)
1405 rb = rec.pop('result_buffers', empty) or empty
1407 rb = rec.pop('result_buffers', empty) or empty
1406 if result_buffer_lens is not None:
1408 if result_buffer_lens is not None:
1407 result_buffer_lens.append(len(rb))
1409 result_buffer_lens.append(len(rb))
1408 buffers.extend(rb)
1410 buffers.extend(rb)
1409 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1411 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1410 result_buffer_lens=result_buffer_lens)
1412 result_buffer_lens=result_buffer_lens)
1411 # self.log.debug (content)
1413 # self.log.debug (content)
1412 self.session.send(self.query, "db_reply", content=content,
1414 self.session.send(self.query, "db_reply", content=content,
1413 parent=msg, ident=client_id,
1415 parent=msg, ident=client_id,
1414 buffers=buffers)
1416 buffers=buffers)
1415
1417
@@ -1,849 +1,852 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 import logging
22 import logging
23 import sys
23 import sys
24 import time
24 import time
25
25
26 from collections import deque
26 from collections import deque
27 from datetime import datetime
27 from datetime import datetime
28 from random import randint, random
28 from random import randint, random
29 from types import FunctionType
29 from types import FunctionType
30
30
31 try:
31 try:
32 import numpy
32 import numpy
33 except ImportError:
33 except ImportError:
34 numpy = None
34 numpy = None
35
35
36 import zmq
36 import zmq
37 from zmq.eventloop import ioloop, zmqstream
37 from zmq.eventloop import ioloop, zmqstream
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.application import Application
42 from IPython.config.loader import Config
42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 from IPython.utils.py3compat import cast_bytes
44 from IPython.utils.py3compat import cast_bytes
45
45
46 from IPython.parallel import error, util
46 from IPython.parallel import error, util
47 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.util import connect_logger, local_logger
48 from IPython.parallel.util import connect_logger, local_logger
49
49
50 from .dependency import Dependency
50 from .dependency import Dependency
51
51
52 @decorator
52 @decorator
53 def logged(f,self,*args,**kwargs):
53 def logged(f,self,*args,**kwargs):
54 # print ("#--------------------")
54 # print ("#--------------------")
55 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
56 # print ("#--")
56 # print ("#--")
57 return f(self,*args, **kwargs)
57 return f(self,*args, **kwargs)
58
58
59 #----------------------------------------------------------------------
59 #----------------------------------------------------------------------
60 # Chooser functions
60 # Chooser functions
61 #----------------------------------------------------------------------
61 #----------------------------------------------------------------------
62
62
63 def plainrandom(loads):
63 def plainrandom(loads):
64 """Plain random pick."""
64 """Plain random pick."""
65 n = len(loads)
65 n = len(loads)
66 return randint(0,n-1)
66 return randint(0,n-1)
67
67
68 def lru(loads):
68 def lru(loads):
69 """Always pick the front of the line.
69 """Always pick the front of the line.
70
70
71 The content of `loads` is ignored.
71 The content of `loads` is ignored.
72
72
73 Assumes LRU ordering of loads, with oldest first.
73 Assumes LRU ordering of loads, with oldest first.
74 """
74 """
75 return 0
75 return 0
76
76
77 def twobin(loads):
77 def twobin(loads):
78 """Pick two at random, use the LRU of the two.
78 """Pick two at random, use the LRU of the two.
79
79
80 The content of loads is ignored.
80 The content of loads is ignored.
81
81
82 Assumes LRU ordering of loads, with oldest first.
82 Assumes LRU ordering of loads, with oldest first.
83 """
83 """
84 n = len(loads)
84 n = len(loads)
85 a = randint(0,n-1)
85 a = randint(0,n-1)
86 b = randint(0,n-1)
86 b = randint(0,n-1)
87 return min(a,b)
87 return min(a,b)
88
88
89 def weighted(loads):
89 def weighted(loads):
90 """Pick two at random using inverse load as weight.
90 """Pick two at random using inverse load as weight.
91
91
92 Return the less loaded of the two.
92 Return the less loaded of the two.
93 """
93 """
94 # weight 0 a million times more than 1:
94 # weight 0 a million times more than 1:
95 weights = 1./(1e-6+numpy.array(loads))
95 weights = 1./(1e-6+numpy.array(loads))
96 sums = weights.cumsum()
96 sums = weights.cumsum()
97 t = sums[-1]
97 t = sums[-1]
98 x = random()*t
98 x = random()*t
99 y = random()*t
99 y = random()*t
100 idx = 0
100 idx = 0
101 idy = 0
101 idy = 0
102 while sums[idx] < x:
102 while sums[idx] < x:
103 idx += 1
103 idx += 1
104 while sums[idy] < y:
104 while sums[idy] < y:
105 idy += 1
105 idy += 1
106 if weights[idy] > weights[idx]:
106 if weights[idy] > weights[idx]:
107 return idy
107 return idy
108 else:
108 else:
109 return idx
109 return idx
110
110
111 def leastload(loads):
111 def leastload(loads):
112 """Always choose the lowest load.
112 """Always choose the lowest load.
113
113
114 If the lowest load occurs more than once, the first
114 If the lowest load occurs more than once, the first
115 occurance will be used. If loads has LRU ordering, this means
115 occurance will be used. If loads has LRU ordering, this means
116 the LRU of those with the lowest load is chosen.
116 the LRU of those with the lowest load is chosen.
117 """
117 """
118 return loads.index(min(loads))
118 return loads.index(min(loads))
119
119
120 #---------------------------------------------------------------------
120 #---------------------------------------------------------------------
121 # Classes
121 # Classes
122 #---------------------------------------------------------------------
122 #---------------------------------------------------------------------
123
123
124
124
125 # store empty default dependency:
125 # store empty default dependency:
126 MET = Dependency([])
126 MET = Dependency([])
127
127
128
128
129 class Job(object):
129 class Job(object):
130 """Simple container for a job"""
130 """Simple container for a job"""
131 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
131 def __init__(self, msg_id, raw_msg, idents, msg, header, metadata,
132 targets, after, follow, timeout):
132 targets, after, follow, timeout):
133 self.msg_id = msg_id
133 self.msg_id = msg_id
134 self.raw_msg = raw_msg
134 self.raw_msg = raw_msg
135 self.idents = idents
135 self.idents = idents
136 self.msg = msg
136 self.msg = msg
137 self.header = header
137 self.header = header
138 self.metadata = metadata
138 self.metadata = metadata
139 self.targets = targets
139 self.targets = targets
140 self.after = after
140 self.after = after
141 self.follow = follow
141 self.follow = follow
142 self.timeout = timeout
142 self.timeout = timeout
143 self.removed = False # used for lazy-delete from sorted queue
143 self.removed = False # used for lazy-delete from sorted queue
144
144
145 self.timestamp = time.time()
145 self.timestamp = time.time()
146 self.blacklist = set()
146 self.blacklist = set()
147
147
148 def __lt__(self, other):
148 def __lt__(self, other):
149 return self.timestamp < other.timestamp
149 return self.timestamp < other.timestamp
150
150
151 def __cmp__(self, other):
151 def __cmp__(self, other):
152 return cmp(self.timestamp, other.timestamp)
152 return cmp(self.timestamp, other.timestamp)
153
153
154 @property
154 @property
155 def dependents(self):
155 def dependents(self):
156 return self.follow.union(self.after)
156 return self.follow.union(self.after)
157
157
158 class TaskScheduler(SessionFactory):
158 class TaskScheduler(SessionFactory):
159 """Python TaskScheduler object.
159 """Python TaskScheduler object.
160
160
161 This is the simplest object that supports msg_id based
161 This is the simplest object that supports msg_id based
162 DAG dependencies. *Only* task msg_ids are checked, not
162 DAG dependencies. *Only* task msg_ids are checked, not
163 msg_ids of jobs submitted via the MUX queue.
163 msg_ids of jobs submitted via the MUX queue.
164
164
165 """
165 """
166
166
167 hwm = Integer(1, config=True,
167 hwm = Integer(1, config=True,
168 help="""specify the High Water Mark (HWM) for the downstream
168 help="""specify the High Water Mark (HWM) for the downstream
169 socket in the Task scheduler. This is the maximum number
169 socket in the Task scheduler. This is the maximum number
170 of allowed outstanding tasks on each engine.
170 of allowed outstanding tasks on each engine.
171
171
172 The default (1) means that only one task can be outstanding on each
172 The default (1) means that only one task can be outstanding on each
173 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
173 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
174 engines continue to be assigned tasks while they are working,
174 engines continue to be assigned tasks while they are working,
175 effectively hiding network latency behind computation, but can result
175 effectively hiding network latency behind computation, but can result
176 in an imbalance of work when submitting many heterogenous tasks all at
176 in an imbalance of work when submitting many heterogenous tasks all at
177 once. Any positive value greater than one is a compromise between the
177 once. Any positive value greater than one is a compromise between the
178 two.
178 two.
179
179
180 """
180 """
181 )
181 )
182 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
182 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
183 'leastload', config=True, allow_none=False,
183 'leastload', config=True, allow_none=False,
184 help="""select the task scheduler scheme [default: Python LRU]
184 help="""select the task scheduler scheme [default: Python LRU]
185 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
185 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
186 )
186 )
187 def _scheme_name_changed(self, old, new):
187 def _scheme_name_changed(self, old, new):
188 self.log.debug("Using scheme %r"%new)
188 self.log.debug("Using scheme %r"%new)
189 self.scheme = globals()[new]
189 self.scheme = globals()[new]
190
190
191 # input arguments:
191 # input arguments:
192 scheme = Instance(FunctionType) # function for determining the destination
192 scheme = Instance(FunctionType) # function for determining the destination
193 def _scheme_default(self):
193 def _scheme_default(self):
194 return leastload
194 return leastload
195 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
195 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
196 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
196 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
197 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
197 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
198 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
198 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
200
200
201 # internals:
201 # internals:
202 queue = Instance(deque) # sorted list of Jobs
202 queue = Instance(deque) # sorted list of Jobs
203 def _queue_default(self):
203 def _queue_default(self):
204 return deque()
204 return deque()
205 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
205 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
206 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
206 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
207 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
207 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
208 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
208 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
209 pending = Dict() # dict by engine_uuid of submitted tasks
209 pending = Dict() # dict by engine_uuid of submitted tasks
210 completed = Dict() # dict by engine_uuid of completed tasks
210 completed = Dict() # dict by engine_uuid of completed tasks
211 failed = Dict() # dict by engine_uuid of failed tasks
211 failed = Dict() # dict by engine_uuid of failed tasks
212 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
212 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
213 clients = Dict() # dict by msg_id for who submitted the task
213 clients = Dict() # dict by msg_id for who submitted the task
214 targets = List() # list of target IDENTs
214 targets = List() # list of target IDENTs
215 loads = List() # list of engine loads
215 loads = List() # list of engine loads
216 # full = Set() # set of IDENTs that have HWM outstanding tasks
216 # full = Set() # set of IDENTs that have HWM outstanding tasks
217 all_completed = Set() # set of all completed tasks
217 all_completed = Set() # set of all completed tasks
218 all_failed = Set() # set of all failed tasks
218 all_failed = Set() # set of all failed tasks
219 all_done = Set() # set of all finished tasks=union(completed,failed)
219 all_done = Set() # set of all finished tasks=union(completed,failed)
220 all_ids = Set() # set of all submitted task IDs
220 all_ids = Set() # set of all submitted task IDs
221
221
222 ident = CBytes() # ZMQ identity. This should just be self.session.session
222 ident = CBytes() # ZMQ identity. This should just be self.session.session
223 # but ensure Bytes
223 # but ensure Bytes
224 def _ident_default(self):
224 def _ident_default(self):
225 return self.session.bsession
225 return self.session.bsession
226
226
227 def start(self):
227 def start(self):
228 self.query_stream.on_recv(self.dispatch_query_reply)
228 self.query_stream.on_recv(self.dispatch_query_reply)
229 self.session.send(self.query_stream, "connection_request", {})
229 self.session.send(self.query_stream, "connection_request", {})
230
230
231 self.engine_stream.on_recv(self.dispatch_result, copy=False)
231 self.engine_stream.on_recv(self.dispatch_result, copy=False)
232 self.client_stream.on_recv(self.dispatch_submission, copy=False)
232 self.client_stream.on_recv(self.dispatch_submission, copy=False)
233
233
234 self._notification_handlers = dict(
234 self._notification_handlers = dict(
235 registration_notification = self._register_engine,
235 registration_notification = self._register_engine,
236 unregistration_notification = self._unregister_engine
236 unregistration_notification = self._unregister_engine
237 )
237 )
238 self.notifier_stream.on_recv(self.dispatch_notification)
238 self.notifier_stream.on_recv(self.dispatch_notification)
239 self.log.info("Scheduler started [%s]" % self.scheme_name)
239 self.log.info("Scheduler started [%s]" % self.scheme_name)
240
240
241 def resume_receiving(self):
241 def resume_receiving(self):
242 """Resume accepting jobs."""
242 """Resume accepting jobs."""
243 self.client_stream.on_recv(self.dispatch_submission, copy=False)
243 self.client_stream.on_recv(self.dispatch_submission, copy=False)
244
244
245 def stop_receiving(self):
245 def stop_receiving(self):
246 """Stop accepting jobs while there are no engines.
246 """Stop accepting jobs while there are no engines.
247 Leave them in the ZMQ queue."""
247 Leave them in the ZMQ queue."""
248 self.client_stream.on_recv(None)
248 self.client_stream.on_recv(None)
249
249
250 #-----------------------------------------------------------------------
250 #-----------------------------------------------------------------------
251 # [Un]Registration Handling
251 # [Un]Registration Handling
252 #-----------------------------------------------------------------------
252 #-----------------------------------------------------------------------
253
253
254
254
255 def dispatch_query_reply(self, msg):
255 def dispatch_query_reply(self, msg):
256 """handle reply to our initial connection request"""
256 """handle reply to our initial connection request"""
257 try:
257 try:
258 idents,msg = self.session.feed_identities(msg)
258 idents,msg = self.session.feed_identities(msg)
259 except ValueError:
259 except ValueError:
260 self.log.warn("task::Invalid Message: %r",msg)
260 self.log.warn("task::Invalid Message: %r",msg)
261 return
261 return
262 try:
262 try:
263 msg = self.session.unserialize(msg)
263 msg = self.session.unserialize(msg)
264 except ValueError:
264 except ValueError:
265 self.log.warn("task::Unauthorized message from: %r"%idents)
265 self.log.warn("task::Unauthorized message from: %r"%idents)
266 return
266 return
267
267
268 content = msg['content']
268 content = msg['content']
269 for uuid in content.get('engines', {}).values():
269 for uuid in content.get('engines', {}).values():
270 self._register_engine(cast_bytes(uuid))
270 self._register_engine(cast_bytes(uuid))
271
271
272
272
273 @util.log_errors
273 @util.log_errors
274 def dispatch_notification(self, msg):
274 def dispatch_notification(self, msg):
275 """dispatch register/unregister events."""
275 """dispatch register/unregister events."""
276 try:
276 try:
277 idents,msg = self.session.feed_identities(msg)
277 idents,msg = self.session.feed_identities(msg)
278 except ValueError:
278 except ValueError:
279 self.log.warn("task::Invalid Message: %r",msg)
279 self.log.warn("task::Invalid Message: %r",msg)
280 return
280 return
281 try:
281 try:
282 msg = self.session.unserialize(msg)
282 msg = self.session.unserialize(msg)
283 except ValueError:
283 except ValueError:
284 self.log.warn("task::Unauthorized message from: %r"%idents)
284 self.log.warn("task::Unauthorized message from: %r"%idents)
285 return
285 return
286
286
287 msg_type = msg['header']['msg_type']
287 msg_type = msg['header']['msg_type']
288
288
289 handler = self._notification_handlers.get(msg_type, None)
289 handler = self._notification_handlers.get(msg_type, None)
290 if handler is None:
290 if handler is None:
291 self.log.error("Unhandled message type: %r"%msg_type)
291 self.log.error("Unhandled message type: %r"%msg_type)
292 else:
292 else:
293 try:
293 try:
294 handler(cast_bytes(msg['content']['uuid']))
294 handler(cast_bytes(msg['content']['uuid']))
295 except Exception:
295 except Exception:
296 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
296 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
297
297
298 def _register_engine(self, uid):
298 def _register_engine(self, uid):
299 """New engine with ident `uid` became available."""
299 """New engine with ident `uid` became available."""
300 # head of the line:
300 # head of the line:
301 self.targets.insert(0,uid)
301 self.targets.insert(0,uid)
302 self.loads.insert(0,0)
302 self.loads.insert(0,0)
303
303
304 # initialize sets
304 # initialize sets
305 self.completed[uid] = set()
305 self.completed[uid] = set()
306 self.failed[uid] = set()
306 self.failed[uid] = set()
307 self.pending[uid] = {}
307 self.pending[uid] = {}
308
308
309 # rescan the graph:
309 # rescan the graph:
310 self.update_graph(None)
310 self.update_graph(None)
311
311
312 def _unregister_engine(self, uid):
312 def _unregister_engine(self, uid):
313 """Existing engine with ident `uid` became unavailable."""
313 """Existing engine with ident `uid` became unavailable."""
314 if len(self.targets) == 1:
314 if len(self.targets) == 1:
315 # this was our only engine
315 # this was our only engine
316 pass
316 pass
317
317
318 # handle any potentially finished tasks:
318 # handle any potentially finished tasks:
319 self.engine_stream.flush()
319 self.engine_stream.flush()
320
320
321 # don't pop destinations, because they might be used later
321 # don't pop destinations, because they might be used later
322 # map(self.destinations.pop, self.completed.pop(uid))
322 # map(self.destinations.pop, self.completed.pop(uid))
323 # map(self.destinations.pop, self.failed.pop(uid))
323 # map(self.destinations.pop, self.failed.pop(uid))
324
324
325 # prevent this engine from receiving work
325 # prevent this engine from receiving work
326 idx = self.targets.index(uid)
326 idx = self.targets.index(uid)
327 self.targets.pop(idx)
327 self.targets.pop(idx)
328 self.loads.pop(idx)
328 self.loads.pop(idx)
329
329
330 # wait 5 seconds before cleaning up pending jobs, since the results might
330 # wait 5 seconds before cleaning up pending jobs, since the results might
331 # still be incoming
331 # still be incoming
332 if self.pending[uid]:
332 if self.pending[uid]:
333 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
333 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
334 dc.start()
334 dc.start()
335 else:
335 else:
336 self.completed.pop(uid)
336 self.completed.pop(uid)
337 self.failed.pop(uid)
337 self.failed.pop(uid)
338
338
339
339
340 def handle_stranded_tasks(self, engine):
340 def handle_stranded_tasks(self, engine):
341 """Deal with jobs resident in an engine that died."""
341 """Deal with jobs resident in an engine that died."""
342 lost = self.pending[engine]
342 lost = self.pending[engine]
343 for msg_id in lost.keys():
343 for msg_id in lost.keys():
344 if msg_id not in self.pending[engine]:
344 if msg_id not in self.pending[engine]:
345 # prevent double-handling of messages
345 # prevent double-handling of messages
346 continue
346 continue
347
347
348 raw_msg = lost[msg_id].raw_msg
348 raw_msg = lost[msg_id].raw_msg
349 idents,msg = self.session.feed_identities(raw_msg, copy=False)
349 idents,msg = self.session.feed_identities(raw_msg, copy=False)
350 parent = self.session.unpack(msg[1].bytes)
350 parent = self.session.unpack(msg[1].bytes)
351 idents = [engine, idents[0]]
351 idents = [engine, idents[0]]
352
352
353 # build fake error reply
353 # build fake error reply
354 try:
354 try:
355 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
355 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
356 except:
356 except:
357 content = error.wrap_exception()
357 content = error.wrap_exception()
358 # build fake metadata
358 # build fake metadata
359 md = dict(
359 md = dict(
360 status=u'error',
360 status=u'error',
361 engine=engine,
361 engine=engine,
362 date=datetime.now(),
362 date=datetime.now(),
363 )
363 )
364 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
364 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
365 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
365 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
366 # and dispatch it
366 # and dispatch it
367 self.dispatch_result(raw_reply)
367 self.dispatch_result(raw_reply)
368
368
369 # finally scrub completed/failed lists
369 # finally scrub completed/failed lists
370 self.completed.pop(engine)
370 self.completed.pop(engine)
371 self.failed.pop(engine)
371 self.failed.pop(engine)
372
372
373
373
374 #-----------------------------------------------------------------------
374 #-----------------------------------------------------------------------
375 # Job Submission
375 # Job Submission
376 #-----------------------------------------------------------------------
376 #-----------------------------------------------------------------------
377
377
378
378
379 @util.log_errors
379 @util.log_errors
380 def dispatch_submission(self, raw_msg):
380 def dispatch_submission(self, raw_msg):
381 """Dispatch job submission to appropriate handlers."""
381 """Dispatch job submission to appropriate handlers."""
382 # ensure targets up to date:
382 # ensure targets up to date:
383 self.notifier_stream.flush()
383 self.notifier_stream.flush()
384 try:
384 try:
385 idents, msg = self.session.feed_identities(raw_msg, copy=False)
385 idents, msg = self.session.feed_identities(raw_msg, copy=False)
386 msg = self.session.unserialize(msg, content=False, copy=False)
386 msg = self.session.unserialize(msg, content=False, copy=False)
387 except Exception:
387 except Exception:
388 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
388 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
389 return
389 return
390
390
391
391
392 # send to monitor
392 # send to monitor
393 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
393 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
394
394
395 header = msg['header']
395 header = msg['header']
396 md = msg['metadata']
396 md = msg['metadata']
397 msg_id = header['msg_id']
397 msg_id = header['msg_id']
398 self.all_ids.add(msg_id)
398 self.all_ids.add(msg_id)
399
399
400 # get targets as a set of bytes objects
400 # get targets as a set of bytes objects
401 # from a list of unicode objects
401 # from a list of unicode objects
402 targets = md.get('targets', [])
402 targets = md.get('targets', [])
403 targets = map(cast_bytes, targets)
403 targets = map(cast_bytes, targets)
404 targets = set(targets)
404 targets = set(targets)
405
405
406 retries = md.get('retries', 0)
406 retries = md.get('retries', 0)
407 self.retries[msg_id] = retries
407 self.retries[msg_id] = retries
408
408
409 # time dependencies
409 # time dependencies
410 after = md.get('after', None)
410 after = md.get('after', None)
411 if after:
411 if after:
412 after = Dependency(after)
412 after = Dependency(after)
413 if after.all:
413 if after.all:
414 if after.success:
414 if after.success:
415 after = Dependency(after.difference(self.all_completed),
415 after = Dependency(after.difference(self.all_completed),
416 success=after.success,
416 success=after.success,
417 failure=after.failure,
417 failure=after.failure,
418 all=after.all,
418 all=after.all,
419 )
419 )
420 if after.failure:
420 if after.failure:
421 after = Dependency(after.difference(self.all_failed),
421 after = Dependency(after.difference(self.all_failed),
422 success=after.success,
422 success=after.success,
423 failure=after.failure,
423 failure=after.failure,
424 all=after.all,
424 all=after.all,
425 )
425 )
426 if after.check(self.all_completed, self.all_failed):
426 if after.check(self.all_completed, self.all_failed):
427 # recast as empty set, if `after` already met,
427 # recast as empty set, if `after` already met,
428 # to prevent unnecessary set comparisons
428 # to prevent unnecessary set comparisons
429 after = MET
429 after = MET
430 else:
430 else:
431 after = MET
431 after = MET
432
432
433 # location dependencies
433 # location dependencies
434 follow = Dependency(md.get('follow', []))
434 follow = Dependency(md.get('follow', []))
435
435
436 # turn timeouts into datetime objects:
436 # turn timeouts into datetime objects:
437 timeout = md.get('timeout', None)
437 timeout = md.get('timeout', None)
438 if timeout:
438 if timeout:
439 timeout = time.time() + float(timeout)
439 timeout = time.time() + float(timeout)
440
440
441 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
441 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
442 header=header, targets=targets, after=after, follow=follow,
442 header=header, targets=targets, after=after, follow=follow,
443 timeout=timeout, metadata=md,
443 timeout=timeout, metadata=md,
444 )
444 )
445 if timeout:
445 if timeout:
446 # schedule timeout callback
446 # schedule timeout callback
447 self.loop.add_timeout(timeout, lambda : self.job_timeout(job))
447 self.loop.add_timeout(timeout, lambda : self.job_timeout(job))
448
448
449 # validate and reduce dependencies:
449 # validate and reduce dependencies:
450 for dep in after,follow:
450 for dep in after,follow:
451 if not dep: # empty dependency
451 if not dep: # empty dependency
452 continue
452 continue
453 # check valid:
453 # check valid:
454 if msg_id in dep or dep.difference(self.all_ids):
454 if msg_id in dep or dep.difference(self.all_ids):
455 self.queue_map[msg_id] = job
455 self.queue_map[msg_id] = job
456 return self.fail_unreachable(msg_id, error.InvalidDependency)
456 return self.fail_unreachable(msg_id, error.InvalidDependency)
457 # check if unreachable:
457 # check if unreachable:
458 if dep.unreachable(self.all_completed, self.all_failed):
458 if dep.unreachable(self.all_completed, self.all_failed):
459 self.queue_map[msg_id] = job
459 self.queue_map[msg_id] = job
460 return self.fail_unreachable(msg_id)
460 return self.fail_unreachable(msg_id)
461
461
462 if after.check(self.all_completed, self.all_failed):
462 if after.check(self.all_completed, self.all_failed):
463 # time deps already met, try to run
463 # time deps already met, try to run
464 if not self.maybe_run(job):
464 if not self.maybe_run(job):
465 # can't run yet
465 # can't run yet
466 if msg_id not in self.all_failed:
466 if msg_id not in self.all_failed:
467 # could have failed as unreachable
467 # could have failed as unreachable
468 self.save_unmet(job)
468 self.save_unmet(job)
469 else:
469 else:
470 self.save_unmet(job)
470 self.save_unmet(job)
471
471
472 def job_timeout(self, job):
472 def job_timeout(self, job):
473 """callback for a job's timeout.
473 """callback for a job's timeout.
474
474
475 The job may or may not have been run at this point.
475 The job may or may not have been run at this point.
476 """
476 """
477 now = time.time()
477 now = time.time()
478 if job.timeout >= (now + 1):
478 if job.timeout >= (now + 1):
479 self.log.warn("task %s timeout fired prematurely: %s > %s",
479 self.log.warn("task %s timeout fired prematurely: %s > %s",
480 job.msg_id, job.timeout, now
480 job.msg_id, job.timeout, now
481 )
481 )
482 if job.msg_id in self.queue_map:
482 if job.msg_id in self.queue_map:
483 # still waiting, but ran out of time
483 # still waiting, but ran out of time
484 self.log.info("task %r timed out", job.msg_id)
484 self.log.info("task %r timed out", job.msg_id)
485 self.fail_unreachable(job.msg_id, error.TaskTimeout)
485 self.fail_unreachable(job.msg_id, error.TaskTimeout)
486
486
487 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
487 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
488 """a task has become unreachable, send a reply with an ImpossibleDependency
488 """a task has become unreachable, send a reply with an ImpossibleDependency
489 error."""
489 error."""
490 if msg_id not in self.queue_map:
490 if msg_id not in self.queue_map:
491 self.log.error("task %r already failed!", msg_id)
491 self.log.error("task %r already failed!", msg_id)
492 return
492 return
493 job = self.queue_map.pop(msg_id)
493 job = self.queue_map.pop(msg_id)
494 # lazy-delete from the queue
494 # lazy-delete from the queue
495 job.removed = True
495 job.removed = True
496 for mid in job.dependents:
496 for mid in job.dependents:
497 if mid in self.graph:
497 if mid in self.graph:
498 self.graph[mid].remove(msg_id)
498 self.graph[mid].remove(msg_id)
499
499
500 try:
500 try:
501 raise why()
501 raise why()
502 except:
502 except:
503 content = error.wrap_exception()
503 content = error.wrap_exception()
504 self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename'])
504 self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename'])
505
505
506 self.all_done.add(msg_id)
506 self.all_done.add(msg_id)
507 self.all_failed.add(msg_id)
507 self.all_failed.add(msg_id)
508
508
509 msg = self.session.send(self.client_stream, 'apply_reply', content,
509 msg = self.session.send(self.client_stream, 'apply_reply', content,
510 parent=job.header, ident=job.idents)
510 parent=job.header, ident=job.idents)
511 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
511 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
512
512
513 self.update_graph(msg_id, success=False)
513 self.update_graph(msg_id, success=False)
514
514
515 def available_engines(self):
515 def available_engines(self):
516 """return a list of available engine indices based on HWM"""
516 """return a list of available engine indices based on HWM"""
517 if not self.hwm:
517 if not self.hwm:
518 return range(len(self.targets))
518 return range(len(self.targets))
519 available = []
519 available = []
520 for idx in range(len(self.targets)):
520 for idx in range(len(self.targets)):
521 if self.loads[idx] < self.hwm:
521 if self.loads[idx] < self.hwm:
522 available.append(idx)
522 available.append(idx)
523 return available
523 return available
524
524
525 def maybe_run(self, job):
525 def maybe_run(self, job):
526 """check location dependencies, and run if they are met."""
526 """check location dependencies, and run if they are met."""
527 msg_id = job.msg_id
527 msg_id = job.msg_id
528 self.log.debug("Attempting to assign task %s", msg_id)
528 self.log.debug("Attempting to assign task %s", msg_id)
529 available = self.available_engines()
529 available = self.available_engines()
530 if not available:
530 if not available:
531 # no engines, definitely can't run
531 # no engines, definitely can't run
532 return False
532 return False
533
533
534 if job.follow or job.targets or job.blacklist or self.hwm:
534 if job.follow or job.targets or job.blacklist or self.hwm:
535 # we need a can_run filter
535 # we need a can_run filter
536 def can_run(idx):
536 def can_run(idx):
537 # check hwm
537 # check hwm
538 if self.hwm and self.loads[idx] == self.hwm:
538 if self.hwm and self.loads[idx] == self.hwm:
539 return False
539 return False
540 target = self.targets[idx]
540 target = self.targets[idx]
541 # check blacklist
541 # check blacklist
542 if target in job.blacklist:
542 if target in job.blacklist:
543 return False
543 return False
544 # check targets
544 # check targets
545 if job.targets and target not in job.targets:
545 if job.targets and target not in job.targets:
546 return False
546 return False
547 # check follow
547 # check follow
548 return job.follow.check(self.completed[target], self.failed[target])
548 return job.follow.check(self.completed[target], self.failed[target])
549
549
550 indices = filter(can_run, available)
550 indices = filter(can_run, available)
551
551
552 if not indices:
552 if not indices:
553 # couldn't run
553 # couldn't run
554 if job.follow.all:
554 if job.follow.all:
555 # check follow for impossibility
555 # check follow for impossibility
556 dests = set()
556 dests = set()
557 relevant = set()
557 relevant = set()
558 if job.follow.success:
558 if job.follow.success:
559 relevant = self.all_completed
559 relevant = self.all_completed
560 if job.follow.failure:
560 if job.follow.failure:
561 relevant = relevant.union(self.all_failed)
561 relevant = relevant.union(self.all_failed)
562 for m in job.follow.intersection(relevant):
562 for m in job.follow.intersection(relevant):
563 dests.add(self.destinations[m])
563 dests.add(self.destinations[m])
564 if len(dests) > 1:
564 if len(dests) > 1:
565 self.queue_map[msg_id] = job
565 self.queue_map[msg_id] = job
566 self.fail_unreachable(msg_id)
566 self.fail_unreachable(msg_id)
567 return False
567 return False
568 if job.targets:
568 if job.targets:
569 # check blacklist+targets for impossibility
569 # check blacklist+targets for impossibility
570 job.targets.difference_update(job.blacklist)
570 job.targets.difference_update(job.blacklist)
571 if not job.targets or not job.targets.intersection(self.targets):
571 if not job.targets or not job.targets.intersection(self.targets):
572 self.queue_map[msg_id] = job
572 self.queue_map[msg_id] = job
573 self.fail_unreachable(msg_id)
573 self.fail_unreachable(msg_id)
574 return False
574 return False
575 return False
575 return False
576 else:
576 else:
577 indices = None
577 indices = None
578
578
579 self.submit_task(job, indices)
579 self.submit_task(job, indices)
580 return True
580 return True
581
581
582 def save_unmet(self, job):
582 def save_unmet(self, job):
583 """Save a message for later submission when its dependencies are met."""
583 """Save a message for later submission when its dependencies are met."""
584 msg_id = job.msg_id
584 msg_id = job.msg_id
585 self.log.debug("Adding task %s to the queue", msg_id)
585 self.log.debug("Adding task %s to the queue", msg_id)
586 self.queue_map[msg_id] = job
586 self.queue_map[msg_id] = job
587 self.queue.append(job)
587 self.queue.append(job)
588 # track the ids in follow or after, but not those already finished
588 # track the ids in follow or after, but not those already finished
589 for dep_id in job.after.union(job.follow).difference(self.all_done):
589 for dep_id in job.after.union(job.follow).difference(self.all_done):
590 if dep_id not in self.graph:
590 if dep_id not in self.graph:
591 self.graph[dep_id] = set()
591 self.graph[dep_id] = set()
592 self.graph[dep_id].add(msg_id)
592 self.graph[dep_id].add(msg_id)
593
593
594 def submit_task(self, job, indices=None):
594 def submit_task(self, job, indices=None):
595 """Submit a task to any of a subset of our targets."""
595 """Submit a task to any of a subset of our targets."""
596 if indices:
596 if indices:
597 loads = [self.loads[i] for i in indices]
597 loads = [self.loads[i] for i in indices]
598 else:
598 else:
599 loads = self.loads
599 loads = self.loads
600 idx = self.scheme(loads)
600 idx = self.scheme(loads)
601 if indices:
601 if indices:
602 idx = indices[idx]
602 idx = indices[idx]
603 target = self.targets[idx]
603 target = self.targets[idx]
604 # print (target, map(str, msg[:3]))
604 # print (target, map(str, msg[:3]))
605 # send job to the engine
605 # send job to the engine
606 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
606 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
607 self.engine_stream.send_multipart(job.raw_msg, copy=False)
607 self.engine_stream.send_multipart(job.raw_msg, copy=False)
608 # update load
608 # update load
609 self.add_job(idx)
609 self.add_job(idx)
610 self.pending[target][job.msg_id] = job
610 self.pending[target][job.msg_id] = job
611 # notify Hub
611 # notify Hub
612 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
612 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
613 self.session.send(self.mon_stream, 'task_destination', content=content,
613 self.session.send(self.mon_stream, 'task_destination', content=content,
614 ident=[b'tracktask',self.ident])
614 ident=[b'tracktask',self.ident])
615
615
616
616
617 #-----------------------------------------------------------------------
617 #-----------------------------------------------------------------------
618 # Result Handling
618 # Result Handling
619 #-----------------------------------------------------------------------
619 #-----------------------------------------------------------------------
620
620
621
621
622 @util.log_errors
622 @util.log_errors
623 def dispatch_result(self, raw_msg):
623 def dispatch_result(self, raw_msg):
624 """dispatch method for result replies"""
624 """dispatch method for result replies"""
625 try:
625 try:
626 idents,msg = self.session.feed_identities(raw_msg, copy=False)
626 idents,msg = self.session.feed_identities(raw_msg, copy=False)
627 msg = self.session.unserialize(msg, content=False, copy=False)
627 msg = self.session.unserialize(msg, content=False, copy=False)
628 engine = idents[0]
628 engine = idents[0]
629 try:
629 try:
630 idx = self.targets.index(engine)
630 idx = self.targets.index(engine)
631 except ValueError:
631 except ValueError:
632 pass # skip load-update for dead engines
632 pass # skip load-update for dead engines
633 else:
633 else:
634 self.finish_job(idx)
634 self.finish_job(idx)
635 except Exception:
635 except Exception:
636 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
636 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
637 return
637 return
638
638
639 md = msg['metadata']
639 md = msg['metadata']
640 parent = msg['parent_header']
640 parent = msg['parent_header']
641 if md.get('dependencies_met', True):
641 if md.get('dependencies_met', True):
642 success = (md['status'] == 'ok')
642 success = (md['status'] == 'ok')
643 msg_id = parent['msg_id']
643 msg_id = parent['msg_id']
644 retries = self.retries[msg_id]
644 retries = self.retries[msg_id]
645 if not success and retries > 0:
645 if not success and retries > 0:
646 # failed
646 # failed
647 self.retries[msg_id] = retries - 1
647 self.retries[msg_id] = retries - 1
648 self.handle_unmet_dependency(idents, parent)
648 self.handle_unmet_dependency(idents, parent)
649 else:
649 else:
650 del self.retries[msg_id]
650 del self.retries[msg_id]
651 # relay to client and update graph
651 # relay to client and update graph
652 self.handle_result(idents, parent, raw_msg, success)
652 self.handle_result(idents, parent, raw_msg, success)
653 # send to Hub monitor
653 # send to Hub monitor
654 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
654 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
655 else:
655 else:
656 self.handle_unmet_dependency(idents, parent)
656 self.handle_unmet_dependency(idents, parent)
657
657
658 def handle_result(self, idents, parent, raw_msg, success=True):
658 def handle_result(self, idents, parent, raw_msg, success=True):
659 """handle a real task result, either success or failure"""
659 """handle a real task result, either success or failure"""
660 # first, relay result to client
660 # first, relay result to client
661 engine = idents[0]
661 engine = idents[0]
662 client = idents[1]
662 client = idents[1]
663 # swap_ids for ROUTER-ROUTER mirror
663 # swap_ids for ROUTER-ROUTER mirror
664 raw_msg[:2] = [client,engine]
664 raw_msg[:2] = [client,engine]
665 # print (map(str, raw_msg[:4]))
665 # print (map(str, raw_msg[:4]))
666 self.client_stream.send_multipart(raw_msg, copy=False)
666 self.client_stream.send_multipart(raw_msg, copy=False)
667 # now, update our data structures
667 # now, update our data structures
668 msg_id = parent['msg_id']
668 msg_id = parent['msg_id']
669 self.pending[engine].pop(msg_id)
669 self.pending[engine].pop(msg_id)
670 if success:
670 if success:
671 self.completed[engine].add(msg_id)
671 self.completed[engine].add(msg_id)
672 self.all_completed.add(msg_id)
672 self.all_completed.add(msg_id)
673 else:
673 else:
674 self.failed[engine].add(msg_id)
674 self.failed[engine].add(msg_id)
675 self.all_failed.add(msg_id)
675 self.all_failed.add(msg_id)
676 self.all_done.add(msg_id)
676 self.all_done.add(msg_id)
677 self.destinations[msg_id] = engine
677 self.destinations[msg_id] = engine
678
678
679 self.update_graph(msg_id, success)
679 self.update_graph(msg_id, success)
680
680
681 def handle_unmet_dependency(self, idents, parent):
681 def handle_unmet_dependency(self, idents, parent):
682 """handle an unmet dependency"""
682 """handle an unmet dependency"""
683 engine = idents[0]
683 engine = idents[0]
684 msg_id = parent['msg_id']
684 msg_id = parent['msg_id']
685
685
686 job = self.pending[engine].pop(msg_id)
686 job = self.pending[engine].pop(msg_id)
687 job.blacklist.add(engine)
687 job.blacklist.add(engine)
688
688
689 if job.blacklist == job.targets:
689 if job.blacklist == job.targets:
690 self.queue_map[msg_id] = job
690 self.queue_map[msg_id] = job
691 self.fail_unreachable(msg_id)
691 self.fail_unreachable(msg_id)
692 elif not self.maybe_run(job):
692 elif not self.maybe_run(job):
693 # resubmit failed
693 # resubmit failed
694 if msg_id not in self.all_failed:
694 if msg_id not in self.all_failed:
695 # put it back in our dependency tree
695 # put it back in our dependency tree
696 self.save_unmet(job)
696 self.save_unmet(job)
697
697
698 if self.hwm:
698 if self.hwm:
699 try:
699 try:
700 idx = self.targets.index(engine)
700 idx = self.targets.index(engine)
701 except ValueError:
701 except ValueError:
702 pass # skip load-update for dead engines
702 pass # skip load-update for dead engines
703 else:
703 else:
704 if self.loads[idx] == self.hwm-1:
704 if self.loads[idx] == self.hwm-1:
705 self.update_graph(None)
705 self.update_graph(None)
706
706
707 def update_graph(self, dep_id=None, success=True):
707 def update_graph(self, dep_id=None, success=True):
708 """dep_id just finished. Update our dependency
708 """dep_id just finished. Update our dependency
709 graph and submit any jobs that just became runnable.
709 graph and submit any jobs that just became runnable.
710
710
711 Called with dep_id=None to update entire graph for hwm, but without finishing a task.
711 Called with dep_id=None to update entire graph for hwm, but without finishing a task.
712 """
712 """
713 # print ("\n\n***********")
713 # print ("\n\n***********")
714 # pprint (dep_id)
714 # pprint (dep_id)
715 # pprint (self.graph)
715 # pprint (self.graph)
716 # pprint (self.queue_map)
716 # pprint (self.queue_map)
717 # pprint (self.all_completed)
717 # pprint (self.all_completed)
718 # pprint (self.all_failed)
718 # pprint (self.all_failed)
719 # print ("\n\n***********\n\n")
719 # print ("\n\n***********\n\n")
720 # update any jobs that depended on the dependency
720 # update any jobs that depended on the dependency
721 msg_ids = self.graph.pop(dep_id, [])
721 msg_ids = self.graph.pop(dep_id, [])
722
722
723 # recheck *all* jobs if
723 # recheck *all* jobs if
724 # a) we have HWM and an engine just become no longer full
724 # a) we have HWM and an engine just become no longer full
725 # or b) dep_id was given as None
725 # or b) dep_id was given as None
726
726
727 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
727 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
728 jobs = self.queue
728 jobs = self.queue
729 using_queue = True
729 using_queue = True
730 else:
730 else:
731 using_queue = False
731 using_queue = False
732 jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids ))
732 jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids ))
733
733
734 to_restore = []
734 to_restore = []
735 while jobs:
735 while jobs:
736 job = jobs.popleft()
736 job = jobs.popleft()
737 if job.removed:
737 if job.removed:
738 continue
738 continue
739 msg_id = job.msg_id
739 msg_id = job.msg_id
740
740
741 put_it_back = True
741 put_it_back = True
742
742
743 if job.after.unreachable(self.all_completed, self.all_failed)\
743 if job.after.unreachable(self.all_completed, self.all_failed)\
744 or job.follow.unreachable(self.all_completed, self.all_failed):
744 or job.follow.unreachable(self.all_completed, self.all_failed):
745 self.fail_unreachable(msg_id)
745 self.fail_unreachable(msg_id)
746 put_it_back = False
746 put_it_back = False
747
747
748 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
748 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
749 if self.maybe_run(job):
749 if self.maybe_run(job):
750 put_it_back = False
750 put_it_back = False
751 self.queue_map.pop(msg_id)
751 self.queue_map.pop(msg_id)
752 for mid in job.dependents:
752 for mid in job.dependents:
753 if mid in self.graph:
753 if mid in self.graph:
754 self.graph[mid].remove(msg_id)
754 self.graph[mid].remove(msg_id)
755
755
756 # abort the loop if we just filled up all of our engines.
756 # abort the loop if we just filled up all of our engines.
757 # avoids an O(N) operation in situation of full queue,
757 # avoids an O(N) operation in situation of full queue,
758 # where graph update is triggered as soon as an engine becomes
758 # where graph update is triggered as soon as an engine becomes
759 # non-full, and all tasks after the first are checked,
759 # non-full, and all tasks after the first are checked,
760 # even though they can't run.
760 # even though they can't run.
761 if not self.available_engines():
761 if not self.available_engines():
762 break
762 break
763
763
764 if using_queue and put_it_back:
764 if using_queue and put_it_back:
765 # popped a job from the queue but it neither ran nor failed,
765 # popped a job from the queue but it neither ran nor failed,
766 # so we need to put it back when we are done
766 # so we need to put it back when we are done
767 # make sure to_restore preserves the same ordering
767 # make sure to_restore preserves the same ordering
768 to_restore.append(job)
768 to_restore.append(job)
769
769
770 # put back any tasks we popped but didn't run
770 # put back any tasks we popped but didn't run
771 if using_queue:
771 if using_queue:
772 self.queue.extendleft(to_restore)
772 self.queue.extendleft(to_restore)
773
773
774 #----------------------------------------------------------------------
774 #----------------------------------------------------------------------
775 # methods to be overridden by subclasses
775 # methods to be overridden by subclasses
776 #----------------------------------------------------------------------
776 #----------------------------------------------------------------------
777
777
778 def add_job(self, idx):
778 def add_job(self, idx):
779 """Called after self.targets[idx] just got the job with header.
779 """Called after self.targets[idx] just got the job with header.
780 Override with subclasses. The default ordering is simple LRU.
780 Override with subclasses. The default ordering is simple LRU.
781 The default loads are the number of outstanding jobs."""
781 The default loads are the number of outstanding jobs."""
782 self.loads[idx] += 1
782 self.loads[idx] += 1
783 for lis in (self.targets, self.loads):
783 for lis in (self.targets, self.loads):
784 lis.append(lis.pop(idx))
784 lis.append(lis.pop(idx))
785
785
786
786
787 def finish_job(self, idx):
787 def finish_job(self, idx):
788 """Called after self.targets[idx] just finished a job.
788 """Called after self.targets[idx] just finished a job.
789 Override with subclasses."""
789 Override with subclasses."""
790 self.loads[idx] -= 1
790 self.loads[idx] -= 1
791
791
792
792
793
793
794 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
794 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
795 logname='root', log_url=None, loglevel=logging.DEBUG,
795 logname='root', log_url=None, loglevel=logging.DEBUG,
796 identity=b'task', in_thread=False):
796 identity=b'task', in_thread=False):
797
797
798 ZMQStream = zmqstream.ZMQStream
798 ZMQStream = zmqstream.ZMQStream
799
799
800 if config:
800 if config:
801 # unwrap dict back into Config
801 # unwrap dict back into Config
802 config = Config(config)
802 config = Config(config)
803
803
804 if in_thread:
804 if in_thread:
805 # use instance() to get the same Context/Loop as our parent
805 # use instance() to get the same Context/Loop as our parent
806 ctx = zmq.Context.instance()
806 ctx = zmq.Context.instance()
807 loop = ioloop.IOLoop.instance()
807 loop = ioloop.IOLoop.instance()
808 else:
808 else:
809 # in a process, don't use instance()
809 # in a process, don't use instance()
810 # for safety with multiprocessing
810 # for safety with multiprocessing
811 ctx = zmq.Context()
811 ctx = zmq.Context()
812 loop = ioloop.IOLoop()
812 loop = ioloop.IOLoop()
813 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
813 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
814 util.set_hwm(ins, 0)
814 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
815 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
815 ins.bind(in_addr)
816 ins.bind(in_addr)
816
817
817 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
818 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
819 util.set_hwm(outs, 0)
818 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
820 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
819 outs.bind(out_addr)
821 outs.bind(out_addr)
820 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
822 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
823 util.set_hwm(mons, 0)
821 mons.connect(mon_addr)
824 mons.connect(mon_addr)
822 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
825 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
823 nots.setsockopt(zmq.SUBSCRIBE, b'')
826 nots.setsockopt(zmq.SUBSCRIBE, b'')
824 nots.connect(not_addr)
827 nots.connect(not_addr)
825
828
826 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
829 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
827 querys.connect(reg_addr)
830 querys.connect(reg_addr)
828
831
829 # setup logging.
832 # setup logging.
830 if in_thread:
833 if in_thread:
831 log = Application.instance().log
834 log = Application.instance().log
832 else:
835 else:
833 if log_url:
836 if log_url:
834 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
837 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
835 else:
838 else:
836 log = local_logger(logname, loglevel)
839 log = local_logger(logname, loglevel)
837
840
838 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
841 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
839 mon_stream=mons, notifier_stream=nots,
842 mon_stream=mons, notifier_stream=nots,
840 query_stream=querys,
843 query_stream=querys,
841 loop=loop, log=log,
844 loop=loop, log=log,
842 config=config)
845 config=config)
843 scheduler.start()
846 scheduler.start()
844 if not in_thread:
847 if not in_thread:
845 try:
848 try:
846 loop.start()
849 loop.start()
847 except KeyboardInterrupt:
850 except KeyboardInterrupt:
848 scheduler.log.critical("Interrupted, exiting...")
851 scheduler.log.critical("Interrupted, exiting...")
849
852
@@ -1,351 +1,368 b''
1 """some generic utilities for dealing with classes, urls, and serialization
1 """some generic utilities for dealing with classes, urls, and serialization
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23 import socket
23 import socket
24 import sys
24 import sys
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
26 try:
26 try:
27 from signal import SIGKILL
27 from signal import SIGKILL
28 except ImportError:
28 except ImportError:
29 SIGKILL=None
29 SIGKILL=None
30
30
31 try:
31 try:
32 import cPickle
32 import cPickle
33 pickle = cPickle
33 pickle = cPickle
34 except:
34 except:
35 cPickle = None
35 cPickle = None
36 import pickle
36 import pickle
37
37
38 # System library imports
38 # System library imports
39 import zmq
39 import zmq
40 from zmq.log import handlers
40 from zmq.log import handlers
41
41
42 from IPython.external.decorator import decorator
42 from IPython.external.decorator import decorator
43
43
44 # IPython imports
44 # IPython imports
45 from IPython.config.application import Application
45 from IPython.config.application import Application
46 from IPython.utils.localinterfaces import LOCALHOST, PUBLIC_IPS
46 from IPython.utils.localinterfaces import LOCALHOST, PUBLIC_IPS
47 from IPython.kernel.zmq.log import EnginePUBHandler
47 from IPython.kernel.zmq.log import EnginePUBHandler
48 from IPython.kernel.zmq.serialize import (
48 from IPython.kernel.zmq.serialize import (
49 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
49 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
50 )
50 )
51
51
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 # Classes
53 # Classes
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55
55
56 class Namespace(dict):
56 class Namespace(dict):
57 """Subclass of dict for attribute access to keys."""
57 """Subclass of dict for attribute access to keys."""
58
58
59 def __getattr__(self, key):
59 def __getattr__(self, key):
60 """getattr aliased to getitem"""
60 """getattr aliased to getitem"""
61 if key in self.iterkeys():
61 if key in self.iterkeys():
62 return self[key]
62 return self[key]
63 else:
63 else:
64 raise NameError(key)
64 raise NameError(key)
65
65
66 def __setattr__(self, key, value):
66 def __setattr__(self, key, value):
67 """setattr aliased to setitem, with strict"""
67 """setattr aliased to setitem, with strict"""
68 if hasattr(dict, key):
68 if hasattr(dict, key):
69 raise KeyError("Cannot override dict keys %r"%key)
69 raise KeyError("Cannot override dict keys %r"%key)
70 self[key] = value
70 self[key] = value
71
71
72
72
73 class ReverseDict(dict):
73 class ReverseDict(dict):
74 """simple double-keyed subset of dict methods."""
74 """simple double-keyed subset of dict methods."""
75
75
76 def __init__(self, *args, **kwargs):
76 def __init__(self, *args, **kwargs):
77 dict.__init__(self, *args, **kwargs)
77 dict.__init__(self, *args, **kwargs)
78 self._reverse = dict()
78 self._reverse = dict()
79 for key, value in self.iteritems():
79 for key, value in self.iteritems():
80 self._reverse[value] = key
80 self._reverse[value] = key
81
81
82 def __getitem__(self, key):
82 def __getitem__(self, key):
83 try:
83 try:
84 return dict.__getitem__(self, key)
84 return dict.__getitem__(self, key)
85 except KeyError:
85 except KeyError:
86 return self._reverse[key]
86 return self._reverse[key]
87
87
88 def __setitem__(self, key, value):
88 def __setitem__(self, key, value):
89 if key in self._reverse:
89 if key in self._reverse:
90 raise KeyError("Can't have key %r on both sides!"%key)
90 raise KeyError("Can't have key %r on both sides!"%key)
91 dict.__setitem__(self, key, value)
91 dict.__setitem__(self, key, value)
92 self._reverse[value] = key
92 self._reverse[value] = key
93
93
94 def pop(self, key):
94 def pop(self, key):
95 value = dict.pop(self, key)
95 value = dict.pop(self, key)
96 self._reverse.pop(value)
96 self._reverse.pop(value)
97 return value
97 return value
98
98
99 def get(self, key, default=None):
99 def get(self, key, default=None):
100 try:
100 try:
101 return self[key]
101 return self[key]
102 except KeyError:
102 except KeyError:
103 return default
103 return default
104
104
105 #-----------------------------------------------------------------------------
105 #-----------------------------------------------------------------------------
106 # Functions
106 # Functions
107 #-----------------------------------------------------------------------------
107 #-----------------------------------------------------------------------------
108
108
109 @decorator
109 @decorator
110 def log_errors(f, self, *args, **kwargs):
110 def log_errors(f, self, *args, **kwargs):
111 """decorator to log unhandled exceptions raised in a method.
111 """decorator to log unhandled exceptions raised in a method.
112
112
113 For use wrapping on_recv callbacks, so that exceptions
113 For use wrapping on_recv callbacks, so that exceptions
114 do not cause the stream to be closed.
114 do not cause the stream to be closed.
115 """
115 """
116 try:
116 try:
117 return f(self, *args, **kwargs)
117 return f(self, *args, **kwargs)
118 except Exception:
118 except Exception:
119 self.log.error("Uncaught exception in %r" % f, exc_info=True)
119 self.log.error("Uncaught exception in %r" % f, exc_info=True)
120
120
121
121
122 def is_url(url):
122 def is_url(url):
123 """boolean check for whether a string is a zmq url"""
123 """boolean check for whether a string is a zmq url"""
124 if '://' not in url:
124 if '://' not in url:
125 return False
125 return False
126 proto, addr = url.split('://', 1)
126 proto, addr = url.split('://', 1)
127 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
127 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
128 return False
128 return False
129 return True
129 return True
130
130
131 def validate_url(url):
131 def validate_url(url):
132 """validate a url for zeromq"""
132 """validate a url for zeromq"""
133 if not isinstance(url, basestring):
133 if not isinstance(url, basestring):
134 raise TypeError("url must be a string, not %r"%type(url))
134 raise TypeError("url must be a string, not %r"%type(url))
135 url = url.lower()
135 url = url.lower()
136
136
137 proto_addr = url.split('://')
137 proto_addr = url.split('://')
138 assert len(proto_addr) == 2, 'Invalid url: %r'%url
138 assert len(proto_addr) == 2, 'Invalid url: %r'%url
139 proto, addr = proto_addr
139 proto, addr = proto_addr
140 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
140 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
141
141
142 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
142 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
143 # author: Remi Sabourin
143 # author: Remi Sabourin
144 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
144 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
145
145
146 if proto == 'tcp':
146 if proto == 'tcp':
147 lis = addr.split(':')
147 lis = addr.split(':')
148 assert len(lis) == 2, 'Invalid url: %r'%url
148 assert len(lis) == 2, 'Invalid url: %r'%url
149 addr,s_port = lis
149 addr,s_port = lis
150 try:
150 try:
151 port = int(s_port)
151 port = int(s_port)
152 except ValueError:
152 except ValueError:
153 raise AssertionError("Invalid port %r in url: %r"%(port, url))
153 raise AssertionError("Invalid port %r in url: %r"%(port, url))
154
154
155 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
155 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
156
156
157 else:
157 else:
158 # only validate tcp urls currently
158 # only validate tcp urls currently
159 pass
159 pass
160
160
161 return True
161 return True
162
162
163
163
164 def validate_url_container(container):
164 def validate_url_container(container):
165 """validate a potentially nested collection of urls."""
165 """validate a potentially nested collection of urls."""
166 if isinstance(container, basestring):
166 if isinstance(container, basestring):
167 url = container
167 url = container
168 return validate_url(url)
168 return validate_url(url)
169 elif isinstance(container, dict):
169 elif isinstance(container, dict):
170 container = container.itervalues()
170 container = container.itervalues()
171
171
172 for element in container:
172 for element in container:
173 validate_url_container(element)
173 validate_url_container(element)
174
174
175
175
176 def split_url(url):
176 def split_url(url):
177 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
177 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
178 proto_addr = url.split('://')
178 proto_addr = url.split('://')
179 assert len(proto_addr) == 2, 'Invalid url: %r'%url
179 assert len(proto_addr) == 2, 'Invalid url: %r'%url
180 proto, addr = proto_addr
180 proto, addr = proto_addr
181 lis = addr.split(':')
181 lis = addr.split(':')
182 assert len(lis) == 2, 'Invalid url: %r'%url
182 assert len(lis) == 2, 'Invalid url: %r'%url
183 addr,s_port = lis
183 addr,s_port = lis
184 return proto,addr,s_port
184 return proto,addr,s_port
185
185
186 def disambiguate_ip_address(ip, location=None):
186 def disambiguate_ip_address(ip, location=None):
187 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
187 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
188 ones, based on the location (default interpretation of location is localhost)."""
188 ones, based on the location (default interpretation of location is localhost)."""
189 if ip in ('0.0.0.0', '*'):
189 if ip in ('0.0.0.0', '*'):
190 if location is None or location in PUBLIC_IPS or not PUBLIC_IPS:
190 if location is None or location in PUBLIC_IPS or not PUBLIC_IPS:
191 # If location is unspecified or cannot be determined, assume local
191 # If location is unspecified or cannot be determined, assume local
192 ip = LOCALHOST
192 ip = LOCALHOST
193 elif location:
193 elif location:
194 return location
194 return location
195 return ip
195 return ip
196
196
197 def disambiguate_url(url, location=None):
197 def disambiguate_url(url, location=None):
198 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
198 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
199 ones, based on the location (default interpretation is localhost).
199 ones, based on the location (default interpretation is localhost).
200
200
201 This is for zeromq urls, such as tcp://*:10101."""
201 This is for zeromq urls, such as tcp://*:10101."""
202 try:
202 try:
203 proto,ip,port = split_url(url)
203 proto,ip,port = split_url(url)
204 except AssertionError:
204 except AssertionError:
205 # probably not tcp url; could be ipc, etc.
205 # probably not tcp url; could be ipc, etc.
206 return url
206 return url
207
207
208 ip = disambiguate_ip_address(ip,location)
208 ip = disambiguate_ip_address(ip,location)
209
209
210 return "%s://%s:%s"%(proto,ip,port)
210 return "%s://%s:%s"%(proto,ip,port)
211
211
212
212
213 #--------------------------------------------------------------------------
213 #--------------------------------------------------------------------------
214 # helpers for implementing old MEC API via view.apply
214 # helpers for implementing old MEC API via view.apply
215 #--------------------------------------------------------------------------
215 #--------------------------------------------------------------------------
216
216
217 def interactive(f):
217 def interactive(f):
218 """decorator for making functions appear as interactively defined.
218 """decorator for making functions appear as interactively defined.
219 This results in the function being linked to the user_ns as globals()
219 This results in the function being linked to the user_ns as globals()
220 instead of the module globals().
220 instead of the module globals().
221 """
221 """
222 f.__module__ = '__main__'
222 f.__module__ = '__main__'
223 return f
223 return f
224
224
225 @interactive
225 @interactive
226 def _push(**ns):
226 def _push(**ns):
227 """helper method for implementing `client.push` via `client.apply`"""
227 """helper method for implementing `client.push` via `client.apply`"""
228 user_ns = globals()
228 user_ns = globals()
229 tmp = '_IP_PUSH_TMP_'
229 tmp = '_IP_PUSH_TMP_'
230 while tmp in user_ns:
230 while tmp in user_ns:
231 tmp = tmp + '_'
231 tmp = tmp + '_'
232 try:
232 try:
233 for name, value in ns.iteritems():
233 for name, value in ns.iteritems():
234 user_ns[tmp] = value
234 user_ns[tmp] = value
235 exec "%s = %s" % (name, tmp) in user_ns
235 exec "%s = %s" % (name, tmp) in user_ns
236 finally:
236 finally:
237 user_ns.pop(tmp, None)
237 user_ns.pop(tmp, None)
238
238
239 @interactive
239 @interactive
240 def _pull(keys):
240 def _pull(keys):
241 """helper method for implementing `client.pull` via `client.apply`"""
241 """helper method for implementing `client.pull` via `client.apply`"""
242 if isinstance(keys, (list,tuple, set)):
242 if isinstance(keys, (list,tuple, set)):
243 return map(lambda key: eval(key, globals()), keys)
243 return map(lambda key: eval(key, globals()), keys)
244 else:
244 else:
245 return eval(keys, globals())
245 return eval(keys, globals())
246
246
247 @interactive
247 @interactive
248 def _execute(code):
248 def _execute(code):
249 """helper method for implementing `client.execute` via `client.apply`"""
249 """helper method for implementing `client.execute` via `client.apply`"""
250 exec code in globals()
250 exec code in globals()
251
251
252 #--------------------------------------------------------------------------
252 #--------------------------------------------------------------------------
253 # extra process management utilities
253 # extra process management utilities
254 #--------------------------------------------------------------------------
254 #--------------------------------------------------------------------------
255
255
256 _random_ports = set()
256 _random_ports = set()
257
257
258 def select_random_ports(n):
258 def select_random_ports(n):
259 """Selects and return n random ports that are available."""
259 """Selects and return n random ports that are available."""
260 ports = []
260 ports = []
261 for i in xrange(n):
261 for i in xrange(n):
262 sock = socket.socket()
262 sock = socket.socket()
263 sock.bind(('', 0))
263 sock.bind(('', 0))
264 while sock.getsockname()[1] in _random_ports:
264 while sock.getsockname()[1] in _random_ports:
265 sock.close()
265 sock.close()
266 sock = socket.socket()
266 sock = socket.socket()
267 sock.bind(('', 0))
267 sock.bind(('', 0))
268 ports.append(sock)
268 ports.append(sock)
269 for i, sock in enumerate(ports):
269 for i, sock in enumerate(ports):
270 port = sock.getsockname()[1]
270 port = sock.getsockname()[1]
271 sock.close()
271 sock.close()
272 ports[i] = port
272 ports[i] = port
273 _random_ports.add(port)
273 _random_ports.add(port)
274 return ports
274 return ports
275
275
276 def signal_children(children):
276 def signal_children(children):
277 """Relay interupt/term signals to children, for more solid process cleanup."""
277 """Relay interupt/term signals to children, for more solid process cleanup."""
278 def terminate_children(sig, frame):
278 def terminate_children(sig, frame):
279 log = Application.instance().log
279 log = Application.instance().log
280 log.critical("Got signal %i, terminating children..."%sig)
280 log.critical("Got signal %i, terminating children..."%sig)
281 for child in children:
281 for child in children:
282 child.terminate()
282 child.terminate()
283
283
284 sys.exit(sig != SIGINT)
284 sys.exit(sig != SIGINT)
285 # sys.exit(sig)
285 # sys.exit(sig)
286 for sig in (SIGINT, SIGABRT, SIGTERM):
286 for sig in (SIGINT, SIGABRT, SIGTERM):
287 signal(sig, terminate_children)
287 signal(sig, terminate_children)
288
288
289 def generate_exec_key(keyfile):
289 def generate_exec_key(keyfile):
290 import uuid
290 import uuid
291 newkey = str(uuid.uuid4())
291 newkey = str(uuid.uuid4())
292 with open(keyfile, 'w') as f:
292 with open(keyfile, 'w') as f:
293 # f.write('ipython-key ')
293 # f.write('ipython-key ')
294 f.write(newkey+'\n')
294 f.write(newkey+'\n')
295 # set user-only RW permissions (0600)
295 # set user-only RW permissions (0600)
296 # this will have no effect on Windows
296 # this will have no effect on Windows
297 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
297 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
298
298
299
299
300 def integer_loglevel(loglevel):
300 def integer_loglevel(loglevel):
301 try:
301 try:
302 loglevel = int(loglevel)
302 loglevel = int(loglevel)
303 except ValueError:
303 except ValueError:
304 if isinstance(loglevel, str):
304 if isinstance(loglevel, str):
305 loglevel = getattr(logging, loglevel)
305 loglevel = getattr(logging, loglevel)
306 return loglevel
306 return loglevel
307
307
308 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
308 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
309 logger = logging.getLogger(logname)
309 logger = logging.getLogger(logname)
310 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
310 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
311 # don't add a second PUBHandler
311 # don't add a second PUBHandler
312 return
312 return
313 loglevel = integer_loglevel(loglevel)
313 loglevel = integer_loglevel(loglevel)
314 lsock = context.socket(zmq.PUB)
314 lsock = context.socket(zmq.PUB)
315 lsock.connect(iface)
315 lsock.connect(iface)
316 handler = handlers.PUBHandler(lsock)
316 handler = handlers.PUBHandler(lsock)
317 handler.setLevel(loglevel)
317 handler.setLevel(loglevel)
318 handler.root_topic = root
318 handler.root_topic = root
319 logger.addHandler(handler)
319 logger.addHandler(handler)
320 logger.setLevel(loglevel)
320 logger.setLevel(loglevel)
321
321
322 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
322 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
323 logger = logging.getLogger()
323 logger = logging.getLogger()
324 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
324 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
325 # don't add a second PUBHandler
325 # don't add a second PUBHandler
326 return
326 return
327 loglevel = integer_loglevel(loglevel)
327 loglevel = integer_loglevel(loglevel)
328 lsock = context.socket(zmq.PUB)
328 lsock = context.socket(zmq.PUB)
329 lsock.connect(iface)
329 lsock.connect(iface)
330 handler = EnginePUBHandler(engine, lsock)
330 handler = EnginePUBHandler(engine, lsock)
331 handler.setLevel(loglevel)
331 handler.setLevel(loglevel)
332 logger.addHandler(handler)
332 logger.addHandler(handler)
333 logger.setLevel(loglevel)
333 logger.setLevel(loglevel)
334 return logger
334 return logger
335
335
336 def local_logger(logname, loglevel=logging.DEBUG):
336 def local_logger(logname, loglevel=logging.DEBUG):
337 loglevel = integer_loglevel(loglevel)
337 loglevel = integer_loglevel(loglevel)
338 logger = logging.getLogger(logname)
338 logger = logging.getLogger(logname)
339 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
339 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
340 # don't add a second StreamHandler
340 # don't add a second StreamHandler
341 return
341 return
342 handler = logging.StreamHandler()
342 handler = logging.StreamHandler()
343 handler.setLevel(loglevel)
343 handler.setLevel(loglevel)
344 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
344 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
345 datefmt="%Y-%m-%d %H:%M:%S")
345 datefmt="%Y-%m-%d %H:%M:%S")
346 handler.setFormatter(formatter)
346 handler.setFormatter(formatter)
347
347
348 logger.addHandler(handler)
348 logger.addHandler(handler)
349 logger.setLevel(loglevel)
349 logger.setLevel(loglevel)
350 return logger
350 return logger
351
351
352 def set_hwm(sock, hwm=0):
353 """set zmq High Water Mark on a socket
354
355 in a way that always works for various pyzmq / libzmq versions.
356 """
357 import zmq
358
359 for key in ('HWM', 'SNDHWM', 'RCVHWM'):
360 opt = getattr(zmq, key, None)
361 if opt is None:
362 continue
363 try:
364 sock.setsockopt(opt, hwm)
365 except zmq.ZMQError:
366 pass
367
368 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now