##// END OF EJS Templates
scrub twisted/deferred references from launchers...
MinRK -
Show More
@@ -0,0 +1,26 b''
1 """daemonize function from twisted.scripts._twistd_unix."""
2
3 #-----------------------------------------------------------------------------
4 # Copyright (c) Twisted Matrix Laboratories.
5 # See Twisted's LICENSE for details.
6 # http://twistedmatrix.com/
7 #-----------------------------------------------------------------------------
8
9 import os, errno
10
11 def daemonize():
12 # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
13 if os.fork(): # launch child and...
14 os._exit(0) # kill off parent
15 os.setsid()
16 if os.fork(): # launch child and...
17 os._exit(0) # kill off parent again.
18 null = os.open('/dev/null', os.O_RDWR)
19 for i in range(3):
20 try:
21 os.dup2(null, i)
22 except OSError, e:
23 if e.errno != errno.EBADF:
24 raise
25 os.close(null)
26
@@ -1,527 +1,526 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import errno
24 import errno
25 import logging
25 import logging
26 import os
26 import os
27 import re
27 import re
28 import signal
28 import signal
29
29
30 from subprocess import check_call, CalledProcessError, PIPE
30 from subprocess import check_call, CalledProcessError, PIPE
31 import zmq
31 import zmq
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 from IPython.config.application import Application, boolean_flag
34 from IPython.config.application import Application, boolean_flag
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
36 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
37 from IPython.utils.daemonize import daemonize
37 from IPython.utils.importstring import import_item
38 from IPython.utils.importstring import import_item
38 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
39 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
39
40
40 from IPython.parallel.apps.baseapp import (
41 from IPython.parallel.apps.baseapp import (
41 BaseParallelApplication,
42 BaseParallelApplication,
42 PIDFileError,
43 PIDFileError,
43 base_flags, base_aliases
44 base_flags, base_aliases
44 )
45 )
45
46
46
47
47 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
48 # Module level variables
49 # Module level variables
49 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
50
51
51
52
52 default_config_file_name = u'ipcluster_config.py'
53 default_config_file_name = u'ipcluster_config.py'
53
54
54
55
55 _description = """Start an IPython cluster for parallel computing.
56 _description = """Start an IPython cluster for parallel computing.
56
57
57 An IPython cluster consists of 1 controller and 1 or more engines.
58 An IPython cluster consists of 1 controller and 1 or more engines.
58 This command automates the startup of these processes using a wide
59 This command automates the startup of these processes using a wide
59 range of startup methods (SSH, local processes, PBS, mpiexec,
60 range of startup methods (SSH, local processes, PBS, mpiexec,
60 Windows HPC Server 2008). To start a cluster with 4 engines on your
61 Windows HPC Server 2008). To start a cluster with 4 engines on your
61 local host simply do 'ipcluster start n=4'. For more complex usage
62 local host simply do 'ipcluster start n=4'. For more complex usage
62 you will typically do 'ipcluster create profile=mycluster', then edit
63 you will typically do 'ipcluster create profile=mycluster', then edit
63 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
64 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
64 """
65 """
65
66
66
67
67 # Exit codes for ipcluster
68 # Exit codes for ipcluster
68
69
69 # This will be the exit code if the ipcluster appears to be running because
70 # This will be the exit code if the ipcluster appears to be running because
70 # a .pid file exists
71 # a .pid file exists
71 ALREADY_STARTED = 10
72 ALREADY_STARTED = 10
72
73
73
74
74 # This will be the exit code if ipcluster stop is run, but there is not .pid
75 # This will be the exit code if ipcluster stop is run, but there is not .pid
75 # file to be found.
76 # file to be found.
76 ALREADY_STOPPED = 11
77 ALREADY_STOPPED = 11
77
78
78 # This will be the exit code if ipcluster engines is run, but there is not .pid
79 # This will be the exit code if ipcluster engines is run, but there is not .pid
79 # file to be found.
80 # file to be found.
80 NO_CLUSTER = 12
81 NO_CLUSTER = 12
81
82
82
83
83 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
84 # Main application
85 # Main application
85 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
86 start_help = """Start an IPython cluster for parallel computing
87 start_help = """Start an IPython cluster for parallel computing
87
88
88 Start an ipython cluster by its profile name or cluster
89 Start an ipython cluster by its profile name or cluster
89 directory. Cluster directories contain configuration, log and
90 directory. Cluster directories contain configuration, log and
90 security related files and are named using the convention
91 security related files and are named using the convention
91 'cluster_<profile>' and should be creating using the 'start'
92 'cluster_<profile>' and should be creating using the 'start'
92 subcommand of 'ipcluster'. If your cluster directory is in
93 subcommand of 'ipcluster'. If your cluster directory is in
93 the cwd or the ipython directory, you can simply refer to it
94 the cwd or the ipython directory, you can simply refer to it
94 using its profile name, 'ipcluster start n=4 profile=<profile>`,
95 using its profile name, 'ipcluster start n=4 profile=<profile>`,
95 otherwise use the 'profile_dir' option.
96 otherwise use the 'profile_dir' option.
96 """
97 """
97 stop_help = """Stop a running IPython cluster
98 stop_help = """Stop a running IPython cluster
98
99
99 Stop a running ipython cluster by its profile name or cluster
100 Stop a running ipython cluster by its profile name or cluster
100 directory. Cluster directories are named using the convention
101 directory. Cluster directories are named using the convention
101 'cluster_<profile>'. If your cluster directory is in
102 'cluster_<profile>'. If your cluster directory is in
102 the cwd or the ipython directory, you can simply refer to it
103 the cwd or the ipython directory, you can simply refer to it
103 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
104 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
104 use the 'profile_dir' option.
105 use the 'profile_dir' option.
105 """
106 """
106 engines_help = """Start engines connected to an existing IPython cluster
107 engines_help = """Start engines connected to an existing IPython cluster
107
108
108 Start one or more engines to connect to an existing Cluster
109 Start one or more engines to connect to an existing Cluster
109 by profile name or cluster directory.
110 by profile name or cluster directory.
110 Cluster directories contain configuration, log and
111 Cluster directories contain configuration, log and
111 security related files and are named using the convention
112 security related files and are named using the convention
112 'cluster_<profile>' and should be creating using the 'start'
113 'cluster_<profile>' and should be creating using the 'start'
113 subcommand of 'ipcluster'. If your cluster directory is in
114 subcommand of 'ipcluster'. If your cluster directory is in
114 the cwd or the ipython directory, you can simply refer to it
115 the cwd or the ipython directory, you can simply refer to it
115 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
116 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
116 otherwise use the 'profile_dir' option.
117 otherwise use the 'profile_dir' option.
117 """
118 """
118 create_help = """Create an ipcluster profile by name
119 create_help = """Create an ipcluster profile by name
119
120
120 Create an ipython cluster directory by its profile name or
121 Create an ipython cluster directory by its profile name or
121 cluster directory path. Cluster directories contain
122 cluster directory path. Cluster directories contain
122 configuration, log and security related files and are named
123 configuration, log and security related files and are named
123 using the convention 'cluster_<profile>'. By default they are
124 using the convention 'cluster_<profile>'. By default they are
124 located in your ipython directory. Once created, you will
125 located in your ipython directory. Once created, you will
125 probably need to edit the configuration files in the cluster
126 probably need to edit the configuration files in the cluster
126 directory to configure your cluster. Most users will create a
127 directory to configure your cluster. Most users will create a
127 cluster directory by profile name,
128 cluster directory by profile name,
128 `ipcluster create profile=mycluster`, which will put the directory
129 `ipcluster create profile=mycluster`, which will put the directory
129 in `<ipython_dir>/cluster_mycluster`.
130 in `<ipython_dir>/cluster_mycluster`.
130 """
131 """
131 list_help = """List available cluster profiles
132 list_help = """List available cluster profiles
132
133
133 List all available clusters, by cluster directory, that can
134 List all available clusters, by cluster directory, that can
134 be found in the current working directly or in the ipython
135 be found in the current working directly or in the ipython
135 directory. Cluster directories are named using the convention
136 directory. Cluster directories are named using the convention
136 'cluster_<profile>'.
137 'cluster_<profile>'.
137 """
138 """
138
139
139
140
140 class IPClusterList(BaseIPythonApplication):
141 class IPClusterList(BaseIPythonApplication):
141 name = u'ipcluster-list'
142 name = u'ipcluster-list'
142 description = list_help
143 description = list_help
143
144
144 # empty aliases
145 # empty aliases
145 aliases=Dict()
146 aliases=Dict()
146 flags = Dict(base_flags)
147 flags = Dict(base_flags)
147
148
148 def _log_level_default(self):
149 def _log_level_default(self):
149 return 20
150 return 20
150
151
151 def list_profile_dirs(self):
152 def list_profile_dirs(self):
152 # Find the search paths
153 # Find the search paths
153 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
154 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
154 if profile_dir_paths:
155 if profile_dir_paths:
155 profile_dir_paths = profile_dir_paths.split(':')
156 profile_dir_paths = profile_dir_paths.split(':')
156 else:
157 else:
157 profile_dir_paths = []
158 profile_dir_paths = []
158
159
159 ipython_dir = self.ipython_dir
160 ipython_dir = self.ipython_dir
160
161
161 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
162 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
162 paths = list(set(paths))
163 paths = list(set(paths))
163
164
164 self.log.info('Searching for cluster profiles in paths: %r' % paths)
165 self.log.info('Searching for cluster profiles in paths: %r' % paths)
165 for path in paths:
166 for path in paths:
166 files = os.listdir(path)
167 files = os.listdir(path)
167 for f in files:
168 for f in files:
168 full_path = os.path.join(path, f)
169 full_path = os.path.join(path, f)
169 if os.path.isdir(full_path) and f.startswith('profile_') and \
170 if os.path.isdir(full_path) and f.startswith('profile_') and \
170 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
171 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
171 profile = f.split('_')[-1]
172 profile = f.split('_')[-1]
172 start_cmd = 'ipcluster start profile=%s n=4' % profile
173 start_cmd = 'ipcluster start profile=%s n=4' % profile
173 print start_cmd + " ==> " + full_path
174 print start_cmd + " ==> " + full_path
174
175
175 def start(self):
176 def start(self):
176 self.list_profile_dirs()
177 self.list_profile_dirs()
177
178
178
179
179 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
180 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
180
181
181 create_flags = {}
182 create_flags = {}
182 create_flags.update(base_flags)
183 create_flags.update(base_flags)
183 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
184 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
184 "reset config files to defaults", "leave existing config files"))
185 "reset config files to defaults", "leave existing config files"))
185
186
186 class IPClusterCreate(BaseParallelApplication):
187 class IPClusterCreate(BaseParallelApplication):
187 name = u'ipcluster-create'
188 name = u'ipcluster-create'
188 description = create_help
189 description = create_help
189 auto_create = Bool(True)
190 auto_create = Bool(True)
190 config_file_name = Unicode(default_config_file_name)
191 config_file_name = Unicode(default_config_file_name)
191
192
192 flags = Dict(create_flags)
193 flags = Dict(create_flags)
193
194
194 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
195 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
195
196
196 classes = [ProfileDir]
197 classes = [ProfileDir]
197
198
198
199
199 stop_aliases = dict(
200 stop_aliases = dict(
200 signal='IPClusterStop.signal',
201 signal='IPClusterStop.signal',
201 profile='BaseIPythonApplication.profile',
202 profile='BaseIPythonApplication.profile',
202 profile_dir='ProfileDir.location',
203 profile_dir='ProfileDir.location',
203 )
204 )
204
205
205 class IPClusterStop(BaseParallelApplication):
206 class IPClusterStop(BaseParallelApplication):
206 name = u'ipcluster'
207 name = u'ipcluster'
207 description = stop_help
208 description = stop_help
208 config_file_name = Unicode(default_config_file_name)
209 config_file_name = Unicode(default_config_file_name)
209
210
210 signal = Int(signal.SIGINT, config=True,
211 signal = Int(signal.SIGINT, config=True,
211 help="signal to use for stopping processes.")
212 help="signal to use for stopping processes.")
212
213
213 aliases = Dict(stop_aliases)
214 aliases = Dict(stop_aliases)
214
215
215 def start(self):
216 def start(self):
216 """Start the app for the stop subcommand."""
217 """Start the app for the stop subcommand."""
217 try:
218 try:
218 pid = self.get_pid_from_file()
219 pid = self.get_pid_from_file()
219 except PIDFileError:
220 except PIDFileError:
220 self.log.critical(
221 self.log.critical(
221 'Could not read pid file, cluster is probably not running.'
222 'Could not read pid file, cluster is probably not running.'
222 )
223 )
223 # Here I exit with a unusual exit status that other processes
224 # Here I exit with a unusual exit status that other processes
224 # can watch for to learn how I existed.
225 # can watch for to learn how I existed.
225 self.remove_pid_file()
226 self.remove_pid_file()
226 self.exit(ALREADY_STOPPED)
227 self.exit(ALREADY_STOPPED)
227
228
228 if not self.check_pid(pid):
229 if not self.check_pid(pid):
229 self.log.critical(
230 self.log.critical(
230 'Cluster [pid=%r] is not running.' % pid
231 'Cluster [pid=%r] is not running.' % pid
231 )
232 )
232 self.remove_pid_file()
233 self.remove_pid_file()
233 # Here I exit with a unusual exit status that other processes
234 # Here I exit with a unusual exit status that other processes
234 # can watch for to learn how I existed.
235 # can watch for to learn how I existed.
235 self.exit(ALREADY_STOPPED)
236 self.exit(ALREADY_STOPPED)
236
237
237 elif os.name=='posix':
238 elif os.name=='posix':
238 sig = self.signal
239 sig = self.signal
239 self.log.info(
240 self.log.info(
240 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
241 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
241 )
242 )
242 try:
243 try:
243 os.kill(pid, sig)
244 os.kill(pid, sig)
244 except OSError:
245 except OSError:
245 self.log.error("Stopping cluster failed, assuming already dead.",
246 self.log.error("Stopping cluster failed, assuming already dead.",
246 exc_info=True)
247 exc_info=True)
247 self.remove_pid_file()
248 self.remove_pid_file()
248 elif os.name=='nt':
249 elif os.name=='nt':
249 try:
250 try:
250 # kill the whole tree
251 # kill the whole tree
251 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
252 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
252 except (CalledProcessError, OSError):
253 except (CalledProcessError, OSError):
253 self.log.error("Stopping cluster failed, assuming already dead.",
254 self.log.error("Stopping cluster failed, assuming already dead.",
254 exc_info=True)
255 exc_info=True)
255 self.remove_pid_file()
256 self.remove_pid_file()
256
257
257 engine_aliases = {}
258 engine_aliases = {}
258 engine_aliases.update(base_aliases)
259 engine_aliases.update(base_aliases)
259 engine_aliases.update(dict(
260 engine_aliases.update(dict(
260 n='IPClusterEngines.n',
261 n='IPClusterEngines.n',
261 elauncher = 'IPClusterEngines.engine_launcher_class',
262 elauncher = 'IPClusterEngines.engine_launcher_class',
262 ))
263 ))
263 class IPClusterEngines(BaseParallelApplication):
264 class IPClusterEngines(BaseParallelApplication):
264
265
265 name = u'ipcluster'
266 name = u'ipcluster'
266 description = engines_help
267 description = engines_help
267 usage = None
268 usage = None
268 config_file_name = Unicode(default_config_file_name)
269 config_file_name = Unicode(default_config_file_name)
269 default_log_level = logging.INFO
270 default_log_level = logging.INFO
270 classes = List()
271 classes = List()
271 def _classes_default(self):
272 def _classes_default(self):
272 from IPython.parallel.apps import launcher
273 from IPython.parallel.apps import launcher
273 launchers = launcher.all_launchers
274 launchers = launcher.all_launchers
274 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
275 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
275 return [ProfileDir]+eslaunchers
276 return [ProfileDir]+eslaunchers
276
277
277 n = Int(2, config=True,
278 n = Int(2, config=True,
278 help="The number of engines to start.")
279 help="The number of engines to start.")
279
280
280 engine_launcher_class = Unicode('LocalEngineSetLauncher',
281 engine_launcher_class = Unicode('LocalEngineSetLauncher',
281 config=True,
282 config=True,
282 help="The class for launching a set of Engines."
283 help="The class for launching a set of Engines."
283 )
284 )
284 daemonize = Bool(False, config=True,
285 daemonize = Bool(False, config=True,
285 help='Daemonize the ipcluster program. This implies --log-to-file')
286 help='Daemonize the ipcluster program. This implies --log-to-file')
286
287
287 def _daemonize_changed(self, name, old, new):
288 def _daemonize_changed(self, name, old, new):
288 if new:
289 if new:
289 self.log_to_file = True
290 self.log_to_file = True
290
291
291 aliases = Dict(engine_aliases)
292 aliases = Dict(engine_aliases)
292 # flags = Dict(flags)
293 # flags = Dict(flags)
293 _stopping = False
294 _stopping = False
294
295
295 def initialize(self, argv=None):
296 def initialize(self, argv=None):
296 super(IPClusterEngines, self).initialize(argv)
297 super(IPClusterEngines, self).initialize(argv)
297 self.init_signal()
298 self.init_signal()
298 self.init_launchers()
299 self.init_launchers()
299
300
300 def init_launchers(self):
301 def init_launchers(self):
301 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
302 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
302 self.engine_launcher.on_stop(lambda r: self.loop.stop())
303 self.engine_launcher.on_stop(lambda r: self.loop.stop())
303
304
304 def init_signal(self):
305 def init_signal(self):
305 # Setup signals
306 # Setup signals
306 signal.signal(signal.SIGINT, self.sigint_handler)
307 signal.signal(signal.SIGINT, self.sigint_handler)
307
308
308 def build_launcher(self, clsname):
309 def build_launcher(self, clsname):
309 """import and instantiate a Launcher based on importstring"""
310 """import and instantiate a Launcher based on importstring"""
310 if '.' not in clsname:
311 if '.' not in clsname:
311 # not a module, presume it's the raw name in apps.launcher
312 # not a module, presume it's the raw name in apps.launcher
312 clsname = 'IPython.parallel.apps.launcher.'+clsname
313 clsname = 'IPython.parallel.apps.launcher.'+clsname
313 # print repr(clsname)
314 # print repr(clsname)
314 klass = import_item(clsname)
315 klass = import_item(clsname)
315
316
316 launcher = klass(
317 launcher = klass(
317 work_dir=self.profile_dir.location, config=self.config, log=self.log
318 work_dir=self.profile_dir.location, config=self.config, log=self.log
318 )
319 )
319 return launcher
320 return launcher
320
321
321 def start_engines(self):
322 def start_engines(self):
322 self.log.info("Starting %i engines"%self.n)
323 self.log.info("Starting %i engines"%self.n)
323 self.engine_launcher.start(
324 self.engine_launcher.start(
324 self.n,
325 self.n,
325 self.profile_dir.location
326 self.profile_dir.location
326 )
327 )
327
328
328 def stop_engines(self):
329 def stop_engines(self):
329 self.log.info("Stopping Engines...")
330 self.log.info("Stopping Engines...")
330 if self.engine_launcher.running:
331 if self.engine_launcher.running:
331 d = self.engine_launcher.stop()
332 d = self.engine_launcher.stop()
332 return d
333 return d
333 else:
334 else:
334 return None
335 return None
335
336
336 def stop_launchers(self, r=None):
337 def stop_launchers(self, r=None):
337 if not self._stopping:
338 if not self._stopping:
338 self._stopping = True
339 self._stopping = True
339 self.log.error("IPython cluster: stopping")
340 self.log.error("IPython cluster: stopping")
340 self.stop_engines()
341 self.stop_engines()
341 # Wait a few seconds to let things shut down.
342 # Wait a few seconds to let things shut down.
342 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
343 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
343 dc.start()
344 dc.start()
344
345
345 def sigint_handler(self, signum, frame):
346 def sigint_handler(self, signum, frame):
346 self.log.debug("SIGINT received, stopping launchers...")
347 self.log.debug("SIGINT received, stopping launchers...")
347 self.stop_launchers()
348 self.stop_launchers()
348
349
349 def start_logging(self):
350 def start_logging(self):
350 # Remove old log files of the controller and engine
351 # Remove old log files of the controller and engine
351 if self.clean_logs:
352 if self.clean_logs:
352 log_dir = self.profile_dir.log_dir
353 log_dir = self.profile_dir.log_dir
353 for f in os.listdir(log_dir):
354 for f in os.listdir(log_dir):
354 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
355 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
355 os.remove(os.path.join(log_dir, f))
356 os.remove(os.path.join(log_dir, f))
356 # This will remove old log files for ipcluster itself
357 # This will remove old log files for ipcluster itself
357 # super(IPBaseParallelApplication, self).start_logging()
358 # super(IPBaseParallelApplication, self).start_logging()
358
359
359 def start(self):
360 def start(self):
360 """Start the app for the engines subcommand."""
361 """Start the app for the engines subcommand."""
361 self.log.info("IPython cluster: started")
362 self.log.info("IPython cluster: started")
362 # First see if the cluster is already running
363 # First see if the cluster is already running
363
364
364 # Now log and daemonize
365 # Now log and daemonize
365 self.log.info(
366 self.log.info(
366 'Starting engines with [daemon=%r]' % self.daemonize
367 'Starting engines with [daemon=%r]' % self.daemonize
367 )
368 )
368 # TODO: Get daemonize working on Windows or as a Windows Server.
369 # TODO: Get daemonize working on Windows or as a Windows Server.
369 if self.daemonize:
370 if self.daemonize:
370 if os.name=='posix':
371 if os.name=='posix':
371 from twisted.scripts._twistd_unix import daemonize
372 daemonize()
372 daemonize()
373
373
374 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
374 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
375 dc.start()
375 dc.start()
376 # Now write the new pid file AFTER our new forked pid is active.
376 # Now write the new pid file AFTER our new forked pid is active.
377 # self.write_pid_file()
377 # self.write_pid_file()
378 try:
378 try:
379 self.loop.start()
379 self.loop.start()
380 except KeyboardInterrupt:
380 except KeyboardInterrupt:
381 pass
381 pass
382 except zmq.ZMQError as e:
382 except zmq.ZMQError as e:
383 if e.errno == errno.EINTR:
383 if e.errno == errno.EINTR:
384 pass
384 pass
385 else:
385 else:
386 raise
386 raise
387
387
388 start_aliases = {}
388 start_aliases = {}
389 start_aliases.update(engine_aliases)
389 start_aliases.update(engine_aliases)
390 start_aliases.update(dict(
390 start_aliases.update(dict(
391 delay='IPClusterStart.delay',
391 delay='IPClusterStart.delay',
392 clean_logs='IPClusterStart.clean_logs',
392 clean_logs='IPClusterStart.clean_logs',
393 ))
393 ))
394
394
395 class IPClusterStart(IPClusterEngines):
395 class IPClusterStart(IPClusterEngines):
396
396
397 name = u'ipcluster'
397 name = u'ipcluster'
398 description = start_help
398 description = start_help
399 default_log_level = logging.INFO
399 default_log_level = logging.INFO
400 auto_create = Bool(True, config=True,
400 auto_create = Bool(True, config=True,
401 help="whether to create the profile_dir if it doesn't exist")
401 help="whether to create the profile_dir if it doesn't exist")
402 classes = List()
402 classes = List()
403 def _classes_default(self,):
403 def _classes_default(self,):
404 from IPython.parallel.apps import launcher
404 from IPython.parallel.apps import launcher
405 return [ProfileDir]+launcher.all_launchers
405 return [ProfileDir]+launcher.all_launchers
406
406
407 clean_logs = Bool(True, config=True,
407 clean_logs = Bool(True, config=True,
408 help="whether to cleanup old logs before starting")
408 help="whether to cleanup old logs before starting")
409
409
410 delay = CFloat(1., config=True,
410 delay = CFloat(1., config=True,
411 help="delay (in s) between starting the controller and the engines")
411 help="delay (in s) between starting the controller and the engines")
412
412
413 controller_launcher_class = Unicode('LocalControllerLauncher',
413 controller_launcher_class = Unicode('LocalControllerLauncher',
414 config=True,
414 config=True,
415 help="The class for launching a Controller."
415 help="The class for launching a Controller."
416 )
416 )
417 reset = Bool(False, config=True,
417 reset = Bool(False, config=True,
418 help="Whether to reset config files as part of '--create'."
418 help="Whether to reset config files as part of '--create'."
419 )
419 )
420
420
421 # flags = Dict(flags)
421 # flags = Dict(flags)
422 aliases = Dict(start_aliases)
422 aliases = Dict(start_aliases)
423
423
424 def init_launchers(self):
424 def init_launchers(self):
425 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
425 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
426 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
426 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
427 self.controller_launcher.on_stop(self.stop_launchers)
427 self.controller_launcher.on_stop(self.stop_launchers)
428
428
429 def start_controller(self):
429 def start_controller(self):
430 self.controller_launcher.start(
430 self.controller_launcher.start(
431 self.profile_dir.location
431 self.profile_dir.location
432 )
432 )
433
433
434 def stop_controller(self):
434 def stop_controller(self):
435 # self.log.info("In stop_controller")
435 # self.log.info("In stop_controller")
436 if self.controller_launcher and self.controller_launcher.running:
436 if self.controller_launcher and self.controller_launcher.running:
437 return self.controller_launcher.stop()
437 return self.controller_launcher.stop()
438
438
439 def stop_launchers(self, r=None):
439 def stop_launchers(self, r=None):
440 if not self._stopping:
440 if not self._stopping:
441 self.stop_controller()
441 self.stop_controller()
442 super(IPClusterStart, self).stop_launchers()
442 super(IPClusterStart, self).stop_launchers()
443
443
444 def start(self):
444 def start(self):
445 """Start the app for the start subcommand."""
445 """Start the app for the start subcommand."""
446 # First see if the cluster is already running
446 # First see if the cluster is already running
447 try:
447 try:
448 pid = self.get_pid_from_file()
448 pid = self.get_pid_from_file()
449 except PIDFileError:
449 except PIDFileError:
450 pass
450 pass
451 else:
451 else:
452 if self.check_pid(pid):
452 if self.check_pid(pid):
453 self.log.critical(
453 self.log.critical(
454 'Cluster is already running with [pid=%s]. '
454 'Cluster is already running with [pid=%s]. '
455 'use "ipcluster stop" to stop the cluster.' % pid
455 'use "ipcluster stop" to stop the cluster.' % pid
456 )
456 )
457 # Here I exit with a unusual exit status that other processes
457 # Here I exit with a unusual exit status that other processes
458 # can watch for to learn how I existed.
458 # can watch for to learn how I existed.
459 self.exit(ALREADY_STARTED)
459 self.exit(ALREADY_STARTED)
460 else:
460 else:
461 self.remove_pid_file()
461 self.remove_pid_file()
462
462
463
463
464 # Now log and daemonize
464 # Now log and daemonize
465 self.log.info(
465 self.log.info(
466 'Starting ipcluster with [daemon=%r]' % self.daemonize
466 'Starting ipcluster with [daemon=%r]' % self.daemonize
467 )
467 )
468 # TODO: Get daemonize working on Windows or as a Windows Server.
468 # TODO: Get daemonize working on Windows or as a Windows Server.
469 if self.daemonize:
469 if self.daemonize:
470 if os.name=='posix':
470 if os.name=='posix':
471 from twisted.scripts._twistd_unix import daemonize
472 daemonize()
471 daemonize()
473
472
474 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
473 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
475 dc.start()
474 dc.start()
476 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
475 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
477 dc.start()
476 dc.start()
478 # Now write the new pid file AFTER our new forked pid is active.
477 # Now write the new pid file AFTER our new forked pid is active.
479 self.write_pid_file()
478 self.write_pid_file()
480 try:
479 try:
481 self.loop.start()
480 self.loop.start()
482 except KeyboardInterrupt:
481 except KeyboardInterrupt:
483 pass
482 pass
484 except zmq.ZMQError as e:
483 except zmq.ZMQError as e:
485 if e.errno == errno.EINTR:
484 if e.errno == errno.EINTR:
486 pass
485 pass
487 else:
486 else:
488 raise
487 raise
489 finally:
488 finally:
490 self.remove_pid_file()
489 self.remove_pid_file()
491
490
492 base='IPython.parallel.apps.ipclusterapp.IPCluster'
491 base='IPython.parallel.apps.ipclusterapp.IPCluster'
493
492
494 class IPBaseParallelApplication(Application):
493 class IPBaseParallelApplication(Application):
495 name = u'ipcluster'
494 name = u'ipcluster'
496 description = _description
495 description = _description
497
496
498 subcommands = {'create' : (base+'Create', create_help),
497 subcommands = {'create' : (base+'Create', create_help),
499 'list' : (base+'List', list_help),
498 'list' : (base+'List', list_help),
500 'start' : (base+'Start', start_help),
499 'start' : (base+'Start', start_help),
501 'stop' : (base+'Stop', stop_help),
500 'stop' : (base+'Stop', stop_help),
502 'engines' : (base+'Engines', engines_help),
501 'engines' : (base+'Engines', engines_help),
503 }
502 }
504
503
505 # no aliases or flags for parent App
504 # no aliases or flags for parent App
506 aliases = Dict()
505 aliases = Dict()
507 flags = Dict()
506 flags = Dict()
508
507
509 def start(self):
508 def start(self):
510 if self.subapp is None:
509 if self.subapp is None:
511 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
510 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
512 print
511 print
513 self.print_subcommands()
512 self.print_subcommands()
514 self.exit(1)
513 self.exit(1)
515 else:
514 else:
516 return self.subapp.start()
515 return self.subapp.start()
517
516
518 def launch_new_instance():
517 def launch_new_instance():
519 """Create and run the IPython cluster."""
518 """Create and run the IPython cluster."""
520 app = IPBaseParallelApplication.instance()
519 app = IPBaseParallelApplication.instance()
521 app.initialize()
520 app.initialize()
522 app.start()
521 app.start()
523
522
524
523
525 if __name__ == '__main__':
524 if __name__ == '__main__':
526 launch_new_instance()
525 launch_new_instance()
527
526
@@ -1,1074 +1,1063 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10 """
10 """
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 import copy
23 import copy
24 import logging
24 import logging
25 import os
25 import os
26 import re
26 import re
27 import stat
27 import stat
28
28
29 # signal imports, handling various platforms, versions
29 # signal imports, handling various platforms, versions
30
30
31 from signal import SIGINT, SIGTERM
31 from signal import SIGINT, SIGTERM
32 try:
32 try:
33 from signal import SIGKILL
33 from signal import SIGKILL
34 except ImportError:
34 except ImportError:
35 # Windows
35 # Windows
36 SIGKILL=SIGTERM
36 SIGKILL=SIGTERM
37
37
38 try:
38 try:
39 # Windows >= 2.7, 3.2
39 # Windows >= 2.7, 3.2
40 from signal import CTRL_C_EVENT as SIGINT
40 from signal import CTRL_C_EVENT as SIGINT
41 except ImportError:
41 except ImportError:
42 pass
42 pass
43
43
44 from subprocess import Popen, PIPE, STDOUT
44 from subprocess import Popen, PIPE, STDOUT
45 try:
45 try:
46 from subprocess import check_output
46 from subprocess import check_output
47 except ImportError:
47 except ImportError:
48 # pre-2.7, define check_output with Popen
48 # pre-2.7, define check_output with Popen
49 def check_output(*args, **kwargs):
49 def check_output(*args, **kwargs):
50 kwargs.update(dict(stdout=PIPE))
50 kwargs.update(dict(stdout=PIPE))
51 p = Popen(*args, **kwargs)
51 p = Popen(*args, **kwargs)
52 out,err = p.communicate()
52 out,err = p.communicate()
53 return out
53 return out
54
54
55 from zmq.eventloop import ioloop
55 from zmq.eventloop import ioloop
56
56
57 from IPython.config.application import Application
57 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
60 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
61 from IPython.utils.path import get_ipython_module_path
61 from IPython.utils.path import get_ipython_module_path
62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63
63
64 from .win32support import forward_read_events
64 from .win32support import forward_read_events
65
65
66 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
66 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
67
67
68 WINDOWS = os.name == 'nt'
68 WINDOWS = os.name == 'nt'
69
69
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71 # Paths to the kernel apps
71 # Paths to the kernel apps
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73
73
74
74
75 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
75 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
76 'IPython.parallel.apps.ipclusterapp'
76 'IPython.parallel.apps.ipclusterapp'
77 ))
77 ))
78
78
79 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
79 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
80 'IPython.parallel.apps.ipengineapp'
80 'IPython.parallel.apps.ipengineapp'
81 ))
81 ))
82
82
83 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
83 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
84 'IPython.parallel.apps.ipcontrollerapp'
84 'IPython.parallel.apps.ipcontrollerapp'
85 ))
85 ))
86
86
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88 # Base launchers and errors
88 # Base launchers and errors
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90
90
91
91
92 class LauncherError(Exception):
92 class LauncherError(Exception):
93 pass
93 pass
94
94
95
95
96 class ProcessStateError(LauncherError):
96 class ProcessStateError(LauncherError):
97 pass
97 pass
98
98
99
99
100 class UnknownStatus(LauncherError):
100 class UnknownStatus(LauncherError):
101 pass
101 pass
102
102
103
103
104 class BaseLauncher(LoggingConfigurable):
104 class BaseLauncher(LoggingConfigurable):
105 """An asbtraction for starting, stopping and signaling a process."""
105 """An asbtraction for starting, stopping and signaling a process."""
106
106
107 # In all of the launchers, the work_dir is where child processes will be
107 # In all of the launchers, the work_dir is where child processes will be
108 # run. This will usually be the profile_dir, but may not be. any work_dir
108 # run. This will usually be the profile_dir, but may not be. any work_dir
109 # passed into the __init__ method will override the config value.
109 # passed into the __init__ method will override the config value.
110 # This should not be used to set the work_dir for the actual engine
110 # This should not be used to set the work_dir for the actual engine
111 # and controller. Instead, use their own config files or the
111 # and controller. Instead, use their own config files or the
112 # controller_args, engine_args attributes of the launchers to add
112 # controller_args, engine_args attributes of the launchers to add
113 # the work_dir option.
113 # the work_dir option.
114 work_dir = Unicode(u'.')
114 work_dir = Unicode(u'.')
115 loop = Instance('zmq.eventloop.ioloop.IOLoop')
115 loop = Instance('zmq.eventloop.ioloop.IOLoop')
116
116
117 start_data = Any()
117 start_data = Any()
118 stop_data = Any()
118 stop_data = Any()
119
119
120 def _loop_default(self):
120 def _loop_default(self):
121 return ioloop.IOLoop.instance()
121 return ioloop.IOLoop.instance()
122
122
123 def __init__(self, work_dir=u'.', config=None, **kwargs):
123 def __init__(self, work_dir=u'.', config=None, **kwargs):
124 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
124 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
125 self.state = 'before' # can be before, running, after
125 self.state = 'before' # can be before, running, after
126 self.stop_callbacks = []
126 self.stop_callbacks = []
127 self.start_data = None
127 self.start_data = None
128 self.stop_data = None
128 self.stop_data = None
129
129
130 @property
130 @property
131 def args(self):
131 def args(self):
132 """A list of cmd and args that will be used to start the process.
132 """A list of cmd and args that will be used to start the process.
133
133
134 This is what is passed to :func:`spawnProcess` and the first element
134 This is what is passed to :func:`spawnProcess` and the first element
135 will be the process name.
135 will be the process name.
136 """
136 """
137 return self.find_args()
137 return self.find_args()
138
138
139 def find_args(self):
139 def find_args(self):
140 """The ``.args`` property calls this to find the args list.
140 """The ``.args`` property calls this to find the args list.
141
141
142 Subcommand should implement this to construct the cmd and args.
142 Subcommand should implement this to construct the cmd and args.
143 """
143 """
144 raise NotImplementedError('find_args must be implemented in a subclass')
144 raise NotImplementedError('find_args must be implemented in a subclass')
145
145
146 @property
146 @property
147 def arg_str(self):
147 def arg_str(self):
148 """The string form of the program arguments."""
148 """The string form of the program arguments."""
149 return ' '.join(self.args)
149 return ' '.join(self.args)
150
150
151 @property
151 @property
152 def running(self):
152 def running(self):
153 """Am I running."""
153 """Am I running."""
154 if self.state == 'running':
154 if self.state == 'running':
155 return True
155 return True
156 else:
156 else:
157 return False
157 return False
158
158
159 def start(self):
159 def start(self):
160 """Start the process.
160 """Start the process."""
161
162 This must return a deferred that fires with information about the
163 process starting (like a pid, job id, etc.).
164 """
165 raise NotImplementedError('start must be implemented in a subclass')
161 raise NotImplementedError('start must be implemented in a subclass')
166
162
167 def stop(self):
163 def stop(self):
168 """Stop the process and notify observers of stopping.
164 """Stop the process and notify observers of stopping.
169
165
170 This must return a deferred that fires with information about the
166 This method will return None immediately.
171 processing stopping, like errors that occur while the process is
167 To observe the actual process stopping, see :meth:`on_stop`.
172 attempting to be shut down. This deferred won't fire when the process
173 actually stops. To observe the actual process stopping, see
174 :func:`observe_stop`.
175 """
168 """
176 raise NotImplementedError('stop must be implemented in a subclass')
169 raise NotImplementedError('stop must be implemented in a subclass')
177
170
178 def on_stop(self, f):
171 def on_stop(self, f):
179 """Get a deferred that will fire when the process stops.
172 """Register a callback to be called with this Launcher's stop_data
180
173 when the process actually finishes.
181 The deferred will fire with data that contains information about
182 the exit status of the process.
183 """
174 """
184 if self.state=='after':
175 if self.state=='after':
185 return f(self.stop_data)
176 return f(self.stop_data)
186 else:
177 else:
187 self.stop_callbacks.append(f)
178 self.stop_callbacks.append(f)
188
179
189 def notify_start(self, data):
180 def notify_start(self, data):
190 """Call this to trigger startup actions.
181 """Call this to trigger startup actions.
191
182
192 This logs the process startup and sets the state to 'running'. It is
183 This logs the process startup and sets the state to 'running'. It is
193 a pass-through so it can be used as a callback.
184 a pass-through so it can be used as a callback.
194 """
185 """
195
186
196 self.log.info('Process %r started: %r' % (self.args[0], data))
187 self.log.info('Process %r started: %r' % (self.args[0], data))
197 self.start_data = data
188 self.start_data = data
198 self.state = 'running'
189 self.state = 'running'
199 return data
190 return data
200
191
201 def notify_stop(self, data):
192 def notify_stop(self, data):
202 """Call this to trigger process stop actions.
193 """Call this to trigger process stop actions.
203
194
204 This logs the process stopping and sets the state to 'after'. Call
195 This logs the process stopping and sets the state to 'after'. Call
205 this to trigger all the deferreds from :func:`observe_stop`."""
196 this to trigger callbacks registered via :meth:`on_stop`."""
206
197
207 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
208 self.stop_data = data
199 self.stop_data = data
209 self.state = 'after'
200 self.state = 'after'
210 for i in range(len(self.stop_callbacks)):
201 for i in range(len(self.stop_callbacks)):
211 d = self.stop_callbacks.pop()
202 d = self.stop_callbacks.pop()
212 d(data)
203 d(data)
213 return data
204 return data
214
205
215 def signal(self, sig):
206 def signal(self, sig):
216 """Signal the process.
207 """Signal the process.
217
208
218 Return a semi-meaningless deferred after signaling the process.
219
220 Parameters
209 Parameters
221 ----------
210 ----------
222 sig : str or int
211 sig : str or int
223 'KILL', 'INT', etc., or any signal number
212 'KILL', 'INT', etc., or any signal number
224 """
213 """
225 raise NotImplementedError('signal must be implemented in a subclass')
214 raise NotImplementedError('signal must be implemented in a subclass')
226
215
227
216
228 #-----------------------------------------------------------------------------
217 #-----------------------------------------------------------------------------
229 # Local process launchers
218 # Local process launchers
230 #-----------------------------------------------------------------------------
219 #-----------------------------------------------------------------------------
231
220
232
221
233 class LocalProcessLauncher(BaseLauncher):
222 class LocalProcessLauncher(BaseLauncher):
234 """Start and stop an external process in an asynchronous manner.
223 """Start and stop an external process in an asynchronous manner.
235
224
236 This will launch the external process with a working directory of
225 This will launch the external process with a working directory of
237 ``self.work_dir``.
226 ``self.work_dir``.
238 """
227 """
239
228
240 # This is used to to construct self.args, which is passed to
229 # This is used to to construct self.args, which is passed to
241 # spawnProcess.
230 # spawnProcess.
242 cmd_and_args = List([])
231 cmd_and_args = List([])
243 poll_frequency = Int(100) # in ms
232 poll_frequency = Int(100) # in ms
244
233
245 def __init__(self, work_dir=u'.', config=None, **kwargs):
234 def __init__(self, work_dir=u'.', config=None, **kwargs):
246 super(LocalProcessLauncher, self).__init__(
235 super(LocalProcessLauncher, self).__init__(
247 work_dir=work_dir, config=config, **kwargs
236 work_dir=work_dir, config=config, **kwargs
248 )
237 )
249 self.process = None
238 self.process = None
250 self.start_deferred = None
251 self.poller = None
239 self.poller = None
252
240
253 def find_args(self):
241 def find_args(self):
254 return self.cmd_and_args
242 return self.cmd_and_args
255
243
256 def start(self):
244 def start(self):
257 if self.state == 'before':
245 if self.state == 'before':
258 self.process = Popen(self.args,
246 self.process = Popen(self.args,
259 stdout=PIPE,stderr=PIPE,stdin=PIPE,
247 stdout=PIPE,stderr=PIPE,stdin=PIPE,
260 env=os.environ,
248 env=os.environ,
261 cwd=self.work_dir
249 cwd=self.work_dir
262 )
250 )
263 if WINDOWS:
251 if WINDOWS:
264 self.stdout = forward_read_events(self.process.stdout)
252 self.stdout = forward_read_events(self.process.stdout)
265 self.stderr = forward_read_events(self.process.stderr)
253 self.stderr = forward_read_events(self.process.stderr)
266 else:
254 else:
267 self.stdout = self.process.stdout.fileno()
255 self.stdout = self.process.stdout.fileno()
268 self.stderr = self.process.stderr.fileno()
256 self.stderr = self.process.stderr.fileno()
269 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
257 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
270 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
258 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
271 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
259 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
272 self.poller.start()
260 self.poller.start()
273 self.notify_start(self.process.pid)
261 self.notify_start(self.process.pid)
274 else:
262 else:
275 s = 'The process was already started and has state: %r' % self.state
263 s = 'The process was already started and has state: %r' % self.state
276 raise ProcessStateError(s)
264 raise ProcessStateError(s)
277
265
278 def stop(self):
266 def stop(self):
279 return self.interrupt_then_kill()
267 return self.interrupt_then_kill()
280
268
281 def signal(self, sig):
269 def signal(self, sig):
282 if self.state == 'running':
270 if self.state == 'running':
283 if WINDOWS and sig != SIGINT:
271 if WINDOWS and sig != SIGINT:
284 # use Windows tree-kill for better child cleanup
272 # use Windows tree-kill for better child cleanup
285 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
273 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
286 else:
274 else:
287 self.process.send_signal(sig)
275 self.process.send_signal(sig)
288
276
289 def interrupt_then_kill(self, delay=2.0):
277 def interrupt_then_kill(self, delay=2.0):
290 """Send INT, wait a delay and then send KILL."""
278 """Send INT, wait a delay and then send KILL."""
291 try:
279 try:
292 self.signal(SIGINT)
280 self.signal(SIGINT)
293 except Exception:
281 except Exception:
294 self.log.debug("interrupt failed")
282 self.log.debug("interrupt failed")
295 pass
283 pass
296 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
284 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
297 self.killer.start()
285 self.killer.start()
298
286
299 # callbacks, etc:
287 # callbacks, etc:
300
288
301 def handle_stdout(self, fd, events):
289 def handle_stdout(self, fd, events):
302 if WINDOWS:
290 if WINDOWS:
303 line = self.stdout.recv()
291 line = self.stdout.recv()
304 else:
292 else:
305 line = self.process.stdout.readline()
293 line = self.process.stdout.readline()
306 # a stopped process will be readable but return empty strings
294 # a stopped process will be readable but return empty strings
307 if line:
295 if line:
308 self.log.info(line[:-1])
296 self.log.info(line[:-1])
309 else:
297 else:
310 self.poll()
298 self.poll()
311
299
312 def handle_stderr(self, fd, events):
300 def handle_stderr(self, fd, events):
313 if WINDOWS:
301 if WINDOWS:
314 line = self.stderr.recv()
302 line = self.stderr.recv()
315 else:
303 else:
316 line = self.process.stderr.readline()
304 line = self.process.stderr.readline()
317 # a stopped process will be readable but return empty strings
305 # a stopped process will be readable but return empty strings
318 if line:
306 if line:
319 self.log.error(line[:-1])
307 self.log.error(line[:-1])
320 else:
308 else:
321 self.poll()
309 self.poll()
322
310
323 def poll(self):
311 def poll(self):
324 status = self.process.poll()
312 status = self.process.poll()
325 if status is not None:
313 if status is not None:
326 self.poller.stop()
314 self.poller.stop()
327 self.loop.remove_handler(self.stdout)
315 self.loop.remove_handler(self.stdout)
328 self.loop.remove_handler(self.stderr)
316 self.loop.remove_handler(self.stderr)
329 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
317 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
330 return status
318 return status
331
319
332 class LocalControllerLauncher(LocalProcessLauncher):
320 class LocalControllerLauncher(LocalProcessLauncher):
333 """Launch a controller as a regular external process."""
321 """Launch a controller as a regular external process."""
334
322
335 controller_cmd = List(ipcontroller_cmd_argv, config=True,
323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
336 help="""Popen command to launch ipcontroller.""")
324 help="""Popen command to launch ipcontroller.""")
337 # Command line arguments to ipcontroller.
325 # Command line arguments to ipcontroller.
338 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
326 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
339 help="""command-line args to pass to ipcontroller""")
327 help="""command-line args to pass to ipcontroller""")
340
328
341 def find_args(self):
329 def find_args(self):
342 return self.controller_cmd + self.controller_args
330 return self.controller_cmd + self.controller_args
343
331
344 def start(self, profile_dir):
332 def start(self, profile_dir):
345 """Start the controller by profile_dir."""
333 """Start the controller by profile_dir."""
346 self.controller_args.extend(['profile_dir=%s'%profile_dir])
334 self.controller_args.extend(['profile_dir=%s'%profile_dir])
347 self.profile_dir = unicode(profile_dir)
335 self.profile_dir = unicode(profile_dir)
348 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
336 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
349 return super(LocalControllerLauncher, self).start()
337 return super(LocalControllerLauncher, self).start()
350
338
351
339
352 class LocalEngineLauncher(LocalProcessLauncher):
340 class LocalEngineLauncher(LocalProcessLauncher):
353 """Launch a single engine as a regular externall process."""
341 """Launch a single engine as a regular externall process."""
354
342
355 engine_cmd = List(ipengine_cmd_argv, config=True,
343 engine_cmd = List(ipengine_cmd_argv, config=True,
356 help="""command to launch the Engine.""")
344 help="""command to launch the Engine.""")
357 # Command line arguments for ipengine.
345 # Command line arguments for ipengine.
358 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
346 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
359 help="command-line arguments to pass to ipengine"
347 help="command-line arguments to pass to ipengine"
360 )
348 )
361
349
362 def find_args(self):
350 def find_args(self):
363 return self.engine_cmd + self.engine_args
351 return self.engine_cmd + self.engine_args
364
352
365 def start(self, profile_dir):
353 def start(self, profile_dir):
366 """Start the engine by profile_dir."""
354 """Start the engine by profile_dir."""
367 self.engine_args.extend(['profile_dir=%s'%profile_dir])
355 self.engine_args.extend(['profile_dir=%s'%profile_dir])
368 self.profile_dir = unicode(profile_dir)
356 self.profile_dir = unicode(profile_dir)
369 return super(LocalEngineLauncher, self).start()
357 return super(LocalEngineLauncher, self).start()
370
358
371
359
372 class LocalEngineSetLauncher(BaseLauncher):
360 class LocalEngineSetLauncher(BaseLauncher):
373 """Launch a set of engines as regular external processes."""
361 """Launch a set of engines as regular external processes."""
374
362
375 # Command line arguments for ipengine.
363 # Command line arguments for ipengine.
376 engine_args = List(
364 engine_args = List(
377 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
365 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
378 help="command-line arguments to pass to ipengine"
366 help="command-line arguments to pass to ipengine"
379 )
367 )
380 # launcher class
368 # launcher class
381 launcher_class = LocalEngineLauncher
369 launcher_class = LocalEngineLauncher
382
370
383 launchers = Dict()
371 launchers = Dict()
384 stop_data = Dict()
372 stop_data = Dict()
385
373
386 def __init__(self, work_dir=u'.', config=None, **kwargs):
374 def __init__(self, work_dir=u'.', config=None, **kwargs):
387 super(LocalEngineSetLauncher, self).__init__(
375 super(LocalEngineSetLauncher, self).__init__(
388 work_dir=work_dir, config=config, **kwargs
376 work_dir=work_dir, config=config, **kwargs
389 )
377 )
390 self.stop_data = {}
378 self.stop_data = {}
391
379
392 def start(self, n, profile_dir):
380 def start(self, n, profile_dir):
393 """Start n engines by profile or profile_dir."""
381 """Start n engines by profile or profile_dir."""
394 self.profile_dir = unicode(profile_dir)
382 self.profile_dir = unicode(profile_dir)
395 dlist = []
383 dlist = []
396 for i in range(n):
384 for i in range(n):
397 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
385 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
398 # Copy the engine args over to each engine launcher.
386 # Copy the engine args over to each engine launcher.
399 el.engine_args = copy.deepcopy(self.engine_args)
387 el.engine_args = copy.deepcopy(self.engine_args)
400 el.on_stop(self._notice_engine_stopped)
388 el.on_stop(self._notice_engine_stopped)
401 d = el.start(profile_dir)
389 d = el.start(profile_dir)
402 if i==0:
390 if i==0:
403 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
391 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
404 self.launchers[i] = el
392 self.launchers[i] = el
405 dlist.append(d)
393 dlist.append(d)
406 self.notify_start(dlist)
394 self.notify_start(dlist)
407 # The consumeErrors here could be dangerous
395 # The consumeErrors here could be dangerous
408 # dfinal = gatherBoth(dlist, consumeErrors=True)
396 # dfinal = gatherBoth(dlist, consumeErrors=True)
409 # dfinal.addCallback(self.notify_start)
397 # dfinal.addCallback(self.notify_start)
410 return dlist
398 return dlist
411
399
412 def find_args(self):
400 def find_args(self):
413 return ['engine set']
401 return ['engine set']
414
402
415 def signal(self, sig):
403 def signal(self, sig):
416 dlist = []
404 dlist = []
417 for el in self.launchers.itervalues():
405 for el in self.launchers.itervalues():
418 d = el.signal(sig)
406 d = el.signal(sig)
419 dlist.append(d)
407 dlist.append(d)
420 # dfinal = gatherBoth(dlist, consumeErrors=True)
408 # dfinal = gatherBoth(dlist, consumeErrors=True)
421 return dlist
409 return dlist
422
410
423 def interrupt_then_kill(self, delay=1.0):
411 def interrupt_then_kill(self, delay=1.0):
424 dlist = []
412 dlist = []
425 for el in self.launchers.itervalues():
413 for el in self.launchers.itervalues():
426 d = el.interrupt_then_kill(delay)
414 d = el.interrupt_then_kill(delay)
427 dlist.append(d)
415 dlist.append(d)
428 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
429 return dlist
417 return dlist
430
418
431 def stop(self):
419 def stop(self):
432 return self.interrupt_then_kill()
420 return self.interrupt_then_kill()
433
421
434 def _notice_engine_stopped(self, data):
422 def _notice_engine_stopped(self, data):
435 pid = data['pid']
423 pid = data['pid']
436 for idx,el in self.launchers.iteritems():
424 for idx,el in self.launchers.iteritems():
437 if el.process.pid == pid:
425 if el.process.pid == pid:
438 break
426 break
439 self.launchers.pop(idx)
427 self.launchers.pop(idx)
440 self.stop_data[idx] = data
428 self.stop_data[idx] = data
441 if not self.launchers:
429 if not self.launchers:
442 self.notify_stop(self.stop_data)
430 self.notify_stop(self.stop_data)
443
431
444
432
445 #-----------------------------------------------------------------------------
433 #-----------------------------------------------------------------------------
446 # MPIExec launchers
434 # MPIExec launchers
447 #-----------------------------------------------------------------------------
435 #-----------------------------------------------------------------------------
448
436
449
437
450 class MPIExecLauncher(LocalProcessLauncher):
438 class MPIExecLauncher(LocalProcessLauncher):
451 """Launch an external process using mpiexec."""
439 """Launch an external process using mpiexec."""
452
440
453 mpi_cmd = List(['mpiexec'], config=True,
441 mpi_cmd = List(['mpiexec'], config=True,
454 help="The mpiexec command to use in starting the process."
442 help="The mpiexec command to use in starting the process."
455 )
443 )
456 mpi_args = List([], config=True,
444 mpi_args = List([], config=True,
457 help="The command line arguments to pass to mpiexec."
445 help="The command line arguments to pass to mpiexec."
458 )
446 )
459 program = List(['date'], config=True,
447 program = List(['date'], config=True,
460 help="The program to start via mpiexec.")
448 help="The program to start via mpiexec.")
461 program_args = List([], config=True,
449 program_args = List([], config=True,
462 help="The command line argument to the program."
450 help="The command line argument to the program."
463 )
451 )
464 n = Int(1)
452 n = Int(1)
465
453
466 def find_args(self):
454 def find_args(self):
467 """Build self.args using all the fields."""
455 """Build self.args using all the fields."""
468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
456 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
469 self.program + self.program_args
457 self.program + self.program_args
470
458
471 def start(self, n):
459 def start(self, n):
472 """Start n instances of the program using mpiexec."""
460 """Start n instances of the program using mpiexec."""
473 self.n = n
461 self.n = n
474 return super(MPIExecLauncher, self).start()
462 return super(MPIExecLauncher, self).start()
475
463
476
464
477 class MPIExecControllerLauncher(MPIExecLauncher):
465 class MPIExecControllerLauncher(MPIExecLauncher):
478 """Launch a controller using mpiexec."""
466 """Launch a controller using mpiexec."""
479
467
480 controller_cmd = List(ipcontroller_cmd_argv, config=True,
468 controller_cmd = List(ipcontroller_cmd_argv, config=True,
481 help="Popen command to launch the Contropper"
469 help="Popen command to launch the Contropper"
482 )
470 )
483 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
471 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
484 help="Command line arguments to pass to ipcontroller."
472 help="Command line arguments to pass to ipcontroller."
485 )
473 )
486 n = Int(1)
474 n = Int(1)
487
475
488 def start(self, profile_dir):
476 def start(self, profile_dir):
489 """Start the controller by profile_dir."""
477 """Start the controller by profile_dir."""
490 self.controller_args.extend(['profile_dir=%s'%profile_dir])
478 self.controller_args.extend(['profile_dir=%s'%profile_dir])
491 self.profile_dir = unicode(profile_dir)
479 self.profile_dir = unicode(profile_dir)
492 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
480 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
493 return super(MPIExecControllerLauncher, self).start(1)
481 return super(MPIExecControllerLauncher, self).start(1)
494
482
495 def find_args(self):
483 def find_args(self):
496 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
484 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
497 self.controller_cmd + self.controller_args
485 self.controller_cmd + self.controller_args
498
486
499
487
500 class MPIExecEngineSetLauncher(MPIExecLauncher):
488 class MPIExecEngineSetLauncher(MPIExecLauncher):
501
489
502 program = List(ipengine_cmd_argv, config=True,
490 program = List(ipengine_cmd_argv, config=True,
503 help="Popen command for ipengine"
491 help="Popen command for ipengine"
504 )
492 )
505 program_args = List(
493 program_args = List(
506 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
494 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
507 help="Command line arguments for ipengine."
495 help="Command line arguments for ipengine."
508 )
496 )
509 n = Int(1)
497 n = Int(1)
510
498
511 def start(self, n, profile_dir):
499 def start(self, n, profile_dir):
512 """Start n engines by profile or profile_dir."""
500 """Start n engines by profile or profile_dir."""
513 self.program_args.extend(['profile_dir=%s'%profile_dir])
501 self.program_args.extend(['profile_dir=%s'%profile_dir])
514 self.profile_dir = unicode(profile_dir)
502 self.profile_dir = unicode(profile_dir)
515 self.n = n
503 self.n = n
516 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
504 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
517 return super(MPIExecEngineSetLauncher, self).start(n)
505 return super(MPIExecEngineSetLauncher, self).start(n)
518
506
519 #-----------------------------------------------------------------------------
507 #-----------------------------------------------------------------------------
520 # SSH launchers
508 # SSH launchers
521 #-----------------------------------------------------------------------------
509 #-----------------------------------------------------------------------------
522
510
523 # TODO: Get SSH Launcher working again.
511 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
524
512
525 class SSHLauncher(LocalProcessLauncher):
513 class SSHLauncher(LocalProcessLauncher):
526 """A minimal launcher for ssh.
514 """A minimal launcher for ssh.
527
515
528 To be useful this will probably have to be extended to use the ``sshx``
516 To be useful this will probably have to be extended to use the ``sshx``
529 idea for environment variables. There could be other things this needs
517 idea for environment variables. There could be other things this needs
530 as well.
518 as well.
531 """
519 """
532
520
533 ssh_cmd = List(['ssh'], config=True,
521 ssh_cmd = List(['ssh'], config=True,
534 help="command for starting ssh")
522 help="command for starting ssh")
535 ssh_args = List(['-tt'], config=True,
523 ssh_args = List(['-tt'], config=True,
536 help="args to pass to ssh")
524 help="args to pass to ssh")
537 program = List(['date'], config=True,
525 program = List(['date'], config=True,
538 help="Program to launch via ssh")
526 help="Program to launch via ssh")
539 program_args = List([], config=True,
527 program_args = List([], config=True,
540 help="args to pass to remote program")
528 help="args to pass to remote program")
541 hostname = Unicode('', config=True,
529 hostname = Unicode('', config=True,
542 help="hostname on which to launch the program")
530 help="hostname on which to launch the program")
543 user = Unicode('', config=True,
531 user = Unicode('', config=True,
544 help="username for ssh")
532 help="username for ssh")
545 location = Unicode('', config=True,
533 location = Unicode('', config=True,
546 help="user@hostname location for ssh in one setting")
534 help="user@hostname location for ssh in one setting")
547
535
548 def _hostname_changed(self, name, old, new):
536 def _hostname_changed(self, name, old, new):
549 if self.user:
537 if self.user:
550 self.location = u'%s@%s' % (self.user, new)
538 self.location = u'%s@%s' % (self.user, new)
551 else:
539 else:
552 self.location = new
540 self.location = new
553
541
554 def _user_changed(self, name, old, new):
542 def _user_changed(self, name, old, new):
555 self.location = u'%s@%s' % (new, self.hostname)
543 self.location = u'%s@%s' % (new, self.hostname)
556
544
557 def find_args(self):
545 def find_args(self):
558 return self.ssh_cmd + self.ssh_args + [self.location] + \
546 return self.ssh_cmd + self.ssh_args + [self.location] + \
559 self.program + self.program_args
547 self.program + self.program_args
560
548
561 def start(self, profile_dir, hostname=None, user=None):
549 def start(self, profile_dir, hostname=None, user=None):
562 self.profile_dir = unicode(profile_dir)
550 self.profile_dir = unicode(profile_dir)
563 if hostname is not None:
551 if hostname is not None:
564 self.hostname = hostname
552 self.hostname = hostname
565 if user is not None:
553 if user is not None:
566 self.user = user
554 self.user = user
567
555
568 return super(SSHLauncher, self).start()
556 return super(SSHLauncher, self).start()
569
557
570 def signal(self, sig):
558 def signal(self, sig):
571 if self.state == 'running':
559 if self.state == 'running':
572 # send escaped ssh connection-closer
560 # send escaped ssh connection-closer
573 self.process.stdin.write('~.')
561 self.process.stdin.write('~.')
574 self.process.stdin.flush()
562 self.process.stdin.flush()
575
563
576
564
577
565
578 class SSHControllerLauncher(SSHLauncher):
566 class SSHControllerLauncher(SSHLauncher):
579
567
580 program = List(ipcontroller_cmd_argv, config=True,
568 program = List(ipcontroller_cmd_argv, config=True,
581 help="remote ipcontroller command.")
569 help="remote ipcontroller command.")
582 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
570 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
583 help="Command line arguments to ipcontroller.")
571 help="Command line arguments to ipcontroller.")
584
572
585
573
586 class SSHEngineLauncher(SSHLauncher):
574 class SSHEngineLauncher(SSHLauncher):
587 program = List(ipengine_cmd_argv, config=True,
575 program = List(ipengine_cmd_argv, config=True,
588 help="remote ipengine command.")
576 help="remote ipengine command.")
589 # Command line arguments for ipengine.
577 # Command line arguments for ipengine.
590 program_args = List(
578 program_args = List(
591 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
579 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
592 help="Command line arguments to ipengine."
580 help="Command line arguments to ipengine."
593 )
581 )
594
582
595 class SSHEngineSetLauncher(LocalEngineSetLauncher):
583 class SSHEngineSetLauncher(LocalEngineSetLauncher):
596 launcher_class = SSHEngineLauncher
584 launcher_class = SSHEngineLauncher
597 engines = Dict(config=True,
585 engines = Dict(config=True,
598 help="""dict of engines to launch. This is a dict by hostname of ints,
586 help="""dict of engines to launch. This is a dict by hostname of ints,
599 corresponding to the number of engines to start on that host.""")
587 corresponding to the number of engines to start on that host.""")
600
588
601 def start(self, n, profile_dir):
589 def start(self, n, profile_dir):
602 """Start engines by profile or profile_dir.
590 """Start engines by profile or profile_dir.
603 `n` is ignored, and the `engines` config property is used instead.
591 `n` is ignored, and the `engines` config property is used instead.
604 """
592 """
605
593
606 self.profile_dir = unicode(profile_dir)
594 self.profile_dir = unicode(profile_dir)
607 dlist = []
595 dlist = []
608 for host, n in self.engines.iteritems():
596 for host, n in self.engines.iteritems():
609 if isinstance(n, (tuple, list)):
597 if isinstance(n, (tuple, list)):
610 n, args = n
598 n, args = n
611 else:
599 else:
612 args = copy.deepcopy(self.engine_args)
600 args = copy.deepcopy(self.engine_args)
613
601
614 if '@' in host:
602 if '@' in host:
615 user,host = host.split('@',1)
603 user,host = host.split('@',1)
616 else:
604 else:
617 user=None
605 user=None
618 for i in range(n):
606 for i in range(n):
619 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
607 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
620
608
621 # Copy the engine args over to each engine launcher.
609 # Copy the engine args over to each engine launcher.
622 i
610 i
623 el.program_args = args
611 el.program_args = args
624 el.on_stop(self._notice_engine_stopped)
612 el.on_stop(self._notice_engine_stopped)
625 d = el.start(profile_dir, user=user, hostname=host)
613 d = el.start(profile_dir, user=user, hostname=host)
626 if i==0:
614 if i==0:
627 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
615 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
628 self.launchers[host+str(i)] = el
616 self.launchers[host+str(i)] = el
629 dlist.append(d)
617 dlist.append(d)
630 self.notify_start(dlist)
618 self.notify_start(dlist)
631 return dlist
619 return dlist
632
620
633
621
634
622
635 #-----------------------------------------------------------------------------
623 #-----------------------------------------------------------------------------
636 # Windows HPC Server 2008 scheduler launchers
624 # Windows HPC Server 2008 scheduler launchers
637 #-----------------------------------------------------------------------------
625 #-----------------------------------------------------------------------------
638
626
639
627
640 # This is only used on Windows.
628 # This is only used on Windows.
641 def find_job_cmd():
629 def find_job_cmd():
642 if WINDOWS:
630 if WINDOWS:
643 try:
631 try:
644 return find_cmd('job')
632 return find_cmd('job')
645 except (FindCmdError, ImportError):
633 except (FindCmdError, ImportError):
646 # ImportError will be raised if win32api is not installed
634 # ImportError will be raised if win32api is not installed
647 return 'job'
635 return 'job'
648 else:
636 else:
649 return 'job'
637 return 'job'
650
638
651
639
652 class WindowsHPCLauncher(BaseLauncher):
640 class WindowsHPCLauncher(BaseLauncher):
653
641
654 job_id_regexp = Unicode(r'\d+', config=True,
642 job_id_regexp = Unicode(r'\d+', config=True,
655 help="""A regular expression used to get the job id from the output of the
643 help="""A regular expression used to get the job id from the output of the
656 submit_command. """
644 submit_command. """
657 )
645 )
658 job_file_name = Unicode(u'ipython_job.xml', config=True,
646 job_file_name = Unicode(u'ipython_job.xml', config=True,
659 help="The filename of the instantiated job script.")
647 help="The filename of the instantiated job script.")
660 # The full path to the instantiated job script. This gets made dynamically
648 # The full path to the instantiated job script. This gets made dynamically
661 # by combining the work_dir with the job_file_name.
649 # by combining the work_dir with the job_file_name.
662 job_file = Unicode(u'')
650 job_file = Unicode(u'')
663 scheduler = Unicode('', config=True,
651 scheduler = Unicode('', config=True,
664 help="The hostname of the scheduler to submit the job to.")
652 help="The hostname of the scheduler to submit the job to.")
665 job_cmd = Unicode(find_job_cmd(), config=True,
653 job_cmd = Unicode(find_job_cmd(), config=True,
666 help="The command for submitting jobs.")
654 help="The command for submitting jobs.")
667
655
668 def __init__(self, work_dir=u'.', config=None, **kwargs):
656 def __init__(self, work_dir=u'.', config=None, **kwargs):
669 super(WindowsHPCLauncher, self).__init__(
657 super(WindowsHPCLauncher, self).__init__(
670 work_dir=work_dir, config=config, **kwargs
658 work_dir=work_dir, config=config, **kwargs
671 )
659 )
672
660
673 @property
661 @property
674 def job_file(self):
662 def job_file(self):
675 return os.path.join(self.work_dir, self.job_file_name)
663 return os.path.join(self.work_dir, self.job_file_name)
676
664
677 def write_job_file(self, n):
665 def write_job_file(self, n):
678 raise NotImplementedError("Implement write_job_file in a subclass.")
666 raise NotImplementedError("Implement write_job_file in a subclass.")
679
667
680 def find_args(self):
668 def find_args(self):
681 return [u'job.exe']
669 return [u'job.exe']
682
670
683 def parse_job_id(self, output):
671 def parse_job_id(self, output):
684 """Take the output of the submit command and return the job id."""
672 """Take the output of the submit command and return the job id."""
685 m = re.search(self.job_id_regexp, output)
673 m = re.search(self.job_id_regexp, output)
686 if m is not None:
674 if m is not None:
687 job_id = m.group()
675 job_id = m.group()
688 else:
676 else:
689 raise LauncherError("Job id couldn't be determined: %s" % output)
677 raise LauncherError("Job id couldn't be determined: %s" % output)
690 self.job_id = job_id
678 self.job_id = job_id
691 self.log.info('Job started with job id: %r' % job_id)
679 self.log.info('Job started with job id: %r' % job_id)
692 return job_id
680 return job_id
693
681
694 def start(self, n):
682 def start(self, n):
695 """Start n copies of the process using the Win HPC job scheduler."""
683 """Start n copies of the process using the Win HPC job scheduler."""
696 self.write_job_file(n)
684 self.write_job_file(n)
697 args = [
685 args = [
698 'submit',
686 'submit',
699 '/jobfile:%s' % self.job_file,
687 '/jobfile:%s' % self.job_file,
700 '/scheduler:%s' % self.scheduler
688 '/scheduler:%s' % self.scheduler
701 ]
689 ]
702 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
690 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
703 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
691
704 output = check_output([self.job_cmd]+args,
692 output = check_output([self.job_cmd]+args,
705 env=os.environ,
693 env=os.environ,
706 cwd=self.work_dir,
694 cwd=self.work_dir,
707 stderr=STDOUT
695 stderr=STDOUT
708 )
696 )
709 job_id = self.parse_job_id(output)
697 job_id = self.parse_job_id(output)
710 self.notify_start(job_id)
698 self.notify_start(job_id)
711 return job_id
699 return job_id
712
700
713 def stop(self):
701 def stop(self):
714 args = [
702 args = [
715 'cancel',
703 'cancel',
716 self.job_id,
704 self.job_id,
717 '/scheduler:%s' % self.scheduler
705 '/scheduler:%s' % self.scheduler
718 ]
706 ]
719 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
707 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
720 try:
708 try:
721 output = check_output([self.job_cmd]+args,
709 output = check_output([self.job_cmd]+args,
722 env=os.environ,
710 env=os.environ,
723 cwd=self.work_dir,
711 cwd=self.work_dir,
724 stderr=STDOUT
712 stderr=STDOUT
725 )
713 )
726 except:
714 except:
727 output = 'The job already appears to be stoppped: %r' % self.job_id
715 output = 'The job already appears to be stoppped: %r' % self.job_id
728 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
716 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
729 return output
717 return output
730
718
731
719
732 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
720 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
733
721
734 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
722 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
735 help="WinHPC xml job file.")
723 help="WinHPC xml job file.")
736 extra_args = List([], config=False,
724 extra_args = List([], config=False,
737 help="extra args to pass to ipcontroller")
725 help="extra args to pass to ipcontroller")
738
726
739 def write_job_file(self, n):
727 def write_job_file(self, n):
740 job = IPControllerJob(config=self.config)
728 job = IPControllerJob(config=self.config)
741
729
742 t = IPControllerTask(config=self.config)
730 t = IPControllerTask(config=self.config)
743 # The tasks work directory is *not* the actual work directory of
731 # The tasks work directory is *not* the actual work directory of
744 # the controller. It is used as the base path for the stdout/stderr
732 # the controller. It is used as the base path for the stdout/stderr
745 # files that the scheduler redirects to.
733 # files that the scheduler redirects to.
746 t.work_directory = self.profile_dir
734 t.work_directory = self.profile_dir
747 # Add the profile_dir and from self.start().
735 # Add the profile_dir and from self.start().
748 t.controller_args.extend(self.extra_args)
736 t.controller_args.extend(self.extra_args)
749 job.add_task(t)
737 job.add_task(t)
750
738
751 self.log.info("Writing job description file: %s" % self.job_file)
739 self.log.info("Writing job description file: %s" % self.job_file)
752 job.write(self.job_file)
740 job.write(self.job_file)
753
741
754 @property
742 @property
755 def job_file(self):
743 def job_file(self):
756 return os.path.join(self.profile_dir, self.job_file_name)
744 return os.path.join(self.profile_dir, self.job_file_name)
757
745
758 def start(self, profile_dir):
746 def start(self, profile_dir):
759 """Start the controller by profile_dir."""
747 """Start the controller by profile_dir."""
760 self.extra_args = ['profile_dir=%s'%profile_dir]
748 self.extra_args = ['profile_dir=%s'%profile_dir]
761 self.profile_dir = unicode(profile_dir)
749 self.profile_dir = unicode(profile_dir)
762 return super(WindowsHPCControllerLauncher, self).start(1)
750 return super(WindowsHPCControllerLauncher, self).start(1)
763
751
764
752
765 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
753 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
766
754
767 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
755 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
768 help="jobfile for ipengines job")
756 help="jobfile for ipengines job")
769 extra_args = List([], config=False,
757 extra_args = List([], config=False,
770 help="extra args to pas to ipengine")
758 help="extra args to pas to ipengine")
771
759
772 def write_job_file(self, n):
760 def write_job_file(self, n):
773 job = IPEngineSetJob(config=self.config)
761 job = IPEngineSetJob(config=self.config)
774
762
775 for i in range(n):
763 for i in range(n):
776 t = IPEngineTask(config=self.config)
764 t = IPEngineTask(config=self.config)
777 # The tasks work directory is *not* the actual work directory of
765 # The tasks work directory is *not* the actual work directory of
778 # the engine. It is used as the base path for the stdout/stderr
766 # the engine. It is used as the base path for the stdout/stderr
779 # files that the scheduler redirects to.
767 # files that the scheduler redirects to.
780 t.work_directory = self.profile_dir
768 t.work_directory = self.profile_dir
781 # Add the profile_dir and from self.start().
769 # Add the profile_dir and from self.start().
782 t.engine_args.extend(self.extra_args)
770 t.engine_args.extend(self.extra_args)
783 job.add_task(t)
771 job.add_task(t)
784
772
785 self.log.info("Writing job description file: %s" % self.job_file)
773 self.log.info("Writing job description file: %s" % self.job_file)
786 job.write(self.job_file)
774 job.write(self.job_file)
787
775
788 @property
776 @property
789 def job_file(self):
777 def job_file(self):
790 return os.path.join(self.profile_dir, self.job_file_name)
778 return os.path.join(self.profile_dir, self.job_file_name)
791
779
792 def start(self, n, profile_dir):
780 def start(self, n, profile_dir):
793 """Start the controller by profile_dir."""
781 """Start the controller by profile_dir."""
794 self.extra_args = ['profile_dir=%s'%profile_dir]
782 self.extra_args = ['profile_dir=%s'%profile_dir]
795 self.profile_dir = unicode(profile_dir)
783 self.profile_dir = unicode(profile_dir)
796 return super(WindowsHPCEngineSetLauncher, self).start(n)
784 return super(WindowsHPCEngineSetLauncher, self).start(n)
797
785
798
786
799 #-----------------------------------------------------------------------------
787 #-----------------------------------------------------------------------------
800 # Batch (PBS) system launchers
788 # Batch (PBS) system launchers
801 #-----------------------------------------------------------------------------
789 #-----------------------------------------------------------------------------
802
790
803 class BatchSystemLauncher(BaseLauncher):
791 class BatchSystemLauncher(BaseLauncher):
804 """Launch an external process using a batch system.
792 """Launch an external process using a batch system.
805
793
806 This class is designed to work with UNIX batch systems like PBS, LSF,
794 This class is designed to work with UNIX batch systems like PBS, LSF,
807 GridEngine, etc. The overall model is that there are different commands
795 GridEngine, etc. The overall model is that there are different commands
808 like qsub, qdel, etc. that handle the starting and stopping of the process.
796 like qsub, qdel, etc. that handle the starting and stopping of the process.
809
797
810 This class also has the notion of a batch script. The ``batch_template``
798 This class also has the notion of a batch script. The ``batch_template``
811 attribute can be set to a string that is a template for the batch script.
799 attribute can be set to a string that is a template for the batch script.
812 This template is instantiated using string formatting. Thus the template can
800 This template is instantiated using string formatting. Thus the template can
813 use {n} fot the number of instances. Subclasses can add additional variables
801 use {n} fot the number of instances. Subclasses can add additional variables
814 to the template dict.
802 to the template dict.
815 """
803 """
816
804
817 # Subclasses must fill these in. See PBSEngineSet
805 # Subclasses must fill these in. See PBSEngineSet
818 submit_command = List([''], config=True,
806 submit_command = List([''], config=True,
819 help="The name of the command line program used to submit jobs.")
807 help="The name of the command line program used to submit jobs.")
820 delete_command = List([''], config=True,
808 delete_command = List([''], config=True,
821 help="The name of the command line program used to delete jobs.")
809 help="The name of the command line program used to delete jobs.")
822 job_id_regexp = Unicode('', config=True,
810 job_id_regexp = Unicode('', config=True,
823 help="""A regular expression used to get the job id from the output of the
811 help="""A regular expression used to get the job id from the output of the
824 submit_command.""")
812 submit_command.""")
825 batch_template = Unicode('', config=True,
813 batch_template = Unicode('', config=True,
826 help="The string that is the batch script template itself.")
814 help="The string that is the batch script template itself.")
827 batch_template_file = Unicode(u'', config=True,
815 batch_template_file = Unicode(u'', config=True,
828 help="The file that contains the batch template.")
816 help="The file that contains the batch template.")
829 batch_file_name = Unicode(u'batch_script', config=True,
817 batch_file_name = Unicode(u'batch_script', config=True,
830 help="The filename of the instantiated batch script.")
818 help="The filename of the instantiated batch script.")
831 queue = Unicode(u'', config=True,
819 queue = Unicode(u'', config=True,
832 help="The PBS Queue.")
820 help="The PBS Queue.")
833
821
834 # not configurable, override in subclasses
822 # not configurable, override in subclasses
835 # PBS Job Array regex
823 # PBS Job Array regex
836 job_array_regexp = Unicode('')
824 job_array_regexp = Unicode('')
837 job_array_template = Unicode('')
825 job_array_template = Unicode('')
838 # PBS Queue regex
826 # PBS Queue regex
839 queue_regexp = Unicode('')
827 queue_regexp = Unicode('')
840 queue_template = Unicode('')
828 queue_template = Unicode('')
841 # The default batch template, override in subclasses
829 # The default batch template, override in subclasses
842 default_template = Unicode('')
830 default_template = Unicode('')
843 # The full path to the instantiated batch script.
831 # The full path to the instantiated batch script.
844 batch_file = Unicode(u'')
832 batch_file = Unicode(u'')
845 # the format dict used with batch_template:
833 # the format dict used with batch_template:
846 context = Dict()
834 context = Dict()
847 # the Formatter instance for rendering the templates:
835 # the Formatter instance for rendering the templates:
848 formatter = Instance(EvalFormatter, (), {})
836 formatter = Instance(EvalFormatter, (), {})
849
837
850
838
851 def find_args(self):
839 def find_args(self):
852 return self.submit_command + [self.batch_file]
840 return self.submit_command + [self.batch_file]
853
841
854 def __init__(self, work_dir=u'.', config=None, **kwargs):
842 def __init__(self, work_dir=u'.', config=None, **kwargs):
855 super(BatchSystemLauncher, self).__init__(
843 super(BatchSystemLauncher, self).__init__(
856 work_dir=work_dir, config=config, **kwargs
844 work_dir=work_dir, config=config, **kwargs
857 )
845 )
858 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
846 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
859
847
860 def parse_job_id(self, output):
848 def parse_job_id(self, output):
861 """Take the output of the submit command and return the job id."""
849 """Take the output of the submit command and return the job id."""
862 m = re.search(self.job_id_regexp, output)
850 m = re.search(self.job_id_regexp, output)
863 if m is not None:
851 if m is not None:
864 job_id = m.group()
852 job_id = m.group()
865 else:
853 else:
866 raise LauncherError("Job id couldn't be determined: %s" % output)
854 raise LauncherError("Job id couldn't be determined: %s" % output)
867 self.job_id = job_id
855 self.job_id = job_id
868 self.log.info('Job submitted with job id: %r' % job_id)
856 self.log.info('Job submitted with job id: %r' % job_id)
869 return job_id
857 return job_id
870
858
871 def write_batch_script(self, n):
859 def write_batch_script(self, n):
872 """Instantiate and write the batch script to the work_dir."""
860 """Instantiate and write the batch script to the work_dir."""
873 self.context['n'] = n
861 self.context['n'] = n
874 self.context['queue'] = self.queue
862 self.context['queue'] = self.queue
875 # first priority is batch_template if set
863 # first priority is batch_template if set
876 if self.batch_template_file and not self.batch_template:
864 if self.batch_template_file and not self.batch_template:
877 # second priority is batch_template_file
865 # second priority is batch_template_file
878 with open(self.batch_template_file) as f:
866 with open(self.batch_template_file) as f:
879 self.batch_template = f.read()
867 self.batch_template = f.read()
880 if not self.batch_template:
868 if not self.batch_template:
881 # third (last) priority is default_template
869 # third (last) priority is default_template
882 self.batch_template = self.default_template
870 self.batch_template = self.default_template
883
871
884 regex = re.compile(self.job_array_regexp)
872 regex = re.compile(self.job_array_regexp)
885 # print regex.search(self.batch_template)
873 # print regex.search(self.batch_template)
886 if not regex.search(self.batch_template):
874 if not regex.search(self.batch_template):
887 self.log.info("adding job array settings to batch script")
875 self.log.info("adding job array settings to batch script")
888 firstline, rest = self.batch_template.split('\n',1)
876 firstline, rest = self.batch_template.split('\n',1)
889 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
877 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
890
878
891 regex = re.compile(self.queue_regexp)
879 regex = re.compile(self.queue_regexp)
892 # print regex.search(self.batch_template)
880 # print regex.search(self.batch_template)
893 if self.queue and not regex.search(self.batch_template):
881 if self.queue and not regex.search(self.batch_template):
894 self.log.info("adding PBS queue settings to batch script")
882 self.log.info("adding PBS queue settings to batch script")
895 firstline, rest = self.batch_template.split('\n',1)
883 firstline, rest = self.batch_template.split('\n',1)
896 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
884 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
897
885
898 script_as_string = self.formatter.format(self.batch_template, **self.context)
886 script_as_string = self.formatter.format(self.batch_template, **self.context)
899 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
887 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
900
888
901 with open(self.batch_file, 'w') as f:
889 with open(self.batch_file, 'w') as f:
902 f.write(script_as_string)
890 f.write(script_as_string)
903 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
891 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
904
892
905 def start(self, n, profile_dir):
893 def start(self, n, profile_dir):
906 """Start n copies of the process using a batch system."""
894 """Start n copies of the process using a batch system."""
907 # Here we save profile_dir in the context so they
895 # Here we save profile_dir in the context so they
908 # can be used in the batch script template as {profile_dir}
896 # can be used in the batch script template as {profile_dir}
909 self.context['profile_dir'] = profile_dir
897 self.context['profile_dir'] = profile_dir
910 self.profile_dir = unicode(profile_dir)
898 self.profile_dir = unicode(profile_dir)
911 self.write_batch_script(n)
899 self.write_batch_script(n)
912 output = check_output(self.args, env=os.environ)
900 output = check_output(self.args, env=os.environ)
913
901
914 job_id = self.parse_job_id(output)
902 job_id = self.parse_job_id(output)
915 self.notify_start(job_id)
903 self.notify_start(job_id)
916 return job_id
904 return job_id
917
905
918 def stop(self):
906 def stop(self):
919 output = check_output(self.delete_command+[self.job_id], env=os.environ)
907 output = check_output(self.delete_command+[self.job_id], env=os.environ)
920 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
908 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
921 return output
909 return output
922
910
923
911
924 class PBSLauncher(BatchSystemLauncher):
912 class PBSLauncher(BatchSystemLauncher):
925 """A BatchSystemLauncher subclass for PBS."""
913 """A BatchSystemLauncher subclass for PBS."""
926
914
927 submit_command = List(['qsub'], config=True,
915 submit_command = List(['qsub'], config=True,
928 help="The PBS submit command ['qsub']")
916 help="The PBS submit command ['qsub']")
929 delete_command = List(['qdel'], config=True,
917 delete_command = List(['qdel'], config=True,
930 help="The PBS delete command ['qsub']")
918 help="The PBS delete command ['qsub']")
931 job_id_regexp = Unicode(r'\d+', config=True,
919 job_id_regexp = Unicode(r'\d+', config=True,
932 help="Regular expresion for identifying the job ID [r'\d+']")
920 help="Regular expresion for identifying the job ID [r'\d+']")
933
921
934 batch_file = Unicode(u'')
922 batch_file = Unicode(u'')
935 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
923 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
936 job_array_template = Unicode('#PBS -t 1-{n}')
924 job_array_template = Unicode('#PBS -t 1-{n}')
937 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
925 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
938 queue_template = Unicode('#PBS -q {queue}')
926 queue_template = Unicode('#PBS -q {queue}')
939
927
940
928
941 class PBSControllerLauncher(PBSLauncher):
929 class PBSControllerLauncher(PBSLauncher):
942 """Launch a controller using PBS."""
930 """Launch a controller using PBS."""
943
931
944 batch_file_name = Unicode(u'pbs_controller', config=True,
932 batch_file_name = Unicode(u'pbs_controller', config=True,
945 help="batch file name for the controller job.")
933 help="batch file name for the controller job.")
946 default_template= Unicode("""#!/bin/sh
934 default_template= Unicode("""#!/bin/sh
947 #PBS -V
935 #PBS -V
948 #PBS -N ipcontroller
936 #PBS -N ipcontroller
949 %s --log-to-file profile_dir={profile_dir}
937 %s --log-to-file profile_dir={profile_dir}
950 """%(' '.join(ipcontroller_cmd_argv)))
938 """%(' '.join(ipcontroller_cmd_argv)))
951
939
952 def start(self, profile_dir):
940 def start(self, profile_dir):
953 """Start the controller by profile or profile_dir."""
941 """Start the controller by profile or profile_dir."""
954 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
942 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
955 return super(PBSControllerLauncher, self).start(1, profile_dir)
943 return super(PBSControllerLauncher, self).start(1, profile_dir)
956
944
957
945
958 class PBSEngineSetLauncher(PBSLauncher):
946 class PBSEngineSetLauncher(PBSLauncher):
959 """Launch Engines using PBS"""
947 """Launch Engines using PBS"""
960 batch_file_name = Unicode(u'pbs_engines', config=True,
948 batch_file_name = Unicode(u'pbs_engines', config=True,
961 help="batch file name for the engine(s) job.")
949 help="batch file name for the engine(s) job.")
962 default_template= Unicode(u"""#!/bin/sh
950 default_template= Unicode(u"""#!/bin/sh
963 #PBS -V
951 #PBS -V
964 #PBS -N ipengine
952 #PBS -N ipengine
965 %s profile_dir={profile_dir}
953 %s profile_dir={profile_dir}
966 """%(' '.join(ipengine_cmd_argv)))
954 """%(' '.join(ipengine_cmd_argv)))
967
955
968 def start(self, n, profile_dir):
956 def start(self, n, profile_dir):
969 """Start n engines by profile or profile_dir."""
957 """Start n engines by profile or profile_dir."""
970 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
958 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
971 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
959 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
972
960
973 #SGE is very similar to PBS
961 #SGE is very similar to PBS
974
962
975 class SGELauncher(PBSLauncher):
963 class SGELauncher(PBSLauncher):
976 """Sun GridEngine is a PBS clone with slightly different syntax"""
964 """Sun GridEngine is a PBS clone with slightly different syntax"""
977 job_array_regexp = Unicode('#\$\W+\-t')
965 job_array_regexp = Unicode('#\$\W+\-t')
978 job_array_template = Unicode('#$ -t 1-{n}')
966 job_array_template = Unicode('#$ -t 1-{n}')
979 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
967 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
980 queue_template = Unicode('#$ -q $queue')
968 queue_template = Unicode('#$ -q $queue')
981
969
982 class SGEControllerLauncher(SGELauncher):
970 class SGEControllerLauncher(SGELauncher):
983 """Launch a controller using SGE."""
971 """Launch a controller using SGE."""
984
972
985 batch_file_name = Unicode(u'sge_controller', config=True,
973 batch_file_name = Unicode(u'sge_controller', config=True,
986 help="batch file name for the ipontroller job.")
974 help="batch file name for the ipontroller job.")
987 default_template= Unicode(u"""#$ -V
975 default_template= Unicode(u"""#$ -V
988 #$ -S /bin/sh
976 #$ -S /bin/sh
989 #$ -N ipcontroller
977 #$ -N ipcontroller
990 %s --log-to-file profile_dir={profile_dir}
978 %s --log-to-file profile_dir={profile_dir}
991 """%(' '.join(ipcontroller_cmd_argv)))
979 """%(' '.join(ipcontroller_cmd_argv)))
992
980
993 def start(self, profile_dir):
981 def start(self, profile_dir):
994 """Start the controller by profile or profile_dir."""
982 """Start the controller by profile or profile_dir."""
995 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
983 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
996 return super(SGEControllerLauncher, self).start(1, profile_dir)
984 return super(SGEControllerLauncher, self).start(1, profile_dir)
997
985
998 class SGEEngineSetLauncher(SGELauncher):
986 class SGEEngineSetLauncher(SGELauncher):
999 """Launch Engines with SGE"""
987 """Launch Engines with SGE"""
1000 batch_file_name = Unicode(u'sge_engines', config=True,
988 batch_file_name = Unicode(u'sge_engines', config=True,
1001 help="batch file name for the engine(s) job.")
989 help="batch file name for the engine(s) job.")
1002 default_template = Unicode("""#$ -V
990 default_template = Unicode("""#$ -V
1003 #$ -S /bin/sh
991 #$ -S /bin/sh
1004 #$ -N ipengine
992 #$ -N ipengine
1005 %s profile_dir={profile_dir}
993 %s profile_dir={profile_dir}
1006 """%(' '.join(ipengine_cmd_argv)))
994 """%(' '.join(ipengine_cmd_argv)))
1007
995
1008 def start(self, n, profile_dir):
996 def start(self, n, profile_dir):
1009 """Start n engines by profile or profile_dir."""
997 """Start n engines by profile or profile_dir."""
1010 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
998 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1011 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
999 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1012
1000
1013
1001
1014 #-----------------------------------------------------------------------------
1002 #-----------------------------------------------------------------------------
1015 # A launcher for ipcluster itself!
1003 # A launcher for ipcluster itself!
1016 #-----------------------------------------------------------------------------
1004 #-----------------------------------------------------------------------------
1017
1005
1018
1006
1019 class IPClusterLauncher(LocalProcessLauncher):
1007 class IPClusterLauncher(LocalProcessLauncher):
1020 """Launch the ipcluster program in an external process."""
1008 """Launch the ipcluster program in an external process."""
1021
1009
1022 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1010 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1023 help="Popen command for ipcluster")
1011 help="Popen command for ipcluster")
1024 ipcluster_args = List(
1012 ipcluster_args = List(
1025 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1013 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1026 help="Command line arguments to pass to ipcluster.")
1014 help="Command line arguments to pass to ipcluster.")
1027 ipcluster_subcommand = Unicode('start')
1015 ipcluster_subcommand = Unicode('start')
1028 ipcluster_n = Int(2)
1016 ipcluster_n = Int(2)
1029
1017
1030 def find_args(self):
1018 def find_args(self):
1031 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1019 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1032 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1020 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1033
1021
1034 def start(self):
1022 def start(self):
1035 self.log.info("Starting ipcluster: %r" % self.args)
1023 self.log.info("Starting ipcluster: %r" % self.args)
1036 return super(IPClusterLauncher, self).start()
1024 return super(IPClusterLauncher, self).start()
1037
1025
1038 #-----------------------------------------------------------------------------
1026 #-----------------------------------------------------------------------------
1039 # Collections of launchers
1027 # Collections of launchers
1040 #-----------------------------------------------------------------------------
1028 #-----------------------------------------------------------------------------
1041
1029
1042 local_launchers = [
1030 local_launchers = [
1043 LocalControllerLauncher,
1031 LocalControllerLauncher,
1044 LocalEngineLauncher,
1032 LocalEngineLauncher,
1045 LocalEngineSetLauncher,
1033 LocalEngineSetLauncher,
1046 ]
1034 ]
1047 mpi_launchers = [
1035 mpi_launchers = [
1048 MPIExecLauncher,
1036 MPIExecLauncher,
1049 MPIExecControllerLauncher,
1037 MPIExecControllerLauncher,
1050 MPIExecEngineSetLauncher,
1038 MPIExecEngineSetLauncher,
1051 ]
1039 ]
1052 ssh_launchers = [
1040 ssh_launchers = [
1053 SSHLauncher,
1041 SSHLauncher,
1054 SSHControllerLauncher,
1042 SSHControllerLauncher,
1055 SSHEngineLauncher,
1043 SSHEngineLauncher,
1056 SSHEngineSetLauncher,
1044 SSHEngineSetLauncher,
1057 ]
1045 ]
1058 winhpc_launchers = [
1046 winhpc_launchers = [
1059 WindowsHPCLauncher,
1047 WindowsHPCLauncher,
1060 WindowsHPCControllerLauncher,
1048 WindowsHPCControllerLauncher,
1061 WindowsHPCEngineSetLauncher,
1049 WindowsHPCEngineSetLauncher,
1062 ]
1050 ]
1063 pbs_launchers = [
1051 pbs_launchers = [
1064 PBSLauncher,
1052 PBSLauncher,
1065 PBSControllerLauncher,
1053 PBSControllerLauncher,
1066 PBSEngineSetLauncher,
1054 PBSEngineSetLauncher,
1067 ]
1055 ]
1068 sge_launchers = [
1056 sge_launchers = [
1069 SGELauncher,
1057 SGELauncher,
1070 SGEControllerLauncher,
1058 SGEControllerLauncher,
1071 SGEEngineSetLauncher,
1059 SGEEngineSetLauncher,
1072 ]
1060 ]
1073 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1061 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1074 + pbs_launchers + sge_launchers
1062 + pbs_launchers + sge_launchers
1063
General Comments 0
You need to be logged in to leave comments. Login now