##// END OF EJS Templates
parallel.apps.ipcontrollerapp: Use utils.localinterfaces...
W. Trevor King -
Show More
@@ -1,528 +1,528 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import json
26 import json
27 import os
27 import os
28 import socket
29 import stat
28 import stat
30 import sys
29 import sys
31
30
32 from multiprocessing import Process
31 from multiprocessing import Process
33 from signal import signal, SIGINT, SIGABRT, SIGTERM
32 from signal import signal, SIGINT, SIGABRT, SIGTERM
34
33
35 import zmq
34 import zmq
36 from zmq.devices import ProcessMonitoredQueue
35 from zmq.devices import ProcessMonitoredQueue
37 from zmq.log.handlers import PUBHandler
36 from zmq.log.handlers import PUBHandler
38
37
39 from IPython.core.profiledir import ProfileDir
38 from IPython.core.profiledir import ProfileDir
40
39
41 from IPython.parallel.apps.baseapp import (
40 from IPython.parallel.apps.baseapp import (
42 BaseParallelApplication,
41 BaseParallelApplication,
43 base_aliases,
42 base_aliases,
44 base_flags,
43 base_flags,
45 catch_config_error,
44 catch_config_error,
46 )
45 )
47 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.localinterfaces import LOCALHOST, PUBLIC_IPS
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
49
49
50 from IPython.zmq.session import (
50 from IPython.zmq.session import (
51 Session, session_aliases, session_flags, default_secure
51 Session, session_aliases, session_flags, default_secure
52 )
52 )
53
53
54 from IPython.parallel.controller.heartmonitor import HeartMonitor
54 from IPython.parallel.controller.heartmonitor import HeartMonitor
55 from IPython.parallel.controller.hub import HubFactory
55 from IPython.parallel.controller.hub import HubFactory
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 from IPython.parallel.controller.sqlitedb import SQLiteDB
57 from IPython.parallel.controller.sqlitedb import SQLiteDB
58
58
59 from IPython.parallel.util import split_url, disambiguate_url
59 from IPython.parallel.util import split_url, disambiguate_url
60
60
61 # conditional import of MongoDB backend class
61 # conditional import of MongoDB backend class
62
62
63 try:
63 try:
64 from IPython.parallel.controller.mongodb import MongoDB
64 from IPython.parallel.controller.mongodb import MongoDB
65 except ImportError:
65 except ImportError:
66 maybe_mongo = []
66 maybe_mongo = []
67 else:
67 else:
68 maybe_mongo = [MongoDB]
68 maybe_mongo = [MongoDB]
69
69
70
70
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72 # Module level variables
72 # Module level variables
73 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
74
74
75
75
76 #: The default config file name for this application
76 #: The default config file name for this application
77 default_config_file_name = u'ipcontroller_config.py'
77 default_config_file_name = u'ipcontroller_config.py'
78
78
79
79
80 _description = """Start the IPython controller for parallel computing.
80 _description = """Start the IPython controller for parallel computing.
81
81
82 The IPython controller provides a gateway between the IPython engines and
82 The IPython controller provides a gateway between the IPython engines and
83 clients. The controller needs to be started before the engines and can be
83 clients. The controller needs to be started before the engines and can be
84 configured using command line options or using a cluster directory. Cluster
84 configured using command line options or using a cluster directory. Cluster
85 directories contain config, log and security files and are usually located in
85 directories contain config, log and security files and are usually located in
86 your ipython directory and named as "profile_name". See the `profile`
86 your ipython directory and named as "profile_name". See the `profile`
87 and `profile-dir` options for details.
87 and `profile-dir` options for details.
88 """
88 """
89
89
90 _examples = """
90 _examples = """
91 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
91 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
92 ipcontroller --scheme=pure # use the pure zeromq scheduler
92 ipcontroller --scheme=pure # use the pure zeromq scheduler
93 """
93 """
94
94
95
95
96 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
97 # The main application
97 # The main application
98 #-----------------------------------------------------------------------------
98 #-----------------------------------------------------------------------------
99 flags = {}
99 flags = {}
100 flags.update(base_flags)
100 flags.update(base_flags)
101 flags.update({
101 flags.update({
102 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
102 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
103 'Use threads instead of processes for the schedulers'),
103 'Use threads instead of processes for the schedulers'),
104 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
104 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
105 'use the SQLiteDB backend'),
105 'use the SQLiteDB backend'),
106 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
106 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
107 'use the MongoDB backend'),
107 'use the MongoDB backend'),
108 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
108 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
109 'use the in-memory DictDB backend'),
109 'use the in-memory DictDB backend'),
110 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
110 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
111 """use dummy DB backend, which doesn't store any information.
111 """use dummy DB backend, which doesn't store any information.
112
112
113 This is the default as of IPython 0.13.
113 This is the default as of IPython 0.13.
114
114
115 To enable delayed or repeated retrieval of results from the Hub,
115 To enable delayed or repeated retrieval of results from the Hub,
116 select one of the true db backends.
116 select one of the true db backends.
117 """),
117 """),
118 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
118 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
119 'reuse existing json connection files'),
119 'reuse existing json connection files'),
120 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
120 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
121 'Attempt to restore engines from a JSON file. '
121 'Attempt to restore engines from a JSON file. '
122 'For use when resuming a crashed controller'),
122 'For use when resuming a crashed controller'),
123 })
123 })
124
124
125 flags.update(session_flags)
125 flags.update(session_flags)
126
126
127 aliases = dict(
127 aliases = dict(
128 ssh = 'IPControllerApp.ssh_server',
128 ssh = 'IPControllerApp.ssh_server',
129 enginessh = 'IPControllerApp.engine_ssh_server',
129 enginessh = 'IPControllerApp.engine_ssh_server',
130 location = 'IPControllerApp.location',
130 location = 'IPControllerApp.location',
131
131
132 url = 'HubFactory.url',
132 url = 'HubFactory.url',
133 ip = 'HubFactory.ip',
133 ip = 'HubFactory.ip',
134 transport = 'HubFactory.transport',
134 transport = 'HubFactory.transport',
135 port = 'HubFactory.regport',
135 port = 'HubFactory.regport',
136
136
137 ping = 'HeartMonitor.period',
137 ping = 'HeartMonitor.period',
138
138
139 scheme = 'TaskScheduler.scheme_name',
139 scheme = 'TaskScheduler.scheme_name',
140 hwm = 'TaskScheduler.hwm',
140 hwm = 'TaskScheduler.hwm',
141 )
141 )
142 aliases.update(base_aliases)
142 aliases.update(base_aliases)
143 aliases.update(session_aliases)
143 aliases.update(session_aliases)
144
144
145 class IPControllerApp(BaseParallelApplication):
145 class IPControllerApp(BaseParallelApplication):
146
146
147 name = u'ipcontroller'
147 name = u'ipcontroller'
148 description = _description
148 description = _description
149 examples = _examples
149 examples = _examples
150 config_file_name = Unicode(default_config_file_name)
150 config_file_name = Unicode(default_config_file_name)
151 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
151 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
152
152
153 # change default to True
153 # change default to True
154 auto_create = Bool(True, config=True,
154 auto_create = Bool(True, config=True,
155 help="""Whether to create profile dir if it doesn't exist.""")
155 help="""Whether to create profile dir if it doesn't exist.""")
156
156
157 reuse_files = Bool(False, config=True,
157 reuse_files = Bool(False, config=True,
158 help="""Whether to reuse existing json connection files.
158 help="""Whether to reuse existing json connection files.
159 If False, connection files will be removed on a clean exit.
159 If False, connection files will be removed on a clean exit.
160 """
160 """
161 )
161 )
162 restore_engines = Bool(False, config=True,
162 restore_engines = Bool(False, config=True,
163 help="""Reload engine state from JSON file
163 help="""Reload engine state from JSON file
164 """
164 """
165 )
165 )
166 ssh_server = Unicode(u'', config=True,
166 ssh_server = Unicode(u'', config=True,
167 help="""ssh url for clients to use when connecting to the Controller
167 help="""ssh url for clients to use when connecting to the Controller
168 processes. It should be of the form: [user@]server[:port]. The
168 processes. It should be of the form: [user@]server[:port]. The
169 Controller's listening addresses must be accessible from the ssh server""",
169 Controller's listening addresses must be accessible from the ssh server""",
170 )
170 )
171 engine_ssh_server = Unicode(u'', config=True,
171 engine_ssh_server = Unicode(u'', config=True,
172 help="""ssh url for engines to use when connecting to the Controller
172 help="""ssh url for engines to use when connecting to the Controller
173 processes. It should be of the form: [user@]server[:port]. The
173 processes. It should be of the form: [user@]server[:port]. The
174 Controller's listening addresses must be accessible from the ssh server""",
174 Controller's listening addresses must be accessible from the ssh server""",
175 )
175 )
176 location = Unicode(u'', config=True,
176 location = Unicode(u'', config=True,
177 help="""The external IP or domain name of the Controller, used for disambiguating
177 help="""The external IP or domain name of the Controller, used for disambiguating
178 engine and client connections.""",
178 engine and client connections.""",
179 )
179 )
180 import_statements = List([], config=True,
180 import_statements = List([], config=True,
181 help="import statements to be run at startup. Necessary in some environments"
181 help="import statements to be run at startup. Necessary in some environments"
182 )
182 )
183
183
184 use_threads = Bool(False, config=True,
184 use_threads = Bool(False, config=True,
185 help='Use threads instead of processes for the schedulers',
185 help='Use threads instead of processes for the schedulers',
186 )
186 )
187
187
188 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
188 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
189 help="JSON filename where engine connection info will be stored.")
189 help="JSON filename where engine connection info will be stored.")
190 client_json_file = Unicode('ipcontroller-client.json', config=True,
190 client_json_file = Unicode('ipcontroller-client.json', config=True,
191 help="JSON filename where client connection info will be stored.")
191 help="JSON filename where client connection info will be stored.")
192
192
193 def _cluster_id_changed(self, name, old, new):
193 def _cluster_id_changed(self, name, old, new):
194 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
194 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
195 self.engine_json_file = "%s-engine.json" % self.name
195 self.engine_json_file = "%s-engine.json" % self.name
196 self.client_json_file = "%s-client.json" % self.name
196 self.client_json_file = "%s-client.json" % self.name
197
197
198
198
199 # internal
199 # internal
200 children = List()
200 children = List()
201 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
201 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
202
202
203 def _use_threads_changed(self, name, old, new):
203 def _use_threads_changed(self, name, old, new):
204 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
204 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
205
205
206 write_connection_files = Bool(True,
206 write_connection_files = Bool(True,
207 help="""Whether to write connection files to disk.
207 help="""Whether to write connection files to disk.
208 True in all cases other than runs with `reuse_files=True` *after the first*
208 True in all cases other than runs with `reuse_files=True` *after the first*
209 """
209 """
210 )
210 )
211
211
212 aliases = Dict(aliases)
212 aliases = Dict(aliases)
213 flags = Dict(flags)
213 flags = Dict(flags)
214
214
215
215
216 def save_connection_dict(self, fname, cdict):
216 def save_connection_dict(self, fname, cdict):
217 """save a connection dict to json file."""
217 """save a connection dict to json file."""
218 c = self.config
218 c = self.config
219 url = cdict['registration']
219 url = cdict['registration']
220 location = cdict['location']
220 location = cdict['location']
221
221
222 if not location:
222 if not location:
223 try:
223 if PUBLIC_IPS:
224 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
224 location = PUBLIC_IPS[-1]
225 except (socket.gaierror, IndexError):
225 else:
226 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
226 self.log.warn("Could not identify this machine's IP, assuming %s."
227 " You may need to specify '--location=<external_ip_address>' to help"
227 " You may need to specify '--location=<external_ip_address>' to help"
228 " IPython decide when to connect via loopback.")
228 " IPython decide when to connect via loopback." % LOCALHOST)
229 location = '127.0.0.1'
229 location = LOCALHOST
230 cdict['location'] = location
230 cdict['location'] = location
231 fname = os.path.join(self.profile_dir.security_dir, fname)
231 fname = os.path.join(self.profile_dir.security_dir, fname)
232 self.log.info("writing connection info to %s", fname)
232 self.log.info("writing connection info to %s", fname)
233 with open(fname, 'w') as f:
233 with open(fname, 'w') as f:
234 f.write(json.dumps(cdict, indent=2))
234 f.write(json.dumps(cdict, indent=2))
235 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
235 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
236
236
237 def load_config_from_json(self):
237 def load_config_from_json(self):
238 """load config from existing json connector files."""
238 """load config from existing json connector files."""
239 c = self.config
239 c = self.config
240 self.log.debug("loading config from JSON")
240 self.log.debug("loading config from JSON")
241
241
242 # load engine config
242 # load engine config
243
243
244 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
244 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
245 self.log.info("loading connection info from %s", fname)
245 self.log.info("loading connection info from %s", fname)
246 with open(fname) as f:
246 with open(fname) as f:
247 ecfg = json.loads(f.read())
247 ecfg = json.loads(f.read())
248
248
249 # json gives unicode, Session.key wants bytes
249 # json gives unicode, Session.key wants bytes
250 c.Session.key = ecfg['exec_key'].encode('ascii')
250 c.Session.key = ecfg['exec_key'].encode('ascii')
251
251
252 xport,ip = ecfg['interface'].split('://')
252 xport,ip = ecfg['interface'].split('://')
253
253
254 c.HubFactory.engine_ip = ip
254 c.HubFactory.engine_ip = ip
255 c.HubFactory.engine_transport = xport
255 c.HubFactory.engine_transport = xport
256
256
257 self.location = ecfg['location']
257 self.location = ecfg['location']
258 if not self.engine_ssh_server:
258 if not self.engine_ssh_server:
259 self.engine_ssh_server = ecfg['ssh']
259 self.engine_ssh_server = ecfg['ssh']
260
260
261 # load client config
261 # load client config
262
262
263 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
263 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
264 self.log.info("loading connection info from %s", fname)
264 self.log.info("loading connection info from %s", fname)
265 with open(fname) as f:
265 with open(fname) as f:
266 ccfg = json.loads(f.read())
266 ccfg = json.loads(f.read())
267
267
268 for key in ('exec_key', 'registration', 'pack', 'unpack'):
268 for key in ('exec_key', 'registration', 'pack', 'unpack'):
269 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
269 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
270
270
271 xport,addr = ccfg['interface'].split('://')
271 xport,addr = ccfg['interface'].split('://')
272
272
273 c.HubFactory.client_transport = xport
273 c.HubFactory.client_transport = xport
274 c.HubFactory.client_ip = ip
274 c.HubFactory.client_ip = ip
275 if not self.ssh_server:
275 if not self.ssh_server:
276 self.ssh_server = ccfg['ssh']
276 self.ssh_server = ccfg['ssh']
277
277
278 # load port config:
278 # load port config:
279 c.HubFactory.regport = ecfg['registration']
279 c.HubFactory.regport = ecfg['registration']
280 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
280 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
281 c.HubFactory.control = (ccfg['control'], ecfg['control'])
281 c.HubFactory.control = (ccfg['control'], ecfg['control'])
282 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
282 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
283 c.HubFactory.task = (ccfg['task'], ecfg['task'])
283 c.HubFactory.task = (ccfg['task'], ecfg['task'])
284 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
284 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
285 c.HubFactory.notifier_port = ccfg['notification']
285 c.HubFactory.notifier_port = ccfg['notification']
286
286
287 def cleanup_connection_files(self):
287 def cleanup_connection_files(self):
288 if self.reuse_files:
288 if self.reuse_files:
289 self.log.debug("leaving JSON connection files for reuse")
289 self.log.debug("leaving JSON connection files for reuse")
290 return
290 return
291 self.log.debug("cleaning up JSON connection files")
291 self.log.debug("cleaning up JSON connection files")
292 for f in (self.client_json_file, self.engine_json_file):
292 for f in (self.client_json_file, self.engine_json_file):
293 f = os.path.join(self.profile_dir.security_dir, f)
293 f = os.path.join(self.profile_dir.security_dir, f)
294 try:
294 try:
295 os.remove(f)
295 os.remove(f)
296 except Exception as e:
296 except Exception as e:
297 self.log.error("Failed to cleanup connection file: %s", e)
297 self.log.error("Failed to cleanup connection file: %s", e)
298 else:
298 else:
299 self.log.debug(u"removed %s", f)
299 self.log.debug(u"removed %s", f)
300
300
301 def load_secondary_config(self):
301 def load_secondary_config(self):
302 """secondary config, loading from JSON and setting defaults"""
302 """secondary config, loading from JSON and setting defaults"""
303 if self.reuse_files:
303 if self.reuse_files:
304 try:
304 try:
305 self.load_config_from_json()
305 self.load_config_from_json()
306 except (AssertionError,IOError) as e:
306 except (AssertionError,IOError) as e:
307 self.log.error("Could not load config from JSON: %s" % e)
307 self.log.error("Could not load config from JSON: %s" % e)
308 else:
308 else:
309 # successfully loaded config from JSON, and reuse=True
309 # successfully loaded config from JSON, and reuse=True
310 # no need to wite back the same file
310 # no need to wite back the same file
311 self.write_connection_files = False
311 self.write_connection_files = False
312
312
313 # switch Session.key default to secure
313 # switch Session.key default to secure
314 default_secure(self.config)
314 default_secure(self.config)
315 self.log.debug("Config changed")
315 self.log.debug("Config changed")
316 self.log.debug(repr(self.config))
316 self.log.debug(repr(self.config))
317
317
318 def init_hub(self):
318 def init_hub(self):
319 c = self.config
319 c = self.config
320
320
321 self.do_import_statements()
321 self.do_import_statements()
322
322
323 try:
323 try:
324 self.factory = HubFactory(config=c, log=self.log)
324 self.factory = HubFactory(config=c, log=self.log)
325 # self.start_logging()
325 # self.start_logging()
326 self.factory.init_hub()
326 self.factory.init_hub()
327 except TraitError:
327 except TraitError:
328 raise
328 raise
329 except Exception:
329 except Exception:
330 self.log.error("Couldn't construct the Controller", exc_info=True)
330 self.log.error("Couldn't construct the Controller", exc_info=True)
331 self.exit(1)
331 self.exit(1)
332
332
333 if self.write_connection_files:
333 if self.write_connection_files:
334 # save to new json config files
334 # save to new json config files
335 f = self.factory
335 f = self.factory
336 base = {
336 base = {
337 'exec_key' : f.session.key.decode('ascii'),
337 'exec_key' : f.session.key.decode('ascii'),
338 'location' : self.location,
338 'location' : self.location,
339 'pack' : f.session.packer,
339 'pack' : f.session.packer,
340 'unpack' : f.session.unpacker,
340 'unpack' : f.session.unpacker,
341 }
341 }
342
342
343 cdict = {'ssh' : self.ssh_server}
343 cdict = {'ssh' : self.ssh_server}
344 cdict.update(f.client_info)
344 cdict.update(f.client_info)
345 cdict.update(base)
345 cdict.update(base)
346 self.save_connection_dict(self.client_json_file, cdict)
346 self.save_connection_dict(self.client_json_file, cdict)
347
347
348 edict = {'ssh' : self.engine_ssh_server}
348 edict = {'ssh' : self.engine_ssh_server}
349 edict.update(f.engine_info)
349 edict.update(f.engine_info)
350 edict.update(base)
350 edict.update(base)
351 self.save_connection_dict(self.engine_json_file, edict)
351 self.save_connection_dict(self.engine_json_file, edict)
352
352
353 fname = "engines%s.json" % self.cluster_id
353 fname = "engines%s.json" % self.cluster_id
354 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
354 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
355 if self.restore_engines:
355 if self.restore_engines:
356 self.factory.hub._load_engine_state()
356 self.factory.hub._load_engine_state()
357
357
358 def init_schedulers(self):
358 def init_schedulers(self):
359 children = self.children
359 children = self.children
360 mq = import_item(str(self.mq_class))
360 mq = import_item(str(self.mq_class))
361
361
362 f = self.factory
362 f = self.factory
363 ident = f.session.bsession
363 ident = f.session.bsession
364 # disambiguate url, in case of *
364 # disambiguate url, in case of *
365 monitor_url = disambiguate_url(f.monitor_url)
365 monitor_url = disambiguate_url(f.monitor_url)
366 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
366 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
367 # IOPub relay (in a Process)
367 # IOPub relay (in a Process)
368 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
368 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
369 q.bind_in(f.client_url('iopub'))
369 q.bind_in(f.client_url('iopub'))
370 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
370 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
371 q.bind_out(f.engine_url('iopub'))
371 q.bind_out(f.engine_url('iopub'))
372 q.setsockopt_out(zmq.SUBSCRIBE, b'')
372 q.setsockopt_out(zmq.SUBSCRIBE, b'')
373 q.connect_mon(monitor_url)
373 q.connect_mon(monitor_url)
374 q.daemon=True
374 q.daemon=True
375 children.append(q)
375 children.append(q)
376
376
377 # Multiplexer Queue (in a Process)
377 # Multiplexer Queue (in a Process)
378 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
378 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
379 q.bind_in(f.client_url('mux'))
379 q.bind_in(f.client_url('mux'))
380 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
380 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
381 q.bind_out(f.engine_url('mux'))
381 q.bind_out(f.engine_url('mux'))
382 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
382 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
383 q.connect_mon(monitor_url)
383 q.connect_mon(monitor_url)
384 q.daemon=True
384 q.daemon=True
385 children.append(q)
385 children.append(q)
386
386
387 # Control Queue (in a Process)
387 # Control Queue (in a Process)
388 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
388 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
389 q.bind_in(f.client_url('control'))
389 q.bind_in(f.client_url('control'))
390 q.setsockopt_in(zmq.IDENTITY, b'control_in')
390 q.setsockopt_in(zmq.IDENTITY, b'control_in')
391 q.bind_out(f.engine_url('control'))
391 q.bind_out(f.engine_url('control'))
392 q.setsockopt_out(zmq.IDENTITY, b'control_out')
392 q.setsockopt_out(zmq.IDENTITY, b'control_out')
393 q.connect_mon(monitor_url)
393 q.connect_mon(monitor_url)
394 q.daemon=True
394 q.daemon=True
395 children.append(q)
395 children.append(q)
396 try:
396 try:
397 scheme = self.config.TaskScheduler.scheme_name
397 scheme = self.config.TaskScheduler.scheme_name
398 except AttributeError:
398 except AttributeError:
399 scheme = TaskScheduler.scheme_name.get_default_value()
399 scheme = TaskScheduler.scheme_name.get_default_value()
400 # Task Queue (in a Process)
400 # Task Queue (in a Process)
401 if scheme == 'pure':
401 if scheme == 'pure':
402 self.log.warn("task::using pure DEALER Task scheduler")
402 self.log.warn("task::using pure DEALER Task scheduler")
403 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
403 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
404 # q.setsockopt_out(zmq.HWM, hub.hwm)
404 # q.setsockopt_out(zmq.HWM, hub.hwm)
405 q.bind_in(f.client_url('task'))
405 q.bind_in(f.client_url('task'))
406 q.setsockopt_in(zmq.IDENTITY, b'task_in')
406 q.setsockopt_in(zmq.IDENTITY, b'task_in')
407 q.bind_out(f.engine_url('task'))
407 q.bind_out(f.engine_url('task'))
408 q.setsockopt_out(zmq.IDENTITY, b'task_out')
408 q.setsockopt_out(zmq.IDENTITY, b'task_out')
409 q.connect_mon(monitor_url)
409 q.connect_mon(monitor_url)
410 q.daemon=True
410 q.daemon=True
411 children.append(q)
411 children.append(q)
412 elif scheme == 'none':
412 elif scheme == 'none':
413 self.log.warn("task::using no Task scheduler")
413 self.log.warn("task::using no Task scheduler")
414
414
415 else:
415 else:
416 self.log.info("task::using Python %s Task scheduler"%scheme)
416 self.log.info("task::using Python %s Task scheduler"%scheme)
417 sargs = (f.client_url('task'), f.engine_url('task'),
417 sargs = (f.client_url('task'), f.engine_url('task'),
418 monitor_url, disambiguate_url(f.client_url('notification')),
418 monitor_url, disambiguate_url(f.client_url('notification')),
419 disambiguate_url(f.client_url('registration')),
419 disambiguate_url(f.client_url('registration')),
420 )
420 )
421 kwargs = dict(logname='scheduler', loglevel=self.log_level,
421 kwargs = dict(logname='scheduler', loglevel=self.log_level,
422 log_url = self.log_url, config=dict(self.config))
422 log_url = self.log_url, config=dict(self.config))
423 if 'Process' in self.mq_class:
423 if 'Process' in self.mq_class:
424 # run the Python scheduler in a Process
424 # run the Python scheduler in a Process
425 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
425 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
426 q.daemon=True
426 q.daemon=True
427 children.append(q)
427 children.append(q)
428 else:
428 else:
429 # single-threaded Controller
429 # single-threaded Controller
430 kwargs['in_thread'] = True
430 kwargs['in_thread'] = True
431 launch_scheduler(*sargs, **kwargs)
431 launch_scheduler(*sargs, **kwargs)
432
432
433 def terminate_children(self):
433 def terminate_children(self):
434 child_procs = []
434 child_procs = []
435 for child in self.children:
435 for child in self.children:
436 if isinstance(child, ProcessMonitoredQueue):
436 if isinstance(child, ProcessMonitoredQueue):
437 child_procs.append(child.launcher)
437 child_procs.append(child.launcher)
438 elif isinstance(child, Process):
438 elif isinstance(child, Process):
439 child_procs.append(child)
439 child_procs.append(child)
440 if child_procs:
440 if child_procs:
441 self.log.critical("terminating children...")
441 self.log.critical("terminating children...")
442 for child in child_procs:
442 for child in child_procs:
443 try:
443 try:
444 child.terminate()
444 child.terminate()
445 except OSError:
445 except OSError:
446 # already dead
446 # already dead
447 pass
447 pass
448
448
449 def handle_signal(self, sig, frame):
449 def handle_signal(self, sig, frame):
450 self.log.critical("Received signal %i, shutting down", sig)
450 self.log.critical("Received signal %i, shutting down", sig)
451 self.terminate_children()
451 self.terminate_children()
452 self.loop.stop()
452 self.loop.stop()
453
453
454 def init_signal(self):
454 def init_signal(self):
455 for sig in (SIGINT, SIGABRT, SIGTERM):
455 for sig in (SIGINT, SIGABRT, SIGTERM):
456 signal(sig, self.handle_signal)
456 signal(sig, self.handle_signal)
457
457
458 def do_import_statements(self):
458 def do_import_statements(self):
459 statements = self.import_statements
459 statements = self.import_statements
460 for s in statements:
460 for s in statements:
461 try:
461 try:
462 self.log.msg("Executing statement: '%s'" % s)
462 self.log.msg("Executing statement: '%s'" % s)
463 exec s in globals(), locals()
463 exec s in globals(), locals()
464 except:
464 except:
465 self.log.msg("Error running statement: %s" % s)
465 self.log.msg("Error running statement: %s" % s)
466
466
467 def forward_logging(self):
467 def forward_logging(self):
468 if self.log_url:
468 if self.log_url:
469 self.log.info("Forwarding logging to %s"%self.log_url)
469 self.log.info("Forwarding logging to %s"%self.log_url)
470 context = zmq.Context.instance()
470 context = zmq.Context.instance()
471 lsock = context.socket(zmq.PUB)
471 lsock = context.socket(zmq.PUB)
472 lsock.connect(self.log_url)
472 lsock.connect(self.log_url)
473 handler = PUBHandler(lsock)
473 handler = PUBHandler(lsock)
474 handler.root_topic = 'controller'
474 handler.root_topic = 'controller'
475 handler.setLevel(self.log_level)
475 handler.setLevel(self.log_level)
476 self.log.addHandler(handler)
476 self.log.addHandler(handler)
477
477
478 @catch_config_error
478 @catch_config_error
479 def initialize(self, argv=None):
479 def initialize(self, argv=None):
480 super(IPControllerApp, self).initialize(argv)
480 super(IPControllerApp, self).initialize(argv)
481 self.forward_logging()
481 self.forward_logging()
482 self.load_secondary_config()
482 self.load_secondary_config()
483 self.init_hub()
483 self.init_hub()
484 self.init_schedulers()
484 self.init_schedulers()
485
485
486 def start(self):
486 def start(self):
487 # Start the subprocesses:
487 # Start the subprocesses:
488 self.factory.start()
488 self.factory.start()
489 # children must be started before signals are setup,
489 # children must be started before signals are setup,
490 # otherwise signal-handling will fire multiple times
490 # otherwise signal-handling will fire multiple times
491 for child in self.children:
491 for child in self.children:
492 child.start()
492 child.start()
493 self.init_signal()
493 self.init_signal()
494
494
495 self.write_pid_file(overwrite=True)
495 self.write_pid_file(overwrite=True)
496
496
497 try:
497 try:
498 self.factory.loop.start()
498 self.factory.loop.start()
499 except KeyboardInterrupt:
499 except KeyboardInterrupt:
500 self.log.critical("Interrupted, Exiting...\n")
500 self.log.critical("Interrupted, Exiting...\n")
501 finally:
501 finally:
502 self.cleanup_connection_files()
502 self.cleanup_connection_files()
503
503
504
504
505
505
506 def launch_new_instance():
506 def launch_new_instance():
507 """Create and run the IPython controller"""
507 """Create and run the IPython controller"""
508 if sys.platform == 'win32':
508 if sys.platform == 'win32':
509 # make sure we don't get called from a multiprocessing subprocess
509 # make sure we don't get called from a multiprocessing subprocess
510 # this can result in infinite Controllers being started on Windows
510 # this can result in infinite Controllers being started on Windows
511 # which doesn't have a proper fork, so multiprocessing is wonky
511 # which doesn't have a proper fork, so multiprocessing is wonky
512
512
513 # this only comes up when IPython has been installed using vanilla
513 # this only comes up when IPython has been installed using vanilla
514 # setuptools, and *not* distribute.
514 # setuptools, and *not* distribute.
515 import multiprocessing
515 import multiprocessing
516 p = multiprocessing.current_process()
516 p = multiprocessing.current_process()
517 # the main process has name 'MainProcess'
517 # the main process has name 'MainProcess'
518 # subprocesses will have names like 'Process-1'
518 # subprocesses will have names like 'Process-1'
519 if p.name != 'MainProcess':
519 if p.name != 'MainProcess':
520 # we are a subprocess, don't start another Controller!
520 # we are a subprocess, don't start another Controller!
521 return
521 return
522 app = IPControllerApp.instance()
522 app = IPControllerApp.instance()
523 app.initialize()
523 app.initialize()
524 app.start()
524 app.start()
525
525
526
526
527 if __name__ == '__main__':
527 if __name__ == '__main__':
528 launch_new_instance()
528 launch_new_instance()
General Comments 0
You need to be logged in to leave comments. Login now