##// END OF EJS Templates
add early shutdown detection, and public-ip message to ipcluster/ipengine...
MinRK -
Show More
@@ -1,557 +1,591 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import errno
24 import errno
25 import logging
25 import logging
26 import os
26 import os
27 import re
27 import re
28 import signal
28 import signal
29
29
30 from subprocess import check_call, CalledProcessError, PIPE
30 from subprocess import check_call, CalledProcessError, PIPE
31 import zmq
31 import zmq
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 from IPython.config.application import Application, boolean_flag
34 from IPython.config.application import Application, boolean_flag
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.core.application import BaseIPythonApplication
36 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.profiledir import ProfileDir
37 from IPython.core.profiledir import ProfileDir
38 from IPython.utils.daemonize import daemonize
38 from IPython.utils.daemonize import daemonize
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.sysinfo import num_cpus
40 from IPython.utils.sysinfo import num_cpus
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List, Any,
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List, Any,
42 DottedObjectName)
42 DottedObjectName)
43
43
44 from IPython.parallel.apps.baseapp import (
44 from IPython.parallel.apps.baseapp import (
45 BaseParallelApplication,
45 BaseParallelApplication,
46 PIDFileError,
46 PIDFileError,
47 base_flags, base_aliases
47 base_flags, base_aliases
48 )
48 )
49
49
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Module level variables
52 # Module level variables
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55
55
56 default_config_file_name = u'ipcluster_config.py'
56 default_config_file_name = u'ipcluster_config.py'
57
57
58
58
59 _description = """Start an IPython cluster for parallel computing.
59 _description = """Start an IPython cluster for parallel computing.
60
60
61 An IPython cluster consists of 1 controller and 1 or more engines.
61 An IPython cluster consists of 1 controller and 1 or more engines.
62 This command automates the startup of these processes using a wide
62 This command automates the startup of these processes using a wide
63 range of startup methods (SSH, local processes, PBS, mpiexec,
63 range of startup methods (SSH, local processes, PBS, mpiexec,
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 local host simply do 'ipcluster start --n=4'. For more complex usage
65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 you will typically do 'ipython profile create mycluster --parallel', then edit
66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 """
68 """
69
69
70 _main_examples = """
70 _main_examples = """
71 ipcluster start --n=4 # start a 4 node cluster on localhost
71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 ipcluster start -h # show the help string for the start subcmd
72 ipcluster start -h # show the help string for the start subcmd
73
73
74 ipcluster stop -h # show the help string for the stop subcmd
74 ipcluster stop -h # show the help string for the stop subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
76 """
76 """
77
77
78 _start_examples = """
78 _start_examples = """
79 ipython profile create mycluster --parallel # create mycluster profile
79 ipython profile create mycluster --parallel # create mycluster profile
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 """
81 """
82
82
83 _stop_examples = """
83 _stop_examples = """
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 """
85 """
86
86
87 _engines_examples = """
87 _engines_examples = """
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 """
89 """
90
90
91
91
92 # Exit codes for ipcluster
92 # Exit codes for ipcluster
93
93
94 # This will be the exit code if the ipcluster appears to be running because
94 # This will be the exit code if the ipcluster appears to be running because
95 # a .pid file exists
95 # a .pid file exists
96 ALREADY_STARTED = 10
96 ALREADY_STARTED = 10
97
97
98
98
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 # file to be found.
100 # file to be found.
101 ALREADY_STOPPED = 11
101 ALREADY_STOPPED = 11
102
102
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 # file to be found.
104 # file to be found.
105 NO_CLUSTER = 12
105 NO_CLUSTER = 12
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Main application
109 # Main application
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111 start_help = """Start an IPython cluster for parallel computing
111 start_help = """Start an IPython cluster for parallel computing
112
112
113 Start an ipython cluster by its profile name or cluster
113 Start an ipython cluster by its profile name or cluster
114 directory. Cluster directories contain configuration, log and
114 directory. Cluster directories contain configuration, log and
115 security related files and are named using the convention
115 security related files and are named using the convention
116 'profile_<name>' and should be creating using the 'start'
116 'profile_<name>' and should be creating using the 'start'
117 subcommand of 'ipcluster'. If your cluster directory is in
117 subcommand of 'ipcluster'. If your cluster directory is in
118 the cwd or the ipython directory, you can simply refer to it
118 the cwd or the ipython directory, you can simply refer to it
119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
120 otherwise use the 'profile-dir' option.
120 otherwise use the 'profile-dir' option.
121 """
121 """
122 stop_help = """Stop a running IPython cluster
122 stop_help = """Stop a running IPython cluster
123
123
124 Stop a running ipython cluster by its profile name or cluster
124 Stop a running ipython cluster by its profile name or cluster
125 directory. Cluster directories are named using the convention
125 directory. Cluster directories are named using the convention
126 'profile_<name>'. If your cluster directory is in
126 'profile_<name>'. If your cluster directory is in
127 the cwd or the ipython directory, you can simply refer to it
127 the cwd or the ipython directory, you can simply refer to it
128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
129 use the '--profile-dir' option.
129 use the '--profile-dir' option.
130 """
130 """
131 engines_help = """Start engines connected to an existing IPython cluster
131 engines_help = """Start engines connected to an existing IPython cluster
132
132
133 Start one or more engines to connect to an existing Cluster
133 Start one or more engines to connect to an existing Cluster
134 by profile name or cluster directory.
134 by profile name or cluster directory.
135 Cluster directories contain configuration, log and
135 Cluster directories contain configuration, log and
136 security related files and are named using the convention
136 security related files and are named using the convention
137 'profile_<name>' and should be creating using the 'start'
137 'profile_<name>' and should be creating using the 'start'
138 subcommand of 'ipcluster'. If your cluster directory is in
138 subcommand of 'ipcluster'. If your cluster directory is in
139 the cwd or the ipython directory, you can simply refer to it
139 the cwd or the ipython directory, you can simply refer to it
140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
141 otherwise use the 'profile-dir' option.
141 otherwise use the 'profile-dir' option.
142 """
142 """
143 stop_aliases = dict(
143 stop_aliases = dict(
144 signal='IPClusterStop.signal',
144 signal='IPClusterStop.signal',
145 )
145 )
146 stop_aliases.update(base_aliases)
146 stop_aliases.update(base_aliases)
147
147
148 class IPClusterStop(BaseParallelApplication):
148 class IPClusterStop(BaseParallelApplication):
149 name = u'ipcluster'
149 name = u'ipcluster'
150 description = stop_help
150 description = stop_help
151 examples = _stop_examples
151 examples = _stop_examples
152 config_file_name = Unicode(default_config_file_name)
152 config_file_name = Unicode(default_config_file_name)
153
153
154 signal = Int(signal.SIGINT, config=True,
154 signal = Int(signal.SIGINT, config=True,
155 help="signal to use for stopping processes.")
155 help="signal to use for stopping processes.")
156
156
157 aliases = Dict(stop_aliases)
157 aliases = Dict(stop_aliases)
158
158
159 def start(self):
159 def start(self):
160 """Start the app for the stop subcommand."""
160 """Start the app for the stop subcommand."""
161 try:
161 try:
162 pid = self.get_pid_from_file()
162 pid = self.get_pid_from_file()
163 except PIDFileError:
163 except PIDFileError:
164 self.log.critical(
164 self.log.critical(
165 'Could not read pid file, cluster is probably not running.'
165 'Could not read pid file, cluster is probably not running.'
166 )
166 )
167 # Here I exit with a unusual exit status that other processes
167 # Here I exit with a unusual exit status that other processes
168 # can watch for to learn how I existed.
168 # can watch for to learn how I existed.
169 self.remove_pid_file()
169 self.remove_pid_file()
170 self.exit(ALREADY_STOPPED)
170 self.exit(ALREADY_STOPPED)
171
171
172 if not self.check_pid(pid):
172 if not self.check_pid(pid):
173 self.log.critical(
173 self.log.critical(
174 'Cluster [pid=%r] is not running.' % pid
174 'Cluster [pid=%r] is not running.' % pid
175 )
175 )
176 self.remove_pid_file()
176 self.remove_pid_file()
177 # Here I exit with a unusual exit status that other processes
177 # Here I exit with a unusual exit status that other processes
178 # can watch for to learn how I existed.
178 # can watch for to learn how I existed.
179 self.exit(ALREADY_STOPPED)
179 self.exit(ALREADY_STOPPED)
180
180
181 elif os.name=='posix':
181 elif os.name=='posix':
182 sig = self.signal
182 sig = self.signal
183 self.log.info(
183 self.log.info(
184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
185 )
185 )
186 try:
186 try:
187 os.kill(pid, sig)
187 os.kill(pid, sig)
188 except OSError:
188 except OSError:
189 self.log.error("Stopping cluster failed, assuming already dead.",
189 self.log.error("Stopping cluster failed, assuming already dead.",
190 exc_info=True)
190 exc_info=True)
191 self.remove_pid_file()
191 self.remove_pid_file()
192 elif os.name=='nt':
192 elif os.name=='nt':
193 try:
193 try:
194 # kill the whole tree
194 # kill the whole tree
195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
196 except (CalledProcessError, OSError):
196 except (CalledProcessError, OSError):
197 self.log.error("Stopping cluster failed, assuming already dead.",
197 self.log.error("Stopping cluster failed, assuming already dead.",
198 exc_info=True)
198 exc_info=True)
199 self.remove_pid_file()
199 self.remove_pid_file()
200
200
201 engine_aliases = {}
201 engine_aliases = {}
202 engine_aliases.update(base_aliases)
202 engine_aliases.update(base_aliases)
203 engine_aliases.update(dict(
203 engine_aliases.update(dict(
204 n='IPClusterEngines.n',
204 n='IPClusterEngines.n',
205 engines = 'IPClusterEngines.engine_launcher_class',
205 engines = 'IPClusterEngines.engine_launcher_class',
206 daemonize = 'IPClusterEngines.daemonize',
206 daemonize = 'IPClusterEngines.daemonize',
207 ))
207 ))
208 engine_flags = {}
208 engine_flags = {}
209 engine_flags.update(base_flags)
209 engine_flags.update(base_flags)
210
210
211 engine_flags.update(dict(
211 engine_flags.update(dict(
212 daemonize=(
212 daemonize=(
213 {'IPClusterEngines' : {'daemonize' : True}},
213 {'IPClusterEngines' : {'daemonize' : True}},
214 """run the cluster into the background (not available on Windows)""",
214 """run the cluster into the background (not available on Windows)""",
215 )
215 )
216 ))
216 ))
217 class IPClusterEngines(BaseParallelApplication):
217 class IPClusterEngines(BaseParallelApplication):
218
218
219 name = u'ipcluster'
219 name = u'ipcluster'
220 description = engines_help
220 description = engines_help
221 examples = _engines_examples
221 examples = _engines_examples
222 usage = None
222 usage = None
223 config_file_name = Unicode(default_config_file_name)
223 config_file_name = Unicode(default_config_file_name)
224 default_log_level = logging.INFO
224 default_log_level = logging.INFO
225 classes = List()
225 classes = List()
226 def _classes_default(self):
226 def _classes_default(self):
227 from IPython.parallel.apps import launcher
227 from IPython.parallel.apps import launcher
228 launchers = launcher.all_launchers
228 launchers = launcher.all_launchers
229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
230 return [ProfileDir]+eslaunchers
230 return [ProfileDir]+eslaunchers
231
231
232 n = Int(num_cpus(), config=True,
232 n = Int(num_cpus(), config=True,
233 help="""The number of engines to start. The default is to use one for each
233 help="""The number of engines to start. The default is to use one for each
234 CPU on your machine""")
234 CPU on your machine""")
235
235
236 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
236 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
237 def _engine_launcher_changed(self, name, old, new):
237 def _engine_launcher_changed(self, name, old, new):
238 if isinstance(new, basestring):
238 if isinstance(new, basestring):
239 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
239 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
240 " use engine_launcher_class" % self.__class__.__name__)
240 " use engine_launcher_class" % self.__class__.__name__)
241 self.engine_launcher_class = new
241 self.engine_launcher_class = new
242 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
242 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
243 config=True,
243 config=True,
244 help="""The class for launching a set of Engines. Change this value
244 help="""The class for launching a set of Engines. Change this value
245 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
245 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
246 Each launcher class has its own set of configuration options, for making sure
246 Each launcher class has its own set of configuration options, for making sure
247 it will work in your environment.
247 it will work in your environment.
248
248
249 You can also write your own launcher, and specify it's absolute import path,
249 You can also write your own launcher, and specify it's absolute import path,
250 as in 'mymodule.launcher.FTLEnginesLauncher`.
250 as in 'mymodule.launcher.FTLEnginesLauncher`.
251
251
252 Examples include:
252 Examples include:
253
253
254 LocalEngineSetLauncher : start engines locally as subprocesses [default]
254 LocalEngineSetLauncher : start engines locally as subprocesses [default]
255 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
255 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
256 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
256 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
257 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
257 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
258 LSFEngineSetLauncher : use LSF (bsub) to submit engines to a batch queue
258 LSFEngineSetLauncher : use LSF (bsub) to submit engines to a batch queue
259 SSHEngineSetLauncher : use SSH to start the controller
259 SSHEngineSetLauncher : use SSH to start the controller
260 Note that SSH does *not* move the connection files
260 Note that SSH does *not* move the connection files
261 around, so you will likely have to do this manually
261 around, so you will likely have to do this manually
262 unless the machines are on a shared file system.
262 unless the machines are on a shared file system.
263 WindowsHPCEngineSetLauncher : use Windows HPC
263 WindowsHPCEngineSetLauncher : use Windows HPC
264
264
265 If you are using one of IPython's builtin launchers, you can specify just the
265 If you are using one of IPython's builtin launchers, you can specify just the
266 prefix, e.g:
266 prefix, e.g:
267
267
268 c.IPClusterEngines.engine_launcher_class = 'SSH'
268 c.IPClusterEngines.engine_launcher_class = 'SSH'
269
269
270 or:
270 or:
271
271
272 ipcluster start --engines 'MPIExec'
272 ipcluster start --engines 'MPIExec'
273
273
274 """
274 """
275 )
275 )
276 daemonize = Bool(False, config=True,
276 daemonize = Bool(False, config=True,
277 help="""Daemonize the ipcluster program. This implies --log-to-file.
277 help="""Daemonize the ipcluster program. This implies --log-to-file.
278 Not available on Windows.
278 Not available on Windows.
279 """)
279 """)
280
280
281 def _daemonize_changed(self, name, old, new):
281 def _daemonize_changed(self, name, old, new):
282 if new:
282 if new:
283 self.log_to_file = True
283 self.log_to_file = True
284
284
285 early_shutdown = Int(30, config=True, help="The timeout (in seconds)")
286 _stopping = False
287
285 aliases = Dict(engine_aliases)
288 aliases = Dict(engine_aliases)
286 flags = Dict(engine_flags)
289 flags = Dict(engine_flags)
287 _stopping = False
288
290
289 def initialize(self, argv=None):
291 def initialize(self, argv=None):
290 super(IPClusterEngines, self).initialize(argv)
292 super(IPClusterEngines, self).initialize(argv)
291 self.init_signal()
293 self.init_signal()
292 self.init_launchers()
294 self.init_launchers()
293
295
294 def init_launchers(self):
296 def init_launchers(self):
295 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
297 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
296 self.engine_launcher.on_stop(lambda r: self.loop.stop())
297
298
298 def init_signal(self):
299 def init_signal(self):
299 # Setup signals
300 # Setup signals
300 signal.signal(signal.SIGINT, self.sigint_handler)
301 signal.signal(signal.SIGINT, self.sigint_handler)
301
302
302 def build_launcher(self, clsname, kind=None):
303 def build_launcher(self, clsname, kind=None):
303 """import and instantiate a Launcher based on importstring"""
304 """import and instantiate a Launcher based on importstring"""
304 if '.' not in clsname:
305 if '.' not in clsname:
305 # not a module, presume it's the raw name in apps.launcher
306 # not a module, presume it's the raw name in apps.launcher
306 if kind and kind not in clsname:
307 if kind and kind not in clsname:
307 # doesn't match necessary full class name, assume it's
308 # doesn't match necessary full class name, assume it's
308 # just 'PBS' or 'MPIExec' prefix:
309 # just 'PBS' or 'MPIExec' prefix:
309 clsname = clsname + kind + 'Launcher'
310 clsname = clsname + kind + 'Launcher'
310 clsname = 'IPython.parallel.apps.launcher.'+clsname
311 clsname = 'IPython.parallel.apps.launcher.'+clsname
311 try:
312 try:
312 klass = import_item(clsname)
313 klass = import_item(clsname)
313 except (ImportError, KeyError):
314 except (ImportError, KeyError):
314 self.log.fatal("Could not import launcher class: %r"%clsname)
315 self.log.fatal("Could not import launcher class: %r"%clsname)
315 self.exit(1)
316 self.exit(1)
316
317
317 launcher = klass(
318 launcher = klass(
318 work_dir=u'.', config=self.config, log=self.log,
319 work_dir=u'.', config=self.config, log=self.log,
319 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
320 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
320 )
321 )
321 return launcher
322 return launcher
322
323
324 def engines_started_okay(self):
325 self.log.info("Engines appear to have started successfully")
326 self.early_shutdown = 0
327
323 def start_engines(self):
328 def start_engines(self):
324 self.log.info("Starting %i engines"%self.n)
329 self.log.info("Starting %i engines"%self.n)
325 self.engine_launcher.start(self.n)
330 self.engine_launcher.start(self.n)
331 self.engine_launcher.on_stop(self.engines_stopped_early)
332 if self.early_shutdown:
333 ioloop.DelayedCallback(self.engines_started_okay, self.early_shutdown*1000, self.loop).start()
334
335 def engines_stopped_early(self, r):
336 if self.early_shutdown and not self._stopping:
337 self.log.error("""
338 Engines shutdown early, they probably failed to connect.
339
340 Check the engine log files for output.
341
342 If your controller and engines are not on the same machine, you probably
343 have to instruct the controller to listen on an interface other than localhost.
344
345 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
346
347 Be sure to read our security docs before instructing your controller to listen on
348 a public interface.
349 """)
350 self.stop_launchers()
351
352 return self.engines_stopped(r)
353
354 def engines_stopped(self, r):
355 return self.loop.stop()
326
356
327 def stop_engines(self):
357 def stop_engines(self):
328 self.log.info("Stopping Engines...")
329 if self.engine_launcher.running:
358 if self.engine_launcher.running:
359 self.log.info("Stopping Engines...")
330 d = self.engine_launcher.stop()
360 d = self.engine_launcher.stop()
331 return d
361 return d
332 else:
362 else:
333 return None
363 return None
334
364
335 def stop_launchers(self, r=None):
365 def stop_launchers(self, r=None):
336 if not self._stopping:
366 if not self._stopping:
337 self._stopping = True
367 self._stopping = True
338 self.log.error("IPython cluster: stopping")
368 self.log.error("IPython cluster: stopping")
339 self.stop_engines()
369 self.stop_engines()
340 # Wait a few seconds to let things shut down.
370 # Wait a few seconds to let things shut down.
341 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
371 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
342 dc.start()
372 dc.start()
343
373
344 def sigint_handler(self, signum, frame):
374 def sigint_handler(self, signum, frame):
345 self.log.debug("SIGINT received, stopping launchers...")
375 self.log.debug("SIGINT received, stopping launchers...")
346 self.stop_launchers()
376 self.stop_launchers()
347
377
348 def start_logging(self):
378 def start_logging(self):
349 # Remove old log files of the controller and engine
379 # Remove old log files of the controller and engine
350 if self.clean_logs:
380 if self.clean_logs:
351 log_dir = self.profile_dir.log_dir
381 log_dir = self.profile_dir.log_dir
352 for f in os.listdir(log_dir):
382 for f in os.listdir(log_dir):
353 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
383 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
354 os.remove(os.path.join(log_dir, f))
384 os.remove(os.path.join(log_dir, f))
355 # This will remove old log files for ipcluster itself
385 # This will remove old log files for ipcluster itself
356 # super(IPBaseParallelApplication, self).start_logging()
386 # super(IPBaseParallelApplication, self).start_logging()
357
387
358 def start(self):
388 def start(self):
359 """Start the app for the engines subcommand."""
389 """Start the app for the engines subcommand."""
360 self.log.info("IPython cluster: started")
390 self.log.info("IPython cluster: started")
361 # First see if the cluster is already running
391 # First see if the cluster is already running
362
392
363 # Now log and daemonize
393 # Now log and daemonize
364 self.log.info(
394 self.log.info(
365 'Starting engines with [daemon=%r]' % self.daemonize
395 'Starting engines with [daemon=%r]' % self.daemonize
366 )
396 )
367 # TODO: Get daemonize working on Windows or as a Windows Server.
397 # TODO: Get daemonize working on Windows or as a Windows Server.
368 if self.daemonize:
398 if self.daemonize:
369 if os.name=='posix':
399 if os.name=='posix':
370 daemonize()
400 daemonize()
371
401
372 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
402 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
373 dc.start()
403 dc.start()
374 # Now write the new pid file AFTER our new forked pid is active.
404 # Now write the new pid file AFTER our new forked pid is active.
375 # self.write_pid_file()
405 # self.write_pid_file()
376 try:
406 try:
377 self.loop.start()
407 self.loop.start()
378 except KeyboardInterrupt:
408 except KeyboardInterrupt:
379 pass
409 pass
380 except zmq.ZMQError as e:
410 except zmq.ZMQError as e:
381 if e.errno == errno.EINTR:
411 if e.errno == errno.EINTR:
382 pass
412 pass
383 else:
413 else:
384 raise
414 raise
385
415
386 start_aliases = {}
416 start_aliases = {}
387 start_aliases.update(engine_aliases)
417 start_aliases.update(engine_aliases)
388 start_aliases.update(dict(
418 start_aliases.update(dict(
389 delay='IPClusterStart.delay',
419 delay='IPClusterStart.delay',
390 controller = 'IPClusterStart.controller_launcher_class',
420 controller = 'IPClusterStart.controller_launcher_class',
391 ))
421 ))
392 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
422 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
393
423
394 class IPClusterStart(IPClusterEngines):
424 class IPClusterStart(IPClusterEngines):
395
425
396 name = u'ipcluster'
426 name = u'ipcluster'
397 description = start_help
427 description = start_help
398 examples = _start_examples
428 examples = _start_examples
399 default_log_level = logging.INFO
429 default_log_level = logging.INFO
400 auto_create = Bool(True, config=True,
430 auto_create = Bool(True, config=True,
401 help="whether to create the profile_dir if it doesn't exist")
431 help="whether to create the profile_dir if it doesn't exist")
402 classes = List()
432 classes = List()
403 def _classes_default(self,):
433 def _classes_default(self,):
404 from IPython.parallel.apps import launcher
434 from IPython.parallel.apps import launcher
405 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
435 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
406
436
407 clean_logs = Bool(True, config=True,
437 clean_logs = Bool(True, config=True,
408 help="whether to cleanup old logs before starting")
438 help="whether to cleanup old logs before starting")
409
439
410 delay = CFloat(1., config=True,
440 delay = CFloat(1., config=True,
411 help="delay (in s) between starting the controller and the engines")
441 help="delay (in s) between starting the controller and the engines")
412
442
413 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
443 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
414 def _controller_launcher_changed(self, name, old, new):
444 def _controller_launcher_changed(self, name, old, new):
415 if isinstance(new, basestring):
445 if isinstance(new, basestring):
416 # old 0.11-style config
446 # old 0.11-style config
417 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
447 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
418 " use controller_launcher_class" % self.__class__.__name__)
448 " use controller_launcher_class" % self.__class__.__name__)
419 self.controller_launcher_class = new
449 self.controller_launcher_class = new
420 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
450 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
421 config=True,
451 config=True,
422 help="""The class for launching a Controller. Change this value if you want
452 help="""The class for launching a Controller. Change this value if you want
423 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
453 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
424
454
425 Each launcher class has its own set of configuration options, for making sure
455 Each launcher class has its own set of configuration options, for making sure
426 it will work in your environment.
456 it will work in your environment.
427
457
428 Examples include:
458 Examples include:
429
459
430 LocalControllerLauncher : start engines locally as subprocesses
460 LocalControllerLauncher : start engines locally as subprocesses
431 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
461 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
432 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
462 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
433 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
463 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
434 LSFControllerLauncher : use LSF (bsub) to submit engines to a batch queue
464 LSFControllerLauncher : use LSF (bsub) to submit engines to a batch queue
435 SSHControllerLauncher : use SSH to start the controller
465 SSHControllerLauncher : use SSH to start the controller
436 WindowsHPCControllerLauncher : use Windows HPC
466 WindowsHPCControllerLauncher : use Windows HPC
437
467
438 If you are using one of IPython's builtin launchers, you can specify just the
468 If you are using one of IPython's builtin launchers, you can specify just the
439 prefix, e.g:
469 prefix, e.g:
440
470
441 c.IPClusterStart.controller_launcher_class = 'SSH'
471 c.IPClusterStart.controller_launcher_class = 'SSH'
442
472
443 or:
473 or:
444
474
445 ipcluster start --controller 'MPIExec'
475 ipcluster start --controller 'MPIExec'
446
476
447 """
477 """
448 )
478 )
449 reset = Bool(False, config=True,
479 reset = Bool(False, config=True,
450 help="Whether to reset config files as part of '--create'."
480 help="Whether to reset config files as part of '--create'."
451 )
481 )
452
482
453 # flags = Dict(flags)
483 # flags = Dict(flags)
454 aliases = Dict(start_aliases)
484 aliases = Dict(start_aliases)
455
485
456 def init_launchers(self):
486 def init_launchers(self):
457 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
487 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
458 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
488 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
459 self.controller_launcher.on_stop(self.stop_launchers)
489 self.controller_launcher.on_stop(self.stop_launchers)
460
490
491 def engines_stopped(self, r):
492 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
493 pass
494
461 def start_controller(self):
495 def start_controller(self):
462 self.controller_launcher.start()
496 self.controller_launcher.start()
463
497
464 def stop_controller(self):
498 def stop_controller(self):
465 # self.log.info("In stop_controller")
499 # self.log.info("In stop_controller")
466 if self.controller_launcher and self.controller_launcher.running:
500 if self.controller_launcher and self.controller_launcher.running:
467 return self.controller_launcher.stop()
501 return self.controller_launcher.stop()
468
502
469 def stop_launchers(self, r=None):
503 def stop_launchers(self, r=None):
470 if not self._stopping:
504 if not self._stopping:
471 self.stop_controller()
505 self.stop_controller()
472 super(IPClusterStart, self).stop_launchers()
506 super(IPClusterStart, self).stop_launchers()
473
507
474 def start(self):
508 def start(self):
475 """Start the app for the start subcommand."""
509 """Start the app for the start subcommand."""
476 # First see if the cluster is already running
510 # First see if the cluster is already running
477 try:
511 try:
478 pid = self.get_pid_from_file()
512 pid = self.get_pid_from_file()
479 except PIDFileError:
513 except PIDFileError:
480 pass
514 pass
481 else:
515 else:
482 if self.check_pid(pid):
516 if self.check_pid(pid):
483 self.log.critical(
517 self.log.critical(
484 'Cluster is already running with [pid=%s]. '
518 'Cluster is already running with [pid=%s]. '
485 'use "ipcluster stop" to stop the cluster.' % pid
519 'use "ipcluster stop" to stop the cluster.' % pid
486 )
520 )
487 # Here I exit with a unusual exit status that other processes
521 # Here I exit with a unusual exit status that other processes
488 # can watch for to learn how I existed.
522 # can watch for to learn how I existed.
489 self.exit(ALREADY_STARTED)
523 self.exit(ALREADY_STARTED)
490 else:
524 else:
491 self.remove_pid_file()
525 self.remove_pid_file()
492
526
493
527
494 # Now log and daemonize
528 # Now log and daemonize
495 self.log.info(
529 self.log.info(
496 'Starting ipcluster with [daemon=%r]' % self.daemonize
530 'Starting ipcluster with [daemon=%r]' % self.daemonize
497 )
531 )
498 # TODO: Get daemonize working on Windows or as a Windows Server.
532 # TODO: Get daemonize working on Windows or as a Windows Server.
499 if self.daemonize:
533 if self.daemonize:
500 if os.name=='posix':
534 if os.name=='posix':
501 daemonize()
535 daemonize()
502
536
503 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
537 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
504 dc.start()
538 dc.start()
505 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
539 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
506 dc.start()
540 dc.start()
507 # Now write the new pid file AFTER our new forked pid is active.
541 # Now write the new pid file AFTER our new forked pid is active.
508 self.write_pid_file()
542 self.write_pid_file()
509 try:
543 try:
510 self.loop.start()
544 self.loop.start()
511 except KeyboardInterrupt:
545 except KeyboardInterrupt:
512 pass
546 pass
513 except zmq.ZMQError as e:
547 except zmq.ZMQError as e:
514 if e.errno == errno.EINTR:
548 if e.errno == errno.EINTR:
515 pass
549 pass
516 else:
550 else:
517 raise
551 raise
518 finally:
552 finally:
519 self.remove_pid_file()
553 self.remove_pid_file()
520
554
521 base='IPython.parallel.apps.ipclusterapp.IPCluster'
555 base='IPython.parallel.apps.ipclusterapp.IPCluster'
522
556
523 class IPClusterApp(Application):
557 class IPClusterApp(Application):
524 name = u'ipcluster'
558 name = u'ipcluster'
525 description = _description
559 description = _description
526 examples = _main_examples
560 examples = _main_examples
527
561
528 subcommands = {
562 subcommands = {
529 'start' : (base+'Start', start_help),
563 'start' : (base+'Start', start_help),
530 'stop' : (base+'Stop', stop_help),
564 'stop' : (base+'Stop', stop_help),
531 'engines' : (base+'Engines', engines_help),
565 'engines' : (base+'Engines', engines_help),
532 }
566 }
533
567
534 # no aliases or flags for parent App
568 # no aliases or flags for parent App
535 aliases = Dict()
569 aliases = Dict()
536 flags = Dict()
570 flags = Dict()
537
571
538 def start(self):
572 def start(self):
539 if self.subapp is None:
573 if self.subapp is None:
540 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
574 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
541 print
575 print
542 self.print_description()
576 self.print_description()
543 self.print_subcommands()
577 self.print_subcommands()
544 self.exit(1)
578 self.exit(1)
545 else:
579 else:
546 return self.subapp.start()
580 return self.subapp.start()
547
581
548 def launch_new_instance():
582 def launch_new_instance():
549 """Create and run the IPython cluster."""
583 """Create and run the IPython cluster."""
550 app = IPClusterApp.instance()
584 app = IPClusterApp.instance()
551 app.initialize()
585 app.initialize()
552 app.start()
586 app.start()
553
587
554
588
555 if __name__ == '__main__':
589 if __name__ == '__main__':
556 launch_new_instance()
590 launch_new_instance()
557
591
@@ -1,226 +1,234 b''
1 """A simple engine that talks to a controller over 0MQ.
1 """A simple engine that talks to a controller over 0MQ.
2 it handles registration, etc. and launches a kernel
2 it handles registration, etc. and launches a kernel
3 connected to the Controller's Schedulers.
3 connected to the Controller's Schedulers.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from getpass import getpass
20 from getpass import getpass
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.external.ssh import tunnel
25 from IPython.external.ssh import tunnel
26 # internal
26 # internal
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 Instance, Dict, Int, Type, CFloat, Unicode, CBytes, Bool
28 Instance, Dict, Int, Type, CFloat, Unicode, CBytes, Bool
29 )
29 )
30 # from IPython.utils.localinterfaces import LOCALHOST
30 # from IPython.utils.localinterfaces import LOCALHOST
31
31
32 from IPython.parallel.controller.heartmonitor import Heart
32 from IPython.parallel.controller.heartmonitor import Heart
33 from IPython.parallel.factory import RegistrationFactory
33 from IPython.parallel.factory import RegistrationFactory
34 from IPython.parallel.util import disambiguate_url, asbytes
34 from IPython.parallel.util import disambiguate_url, asbytes
35
35
36 from IPython.zmq.session import Message
36 from IPython.zmq.session import Message
37
37
38 from .streamkernel import Kernel
38 from .streamkernel import Kernel
39
39
40 class EngineFactory(RegistrationFactory):
40 class EngineFactory(RegistrationFactory):
41 """IPython engine"""
41 """IPython engine"""
42
42
43 # configurables:
43 # configurables:
44 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
44 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
45 help="""The OutStream for handling stdout/err.
45 help="""The OutStream for handling stdout/err.
46 Typically 'IPython.zmq.iostream.OutStream'""")
46 Typically 'IPython.zmq.iostream.OutStream'""")
47 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
47 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
48 help="""The class for handling displayhook.
48 help="""The class for handling displayhook.
49 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
49 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
50 location=Unicode(config=True,
50 location=Unicode(config=True,
51 help="""The location (an IP address) of the controller. This is
51 help="""The location (an IP address) of the controller. This is
52 used for disambiguating URLs, to determine whether
52 used for disambiguating URLs, to determine whether
53 loopback should be used to connect or the public address.""")
53 loopback should be used to connect or the public address.""")
54 timeout=CFloat(2,config=True,
54 timeout=CFloat(2,config=True,
55 help="""The time (in seconds) to wait for the Controller to respond
55 help="""The time (in seconds) to wait for the Controller to respond
56 to registration requests before giving up.""")
56 to registration requests before giving up.""")
57 sshserver=Unicode(config=True,
57 sshserver=Unicode(config=True,
58 help="""The SSH server to use for tunneling connections to the Controller.""")
58 help="""The SSH server to use for tunneling connections to the Controller.""")
59 sshkey=Unicode(config=True,
59 sshkey=Unicode(config=True,
60 help="""The SSH private key file to use when tunneling connections to the Controller.""")
60 help="""The SSH private key file to use when tunneling connections to the Controller.""")
61 paramiko=Bool(sys.platform == 'win32', config=True,
61 paramiko=Bool(sys.platform == 'win32', config=True,
62 help="""Whether to use paramiko instead of openssh for tunnels.""")
62 help="""Whether to use paramiko instead of openssh for tunnels.""")
63
63
64 # not configurable:
64 # not configurable:
65 user_ns=Dict()
65 user_ns=Dict()
66 id=Int(allow_none=True)
66 id=Int(allow_none=True)
67 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
68 kernel=Instance(Kernel)
68 kernel=Instance(Kernel)
69
69
70 bident = CBytes()
70 bident = CBytes()
71 ident = Unicode()
71 ident = Unicode()
72 def _ident_changed(self, name, old, new):
72 def _ident_changed(self, name, old, new):
73 self.bident = asbytes(new)
73 self.bident = asbytes(new)
74 using_ssh=Bool(False)
74 using_ssh=Bool(False)
75
75
76
76
77 def __init__(self, **kwargs):
77 def __init__(self, **kwargs):
78 super(EngineFactory, self).__init__(**kwargs)
78 super(EngineFactory, self).__init__(**kwargs)
79 self.ident = self.session.session
79 self.ident = self.session.session
80
80
81 def init_connector(self):
81 def init_connector(self):
82 """construct connection function, which handles tunnels."""
82 """construct connection function, which handles tunnels."""
83 self.using_ssh = bool(self.sshkey or self.sshserver)
83 self.using_ssh = bool(self.sshkey or self.sshserver)
84
84
85 if self.sshkey and not self.sshserver:
85 if self.sshkey and not self.sshserver:
86 # We are using ssh directly to the controller, tunneling localhost to localhost
86 # We are using ssh directly to the controller, tunneling localhost to localhost
87 self.sshserver = self.url.split('://')[1].split(':')[0]
87 self.sshserver = self.url.split('://')[1].split(':')[0]
88
88
89 if self.using_ssh:
89 if self.using_ssh:
90 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
90 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
91 password=False
91 password=False
92 else:
92 else:
93 password = getpass("SSH Password for %s: "%self.sshserver)
93 password = getpass("SSH Password for %s: "%self.sshserver)
94 else:
94 else:
95 password = False
95 password = False
96
96
97 def connect(s, url):
97 def connect(s, url):
98 url = disambiguate_url(url, self.location)
98 url = disambiguate_url(url, self.location)
99 if self.using_ssh:
99 if self.using_ssh:
100 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
100 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
101 return tunnel.tunnel_connection(s, url, self.sshserver,
101 return tunnel.tunnel_connection(s, url, self.sshserver,
102 keyfile=self.sshkey, paramiko=self.paramiko,
102 keyfile=self.sshkey, paramiko=self.paramiko,
103 password=password,
103 password=password,
104 )
104 )
105 else:
105 else:
106 return s.connect(url)
106 return s.connect(url)
107
107
108 def maybe_tunnel(url):
108 def maybe_tunnel(url):
109 """like connect, but don't complete the connection (for use by heartbeat)"""
109 """like connect, but don't complete the connection (for use by heartbeat)"""
110 url = disambiguate_url(url, self.location)
110 url = disambiguate_url(url, self.location)
111 if self.using_ssh:
111 if self.using_ssh:
112 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
112 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
113 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
113 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
114 keyfile=self.sshkey, paramiko=self.paramiko,
114 keyfile=self.sshkey, paramiko=self.paramiko,
115 password=password,
115 password=password,
116 )
116 )
117 return url
117 return url
118 return connect, maybe_tunnel
118 return connect, maybe_tunnel
119
119
120 def register(self):
120 def register(self):
121 """send the registration_request"""
121 """send the registration_request"""
122
122
123 self.log.info("Registering with controller at %s"%self.url)
123 self.log.info("Registering with controller at %s"%self.url)
124 ctx = self.context
124 ctx = self.context
125 connect,maybe_tunnel = self.init_connector()
125 connect,maybe_tunnel = self.init_connector()
126 reg = ctx.socket(zmq.DEALER)
126 reg = ctx.socket(zmq.DEALER)
127 reg.setsockopt(zmq.IDENTITY, self.bident)
127 reg.setsockopt(zmq.IDENTITY, self.bident)
128 connect(reg, self.url)
128 connect(reg, self.url)
129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
130
130
131
131
132 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
134 # print (self.session.key)
134 # print (self.session.key)
135 self.session.send(self.registrar, "registration_request",content=content)
135 self.session.send(self.registrar, "registration_request",content=content)
136
136
137 def complete_registration(self, msg, connect, maybe_tunnel):
137 def complete_registration(self, msg, connect, maybe_tunnel):
138 # print msg
138 # print msg
139 self._abort_dc.stop()
139 self._abort_dc.stop()
140 ctx = self.context
140 ctx = self.context
141 loop = self.loop
141 loop = self.loop
142 identity = self.bident
142 identity = self.bident
143 idents,msg = self.session.feed_identities(msg)
143 idents,msg = self.session.feed_identities(msg)
144 msg = Message(self.session.unserialize(msg))
144 msg = Message(self.session.unserialize(msg))
145
145
146 if msg.content.status == 'ok':
146 if msg.content.status == 'ok':
147 self.id = int(msg.content.id)
147 self.id = int(msg.content.id)
148
148
149 # launch heartbeat
149 # launch heartbeat
150 hb_addrs = msg.content.heartbeat
150 hb_addrs = msg.content.heartbeat
151
151
152 # possibly forward hb ports with tunnels
152 # possibly forward hb ports with tunnels
153 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
153 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
154 heart = Heart(*map(str, hb_addrs), heart_id=identity)
154 heart = Heart(*map(str, hb_addrs), heart_id=identity)
155 heart.start()
155 heart.start()
156
156
157 # create Shell Streams (MUX, Task, etc.):
157 # create Shell Streams (MUX, Task, etc.):
158 queue_addr = msg.content.mux
158 queue_addr = msg.content.mux
159 shell_addrs = [ str(queue_addr) ]
159 shell_addrs = [ str(queue_addr) ]
160 task_addr = msg.content.task
160 task_addr = msg.content.task
161 if task_addr:
161 if task_addr:
162 shell_addrs.append(str(task_addr))
162 shell_addrs.append(str(task_addr))
163
163
164 # Uncomment this to go back to two-socket model
164 # Uncomment this to go back to two-socket model
165 # shell_streams = []
165 # shell_streams = []
166 # for addr in shell_addrs:
166 # for addr in shell_addrs:
167 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
167 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
168 # stream.setsockopt(zmq.IDENTITY, identity)
168 # stream.setsockopt(zmq.IDENTITY, identity)
169 # stream.connect(disambiguate_url(addr, self.location))
169 # stream.connect(disambiguate_url(addr, self.location))
170 # shell_streams.append(stream)
170 # shell_streams.append(stream)
171
171
172 # Now use only one shell stream for mux and tasks
172 # Now use only one shell stream for mux and tasks
173 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
173 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
174 stream.setsockopt(zmq.IDENTITY, identity)
174 stream.setsockopt(zmq.IDENTITY, identity)
175 shell_streams = [stream]
175 shell_streams = [stream]
176 for addr in shell_addrs:
176 for addr in shell_addrs:
177 connect(stream, addr)
177 connect(stream, addr)
178 # end single stream-socket
178 # end single stream-socket
179
179
180 # control stream:
180 # control stream:
181 control_addr = str(msg.content.control)
181 control_addr = str(msg.content.control)
182 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
182 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
183 control_stream.setsockopt(zmq.IDENTITY, identity)
183 control_stream.setsockopt(zmq.IDENTITY, identity)
184 connect(control_stream, control_addr)
184 connect(control_stream, control_addr)
185
185
186 # create iopub stream:
186 # create iopub stream:
187 iopub_addr = msg.content.iopub
187 iopub_addr = msg.content.iopub
188 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
188 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
189 iopub_stream.setsockopt(zmq.IDENTITY, identity)
189 iopub_stream.setsockopt(zmq.IDENTITY, identity)
190 connect(iopub_stream, iopub_addr)
190 connect(iopub_stream, iopub_addr)
191
191
192 # # Redirect input streams and set a display hook.
192 # # Redirect input streams and set a display hook.
193 if self.out_stream_factory:
193 if self.out_stream_factory:
194 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
194 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
195 sys.stdout.topic = 'engine.%i.stdout'%self.id
195 sys.stdout.topic = 'engine.%i.stdout'%self.id
196 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
196 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
197 sys.stderr.topic = 'engine.%i.stderr'%self.id
197 sys.stderr.topic = 'engine.%i.stderr'%self.id
198 if self.display_hook_factory:
198 if self.display_hook_factory:
199 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
199 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
200 sys.displayhook.topic = 'engine.%i.pyout'%self.id
200 sys.displayhook.topic = 'engine.%i.pyout'%self.id
201
201
202 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
202 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
203 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
203 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
204 loop=loop, user_ns = self.user_ns, log=self.log)
204 loop=loop, user_ns = self.user_ns, log=self.log)
205 self.kernel.start()
205 self.kernel.start()
206
206
207
207
208 else:
208 else:
209 self.log.fatal("Registration Failed: %s"%msg)
209 self.log.fatal("Registration Failed: %s"%msg)
210 raise Exception("Registration Failed: %s"%msg)
210 raise Exception("Registration Failed: %s"%msg)
211
211
212 self.log.info("Completed registration with id %i"%self.id)
212 self.log.info("Completed registration with id %i"%self.id)
213
213
214
214
215 def abort(self):
215 def abort(self):
216 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
216 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
217 if '127' in self.url:
218 self.log.fatal("""
219 If the controller and engines are not on the same machine,
220 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
221 c.HubFactory.ip='*' # for all interfaces, internal and external
222 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
223 or tunnel connections via ssh.
224 """)
217 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
225 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
218 time.sleep(1)
226 time.sleep(1)
219 sys.exit(255)
227 sys.exit(255)
220
228
221 def start(self):
229 def start(self):
222 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
230 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
223 dc.start()
231 dc.start()
224 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
232 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
225 self._abort_dc.start()
233 self._abort_dc.start()
226
234
General Comments 0
You need to be logged in to leave comments. Login now