##// END OF EJS Templates
pass Session key around in config in ipcontroller...
Min RK -
Show More
@@ -1,545 +1,548 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008 The IPython Development Team
14 # Copyright (C) 2008 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import json
26 import json
27 import os
27 import os
28 import stat
28 import stat
29 import sys
29 import sys
30
30
31 from multiprocessing import Process
31 from multiprocessing import Process
32 from signal import signal, SIGINT, SIGABRT, SIGTERM
32 from signal import signal, SIGINT, SIGABRT, SIGTERM
33
33
34 import zmq
34 import zmq
35 from zmq.devices import ProcessMonitoredQueue
35 from zmq.devices import ProcessMonitoredQueue
36 from zmq.log.handlers import PUBHandler
36 from zmq.log.handlers import PUBHandler
37
37
38 from IPython.core.profiledir import ProfileDir
38 from IPython.core.profiledir import ProfileDir
39
39
40 from IPython.parallel.apps.baseapp import (
40 from IPython.parallel.apps.baseapp import (
41 BaseParallelApplication,
41 BaseParallelApplication,
42 base_aliases,
42 base_aliases,
43 base_flags,
43 base_flags,
44 catch_config_error,
44 catch_config_error,
45 )
45 )
46 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.localinterfaces import localhost, public_ips
47 from IPython.utils.localinterfaces import localhost, public_ips
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
49
49
50 from IPython.kernel.zmq.session import (
50 from IPython.kernel.zmq.session import (
51 Session, session_aliases, session_flags,
51 Session, session_aliases, session_flags,
52 )
52 )
53
53
54 from IPython.parallel.controller.heartmonitor import HeartMonitor
54 from IPython.parallel.controller.heartmonitor import HeartMonitor
55 from IPython.parallel.controller.hub import HubFactory
55 from IPython.parallel.controller.hub import HubFactory
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 from IPython.parallel.controller.dictdb import DictDB
57 from IPython.parallel.controller.dictdb import DictDB
58
58
59 from IPython.parallel.util import split_url, disambiguate_url, set_hwm
59 from IPython.parallel.util import split_url, disambiguate_url, set_hwm
60
60
61 # conditional import of SQLiteDB / MongoDB backend class
61 # conditional import of SQLiteDB / MongoDB backend class
62 real_dbs = []
62 real_dbs = []
63
63
64 try:
64 try:
65 from IPython.parallel.controller.sqlitedb import SQLiteDB
65 from IPython.parallel.controller.sqlitedb import SQLiteDB
66 except ImportError:
66 except ImportError:
67 pass
67 pass
68 else:
68 else:
69 real_dbs.append(SQLiteDB)
69 real_dbs.append(SQLiteDB)
70
70
71 try:
71 try:
72 from IPython.parallel.controller.mongodb import MongoDB
72 from IPython.parallel.controller.mongodb import MongoDB
73 except ImportError:
73 except ImportError:
74 pass
74 pass
75 else:
75 else:
76 real_dbs.append(MongoDB)
76 real_dbs.append(MongoDB)
77
77
78
78
79
79
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81 # Module level variables
81 # Module level variables
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83
83
84
84
85 _description = """Start the IPython controller for parallel computing.
85 _description = """Start the IPython controller for parallel computing.
86
86
87 The IPython controller provides a gateway between the IPython engines and
87 The IPython controller provides a gateway between the IPython engines and
88 clients. The controller needs to be started before the engines and can be
88 clients. The controller needs to be started before the engines and can be
89 configured using command line options or using a cluster directory. Cluster
89 configured using command line options or using a cluster directory. Cluster
90 directories contain config, log and security files and are usually located in
90 directories contain config, log and security files and are usually located in
91 your ipython directory and named as "profile_name". See the `profile`
91 your ipython directory and named as "profile_name". See the `profile`
92 and `profile-dir` options for details.
92 and `profile-dir` options for details.
93 """
93 """
94
94
95 _examples = """
95 _examples = """
96 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
96 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
97 ipcontroller --scheme=pure # use the pure zeromq scheduler
97 ipcontroller --scheme=pure # use the pure zeromq scheduler
98 """
98 """
99
99
100
100
101 #-----------------------------------------------------------------------------
101 #-----------------------------------------------------------------------------
102 # The main application
102 # The main application
103 #-----------------------------------------------------------------------------
103 #-----------------------------------------------------------------------------
104 flags = {}
104 flags = {}
105 flags.update(base_flags)
105 flags.update(base_flags)
106 flags.update({
106 flags.update({
107 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
107 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
108 'Use threads instead of processes for the schedulers'),
108 'Use threads instead of processes for the schedulers'),
109 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
109 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
110 'use the SQLiteDB backend'),
110 'use the SQLiteDB backend'),
111 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
111 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
112 'use the MongoDB backend'),
112 'use the MongoDB backend'),
113 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
113 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
114 'use the in-memory DictDB backend'),
114 'use the in-memory DictDB backend'),
115 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
115 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
116 """use dummy DB backend, which doesn't store any information.
116 """use dummy DB backend, which doesn't store any information.
117
117
118 This is the default as of IPython 0.13.
118 This is the default as of IPython 0.13.
119
119
120 To enable delayed or repeated retrieval of results from the Hub,
120 To enable delayed or repeated retrieval of results from the Hub,
121 select one of the true db backends.
121 select one of the true db backends.
122 """),
122 """),
123 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
123 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
124 'reuse existing json connection files'),
124 'reuse existing json connection files'),
125 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
125 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
126 'Attempt to restore engines from a JSON file. '
126 'Attempt to restore engines from a JSON file. '
127 'For use when resuming a crashed controller'),
127 'For use when resuming a crashed controller'),
128 })
128 })
129
129
130 flags.update(session_flags)
130 flags.update(session_flags)
131
131
132 aliases = dict(
132 aliases = dict(
133 ssh = 'IPControllerApp.ssh_server',
133 ssh = 'IPControllerApp.ssh_server',
134 enginessh = 'IPControllerApp.engine_ssh_server',
134 enginessh = 'IPControllerApp.engine_ssh_server',
135 location = 'IPControllerApp.location',
135 location = 'IPControllerApp.location',
136
136
137 url = 'HubFactory.url',
137 url = 'HubFactory.url',
138 ip = 'HubFactory.ip',
138 ip = 'HubFactory.ip',
139 transport = 'HubFactory.transport',
139 transport = 'HubFactory.transport',
140 port = 'HubFactory.regport',
140 port = 'HubFactory.regport',
141
141
142 ping = 'HeartMonitor.period',
142 ping = 'HeartMonitor.period',
143
143
144 scheme = 'TaskScheduler.scheme_name',
144 scheme = 'TaskScheduler.scheme_name',
145 hwm = 'TaskScheduler.hwm',
145 hwm = 'TaskScheduler.hwm',
146 )
146 )
147 aliases.update(base_aliases)
147 aliases.update(base_aliases)
148 aliases.update(session_aliases)
148 aliases.update(session_aliases)
149
149
150 class IPControllerApp(BaseParallelApplication):
150 class IPControllerApp(BaseParallelApplication):
151
151
152 name = u'ipcontroller'
152 name = u'ipcontroller'
153 description = _description
153 description = _description
154 examples = _examples
154 examples = _examples
155 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, DictDB] + real_dbs
155 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, DictDB] + real_dbs
156
156
157 # change default to True
157 # change default to True
158 auto_create = Bool(True, config=True,
158 auto_create = Bool(True, config=True,
159 help="""Whether to create profile dir if it doesn't exist.""")
159 help="""Whether to create profile dir if it doesn't exist.""")
160
160
161 reuse_files = Bool(False, config=True,
161 reuse_files = Bool(False, config=True,
162 help="""Whether to reuse existing json connection files.
162 help="""Whether to reuse existing json connection files.
163 If False, connection files will be removed on a clean exit.
163 If False, connection files will be removed on a clean exit.
164 """
164 """
165 )
165 )
166 restore_engines = Bool(False, config=True,
166 restore_engines = Bool(False, config=True,
167 help="""Reload engine state from JSON file
167 help="""Reload engine state from JSON file
168 """
168 """
169 )
169 )
170 ssh_server = Unicode(u'', config=True,
170 ssh_server = Unicode(u'', config=True,
171 help="""ssh url for clients to use when connecting to the Controller
171 help="""ssh url for clients to use when connecting to the Controller
172 processes. It should be of the form: [user@]server[:port]. The
172 processes. It should be of the form: [user@]server[:port]. The
173 Controller's listening addresses must be accessible from the ssh server""",
173 Controller's listening addresses must be accessible from the ssh server""",
174 )
174 )
175 engine_ssh_server = Unicode(u'', config=True,
175 engine_ssh_server = Unicode(u'', config=True,
176 help="""ssh url for engines to use when connecting to the Controller
176 help="""ssh url for engines to use when connecting to the Controller
177 processes. It should be of the form: [user@]server[:port]. The
177 processes. It should be of the form: [user@]server[:port]. The
178 Controller's listening addresses must be accessible from the ssh server""",
178 Controller's listening addresses must be accessible from the ssh server""",
179 )
179 )
180 location = Unicode(u'', config=True,
180 location = Unicode(u'', config=True,
181 help="""The external IP or domain name of the Controller, used for disambiguating
181 help="""The external IP or domain name of the Controller, used for disambiguating
182 engine and client connections.""",
182 engine and client connections.""",
183 )
183 )
184 import_statements = List([], config=True,
184 import_statements = List([], config=True,
185 help="import statements to be run at startup. Necessary in some environments"
185 help="import statements to be run at startup. Necessary in some environments"
186 )
186 )
187
187
188 use_threads = Bool(False, config=True,
188 use_threads = Bool(False, config=True,
189 help='Use threads instead of processes for the schedulers',
189 help='Use threads instead of processes for the schedulers',
190 )
190 )
191
191
192 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
192 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
193 help="JSON filename where engine connection info will be stored.")
193 help="JSON filename where engine connection info will be stored.")
194 client_json_file = Unicode('ipcontroller-client.json', config=True,
194 client_json_file = Unicode('ipcontroller-client.json', config=True,
195 help="JSON filename where client connection info will be stored.")
195 help="JSON filename where client connection info will be stored.")
196
196
197 def _cluster_id_changed(self, name, old, new):
197 def _cluster_id_changed(self, name, old, new):
198 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
198 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
199 self.engine_json_file = "%s-engine.json" % self.name
199 self.engine_json_file = "%s-engine.json" % self.name
200 self.client_json_file = "%s-client.json" % self.name
200 self.client_json_file = "%s-client.json" % self.name
201
201
202
202
203 # internal
203 # internal
204 children = List()
204 children = List()
205 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
205 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
206
206
207 def _use_threads_changed(self, name, old, new):
207 def _use_threads_changed(self, name, old, new):
208 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
208 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
209
209
210 write_connection_files = Bool(True,
210 write_connection_files = Bool(True,
211 help="""Whether to write connection files to disk.
211 help="""Whether to write connection files to disk.
212 True in all cases other than runs with `reuse_files=True` *after the first*
212 True in all cases other than runs with `reuse_files=True` *after the first*
213 """
213 """
214 )
214 )
215
215
216 aliases = Dict(aliases)
216 aliases = Dict(aliases)
217 flags = Dict(flags)
217 flags = Dict(flags)
218
218
219
219
220 def save_connection_dict(self, fname, cdict):
220 def save_connection_dict(self, fname, cdict):
221 """save a connection dict to json file."""
221 """save a connection dict to json file."""
222 c = self.config
222 c = self.config
223 url = cdict['registration']
223 url = cdict['registration']
224 location = cdict['location']
224 location = cdict['location']
225
225
226 if not location:
226 if not location:
227 if public_ips():
227 if public_ips():
228 location = public_ips()[-1]
228 location = public_ips()[-1]
229 else:
229 else:
230 self.log.warn("Could not identify this machine's IP, assuming %s."
230 self.log.warn("Could not identify this machine's IP, assuming %s."
231 " You may need to specify '--location=<external_ip_address>' to help"
231 " You may need to specify '--location=<external_ip_address>' to help"
232 " IPython decide when to connect via loopback." % localhost() )
232 " IPython decide when to connect via loopback." % localhost() )
233 location = localhost()
233 location = localhost()
234 cdict['location'] = location
234 cdict['location'] = location
235 fname = os.path.join(self.profile_dir.security_dir, fname)
235 fname = os.path.join(self.profile_dir.security_dir, fname)
236 self.log.info("writing connection info to %s", fname)
236 self.log.info("writing connection info to %s", fname)
237 with open(fname, 'w') as f:
237 with open(fname, 'w') as f:
238 f.write(json.dumps(cdict, indent=2))
238 f.write(json.dumps(cdict, indent=2))
239 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
239 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
240
240
241 def load_config_from_json(self):
241 def load_config_from_json(self):
242 """load config from existing json connector files."""
242 """load config from existing json connector files."""
243 c = self.config
243 c = self.config
244 self.log.debug("loading config from JSON")
244 self.log.debug("loading config from JSON")
245
245
246 # load engine config
246 # load engine config
247
247
248 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
248 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
249 self.log.info("loading connection info from %s", fname)
249 self.log.info("loading connection info from %s", fname)
250 with open(fname) as f:
250 with open(fname) as f:
251 ecfg = json.loads(f.read())
251 ecfg = json.loads(f.read())
252
252
253 # json gives unicode, Session.key wants bytes
253 # json gives unicode, Session.key wants bytes
254 c.Session.key = ecfg['key'].encode('ascii')
254 c.Session.key = ecfg['key'].encode('ascii')
255
255
256 xport,ip = ecfg['interface'].split('://')
256 xport,ip = ecfg['interface'].split('://')
257
257
258 c.HubFactory.engine_ip = ip
258 c.HubFactory.engine_ip = ip
259 c.HubFactory.engine_transport = xport
259 c.HubFactory.engine_transport = xport
260
260
261 self.location = ecfg['location']
261 self.location = ecfg['location']
262 if not self.engine_ssh_server:
262 if not self.engine_ssh_server:
263 self.engine_ssh_server = ecfg['ssh']
263 self.engine_ssh_server = ecfg['ssh']
264
264
265 # load client config
265 # load client config
266
266
267 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
267 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
268 self.log.info("loading connection info from %s", fname)
268 self.log.info("loading connection info from %s", fname)
269 with open(fname) as f:
269 with open(fname) as f:
270 ccfg = json.loads(f.read())
270 ccfg = json.loads(f.read())
271
271
272 for key in ('key', 'registration', 'pack', 'unpack', 'signature_scheme'):
272 for key in ('key', 'registration', 'pack', 'unpack', 'signature_scheme'):
273 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
273 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
274
274
275 xport,addr = ccfg['interface'].split('://')
275 xport,addr = ccfg['interface'].split('://')
276
276
277 c.HubFactory.client_transport = xport
277 c.HubFactory.client_transport = xport
278 c.HubFactory.client_ip = ip
278 c.HubFactory.client_ip = ip
279 if not self.ssh_server:
279 if not self.ssh_server:
280 self.ssh_server = ccfg['ssh']
280 self.ssh_server = ccfg['ssh']
281
281
282 # load port config:
282 # load port config:
283 c.HubFactory.regport = ecfg['registration']
283 c.HubFactory.regport = ecfg['registration']
284 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
284 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
285 c.HubFactory.control = (ccfg['control'], ecfg['control'])
285 c.HubFactory.control = (ccfg['control'], ecfg['control'])
286 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
286 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
287 c.HubFactory.task = (ccfg['task'], ecfg['task'])
287 c.HubFactory.task = (ccfg['task'], ecfg['task'])
288 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
288 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
289 c.HubFactory.notifier_port = ccfg['notification']
289 c.HubFactory.notifier_port = ccfg['notification']
290
290
291 def cleanup_connection_files(self):
291 def cleanup_connection_files(self):
292 if self.reuse_files:
292 if self.reuse_files:
293 self.log.debug("leaving JSON connection files for reuse")
293 self.log.debug("leaving JSON connection files for reuse")
294 return
294 return
295 self.log.debug("cleaning up JSON connection files")
295 self.log.debug("cleaning up JSON connection files")
296 for f in (self.client_json_file, self.engine_json_file):
296 for f in (self.client_json_file, self.engine_json_file):
297 f = os.path.join(self.profile_dir.security_dir, f)
297 f = os.path.join(self.profile_dir.security_dir, f)
298 try:
298 try:
299 os.remove(f)
299 os.remove(f)
300 except Exception as e:
300 except Exception as e:
301 self.log.error("Failed to cleanup connection file: %s", e)
301 self.log.error("Failed to cleanup connection file: %s", e)
302 else:
302 else:
303 self.log.debug(u"removed %s", f)
303 self.log.debug(u"removed %s", f)
304
304
305 def load_secondary_config(self):
305 def load_secondary_config(self):
306 """secondary config, loading from JSON and setting defaults"""
306 """secondary config, loading from JSON and setting defaults"""
307 if self.reuse_files:
307 if self.reuse_files:
308 try:
308 try:
309 self.load_config_from_json()
309 self.load_config_from_json()
310 except (AssertionError,IOError) as e:
310 except (AssertionError,IOError) as e:
311 self.log.error("Could not load config from JSON: %s" % e)
311 self.log.error("Could not load config from JSON: %s" % e)
312 else:
312 else:
313 # successfully loaded config from JSON, and reuse=True
313 # successfully loaded config from JSON, and reuse=True
314 # no need to wite back the same file
314 # no need to wite back the same file
315 self.write_connection_files = False
315 self.write_connection_files = False
316
316
317 self.log.debug("Config changed")
317 self.log.debug("Config changed")
318 self.log.debug(repr(self.config))
318 self.log.debug(repr(self.config))
319
319
320 def init_hub(self):
320 def init_hub(self):
321 c = self.config
321 c = self.config
322
322
323 self.do_import_statements()
323 self.do_import_statements()
324
324
325 try:
325 try:
326 self.factory = HubFactory(config=c, log=self.log)
326 self.factory = HubFactory(config=c, log=self.log)
327 # self.start_logging()
327 # self.start_logging()
328 self.factory.init_hub()
328 self.factory.init_hub()
329 except TraitError:
329 except TraitError:
330 raise
330 raise
331 except Exception:
331 except Exception:
332 self.log.error("Couldn't construct the Controller", exc_info=True)
332 self.log.error("Couldn't construct the Controller", exc_info=True)
333 self.exit(1)
333 self.exit(1)
334
334
335 if self.write_connection_files:
335 if self.write_connection_files:
336 # save to new json config files
336 # save to new json config files
337 f = self.factory
337 f = self.factory
338 base = {
338 base = {
339 'key' : f.session.key.decode('ascii'),
339 'key' : f.session.key.decode('ascii'),
340 'location' : self.location,
340 'location' : self.location,
341 'pack' : f.session.packer,
341 'pack' : f.session.packer,
342 'unpack' : f.session.unpacker,
342 'unpack' : f.session.unpacker,
343 'signature_scheme' : f.session.signature_scheme,
343 'signature_scheme' : f.session.signature_scheme,
344 }
344 }
345
345
346 cdict = {'ssh' : self.ssh_server}
346 cdict = {'ssh' : self.ssh_server}
347 cdict.update(f.client_info)
347 cdict.update(f.client_info)
348 cdict.update(base)
348 cdict.update(base)
349 self.save_connection_dict(self.client_json_file, cdict)
349 self.save_connection_dict(self.client_json_file, cdict)
350
350
351 edict = {'ssh' : self.engine_ssh_server}
351 edict = {'ssh' : self.engine_ssh_server}
352 edict.update(f.engine_info)
352 edict.update(f.engine_info)
353 edict.update(base)
353 edict.update(base)
354 self.save_connection_dict(self.engine_json_file, edict)
354 self.save_connection_dict(self.engine_json_file, edict)
355
355
356 fname = "engines%s.json" % self.cluster_id
356 fname = "engines%s.json" % self.cluster_id
357 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
357 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
358 if self.restore_engines:
358 if self.restore_engines:
359 self.factory.hub._load_engine_state()
359 self.factory.hub._load_engine_state()
360 # load key into config so other sessions in this process (TaskScheduler)
361 # have the same value
362 self.config.Session.key = self.factory.session.key
360
363
361 def init_schedulers(self):
364 def init_schedulers(self):
362 children = self.children
365 children = self.children
363 mq = import_item(str(self.mq_class))
366 mq = import_item(str(self.mq_class))
364
367
365 f = self.factory
368 f = self.factory
366 ident = f.session.bsession
369 ident = f.session.bsession
367 # disambiguate url, in case of *
370 # disambiguate url, in case of *
368 monitor_url = disambiguate_url(f.monitor_url)
371 monitor_url = disambiguate_url(f.monitor_url)
369 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
372 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
370 # IOPub relay (in a Process)
373 # IOPub relay (in a Process)
371 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
374 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
372 q.bind_in(f.client_url('iopub'))
375 q.bind_in(f.client_url('iopub'))
373 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
376 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
374 q.bind_out(f.engine_url('iopub'))
377 q.bind_out(f.engine_url('iopub'))
375 q.setsockopt_out(zmq.SUBSCRIBE, b'')
378 q.setsockopt_out(zmq.SUBSCRIBE, b'')
376 q.connect_mon(monitor_url)
379 q.connect_mon(monitor_url)
377 q.daemon=True
380 q.daemon=True
378 children.append(q)
381 children.append(q)
379
382
380 # Multiplexer Queue (in a Process)
383 # Multiplexer Queue (in a Process)
381 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
384 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
382
385
383 q.bind_in(f.client_url('mux'))
386 q.bind_in(f.client_url('mux'))
384 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
387 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
385 q.bind_out(f.engine_url('mux'))
388 q.bind_out(f.engine_url('mux'))
386 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
389 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
387 q.connect_mon(monitor_url)
390 q.connect_mon(monitor_url)
388 q.daemon=True
391 q.daemon=True
389 children.append(q)
392 children.append(q)
390
393
391 # Control Queue (in a Process)
394 # Control Queue (in a Process)
392 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
395 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
393 q.bind_in(f.client_url('control'))
396 q.bind_in(f.client_url('control'))
394 q.setsockopt_in(zmq.IDENTITY, b'control_in')
397 q.setsockopt_in(zmq.IDENTITY, b'control_in')
395 q.bind_out(f.engine_url('control'))
398 q.bind_out(f.engine_url('control'))
396 q.setsockopt_out(zmq.IDENTITY, b'control_out')
399 q.setsockopt_out(zmq.IDENTITY, b'control_out')
397 q.connect_mon(monitor_url)
400 q.connect_mon(monitor_url)
398 q.daemon=True
401 q.daemon=True
399 children.append(q)
402 children.append(q)
400 if 'TaskScheduler.scheme_name' in self.config:
403 if 'TaskScheduler.scheme_name' in self.config:
401 scheme = self.config.TaskScheduler.scheme_name
404 scheme = self.config.TaskScheduler.scheme_name
402 else:
405 else:
403 scheme = TaskScheduler.scheme_name.get_default_value()
406 scheme = TaskScheduler.scheme_name.get_default_value()
404 # Task Queue (in a Process)
407 # Task Queue (in a Process)
405 if scheme == 'pure':
408 if scheme == 'pure':
406 self.log.warn("task::using pure DEALER Task scheduler")
409 self.log.warn("task::using pure DEALER Task scheduler")
407 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
410 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
408 # q.setsockopt_out(zmq.HWM, hub.hwm)
411 # q.setsockopt_out(zmq.HWM, hub.hwm)
409 q.bind_in(f.client_url('task'))
412 q.bind_in(f.client_url('task'))
410 q.setsockopt_in(zmq.IDENTITY, b'task_in')
413 q.setsockopt_in(zmq.IDENTITY, b'task_in')
411 q.bind_out(f.engine_url('task'))
414 q.bind_out(f.engine_url('task'))
412 q.setsockopt_out(zmq.IDENTITY, b'task_out')
415 q.setsockopt_out(zmq.IDENTITY, b'task_out')
413 q.connect_mon(monitor_url)
416 q.connect_mon(monitor_url)
414 q.daemon=True
417 q.daemon=True
415 children.append(q)
418 children.append(q)
416 elif scheme == 'none':
419 elif scheme == 'none':
417 self.log.warn("task::using no Task scheduler")
420 self.log.warn("task::using no Task scheduler")
418
421
419 else:
422 else:
420 self.log.info("task::using Python %s Task scheduler"%scheme)
423 self.log.info("task::using Python %s Task scheduler"%scheme)
421 sargs = (f.client_url('task'), f.engine_url('task'),
424 sargs = (f.client_url('task'), f.engine_url('task'),
422 monitor_url, disambiguate_url(f.client_url('notification')),
425 monitor_url, disambiguate_url(f.client_url('notification')),
423 disambiguate_url(f.client_url('registration')),
426 disambiguate_url(f.client_url('registration')),
424 )
427 )
425 kwargs = dict(logname='scheduler', loglevel=self.log_level,
428 kwargs = dict(logname='scheduler', loglevel=self.log_level,
426 log_url = self.log_url, config=dict(self.config))
429 log_url = self.log_url, config=dict(self.config))
427 if 'Process' in self.mq_class:
430 if 'Process' in self.mq_class:
428 # run the Python scheduler in a Process
431 # run the Python scheduler in a Process
429 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
432 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
430 q.daemon=True
433 q.daemon=True
431 children.append(q)
434 children.append(q)
432 else:
435 else:
433 # single-threaded Controller
436 # single-threaded Controller
434 kwargs['in_thread'] = True
437 kwargs['in_thread'] = True
435 launch_scheduler(*sargs, **kwargs)
438 launch_scheduler(*sargs, **kwargs)
436
439
437 # set unlimited HWM for all relay devices
440 # set unlimited HWM for all relay devices
438 if hasattr(zmq, 'SNDHWM'):
441 if hasattr(zmq, 'SNDHWM'):
439 q = children[0]
442 q = children[0]
440 q.setsockopt_in(zmq.RCVHWM, 0)
443 q.setsockopt_in(zmq.RCVHWM, 0)
441 q.setsockopt_out(zmq.SNDHWM, 0)
444 q.setsockopt_out(zmq.SNDHWM, 0)
442
445
443 for q in children[1:]:
446 for q in children[1:]:
444 if not hasattr(q, 'setsockopt_in'):
447 if not hasattr(q, 'setsockopt_in'):
445 continue
448 continue
446 q.setsockopt_in(zmq.SNDHWM, 0)
449 q.setsockopt_in(zmq.SNDHWM, 0)
447 q.setsockopt_in(zmq.RCVHWM, 0)
450 q.setsockopt_in(zmq.RCVHWM, 0)
448 q.setsockopt_out(zmq.SNDHWM, 0)
451 q.setsockopt_out(zmq.SNDHWM, 0)
449 q.setsockopt_out(zmq.RCVHWM, 0)
452 q.setsockopt_out(zmq.RCVHWM, 0)
450 q.setsockopt_mon(zmq.SNDHWM, 0)
453 q.setsockopt_mon(zmq.SNDHWM, 0)
451
454
452
455
453 def terminate_children(self):
456 def terminate_children(self):
454 child_procs = []
457 child_procs = []
455 for child in self.children:
458 for child in self.children:
456 if isinstance(child, ProcessMonitoredQueue):
459 if isinstance(child, ProcessMonitoredQueue):
457 child_procs.append(child.launcher)
460 child_procs.append(child.launcher)
458 elif isinstance(child, Process):
461 elif isinstance(child, Process):
459 child_procs.append(child)
462 child_procs.append(child)
460 if child_procs:
463 if child_procs:
461 self.log.critical("terminating children...")
464 self.log.critical("terminating children...")
462 for child in child_procs:
465 for child in child_procs:
463 try:
466 try:
464 child.terminate()
467 child.terminate()
465 except OSError:
468 except OSError:
466 # already dead
469 # already dead
467 pass
470 pass
468
471
469 def handle_signal(self, sig, frame):
472 def handle_signal(self, sig, frame):
470 self.log.critical("Received signal %i, shutting down", sig)
473 self.log.critical("Received signal %i, shutting down", sig)
471 self.terminate_children()
474 self.terminate_children()
472 self.loop.stop()
475 self.loop.stop()
473
476
474 def init_signal(self):
477 def init_signal(self):
475 for sig in (SIGINT, SIGABRT, SIGTERM):
478 for sig in (SIGINT, SIGABRT, SIGTERM):
476 signal(sig, self.handle_signal)
479 signal(sig, self.handle_signal)
477
480
478 def do_import_statements(self):
481 def do_import_statements(self):
479 statements = self.import_statements
482 statements = self.import_statements
480 for s in statements:
483 for s in statements:
481 try:
484 try:
482 self.log.msg("Executing statement: '%s'" % s)
485 self.log.msg("Executing statement: '%s'" % s)
483 exec(s, globals(), locals())
486 exec(s, globals(), locals())
484 except:
487 except:
485 self.log.msg("Error running statement: %s" % s)
488 self.log.msg("Error running statement: %s" % s)
486
489
487 def forward_logging(self):
490 def forward_logging(self):
488 if self.log_url:
491 if self.log_url:
489 self.log.info("Forwarding logging to %s"%self.log_url)
492 self.log.info("Forwarding logging to %s"%self.log_url)
490 context = zmq.Context.instance()
493 context = zmq.Context.instance()
491 lsock = context.socket(zmq.PUB)
494 lsock = context.socket(zmq.PUB)
492 lsock.connect(self.log_url)
495 lsock.connect(self.log_url)
493 handler = PUBHandler(lsock)
496 handler = PUBHandler(lsock)
494 handler.root_topic = 'controller'
497 handler.root_topic = 'controller'
495 handler.setLevel(self.log_level)
498 handler.setLevel(self.log_level)
496 self.log.addHandler(handler)
499 self.log.addHandler(handler)
497
500
498 @catch_config_error
501 @catch_config_error
499 def initialize(self, argv=None):
502 def initialize(self, argv=None):
500 super(IPControllerApp, self).initialize(argv)
503 super(IPControllerApp, self).initialize(argv)
501 self.forward_logging()
504 self.forward_logging()
502 self.load_secondary_config()
505 self.load_secondary_config()
503 self.init_hub()
506 self.init_hub()
504 self.init_schedulers()
507 self.init_schedulers()
505
508
506 def start(self):
509 def start(self):
507 # Start the subprocesses:
510 # Start the subprocesses:
508 self.factory.start()
511 self.factory.start()
509 # children must be started before signals are setup,
512 # children must be started before signals are setup,
510 # otherwise signal-handling will fire multiple times
513 # otherwise signal-handling will fire multiple times
511 for child in self.children:
514 for child in self.children:
512 child.start()
515 child.start()
513 self.init_signal()
516 self.init_signal()
514
517
515 self.write_pid_file(overwrite=True)
518 self.write_pid_file(overwrite=True)
516
519
517 try:
520 try:
518 self.factory.loop.start()
521 self.factory.loop.start()
519 except KeyboardInterrupt:
522 except KeyboardInterrupt:
520 self.log.critical("Interrupted, Exiting...\n")
523 self.log.critical("Interrupted, Exiting...\n")
521 finally:
524 finally:
522 self.cleanup_connection_files()
525 self.cleanup_connection_files()
523
526
524
527
525 def launch_new_instance(*args, **kwargs):
528 def launch_new_instance(*args, **kwargs):
526 """Create and run the IPython controller"""
529 """Create and run the IPython controller"""
527 if sys.platform == 'win32':
530 if sys.platform == 'win32':
528 # make sure we don't get called from a multiprocessing subprocess
531 # make sure we don't get called from a multiprocessing subprocess
529 # this can result in infinite Controllers being started on Windows
532 # this can result in infinite Controllers being started on Windows
530 # which doesn't have a proper fork, so multiprocessing is wonky
533 # which doesn't have a proper fork, so multiprocessing is wonky
531
534
532 # this only comes up when IPython has been installed using vanilla
535 # this only comes up when IPython has been installed using vanilla
533 # setuptools, and *not* distribute.
536 # setuptools, and *not* distribute.
534 import multiprocessing
537 import multiprocessing
535 p = multiprocessing.current_process()
538 p = multiprocessing.current_process()
536 # the main process has name 'MainProcess'
539 # the main process has name 'MainProcess'
537 # subprocesses will have names like 'Process-1'
540 # subprocesses will have names like 'Process-1'
538 if p.name != 'MainProcess':
541 if p.name != 'MainProcess':
539 # we are a subprocess, don't start another Controller!
542 # we are a subprocess, don't start another Controller!
540 return
543 return
541 return IPControllerApp.launch_instance(*args, **kwargs)
544 return IPControllerApp.launch_instance(*args, **kwargs)
542
545
543
546
544 if __name__ == '__main__':
547 if __name__ == '__main__':
545 launch_new_instance()
548 launch_new_instance()
General Comments 0
You need to be logged in to leave comments. Login now