##// END OF EJS Templates
don't pass profile_dir as kwarg in ipclusterapp...
MinRK -
Show More
@@ -1,521 +1,521 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import errno
18 import errno
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import signal
22 import signal
23
23
24 from subprocess import check_call, CalledProcessError, PIPE
24 from subprocess import check_call, CalledProcessError, PIPE
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27
27
28 from IPython.config.application import Application, boolean_flag
28 from IPython.config.application import Application, boolean_flag
29 from IPython.config.loader import Config
29 from IPython.config.loader import Config
30 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
30 from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
31 from IPython.utils.importstring import import_item
31 from IPython.utils.importstring import import_item
32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
32 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
33
33
34 from IPython.parallel.apps.baseapp import (
34 from IPython.parallel.apps.baseapp import (
35 BaseParallelApplication,
35 BaseParallelApplication,
36 PIDFileError,
36 PIDFileError,
37 base_flags, base_aliases
37 base_flags, base_aliases
38 )
38 )
39
39
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Module level variables
42 # Module level variables
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 default_config_file_name = u'ipcluster_config.py'
46 default_config_file_name = u'ipcluster_config.py'
47
47
48
48
49 _description = """Start an IPython cluster for parallel computing.
49 _description = """Start an IPython cluster for parallel computing.
50
50
51 An IPython cluster consists of 1 controller and 1 or more engines.
51 An IPython cluster consists of 1 controller and 1 or more engines.
52 This command automates the startup of these processes using a wide
52 This command automates the startup of these processes using a wide
53 range of startup methods (SSH, local processes, PBS, mpiexec,
53 range of startup methods (SSH, local processes, PBS, mpiexec,
54 Windows HPC Server 2008). To start a cluster with 4 engines on your
54 Windows HPC Server 2008). To start a cluster with 4 engines on your
55 local host simply do 'ipcluster start n=4'. For more complex usage
55 local host simply do 'ipcluster start n=4'. For more complex usage
56 you will typically do 'ipcluster create profile=mycluster', then edit
56 you will typically do 'ipcluster create profile=mycluster', then edit
57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
57 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
58 """
58 """
59
59
60
60
61 # Exit codes for ipcluster
61 # Exit codes for ipcluster
62
62
63 # This will be the exit code if the ipcluster appears to be running because
63 # This will be the exit code if the ipcluster appears to be running because
64 # a .pid file exists
64 # a .pid file exists
65 ALREADY_STARTED = 10
65 ALREADY_STARTED = 10
66
66
67
67
68 # This will be the exit code if ipcluster stop is run, but there is not .pid
68 # This will be the exit code if ipcluster stop is run, but there is not .pid
69 # file to be found.
69 # file to be found.
70 ALREADY_STOPPED = 11
70 ALREADY_STOPPED = 11
71
71
72 # This will be the exit code if ipcluster engines is run, but there is not .pid
72 # This will be the exit code if ipcluster engines is run, but there is not .pid
73 # file to be found.
73 # file to be found.
74 NO_CLUSTER = 12
74 NO_CLUSTER = 12
75
75
76
76
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78 # Main application
78 # Main application
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80 start_help = """Start an IPython cluster for parallel computing
80 start_help = """Start an IPython cluster for parallel computing
81
81
82 Start an ipython cluster by its profile name or cluster
82 Start an ipython cluster by its profile name or cluster
83 directory. Cluster directories contain configuration, log and
83 directory. Cluster directories contain configuration, log and
84 security related files and are named using the convention
84 security related files and are named using the convention
85 'cluster_<profile>' and should be creating using the 'start'
85 'cluster_<profile>' and should be creating using the 'start'
86 subcommand of 'ipcluster'. If your cluster directory is in
86 subcommand of 'ipcluster'. If your cluster directory is in
87 the cwd or the ipython directory, you can simply refer to it
87 the cwd or the ipython directory, you can simply refer to it
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
88 using its profile name, 'ipcluster start n=4 profile=<profile>`,
89 otherwise use the 'profile_dir' option.
89 otherwise use the 'profile_dir' option.
90 """
90 """
91 stop_help = """Stop a running IPython cluster
91 stop_help = """Stop a running IPython cluster
92
92
93 Stop a running ipython cluster by its profile name or cluster
93 Stop a running ipython cluster by its profile name or cluster
94 directory. Cluster directories are named using the convention
94 directory. Cluster directories are named using the convention
95 'cluster_<profile>'. If your cluster directory is in
95 'cluster_<profile>'. If your cluster directory is in
96 the cwd or the ipython directory, you can simply refer to it
96 the cwd or the ipython directory, you can simply refer to it
97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
97 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
98 use the 'profile_dir' option.
98 use the 'profile_dir' option.
99 """
99 """
100 engines_help = """Start engines connected to an existing IPython cluster
100 engines_help = """Start engines connected to an existing IPython cluster
101
101
102 Start one or more engines to connect to an existing Cluster
102 Start one or more engines to connect to an existing Cluster
103 by profile name or cluster directory.
103 by profile name or cluster directory.
104 Cluster directories contain configuration, log and
104 Cluster directories contain configuration, log and
105 security related files and are named using the convention
105 security related files and are named using the convention
106 'cluster_<profile>' and should be creating using the 'start'
106 'cluster_<profile>' and should be creating using the 'start'
107 subcommand of 'ipcluster'. If your cluster directory is in
107 subcommand of 'ipcluster'. If your cluster directory is in
108 the cwd or the ipython directory, you can simply refer to it
108 the cwd or the ipython directory, you can simply refer to it
109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
109 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
110 otherwise use the 'profile_dir' option.
110 otherwise use the 'profile_dir' option.
111 """
111 """
112 create_help = """Create an ipcluster profile by name
112 create_help = """Create an ipcluster profile by name
113
113
114 Create an ipython cluster directory by its profile name or
114 Create an ipython cluster directory by its profile name or
115 cluster directory path. Cluster directories contain
115 cluster directory path. Cluster directories contain
116 configuration, log and security related files and are named
116 configuration, log and security related files and are named
117 using the convention 'cluster_<profile>'. By default they are
117 using the convention 'cluster_<profile>'. By default they are
118 located in your ipython directory. Once created, you will
118 located in your ipython directory. Once created, you will
119 probably need to edit the configuration files in the cluster
119 probably need to edit the configuration files in the cluster
120 directory to configure your cluster. Most users will create a
120 directory to configure your cluster. Most users will create a
121 cluster directory by profile name,
121 cluster directory by profile name,
122 `ipcluster create profile=mycluster`, which will put the directory
122 `ipcluster create profile=mycluster`, which will put the directory
123 in `<ipython_dir>/cluster_mycluster`.
123 in `<ipython_dir>/cluster_mycluster`.
124 """
124 """
125 list_help = """List available cluster profiles
125 list_help = """List available cluster profiles
126
126
127 List all available clusters, by cluster directory, that can
127 List all available clusters, by cluster directory, that can
128 be found in the current working directly or in the ipython
128 be found in the current working directly or in the ipython
129 directory. Cluster directories are named using the convention
129 directory. Cluster directories are named using the convention
130 'cluster_<profile>'.
130 'cluster_<profile>'.
131 """
131 """
132
132
133
133
134 class IPClusterList(BaseIPythonApplication):
134 class IPClusterList(BaseIPythonApplication):
135 name = u'ipcluster-list'
135 name = u'ipcluster-list'
136 description = list_help
136 description = list_help
137
137
138 # empty aliases
138 # empty aliases
139 aliases=Dict()
139 aliases=Dict()
140 flags = Dict(base_flags)
140 flags = Dict(base_flags)
141
141
142 def _log_level_default(self):
142 def _log_level_default(self):
143 return 20
143 return 20
144
144
145 def list_profile_dirs(self):
145 def list_profile_dirs(self):
146 # Find the search paths
146 # Find the search paths
147 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
147 profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
148 if profile_dir_paths:
148 if profile_dir_paths:
149 profile_dir_paths = profile_dir_paths.split(':')
149 profile_dir_paths = profile_dir_paths.split(':')
150 else:
150 else:
151 profile_dir_paths = []
151 profile_dir_paths = []
152
152
153 ipython_dir = self.ipython_dir
153 ipython_dir = self.ipython_dir
154
154
155 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
155 paths = [os.getcwd(), ipython_dir] + profile_dir_paths
156 paths = list(set(paths))
156 paths = list(set(paths))
157
157
158 self.log.info('Searching for cluster profiles in paths: %r' % paths)
158 self.log.info('Searching for cluster profiles in paths: %r' % paths)
159 for path in paths:
159 for path in paths:
160 files = os.listdir(path)
160 files = os.listdir(path)
161 for f in files:
161 for f in files:
162 full_path = os.path.join(path, f)
162 full_path = os.path.join(path, f)
163 if os.path.isdir(full_path) and f.startswith('profile_') and \
163 if os.path.isdir(full_path) and f.startswith('profile_') and \
164 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
164 os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
165 profile = f.split('_')[-1]
165 profile = f.split('_')[-1]
166 start_cmd = 'ipcluster start profile=%s n=4' % profile
166 start_cmd = 'ipcluster start profile=%s n=4' % profile
167 print start_cmd + " ==> " + full_path
167 print start_cmd + " ==> " + full_path
168
168
169 def start(self):
169 def start(self):
170 self.list_profile_dirs()
170 self.list_profile_dirs()
171
171
172
172
173 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
173 # `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
174
174
175 create_flags = {}
175 create_flags = {}
176 create_flags.update(base_flags)
176 create_flags.update(base_flags)
177 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
177 create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
178 "reset config files to defaults", "leave existing config files"))
178 "reset config files to defaults", "leave existing config files"))
179
179
180 class IPClusterCreate(BaseParallelApplication):
180 class IPClusterCreate(BaseParallelApplication):
181 name = u'ipcluster-create'
181 name = u'ipcluster-create'
182 description = create_help
182 description = create_help
183 auto_create = Bool(True)
183 auto_create = Bool(True)
184 config_file_name = Unicode(default_config_file_name)
184 config_file_name = Unicode(default_config_file_name)
185
185
186 flags = Dict(create_flags)
186 flags = Dict(create_flags)
187
187
188 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
188 aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
189
189
190 classes = [ProfileDir]
190 classes = [ProfileDir]
191
191
192
192
193 stop_aliases = dict(
193 stop_aliases = dict(
194 signal='IPClusterStop.signal',
194 signal='IPClusterStop.signal',
195 profile='BaseIPythonApplication.profile',
195 profile='BaseIPythonApplication.profile',
196 profile_dir='ProfileDir.location',
196 profile_dir='ProfileDir.location',
197 )
197 )
198
198
199 class IPClusterStop(BaseParallelApplication):
199 class IPClusterStop(BaseParallelApplication):
200 name = u'ipcluster'
200 name = u'ipcluster'
201 description = stop_help
201 description = stop_help
202 config_file_name = Unicode(default_config_file_name)
202 config_file_name = Unicode(default_config_file_name)
203
203
204 signal = Int(signal.SIGINT, config=True,
204 signal = Int(signal.SIGINT, config=True,
205 help="signal to use for stopping processes.")
205 help="signal to use for stopping processes.")
206
206
207 aliases = Dict(stop_aliases)
207 aliases = Dict(stop_aliases)
208
208
209 def start(self):
209 def start(self):
210 """Start the app for the stop subcommand."""
210 """Start the app for the stop subcommand."""
211 try:
211 try:
212 pid = self.get_pid_from_file()
212 pid = self.get_pid_from_file()
213 except PIDFileError:
213 except PIDFileError:
214 self.log.critical(
214 self.log.critical(
215 'Could not read pid file, cluster is probably not running.'
215 'Could not read pid file, cluster is probably not running.'
216 )
216 )
217 # Here I exit with a unusual exit status that other processes
217 # Here I exit with a unusual exit status that other processes
218 # can watch for to learn how I existed.
218 # can watch for to learn how I existed.
219 self.remove_pid_file()
219 self.remove_pid_file()
220 self.exit(ALREADY_STOPPED)
220 self.exit(ALREADY_STOPPED)
221
221
222 if not self.check_pid(pid):
222 if not self.check_pid(pid):
223 self.log.critical(
223 self.log.critical(
224 'Cluster [pid=%r] is not running.' % pid
224 'Cluster [pid=%r] is not running.' % pid
225 )
225 )
226 self.remove_pid_file()
226 self.remove_pid_file()
227 # Here I exit with a unusual exit status that other processes
227 # Here I exit with a unusual exit status that other processes
228 # can watch for to learn how I existed.
228 # can watch for to learn how I existed.
229 self.exit(ALREADY_STOPPED)
229 self.exit(ALREADY_STOPPED)
230
230
231 elif os.name=='posix':
231 elif os.name=='posix':
232 sig = self.signal
232 sig = self.signal
233 self.log.info(
233 self.log.info(
234 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
234 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
235 )
235 )
236 try:
236 try:
237 os.kill(pid, sig)
237 os.kill(pid, sig)
238 except OSError:
238 except OSError:
239 self.log.error("Stopping cluster failed, assuming already dead.",
239 self.log.error("Stopping cluster failed, assuming already dead.",
240 exc_info=True)
240 exc_info=True)
241 self.remove_pid_file()
241 self.remove_pid_file()
242 elif os.name=='nt':
242 elif os.name=='nt':
243 try:
243 try:
244 # kill the whole tree
244 # kill the whole tree
245 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
245 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
246 except (CalledProcessError, OSError):
246 except (CalledProcessError, OSError):
247 self.log.error("Stopping cluster failed, assuming already dead.",
247 self.log.error("Stopping cluster failed, assuming already dead.",
248 exc_info=True)
248 exc_info=True)
249 self.remove_pid_file()
249 self.remove_pid_file()
250
250
251 engine_aliases = {}
251 engine_aliases = {}
252 engine_aliases.update(base_aliases)
252 engine_aliases.update(base_aliases)
253 engine_aliases.update(dict(
253 engine_aliases.update(dict(
254 n='IPClusterEngines.n',
254 n='IPClusterEngines.n',
255 elauncher = 'IPClusterEngines.engine_launcher_class',
255 elauncher = 'IPClusterEngines.engine_launcher_class',
256 ))
256 ))
257 class IPClusterEngines(BaseParallelApplication):
257 class IPClusterEngines(BaseParallelApplication):
258
258
259 name = u'ipcluster'
259 name = u'ipcluster'
260 description = engines_help
260 description = engines_help
261 usage = None
261 usage = None
262 config_file_name = Unicode(default_config_file_name)
262 config_file_name = Unicode(default_config_file_name)
263 default_log_level = logging.INFO
263 default_log_level = logging.INFO
264 classes = List()
264 classes = List()
265 def _classes_default(self):
265 def _classes_default(self):
266 from IPython.parallel.apps import launcher
266 from IPython.parallel.apps import launcher
267 launchers = launcher.all_launchers
267 launchers = launcher.all_launchers
268 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
268 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
269 return [ProfileDir]+eslaunchers
269 return [ProfileDir]+eslaunchers
270
270
271 n = Int(2, config=True,
271 n = Int(2, config=True,
272 help="The number of engines to start.")
272 help="The number of engines to start.")
273
273
274 engine_launcher_class = Unicode('LocalEngineSetLauncher',
274 engine_launcher_class = Unicode('LocalEngineSetLauncher',
275 config=True,
275 config=True,
276 help="The class for launching a set of Engines."
276 help="The class for launching a set of Engines."
277 )
277 )
278 daemonize = Bool(False, config=True,
278 daemonize = Bool(False, config=True,
279 help='Daemonize the ipcluster program. This implies --log-to-file')
279 help='Daemonize the ipcluster program. This implies --log-to-file')
280
280
281 def _daemonize_changed(self, name, old, new):
281 def _daemonize_changed(self, name, old, new):
282 if new:
282 if new:
283 self.log_to_file = True
283 self.log_to_file = True
284
284
285 aliases = Dict(engine_aliases)
285 aliases = Dict(engine_aliases)
286 # flags = Dict(flags)
286 # flags = Dict(flags)
287 _stopping = False
287 _stopping = False
288
288
289 def initialize(self, argv=None):
289 def initialize(self, argv=None):
290 super(IPClusterEngines, self).initialize(argv)
290 super(IPClusterEngines, self).initialize(argv)
291 self.init_signal()
291 self.init_signal()
292 self.init_launchers()
292 self.init_launchers()
293
293
294 def init_launchers(self):
294 def init_launchers(self):
295 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
295 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
296 self.engine_launcher.on_stop(lambda r: self.loop.stop())
296 self.engine_launcher.on_stop(lambda r: self.loop.stop())
297
297
298 def init_signal(self):
298 def init_signal(self):
299 # Setup signals
299 # Setup signals
300 signal.signal(signal.SIGINT, self.sigint_handler)
300 signal.signal(signal.SIGINT, self.sigint_handler)
301
301
302 def build_launcher(self, clsname):
302 def build_launcher(self, clsname):
303 """import and instantiate a Launcher based on importstring"""
303 """import and instantiate a Launcher based on importstring"""
304 if '.' not in clsname:
304 if '.' not in clsname:
305 # not a module, presume it's the raw name in apps.launcher
305 # not a module, presume it's the raw name in apps.launcher
306 clsname = 'IPython.parallel.apps.launcher.'+clsname
306 clsname = 'IPython.parallel.apps.launcher.'+clsname
307 # print repr(clsname)
307 # print repr(clsname)
308 klass = import_item(clsname)
308 klass = import_item(clsname)
309
309
310 launcher = klass(
310 launcher = klass(
311 work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
311 work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
312 )
312 )
313 return launcher
313 return launcher
314
314
315 def start_engines(self):
315 def start_engines(self):
316 self.log.info("Starting %i engines"%self.n)
316 self.log.info("Starting %i engines"%self.n)
317 self.engine_launcher.start(
317 self.engine_launcher.start(
318 self.n,
318 self.n,
319 profile_dir=self.profile_dir.location
319 self.profile_dir.location
320 )
320 )
321
321
322 def stop_engines(self):
322 def stop_engines(self):
323 self.log.info("Stopping Engines...")
323 self.log.info("Stopping Engines...")
324 if self.engine_launcher.running:
324 if self.engine_launcher.running:
325 d = self.engine_launcher.stop()
325 d = self.engine_launcher.stop()
326 return d
326 return d
327 else:
327 else:
328 return None
328 return None
329
329
330 def stop_launchers(self, r=None):
330 def stop_launchers(self, r=None):
331 if not self._stopping:
331 if not self._stopping:
332 self._stopping = True
332 self._stopping = True
333 self.log.error("IPython cluster: stopping")
333 self.log.error("IPython cluster: stopping")
334 self.stop_engines()
334 self.stop_engines()
335 # Wait a few seconds to let things shut down.
335 # Wait a few seconds to let things shut down.
336 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
336 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
337 dc.start()
337 dc.start()
338
338
339 def sigint_handler(self, signum, frame):
339 def sigint_handler(self, signum, frame):
340 self.log.debug("SIGINT received, stopping launchers...")
340 self.log.debug("SIGINT received, stopping launchers...")
341 self.stop_launchers()
341 self.stop_launchers()
342
342
343 def start_logging(self):
343 def start_logging(self):
344 # Remove old log files of the controller and engine
344 # Remove old log files of the controller and engine
345 if self.clean_logs:
345 if self.clean_logs:
346 log_dir = self.profile_dir.log_dir
346 log_dir = self.profile_dir.log_dir
347 for f in os.listdir(log_dir):
347 for f in os.listdir(log_dir):
348 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
348 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
349 os.remove(os.path.join(log_dir, f))
349 os.remove(os.path.join(log_dir, f))
350 # This will remove old log files for ipcluster itself
350 # This will remove old log files for ipcluster itself
351 # super(IPBaseParallelApplication, self).start_logging()
351 # super(IPBaseParallelApplication, self).start_logging()
352
352
353 def start(self):
353 def start(self):
354 """Start the app for the engines subcommand."""
354 """Start the app for the engines subcommand."""
355 self.log.info("IPython cluster: started")
355 self.log.info("IPython cluster: started")
356 # First see if the cluster is already running
356 # First see if the cluster is already running
357
357
358 # Now log and daemonize
358 # Now log and daemonize
359 self.log.info(
359 self.log.info(
360 'Starting engines with [daemon=%r]' % self.daemonize
360 'Starting engines with [daemon=%r]' % self.daemonize
361 )
361 )
362 # TODO: Get daemonize working on Windows or as a Windows Server.
362 # TODO: Get daemonize working on Windows or as a Windows Server.
363 if self.daemonize:
363 if self.daemonize:
364 if os.name=='posix':
364 if os.name=='posix':
365 from twisted.scripts._twistd_unix import daemonize
365 from twisted.scripts._twistd_unix import daemonize
366 daemonize()
366 daemonize()
367
367
368 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
368 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
369 dc.start()
369 dc.start()
370 # Now write the new pid file AFTER our new forked pid is active.
370 # Now write the new pid file AFTER our new forked pid is active.
371 # self.write_pid_file()
371 # self.write_pid_file()
372 try:
372 try:
373 self.loop.start()
373 self.loop.start()
374 except KeyboardInterrupt:
374 except KeyboardInterrupt:
375 pass
375 pass
376 except zmq.ZMQError as e:
376 except zmq.ZMQError as e:
377 if e.errno == errno.EINTR:
377 if e.errno == errno.EINTR:
378 pass
378 pass
379 else:
379 else:
380 raise
380 raise
381
381
382 start_aliases = {}
382 start_aliases = {}
383 start_aliases.update(engine_aliases)
383 start_aliases.update(engine_aliases)
384 start_aliases.update(dict(
384 start_aliases.update(dict(
385 delay='IPClusterStart.delay',
385 delay='IPClusterStart.delay',
386 clean_logs='IPClusterStart.clean_logs',
386 clean_logs='IPClusterStart.clean_logs',
387 ))
387 ))
388
388
389 class IPClusterStart(IPClusterEngines):
389 class IPClusterStart(IPClusterEngines):
390
390
391 name = u'ipcluster'
391 name = u'ipcluster'
392 description = start_help
392 description = start_help
393 default_log_level = logging.INFO
393 default_log_level = logging.INFO
394 auto_create = Bool(True, config=True,
394 auto_create = Bool(True, config=True,
395 help="whether to create the profile_dir if it doesn't exist")
395 help="whether to create the profile_dir if it doesn't exist")
396 classes = List()
396 classes = List()
397 def _classes_default(self,):
397 def _classes_default(self,):
398 from IPython.parallel.apps import launcher
398 from IPython.parallel.apps import launcher
399 return [ProfileDir]+launcher.all_launchers
399 return [ProfileDir]+launcher.all_launchers
400
400
401 clean_logs = Bool(True, config=True,
401 clean_logs = Bool(True, config=True,
402 help="whether to cleanup old logs before starting")
402 help="whether to cleanup old logs before starting")
403
403
404 delay = CFloat(1., config=True,
404 delay = CFloat(1., config=True,
405 help="delay (in s) between starting the controller and the engines")
405 help="delay (in s) between starting the controller and the engines")
406
406
407 controller_launcher_class = Unicode('LocalControllerLauncher',
407 controller_launcher_class = Unicode('LocalControllerLauncher',
408 config=True,
408 config=True,
409 help="The class for launching a Controller."
409 help="The class for launching a Controller."
410 )
410 )
411 reset = Bool(False, config=True,
411 reset = Bool(False, config=True,
412 help="Whether to reset config files as part of '--create'."
412 help="Whether to reset config files as part of '--create'."
413 )
413 )
414
414
415 # flags = Dict(flags)
415 # flags = Dict(flags)
416 aliases = Dict(start_aliases)
416 aliases = Dict(start_aliases)
417
417
418 def init_launchers(self):
418 def init_launchers(self):
419 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
419 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
420 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
420 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
421 self.controller_launcher.on_stop(self.stop_launchers)
421 self.controller_launcher.on_stop(self.stop_launchers)
422
422
423 def start_controller(self):
423 def start_controller(self):
424 self.controller_launcher.start(
424 self.controller_launcher.start(
425 profile_dir=self.profile_dir.location
425 self.profile_dir.location
426 )
426 )
427
427
428 def stop_controller(self):
428 def stop_controller(self):
429 # self.log.info("In stop_controller")
429 # self.log.info("In stop_controller")
430 if self.controller_launcher and self.controller_launcher.running:
430 if self.controller_launcher and self.controller_launcher.running:
431 return self.controller_launcher.stop()
431 return self.controller_launcher.stop()
432
432
433 def stop_launchers(self, r=None):
433 def stop_launchers(self, r=None):
434 if not self._stopping:
434 if not self._stopping:
435 self.stop_controller()
435 self.stop_controller()
436 super(IPClusterStart, self).stop_launchers()
436 super(IPClusterStart, self).stop_launchers()
437
437
438 def start(self):
438 def start(self):
439 """Start the app for the start subcommand."""
439 """Start the app for the start subcommand."""
440 # First see if the cluster is already running
440 # First see if the cluster is already running
441 try:
441 try:
442 pid = self.get_pid_from_file()
442 pid = self.get_pid_from_file()
443 except PIDFileError:
443 except PIDFileError:
444 pass
444 pass
445 else:
445 else:
446 if self.check_pid(pid):
446 if self.check_pid(pid):
447 self.log.critical(
447 self.log.critical(
448 'Cluster is already running with [pid=%s]. '
448 'Cluster is already running with [pid=%s]. '
449 'use "ipcluster stop" to stop the cluster.' % pid
449 'use "ipcluster stop" to stop the cluster.' % pid
450 )
450 )
451 # Here I exit with a unusual exit status that other processes
451 # Here I exit with a unusual exit status that other processes
452 # can watch for to learn how I existed.
452 # can watch for to learn how I existed.
453 self.exit(ALREADY_STARTED)
453 self.exit(ALREADY_STARTED)
454 else:
454 else:
455 self.remove_pid_file()
455 self.remove_pid_file()
456
456
457
457
458 # Now log and daemonize
458 # Now log and daemonize
459 self.log.info(
459 self.log.info(
460 'Starting ipcluster with [daemon=%r]' % self.daemonize
460 'Starting ipcluster with [daemon=%r]' % self.daemonize
461 )
461 )
462 # TODO: Get daemonize working on Windows or as a Windows Server.
462 # TODO: Get daemonize working on Windows or as a Windows Server.
463 if self.daemonize:
463 if self.daemonize:
464 if os.name=='posix':
464 if os.name=='posix':
465 from twisted.scripts._twistd_unix import daemonize
465 from twisted.scripts._twistd_unix import daemonize
466 daemonize()
466 daemonize()
467
467
468 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
468 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
469 dc.start()
469 dc.start()
470 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
470 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
471 dc.start()
471 dc.start()
472 # Now write the new pid file AFTER our new forked pid is active.
472 # Now write the new pid file AFTER our new forked pid is active.
473 self.write_pid_file()
473 self.write_pid_file()
474 try:
474 try:
475 self.loop.start()
475 self.loop.start()
476 except KeyboardInterrupt:
476 except KeyboardInterrupt:
477 pass
477 pass
478 except zmq.ZMQError as e:
478 except zmq.ZMQError as e:
479 if e.errno == errno.EINTR:
479 if e.errno == errno.EINTR:
480 pass
480 pass
481 else:
481 else:
482 raise
482 raise
483 finally:
483 finally:
484 self.remove_pid_file()
484 self.remove_pid_file()
485
485
486 base='IPython.parallel.apps.ipclusterapp.IPCluster'
486 base='IPython.parallel.apps.ipclusterapp.IPCluster'
487
487
488 class IPBaseParallelApplication(Application):
488 class IPBaseParallelApplication(Application):
489 name = u'ipcluster'
489 name = u'ipcluster'
490 description = _description
490 description = _description
491
491
492 subcommands = {'create' : (base+'Create', create_help),
492 subcommands = {'create' : (base+'Create', create_help),
493 'list' : (base+'List', list_help),
493 'list' : (base+'List', list_help),
494 'start' : (base+'Start', start_help),
494 'start' : (base+'Start', start_help),
495 'stop' : (base+'Stop', stop_help),
495 'stop' : (base+'Stop', stop_help),
496 'engines' : (base+'Engines', engines_help),
496 'engines' : (base+'Engines', engines_help),
497 }
497 }
498
498
499 # no aliases or flags for parent App
499 # no aliases or flags for parent App
500 aliases = Dict()
500 aliases = Dict()
501 flags = Dict()
501 flags = Dict()
502
502
503 def start(self):
503 def start(self):
504 if self.subapp is None:
504 if self.subapp is None:
505 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
505 print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
506 print
506 print
507 self.print_subcommands()
507 self.print_subcommands()
508 self.exit(1)
508 self.exit(1)
509 else:
509 else:
510 return self.subapp.start()
510 return self.subapp.start()
511
511
512 def launch_new_instance():
512 def launch_new_instance():
513 """Create and run the IPython cluster."""
513 """Create and run the IPython cluster."""
514 app = IPBaseParallelApplication.instance()
514 app = IPBaseParallelApplication.instance()
515 app.initialize()
515 app.initialize()
516 app.start()
516 app.start()
517
517
518
518
519 if __name__ == '__main__':
519 if __name__ == '__main__':
520 launch_new_instance()
520 launch_new_instance()
521
521
@@ -1,1070 +1,1070 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23
23
24 # signal imports, handling various platforms, versions
24 # signal imports, handling various platforms, versions
25
25
26 from signal import SIGINT, SIGTERM
26 from signal import SIGINT, SIGTERM
27 try:
27 try:
28 from signal import SIGKILL
28 from signal import SIGKILL
29 except ImportError:
29 except ImportError:
30 # Windows
30 # Windows
31 SIGKILL=SIGTERM
31 SIGKILL=SIGTERM
32
32
33 try:
33 try:
34 # Windows >= 2.7, 3.2
34 # Windows >= 2.7, 3.2
35 from signal import CTRL_C_EVENT as SIGINT
35 from signal import CTRL_C_EVENT as SIGINT
36 except ImportError:
36 except ImportError:
37 pass
37 pass
38
38
39 from subprocess import Popen, PIPE, STDOUT
39 from subprocess import Popen, PIPE, STDOUT
40 try:
40 try:
41 from subprocess import check_output
41 from subprocess import check_output
42 except ImportError:
42 except ImportError:
43 # pre-2.7, define check_output with Popen
43 # pre-2.7, define check_output with Popen
44 def check_output(*args, **kwargs):
44 def check_output(*args, **kwargs):
45 kwargs.update(dict(stdout=PIPE))
45 kwargs.update(dict(stdout=PIPE))
46 p = Popen(*args, **kwargs)
46 p = Popen(*args, **kwargs)
47 out,err = p.communicate()
47 out,err = p.communicate()
48 return out
48 return out
49
49
50 from zmq.eventloop import ioloop
50 from zmq.eventloop import ioloop
51
51
52 from IPython.external import Itpl
52 from IPython.external import Itpl
53 # from IPython.config.configurable import Configurable
53 # from IPython.config.configurable import Configurable
54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
55 from IPython.utils.path import get_ipython_module_path
55 from IPython.utils.path import get_ipython_module_path
56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
57
57
58 from IPython.parallel.factory import LoggingFactory
58 from IPython.parallel.factory import LoggingFactory
59
59
60 from .win32support import forward_read_events
60 from .win32support import forward_read_events
61
61
62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
63
63
64 WINDOWS = os.name == 'nt'
64 WINDOWS = os.name == 'nt'
65
65
66 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
67 # Paths to the kernel apps
67 # Paths to the kernel apps
68 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
69
69
70
70
71 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
71 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
72 'IPython.parallel.apps.ipclusterapp'
72 'IPython.parallel.apps.ipclusterapp'
73 ))
73 ))
74
74
75 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
75 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
76 'IPython.parallel.apps.ipengineapp'
76 'IPython.parallel.apps.ipengineapp'
77 ))
77 ))
78
78
79 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
79 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
80 'IPython.parallel.apps.ipcontrollerapp'
80 'IPython.parallel.apps.ipcontrollerapp'
81 ))
81 ))
82
82
83 #-----------------------------------------------------------------------------
83 #-----------------------------------------------------------------------------
84 # Base launchers and errors
84 # Base launchers and errors
85 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
86
86
87
87
88 class LauncherError(Exception):
88 class LauncherError(Exception):
89 pass
89 pass
90
90
91
91
92 class ProcessStateError(LauncherError):
92 class ProcessStateError(LauncherError):
93 pass
93 pass
94
94
95
95
96 class UnknownStatus(LauncherError):
96 class UnknownStatus(LauncherError):
97 pass
97 pass
98
98
99
99
100 class BaseLauncher(LoggingFactory):
100 class BaseLauncher(LoggingFactory):
101 """An asbtraction for starting, stopping and signaling a process."""
101 """An asbtraction for starting, stopping and signaling a process."""
102
102
103 # In all of the launchers, the work_dir is where child processes will be
103 # In all of the launchers, the work_dir is where child processes will be
104 # run. This will usually be the profile_dir, but may not be. any work_dir
104 # run. This will usually be the profile_dir, but may not be. any work_dir
105 # passed into the __init__ method will override the config value.
105 # passed into the __init__ method will override the config value.
106 # This should not be used to set the work_dir for the actual engine
106 # This should not be used to set the work_dir for the actual engine
107 # and controller. Instead, use their own config files or the
107 # and controller. Instead, use their own config files or the
108 # controller_args, engine_args attributes of the launchers to add
108 # controller_args, engine_args attributes of the launchers to add
109 # the work_dir option.
109 # the work_dir option.
110 work_dir = Unicode(u'.')
110 work_dir = Unicode(u'.')
111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
112
112
113 start_data = Any()
113 start_data = Any()
114 stop_data = Any()
114 stop_data = Any()
115
115
116 def _loop_default(self):
116 def _loop_default(self):
117 return ioloop.IOLoop.instance()
117 return ioloop.IOLoop.instance()
118
118
119 def __init__(self, work_dir=u'.', config=None, **kwargs):
119 def __init__(self, work_dir=u'.', config=None, **kwargs):
120 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
120 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
121 self.state = 'before' # can be before, running, after
121 self.state = 'before' # can be before, running, after
122 self.stop_callbacks = []
122 self.stop_callbacks = []
123 self.start_data = None
123 self.start_data = None
124 self.stop_data = None
124 self.stop_data = None
125
125
126 @property
126 @property
127 def args(self):
127 def args(self):
128 """A list of cmd and args that will be used to start the process.
128 """A list of cmd and args that will be used to start the process.
129
129
130 This is what is passed to :func:`spawnProcess` and the first element
130 This is what is passed to :func:`spawnProcess` and the first element
131 will be the process name.
131 will be the process name.
132 """
132 """
133 return self.find_args()
133 return self.find_args()
134
134
135 def find_args(self):
135 def find_args(self):
136 """The ``.args`` property calls this to find the args list.
136 """The ``.args`` property calls this to find the args list.
137
137
138 Subcommand should implement this to construct the cmd and args.
138 Subcommand should implement this to construct the cmd and args.
139 """
139 """
140 raise NotImplementedError('find_args must be implemented in a subclass')
140 raise NotImplementedError('find_args must be implemented in a subclass')
141
141
142 @property
142 @property
143 def arg_str(self):
143 def arg_str(self):
144 """The string form of the program arguments."""
144 """The string form of the program arguments."""
145 return ' '.join(self.args)
145 return ' '.join(self.args)
146
146
147 @property
147 @property
148 def running(self):
148 def running(self):
149 """Am I running."""
149 """Am I running."""
150 if self.state == 'running':
150 if self.state == 'running':
151 return True
151 return True
152 else:
152 else:
153 return False
153 return False
154
154
155 def start(self):
155 def start(self):
156 """Start the process.
156 """Start the process.
157
157
158 This must return a deferred that fires with information about the
158 This must return a deferred that fires with information about the
159 process starting (like a pid, job id, etc.).
159 process starting (like a pid, job id, etc.).
160 """
160 """
161 raise NotImplementedError('start must be implemented in a subclass')
161 raise NotImplementedError('start must be implemented in a subclass')
162
162
163 def stop(self):
163 def stop(self):
164 """Stop the process and notify observers of stopping.
164 """Stop the process and notify observers of stopping.
165
165
166 This must return a deferred that fires with information about the
166 This must return a deferred that fires with information about the
167 processing stopping, like errors that occur while the process is
167 processing stopping, like errors that occur while the process is
168 attempting to be shut down. This deferred won't fire when the process
168 attempting to be shut down. This deferred won't fire when the process
169 actually stops. To observe the actual process stopping, see
169 actually stops. To observe the actual process stopping, see
170 :func:`observe_stop`.
170 :func:`observe_stop`.
171 """
171 """
172 raise NotImplementedError('stop must be implemented in a subclass')
172 raise NotImplementedError('stop must be implemented in a subclass')
173
173
174 def on_stop(self, f):
174 def on_stop(self, f):
175 """Get a deferred that will fire when the process stops.
175 """Get a deferred that will fire when the process stops.
176
176
177 The deferred will fire with data that contains information about
177 The deferred will fire with data that contains information about
178 the exit status of the process.
178 the exit status of the process.
179 """
179 """
180 if self.state=='after':
180 if self.state=='after':
181 return f(self.stop_data)
181 return f(self.stop_data)
182 else:
182 else:
183 self.stop_callbacks.append(f)
183 self.stop_callbacks.append(f)
184
184
185 def notify_start(self, data):
185 def notify_start(self, data):
186 """Call this to trigger startup actions.
186 """Call this to trigger startup actions.
187
187
188 This logs the process startup and sets the state to 'running'. It is
188 This logs the process startup and sets the state to 'running'. It is
189 a pass-through so it can be used as a callback.
189 a pass-through so it can be used as a callback.
190 """
190 """
191
191
192 self.log.info('Process %r started: %r' % (self.args[0], data))
192 self.log.info('Process %r started: %r' % (self.args[0], data))
193 self.start_data = data
193 self.start_data = data
194 self.state = 'running'
194 self.state = 'running'
195 return data
195 return data
196
196
197 def notify_stop(self, data):
197 def notify_stop(self, data):
198 """Call this to trigger process stop actions.
198 """Call this to trigger process stop actions.
199
199
200 This logs the process stopping and sets the state to 'after'. Call
200 This logs the process stopping and sets the state to 'after'. Call
201 this to trigger all the deferreds from :func:`observe_stop`."""
201 this to trigger all the deferreds from :func:`observe_stop`."""
202
202
203 self.log.info('Process %r stopped: %r' % (self.args[0], data))
203 self.log.info('Process %r stopped: %r' % (self.args[0], data))
204 self.stop_data = data
204 self.stop_data = data
205 self.state = 'after'
205 self.state = 'after'
206 for i in range(len(self.stop_callbacks)):
206 for i in range(len(self.stop_callbacks)):
207 d = self.stop_callbacks.pop()
207 d = self.stop_callbacks.pop()
208 d(data)
208 d(data)
209 return data
209 return data
210
210
211 def signal(self, sig):
211 def signal(self, sig):
212 """Signal the process.
212 """Signal the process.
213
213
214 Return a semi-meaningless deferred after signaling the process.
214 Return a semi-meaningless deferred after signaling the process.
215
215
216 Parameters
216 Parameters
217 ----------
217 ----------
218 sig : str or int
218 sig : str or int
219 'KILL', 'INT', etc., or any signal number
219 'KILL', 'INT', etc., or any signal number
220 """
220 """
221 raise NotImplementedError('signal must be implemented in a subclass')
221 raise NotImplementedError('signal must be implemented in a subclass')
222
222
223
223
224 #-----------------------------------------------------------------------------
224 #-----------------------------------------------------------------------------
225 # Local process launchers
225 # Local process launchers
226 #-----------------------------------------------------------------------------
226 #-----------------------------------------------------------------------------
227
227
228
228
229 class LocalProcessLauncher(BaseLauncher):
229 class LocalProcessLauncher(BaseLauncher):
230 """Start and stop an external process in an asynchronous manner.
230 """Start and stop an external process in an asynchronous manner.
231
231
232 This will launch the external process with a working directory of
232 This will launch the external process with a working directory of
233 ``self.work_dir``.
233 ``self.work_dir``.
234 """
234 """
235
235
236 # This is used to to construct self.args, which is passed to
236 # This is used to to construct self.args, which is passed to
237 # spawnProcess.
237 # spawnProcess.
238 cmd_and_args = List([])
238 cmd_and_args = List([])
239 poll_frequency = Int(100) # in ms
239 poll_frequency = Int(100) # in ms
240
240
241 def __init__(self, work_dir=u'.', config=None, **kwargs):
241 def __init__(self, work_dir=u'.', config=None, **kwargs):
242 super(LocalProcessLauncher, self).__init__(
242 super(LocalProcessLauncher, self).__init__(
243 work_dir=work_dir, config=config, **kwargs
243 work_dir=work_dir, config=config, **kwargs
244 )
244 )
245 self.process = None
245 self.process = None
246 self.start_deferred = None
246 self.start_deferred = None
247 self.poller = None
247 self.poller = None
248
248
249 def find_args(self):
249 def find_args(self):
250 return self.cmd_and_args
250 return self.cmd_and_args
251
251
252 def start(self):
252 def start(self):
253 if self.state == 'before':
253 if self.state == 'before':
254 self.process = Popen(self.args,
254 self.process = Popen(self.args,
255 stdout=PIPE,stderr=PIPE,stdin=PIPE,
255 stdout=PIPE,stderr=PIPE,stdin=PIPE,
256 env=os.environ,
256 env=os.environ,
257 cwd=self.work_dir
257 cwd=self.work_dir
258 )
258 )
259 if WINDOWS:
259 if WINDOWS:
260 self.stdout = forward_read_events(self.process.stdout)
260 self.stdout = forward_read_events(self.process.stdout)
261 self.stderr = forward_read_events(self.process.stderr)
261 self.stderr = forward_read_events(self.process.stderr)
262 else:
262 else:
263 self.stdout = self.process.stdout.fileno()
263 self.stdout = self.process.stdout.fileno()
264 self.stderr = self.process.stderr.fileno()
264 self.stderr = self.process.stderr.fileno()
265 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
265 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
266 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
266 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
267 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
267 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
268 self.poller.start()
268 self.poller.start()
269 self.notify_start(self.process.pid)
269 self.notify_start(self.process.pid)
270 else:
270 else:
271 s = 'The process was already started and has state: %r' % self.state
271 s = 'The process was already started and has state: %r' % self.state
272 raise ProcessStateError(s)
272 raise ProcessStateError(s)
273
273
274 def stop(self):
274 def stop(self):
275 return self.interrupt_then_kill()
275 return self.interrupt_then_kill()
276
276
277 def signal(self, sig):
277 def signal(self, sig):
278 if self.state == 'running':
278 if self.state == 'running':
279 if WINDOWS and sig != SIGINT:
279 if WINDOWS and sig != SIGINT:
280 # use Windows tree-kill for better child cleanup
280 # use Windows tree-kill for better child cleanup
281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
282 else:
282 else:
283 self.process.send_signal(sig)
283 self.process.send_signal(sig)
284
284
285 def interrupt_then_kill(self, delay=2.0):
285 def interrupt_then_kill(self, delay=2.0):
286 """Send INT, wait a delay and then send KILL."""
286 """Send INT, wait a delay and then send KILL."""
287 try:
287 try:
288 self.signal(SIGINT)
288 self.signal(SIGINT)
289 except Exception:
289 except Exception:
290 self.log.debug("interrupt failed")
290 self.log.debug("interrupt failed")
291 pass
291 pass
292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
293 self.killer.start()
293 self.killer.start()
294
294
295 # callbacks, etc:
295 # callbacks, etc:
296
296
297 def handle_stdout(self, fd, events):
297 def handle_stdout(self, fd, events):
298 if WINDOWS:
298 if WINDOWS:
299 line = self.stdout.recv()
299 line = self.stdout.recv()
300 else:
300 else:
301 line = self.process.stdout.readline()
301 line = self.process.stdout.readline()
302 # a stopped process will be readable but return empty strings
302 # a stopped process will be readable but return empty strings
303 if line:
303 if line:
304 self.log.info(line[:-1])
304 self.log.info(line[:-1])
305 else:
305 else:
306 self.poll()
306 self.poll()
307
307
308 def handle_stderr(self, fd, events):
308 def handle_stderr(self, fd, events):
309 if WINDOWS:
309 if WINDOWS:
310 line = self.stderr.recv()
310 line = self.stderr.recv()
311 else:
311 else:
312 line = self.process.stderr.readline()
312 line = self.process.stderr.readline()
313 # a stopped process will be readable but return empty strings
313 # a stopped process will be readable but return empty strings
314 if line:
314 if line:
315 self.log.error(line[:-1])
315 self.log.error(line[:-1])
316 else:
316 else:
317 self.poll()
317 self.poll()
318
318
319 def poll(self):
319 def poll(self):
320 status = self.process.poll()
320 status = self.process.poll()
321 if status is not None:
321 if status is not None:
322 self.poller.stop()
322 self.poller.stop()
323 self.loop.remove_handler(self.stdout)
323 self.loop.remove_handler(self.stdout)
324 self.loop.remove_handler(self.stderr)
324 self.loop.remove_handler(self.stderr)
325 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
325 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
326 return status
326 return status
327
327
328 class LocalControllerLauncher(LocalProcessLauncher):
328 class LocalControllerLauncher(LocalProcessLauncher):
329 """Launch a controller as a regular external process."""
329 """Launch a controller as a regular external process."""
330
330
331 controller_cmd = List(ipcontroller_cmd_argv, config=True,
331 controller_cmd = List(ipcontroller_cmd_argv, config=True,
332 help="""Popen command to launch ipcontroller.""")
332 help="""Popen command to launch ipcontroller.""")
333 # Command line arguments to ipcontroller.
333 # Command line arguments to ipcontroller.
334 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
334 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
335 help="""command-line args to pass to ipcontroller""")
335 help="""command-line args to pass to ipcontroller""")
336
336
337 def find_args(self):
337 def find_args(self):
338 return self.controller_cmd + self.controller_args
338 return self.controller_cmd + self.controller_args
339
339
340 def start(self, profile_dir):
340 def start(self, profile_dir):
341 """Start the controller by profile_dir."""
341 """Start the controller by profile_dir."""
342 self.controller_args.extend(['profile_dir=%s'%profile_dir])
342 self.controller_args.extend(['profile_dir=%s'%profile_dir])
343 self.profile_dir = unicode(profile_dir)
343 self.profile_dir = unicode(profile_dir)
344 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
344 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
345 return super(LocalControllerLauncher, self).start()
345 return super(LocalControllerLauncher, self).start()
346
346
347
347
348 class LocalEngineLauncher(LocalProcessLauncher):
348 class LocalEngineLauncher(LocalProcessLauncher):
349 """Launch a single engine as a regular externall process."""
349 """Launch a single engine as a regular externall process."""
350
350
351 engine_cmd = List(ipengine_cmd_argv, config=True,
351 engine_cmd = List(ipengine_cmd_argv, config=True,
352 help="""command to launch the Engine.""")
352 help="""command to launch the Engine.""")
353 # Command line arguments for ipengine.
353 # Command line arguments for ipengine.
354 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
354 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
355 help="command-line arguments to pass to ipengine"
355 help="command-line arguments to pass to ipengine"
356 )
356 )
357
357
358 def find_args(self):
358 def find_args(self):
359 return self.engine_cmd + self.engine_args
359 return self.engine_cmd + self.engine_args
360
360
361 def start(self, profile_dir):
361 def start(self, profile_dir):
362 """Start the engine by profile_dir."""
362 """Start the engine by profile_dir."""
363 self.engine_args.extend(['profile_dir=%s'%profile_dir])
363 self.engine_args.extend(['profile_dir=%s'%profile_dir])
364 self.profile_dir = unicode(profile_dir)
364 self.profile_dir = unicode(profile_dir)
365 return super(LocalEngineLauncher, self).start()
365 return super(LocalEngineLauncher, self).start()
366
366
367
367
368 class LocalEngineSetLauncher(BaseLauncher):
368 class LocalEngineSetLauncher(BaseLauncher):
369 """Launch a set of engines as regular external processes."""
369 """Launch a set of engines as regular external processes."""
370
370
371 # Command line arguments for ipengine.
371 # Command line arguments for ipengine.
372 engine_args = List(
372 engine_args = List(
373 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
373 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
374 help="command-line arguments to pass to ipengine"
374 help="command-line arguments to pass to ipengine"
375 )
375 )
376 # launcher class
376 # launcher class
377 launcher_class = LocalEngineLauncher
377 launcher_class = LocalEngineLauncher
378
378
379 launchers = Dict()
379 launchers = Dict()
380 stop_data = Dict()
380 stop_data = Dict()
381
381
382 def __init__(self, work_dir=u'.', config=None, **kwargs):
382 def __init__(self, work_dir=u'.', config=None, **kwargs):
383 super(LocalEngineSetLauncher, self).__init__(
383 super(LocalEngineSetLauncher, self).__init__(
384 work_dir=work_dir, config=config, **kwargs
384 work_dir=work_dir, config=config, **kwargs
385 )
385 )
386 self.stop_data = {}
386 self.stop_data = {}
387
387
388 def start(self, n, profile_dir):
388 def start(self, n, profile_dir):
389 """Start n engines by profile or profile_dir."""
389 """Start n engines by profile or profile_dir."""
390 self.profile_dir = unicode(profile_dir)
390 self.profile_dir = unicode(profile_dir)
391 dlist = []
391 dlist = []
392 for i in range(n):
392 for i in range(n):
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
394 # Copy the engine args over to each engine launcher.
394 # Copy the engine args over to each engine launcher.
395 el.engine_args = copy.deepcopy(self.engine_args)
395 el.engine_args = copy.deepcopy(self.engine_args)
396 el.on_stop(self._notice_engine_stopped)
396 el.on_stop(self._notice_engine_stopped)
397 d = el.start(profile_dir)
397 d = el.start(profile_dir)
398 if i==0:
398 if i==0:
399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 self.launchers[i] = el
400 self.launchers[i] = el
401 dlist.append(d)
401 dlist.append(d)
402 self.notify_start(dlist)
402 self.notify_start(dlist)
403 # The consumeErrors here could be dangerous
403 # The consumeErrors here could be dangerous
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 # dfinal.addCallback(self.notify_start)
405 # dfinal.addCallback(self.notify_start)
406 return dlist
406 return dlist
407
407
408 def find_args(self):
408 def find_args(self):
409 return ['engine set']
409 return ['engine set']
410
410
411 def signal(self, sig):
411 def signal(self, sig):
412 dlist = []
412 dlist = []
413 for el in self.launchers.itervalues():
413 for el in self.launchers.itervalues():
414 d = el.signal(sig)
414 d = el.signal(sig)
415 dlist.append(d)
415 dlist.append(d)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 return dlist
417 return dlist
418
418
419 def interrupt_then_kill(self, delay=1.0):
419 def interrupt_then_kill(self, delay=1.0):
420 dlist = []
420 dlist = []
421 for el in self.launchers.itervalues():
421 for el in self.launchers.itervalues():
422 d = el.interrupt_then_kill(delay)
422 d = el.interrupt_then_kill(delay)
423 dlist.append(d)
423 dlist.append(d)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 return dlist
425 return dlist
426
426
427 def stop(self):
427 def stop(self):
428 return self.interrupt_then_kill()
428 return self.interrupt_then_kill()
429
429
430 def _notice_engine_stopped(self, data):
430 def _notice_engine_stopped(self, data):
431 pid = data['pid']
431 pid = data['pid']
432 for idx,el in self.launchers.iteritems():
432 for idx,el in self.launchers.iteritems():
433 if el.process.pid == pid:
433 if el.process.pid == pid:
434 break
434 break
435 self.launchers.pop(idx)
435 self.launchers.pop(idx)
436 self.stop_data[idx] = data
436 self.stop_data[idx] = data
437 if not self.launchers:
437 if not self.launchers:
438 self.notify_stop(self.stop_data)
438 self.notify_stop(self.stop_data)
439
439
440
440
441 #-----------------------------------------------------------------------------
441 #-----------------------------------------------------------------------------
442 # MPIExec launchers
442 # MPIExec launchers
443 #-----------------------------------------------------------------------------
443 #-----------------------------------------------------------------------------
444
444
445
445
446 class MPIExecLauncher(LocalProcessLauncher):
446 class MPIExecLauncher(LocalProcessLauncher):
447 """Launch an external process using mpiexec."""
447 """Launch an external process using mpiexec."""
448
448
449 mpi_cmd = List(['mpiexec'], config=True,
449 mpi_cmd = List(['mpiexec'], config=True,
450 help="The mpiexec command to use in starting the process."
450 help="The mpiexec command to use in starting the process."
451 )
451 )
452 mpi_args = List([], config=True,
452 mpi_args = List([], config=True,
453 help="The command line arguments to pass to mpiexec."
453 help="The command line arguments to pass to mpiexec."
454 )
454 )
455 program = List(['date'], config=True,
455 program = List(['date'], config=True,
456 help="The program to start via mpiexec.")
456 help="The program to start via mpiexec.")
457 program_args = List([], config=True,
457 program_args = List([], config=True,
458 help="The command line argument to the program."
458 help="The command line argument to the program."
459 )
459 )
460 n = Int(1)
460 n = Int(1)
461
461
462 def find_args(self):
462 def find_args(self):
463 """Build self.args using all the fields."""
463 """Build self.args using all the fields."""
464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 self.program + self.program_args
465 self.program + self.program_args
466
466
467 def start(self, n):
467 def start(self, n):
468 """Start n instances of the program using mpiexec."""
468 """Start n instances of the program using mpiexec."""
469 self.n = n
469 self.n = n
470 return super(MPIExecLauncher, self).start()
470 return super(MPIExecLauncher, self).start()
471
471
472
472
473 class MPIExecControllerLauncher(MPIExecLauncher):
473 class MPIExecControllerLauncher(MPIExecLauncher):
474 """Launch a controller using mpiexec."""
474 """Launch a controller using mpiexec."""
475
475
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 help="Popen command to launch the Contropper"
477 help="Popen command to launch the Contropper"
478 )
478 )
479 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
479 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
480 help="Command line arguments to pass to ipcontroller."
480 help="Command line arguments to pass to ipcontroller."
481 )
481 )
482 n = Int(1)
482 n = Int(1)
483
483
484 def start(self, profile_dir):
484 def start(self, profile_dir):
485 """Start the controller by profile_dir."""
485 """Start the controller by profile_dir."""
486 self.controller_args.extend(['profile_dir=%s'%profile_dir])
486 self.controller_args.extend(['profile_dir=%s'%profile_dir])
487 self.profile_dir = unicode(profile_dir)
487 self.profile_dir = unicode(profile_dir)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 return super(MPIExecControllerLauncher, self).start(1)
489 return super(MPIExecControllerLauncher, self).start(1)
490
490
491 def find_args(self):
491 def find_args(self):
492 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
492 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
493 self.controller_cmd + self.controller_args
493 self.controller_cmd + self.controller_args
494
494
495
495
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
497
497
498 program = List(ipengine_cmd_argv, config=True,
498 program = List(ipengine_cmd_argv, config=True,
499 help="Popen command for ipengine"
499 help="Popen command for ipengine"
500 )
500 )
501 program_args = List(
501 program_args = List(
502 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
502 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
503 help="Command line arguments for ipengine."
503 help="Command line arguments for ipengine."
504 )
504 )
505 n = Int(1)
505 n = Int(1)
506
506
507 def start(self, n, profile_dir):
507 def start(self, n, profile_dir):
508 """Start n engines by profile or profile_dir."""
508 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['profile_dir=%s'%profile_dir])
509 self.program_args.extend(['profile_dir=%s'%profile_dir])
510 self.profile_dir = unicode(profile_dir)
510 self.profile_dir = unicode(profile_dir)
511 self.n = n
511 self.n = n
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 return super(MPIExecEngineSetLauncher, self).start(n)
513 return super(MPIExecEngineSetLauncher, self).start(n)
514
514
515 #-----------------------------------------------------------------------------
515 #-----------------------------------------------------------------------------
516 # SSH launchers
516 # SSH launchers
517 #-----------------------------------------------------------------------------
517 #-----------------------------------------------------------------------------
518
518
519 # TODO: Get SSH Launcher working again.
519 # TODO: Get SSH Launcher working again.
520
520
521 class SSHLauncher(LocalProcessLauncher):
521 class SSHLauncher(LocalProcessLauncher):
522 """A minimal launcher for ssh.
522 """A minimal launcher for ssh.
523
523
524 To be useful this will probably have to be extended to use the ``sshx``
524 To be useful this will probably have to be extended to use the ``sshx``
525 idea for environment variables. There could be other things this needs
525 idea for environment variables. There could be other things this needs
526 as well.
526 as well.
527 """
527 """
528
528
529 ssh_cmd = List(['ssh'], config=True,
529 ssh_cmd = List(['ssh'], config=True,
530 help="command for starting ssh")
530 help="command for starting ssh")
531 ssh_args = List(['-tt'], config=True,
531 ssh_args = List(['-tt'], config=True,
532 help="args to pass to ssh")
532 help="args to pass to ssh")
533 program = List(['date'], config=True,
533 program = List(['date'], config=True,
534 help="Program to launch via ssh")
534 help="Program to launch via ssh")
535 program_args = List([], config=True,
535 program_args = List([], config=True,
536 help="args to pass to remote program")
536 help="args to pass to remote program")
537 hostname = Unicode('', config=True,
537 hostname = Unicode('', config=True,
538 help="hostname on which to launch the program")
538 help="hostname on which to launch the program")
539 user = Unicode('', config=True,
539 user = Unicode('', config=True,
540 help="username for ssh")
540 help="username for ssh")
541 location = Unicode('', config=True,
541 location = Unicode('', config=True,
542 help="user@hostname location for ssh in one setting")
542 help="user@hostname location for ssh in one setting")
543
543
544 def _hostname_changed(self, name, old, new):
544 def _hostname_changed(self, name, old, new):
545 if self.user:
545 if self.user:
546 self.location = u'%s@%s' % (self.user, new)
546 self.location = u'%s@%s' % (self.user, new)
547 else:
547 else:
548 self.location = new
548 self.location = new
549
549
550 def _user_changed(self, name, old, new):
550 def _user_changed(self, name, old, new):
551 self.location = u'%s@%s' % (new, self.hostname)
551 self.location = u'%s@%s' % (new, self.hostname)
552
552
553 def find_args(self):
553 def find_args(self):
554 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 self.program + self.program_args
555 self.program + self.program_args
556
556
557 def start(self, profile_dir, hostname=None, user=None):
557 def start(self, profile_dir, hostname=None, user=None):
558 self.profile_dir = unicode(profile_dir)
558 self.profile_dir = unicode(profile_dir)
559 if hostname is not None:
559 if hostname is not None:
560 self.hostname = hostname
560 self.hostname = hostname
561 if user is not None:
561 if user is not None:
562 self.user = user
562 self.user = user
563
563
564 return super(SSHLauncher, self).start()
564 return super(SSHLauncher, self).start()
565
565
566 def signal(self, sig):
566 def signal(self, sig):
567 if self.state == 'running':
567 if self.state == 'running':
568 # send escaped ssh connection-closer
568 # send escaped ssh connection-closer
569 self.process.stdin.write('~.')
569 self.process.stdin.write('~.')
570 self.process.stdin.flush()
570 self.process.stdin.flush()
571
571
572
572
573
573
574 class SSHControllerLauncher(SSHLauncher):
574 class SSHControllerLauncher(SSHLauncher):
575
575
576 program = List(ipcontroller_cmd_argv, config=True,
576 program = List(ipcontroller_cmd_argv, config=True,
577 help="remote ipcontroller command.")
577 help="remote ipcontroller command.")
578 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
578 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
579 help="Command line arguments to ipcontroller.")
579 help="Command line arguments to ipcontroller.")
580
580
581
581
582 class SSHEngineLauncher(SSHLauncher):
582 class SSHEngineLauncher(SSHLauncher):
583 program = List(ipengine_cmd_argv, config=True,
583 program = List(ipengine_cmd_argv, config=True,
584 help="remote ipengine command.")
584 help="remote ipengine command.")
585 # Command line arguments for ipengine.
585 # Command line arguments for ipengine.
586 program_args = List(
586 program_args = List(
587 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
587 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
588 help="Command line arguments to ipengine."
588 help="Command line arguments to ipengine."
589 )
589 )
590
590
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 launcher_class = SSHEngineLauncher
592 launcher_class = SSHEngineLauncher
593 engines = Dict(config=True,
593 engines = Dict(config=True,
594 help="""dict of engines to launch. This is a dict by hostname of ints,
594 help="""dict of engines to launch. This is a dict by hostname of ints,
595 corresponding to the number of engines to start on that host.""")
595 corresponding to the number of engines to start on that host.""")
596
596
597 def start(self, n, profile_dir):
597 def start(self, n, profile_dir):
598 """Start engines by profile or profile_dir.
598 """Start engines by profile or profile_dir.
599 `n` is ignored, and the `engines` config property is used instead.
599 `n` is ignored, and the `engines` config property is used instead.
600 """
600 """
601
601
602 self.profile_dir = unicode(profile_dir)
602 self.profile_dir = unicode(profile_dir)
603 dlist = []
603 dlist = []
604 for host, n in self.engines.iteritems():
604 for host, n in self.engines.iteritems():
605 if isinstance(n, (tuple, list)):
605 if isinstance(n, (tuple, list)):
606 n, args = n
606 n, args = n
607 else:
607 else:
608 args = copy.deepcopy(self.engine_args)
608 args = copy.deepcopy(self.engine_args)
609
609
610 if '@' in host:
610 if '@' in host:
611 user,host = host.split('@',1)
611 user,host = host.split('@',1)
612 else:
612 else:
613 user=None
613 user=None
614 for i in range(n):
614 for i in range(n):
615 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
615 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
616
616
617 # Copy the engine args over to each engine launcher.
617 # Copy the engine args over to each engine launcher.
618 i
618 i
619 el.program_args = args
619 el.program_args = args
620 el.on_stop(self._notice_engine_stopped)
620 el.on_stop(self._notice_engine_stopped)
621 d = el.start(profile_dir, user=user, hostname=host)
621 d = el.start(profile_dir, user=user, hostname=host)
622 if i==0:
622 if i==0:
623 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
623 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
624 self.launchers[host+str(i)] = el
624 self.launchers[host+str(i)] = el
625 dlist.append(d)
625 dlist.append(d)
626 self.notify_start(dlist)
626 self.notify_start(dlist)
627 return dlist
627 return dlist
628
628
629
629
630
630
631 #-----------------------------------------------------------------------------
631 #-----------------------------------------------------------------------------
632 # Windows HPC Server 2008 scheduler launchers
632 # Windows HPC Server 2008 scheduler launchers
633 #-----------------------------------------------------------------------------
633 #-----------------------------------------------------------------------------
634
634
635
635
636 # This is only used on Windows.
636 # This is only used on Windows.
637 def find_job_cmd():
637 def find_job_cmd():
638 if WINDOWS:
638 if WINDOWS:
639 try:
639 try:
640 return find_cmd('job')
640 return find_cmd('job')
641 except (FindCmdError, ImportError):
641 except (FindCmdError, ImportError):
642 # ImportError will be raised if win32api is not installed
642 # ImportError will be raised if win32api is not installed
643 return 'job'
643 return 'job'
644 else:
644 else:
645 return 'job'
645 return 'job'
646
646
647
647
648 class WindowsHPCLauncher(BaseLauncher):
648 class WindowsHPCLauncher(BaseLauncher):
649
649
650 job_id_regexp = Unicode(r'\d+', config=True,
650 job_id_regexp = Unicode(r'\d+', config=True,
651 help="""A regular expression used to get the job id from the output of the
651 help="""A regular expression used to get the job id from the output of the
652 submit_command. """
652 submit_command. """
653 )
653 )
654 job_file_name = Unicode(u'ipython_job.xml', config=True,
654 job_file_name = Unicode(u'ipython_job.xml', config=True,
655 help="The filename of the instantiated job script.")
655 help="The filename of the instantiated job script.")
656 # The full path to the instantiated job script. This gets made dynamically
656 # The full path to the instantiated job script. This gets made dynamically
657 # by combining the work_dir with the job_file_name.
657 # by combining the work_dir with the job_file_name.
658 job_file = Unicode(u'')
658 job_file = Unicode(u'')
659 scheduler = Unicode('', config=True,
659 scheduler = Unicode('', config=True,
660 help="The hostname of the scheduler to submit the job to.")
660 help="The hostname of the scheduler to submit the job to.")
661 job_cmd = Unicode(find_job_cmd(), config=True,
661 job_cmd = Unicode(find_job_cmd(), config=True,
662 help="The command for submitting jobs.")
662 help="The command for submitting jobs.")
663
663
664 def __init__(self, work_dir=u'.', config=None, **kwargs):
664 def __init__(self, work_dir=u'.', config=None, **kwargs):
665 super(WindowsHPCLauncher, self).__init__(
665 super(WindowsHPCLauncher, self).__init__(
666 work_dir=work_dir, config=config, **kwargs
666 work_dir=work_dir, config=config, **kwargs
667 )
667 )
668
668
669 @property
669 @property
670 def job_file(self):
670 def job_file(self):
671 return os.path.join(self.work_dir, self.job_file_name)
671 return os.path.join(self.work_dir, self.job_file_name)
672
672
673 def write_job_file(self, n):
673 def write_job_file(self, n):
674 raise NotImplementedError("Implement write_job_file in a subclass.")
674 raise NotImplementedError("Implement write_job_file in a subclass.")
675
675
676 def find_args(self):
676 def find_args(self):
677 return [u'job.exe']
677 return [u'job.exe']
678
678
679 def parse_job_id(self, output):
679 def parse_job_id(self, output):
680 """Take the output of the submit command and return the job id."""
680 """Take the output of the submit command and return the job id."""
681 m = re.search(self.job_id_regexp, output)
681 m = re.search(self.job_id_regexp, output)
682 if m is not None:
682 if m is not None:
683 job_id = m.group()
683 job_id = m.group()
684 else:
684 else:
685 raise LauncherError("Job id couldn't be determined: %s" % output)
685 raise LauncherError("Job id couldn't be determined: %s" % output)
686 self.job_id = job_id
686 self.job_id = job_id
687 self.log.info('Job started with job id: %r' % job_id)
687 self.log.info('Job started with job id: %r' % job_id)
688 return job_id
688 return job_id
689
689
690 def start(self, n):
690 def start(self, n):
691 """Start n copies of the process using the Win HPC job scheduler."""
691 """Start n copies of the process using the Win HPC job scheduler."""
692 self.write_job_file(n)
692 self.write_job_file(n)
693 args = [
693 args = [
694 'submit',
694 'submit',
695 '/jobfile:%s' % self.job_file,
695 '/jobfile:%s' % self.job_file,
696 '/scheduler:%s' % self.scheduler
696 '/scheduler:%s' % self.scheduler
697 ]
697 ]
698 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
698 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
699 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
699 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
700 output = check_output([self.job_cmd]+args,
700 output = check_output([self.job_cmd]+args,
701 env=os.environ,
701 env=os.environ,
702 cwd=self.work_dir,
702 cwd=self.work_dir,
703 stderr=STDOUT
703 stderr=STDOUT
704 )
704 )
705 job_id = self.parse_job_id(output)
705 job_id = self.parse_job_id(output)
706 self.notify_start(job_id)
706 self.notify_start(job_id)
707 return job_id
707 return job_id
708
708
709 def stop(self):
709 def stop(self):
710 args = [
710 args = [
711 'cancel',
711 'cancel',
712 self.job_id,
712 self.job_id,
713 '/scheduler:%s' % self.scheduler
713 '/scheduler:%s' % self.scheduler
714 ]
714 ]
715 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
715 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
716 try:
716 try:
717 output = check_output([self.job_cmd]+args,
717 output = check_output([self.job_cmd]+args,
718 env=os.environ,
718 env=os.environ,
719 cwd=self.work_dir,
719 cwd=self.work_dir,
720 stderr=STDOUT
720 stderr=STDOUT
721 )
721 )
722 except:
722 except:
723 output = 'The job already appears to be stoppped: %r' % self.job_id
723 output = 'The job already appears to be stoppped: %r' % self.job_id
724 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
724 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
725 return output
725 return output
726
726
727
727
728 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
728 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
729
729
730 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
730 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
731 help="WinHPC xml job file.")
731 help="WinHPC xml job file.")
732 extra_args = List([], config=False,
732 extra_args = List([], config=False,
733 help="extra args to pass to ipcontroller")
733 help="extra args to pass to ipcontroller")
734
734
735 def write_job_file(self, n):
735 def write_job_file(self, n):
736 job = IPControllerJob(config=self.config)
736 job = IPControllerJob(config=self.config)
737
737
738 t = IPControllerTask(config=self.config)
738 t = IPControllerTask(config=self.config)
739 # The tasks work directory is *not* the actual work directory of
739 # The tasks work directory is *not* the actual work directory of
740 # the controller. It is used as the base path for the stdout/stderr
740 # the controller. It is used as the base path for the stdout/stderr
741 # files that the scheduler redirects to.
741 # files that the scheduler redirects to.
742 t.work_directory = self.profile_dir
742 t.work_directory = self.profile_dir
743 # Add the profile_dir and from self.start().
743 # Add the profile_dir and from self.start().
744 t.controller_args.extend(self.extra_args)
744 t.controller_args.extend(self.extra_args)
745 job.add_task(t)
745 job.add_task(t)
746
746
747 self.log.info("Writing job description file: %s" % self.job_file)
747 self.log.info("Writing job description file: %s" % self.job_file)
748 job.write(self.job_file)
748 job.write(self.job_file)
749
749
750 @property
750 @property
751 def job_file(self):
751 def job_file(self):
752 return os.path.join(self.profile_dir, self.job_file_name)
752 return os.path.join(self.profile_dir, self.job_file_name)
753
753
754 def start(self, profile_dir):
754 def start(self, profile_dir):
755 """Start the controller by profile_dir."""
755 """Start the controller by profile_dir."""
756 self.extra_args = ['profile_dir=%s'%profile_dir]
756 self.extra_args = ['profile_dir=%s'%profile_dir]
757 self.profile_dir = unicode(profile_dir)
757 self.profile_dir = unicode(profile_dir)
758 return super(WindowsHPCControllerLauncher, self).start(1)
758 return super(WindowsHPCControllerLauncher, self).start(1)
759
759
760
760
761 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
761 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
762
762
763 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
763 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
764 help="jobfile for ipengines job")
764 help="jobfile for ipengines job")
765 extra_args = List([], config=False,
765 extra_args = List([], config=False,
766 help="extra args to pas to ipengine")
766 help="extra args to pas to ipengine")
767
767
768 def write_job_file(self, n):
768 def write_job_file(self, n):
769 job = IPEngineSetJob(config=self.config)
769 job = IPEngineSetJob(config=self.config)
770
770
771 for i in range(n):
771 for i in range(n):
772 t = IPEngineTask(config=self.config)
772 t = IPEngineTask(config=self.config)
773 # The tasks work directory is *not* the actual work directory of
773 # The tasks work directory is *not* the actual work directory of
774 # the engine. It is used as the base path for the stdout/stderr
774 # the engine. It is used as the base path for the stdout/stderr
775 # files that the scheduler redirects to.
775 # files that the scheduler redirects to.
776 t.work_directory = self.profile_dir
776 t.work_directory = self.profile_dir
777 # Add the profile_dir and from self.start().
777 # Add the profile_dir and from self.start().
778 t.engine_args.extend(self.extra_args)
778 t.engine_args.extend(self.extra_args)
779 job.add_task(t)
779 job.add_task(t)
780
780
781 self.log.info("Writing job description file: %s" % self.job_file)
781 self.log.info("Writing job description file: %s" % self.job_file)
782 job.write(self.job_file)
782 job.write(self.job_file)
783
783
784 @property
784 @property
785 def job_file(self):
785 def job_file(self):
786 return os.path.join(self.profile_dir, self.job_file_name)
786 return os.path.join(self.profile_dir, self.job_file_name)
787
787
788 def start(self, n, profile_dir):
788 def start(self, n, profile_dir):
789 """Start the controller by profile_dir."""
789 """Start the controller by profile_dir."""
790 self.extra_args = ['profile_dir=%s'%profile_dir]
790 self.extra_args = ['profile_dir=%s'%profile_dir]
791 self.profile_dir = unicode(profile_dir)
791 self.profile_dir = unicode(profile_dir)
792 return super(WindowsHPCEngineSetLauncher, self).start(n)
792 return super(WindowsHPCEngineSetLauncher, self).start(n)
793
793
794
794
795 #-----------------------------------------------------------------------------
795 #-----------------------------------------------------------------------------
796 # Batch (PBS) system launchers
796 # Batch (PBS) system launchers
797 #-----------------------------------------------------------------------------
797 #-----------------------------------------------------------------------------
798
798
799 class BatchSystemLauncher(BaseLauncher):
799 class BatchSystemLauncher(BaseLauncher):
800 """Launch an external process using a batch system.
800 """Launch an external process using a batch system.
801
801
802 This class is designed to work with UNIX batch systems like PBS, LSF,
802 This class is designed to work with UNIX batch systems like PBS, LSF,
803 GridEngine, etc. The overall model is that there are different commands
803 GridEngine, etc. The overall model is that there are different commands
804 like qsub, qdel, etc. that handle the starting and stopping of the process.
804 like qsub, qdel, etc. that handle the starting and stopping of the process.
805
805
806 This class also has the notion of a batch script. The ``batch_template``
806 This class also has the notion of a batch script. The ``batch_template``
807 attribute can be set to a string that is a template for the batch script.
807 attribute can be set to a string that is a template for the batch script.
808 This template is instantiated using Itpl. Thus the template can use
808 This template is instantiated using Itpl. Thus the template can use
809 ${n} fot the number of instances. Subclasses can add additional variables
809 ${n} fot the number of instances. Subclasses can add additional variables
810 to the template dict.
810 to the template dict.
811 """
811 """
812
812
813 # Subclasses must fill these in. See PBSEngineSet
813 # Subclasses must fill these in. See PBSEngineSet
814 submit_command = List([''], config=True,
814 submit_command = List([''], config=True,
815 help="The name of the command line program used to submit jobs.")
815 help="The name of the command line program used to submit jobs.")
816 delete_command = List([''], config=True,
816 delete_command = List([''], config=True,
817 help="The name of the command line program used to delete jobs.")
817 help="The name of the command line program used to delete jobs.")
818 job_id_regexp = Unicode('', config=True,
818 job_id_regexp = Unicode('', config=True,
819 help="""A regular expression used to get the job id from the output of the
819 help="""A regular expression used to get the job id from the output of the
820 submit_command.""")
820 submit_command.""")
821 batch_template = Unicode('', config=True,
821 batch_template = Unicode('', config=True,
822 help="The string that is the batch script template itself.")
822 help="The string that is the batch script template itself.")
823 batch_template_file = Unicode(u'', config=True,
823 batch_template_file = Unicode(u'', config=True,
824 help="The file that contains the batch template.")
824 help="The file that contains the batch template.")
825 batch_file_name = Unicode(u'batch_script', config=True,
825 batch_file_name = Unicode(u'batch_script', config=True,
826 help="The filename of the instantiated batch script.")
826 help="The filename of the instantiated batch script.")
827 queue = Unicode(u'', config=True,
827 queue = Unicode(u'', config=True,
828 help="The PBS Queue.")
828 help="The PBS Queue.")
829
829
830 # not configurable, override in subclasses
830 # not configurable, override in subclasses
831 # PBS Job Array regex
831 # PBS Job Array regex
832 job_array_regexp = Unicode('')
832 job_array_regexp = Unicode('')
833 job_array_template = Unicode('')
833 job_array_template = Unicode('')
834 # PBS Queue regex
834 # PBS Queue regex
835 queue_regexp = Unicode('')
835 queue_regexp = Unicode('')
836 queue_template = Unicode('')
836 queue_template = Unicode('')
837 # The default batch template, override in subclasses
837 # The default batch template, override in subclasses
838 default_template = Unicode('')
838 default_template = Unicode('')
839 # The full path to the instantiated batch script.
839 # The full path to the instantiated batch script.
840 batch_file = Unicode(u'')
840 batch_file = Unicode(u'')
841 # the format dict used with batch_template:
841 # the format dict used with batch_template:
842 context = Dict()
842 context = Dict()
843
843
844
844
845 def find_args(self):
845 def find_args(self):
846 return self.submit_command + [self.batch_file]
846 return self.submit_command + [self.batch_file]
847
847
848 def __init__(self, work_dir=u'.', config=None, **kwargs):
848 def __init__(self, work_dir=u'.', config=None, **kwargs):
849 super(BatchSystemLauncher, self).__init__(
849 super(BatchSystemLauncher, self).__init__(
850 work_dir=work_dir, config=config, **kwargs
850 work_dir=work_dir, config=config, **kwargs
851 )
851 )
852 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
852 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
853
853
854 def parse_job_id(self, output):
854 def parse_job_id(self, output):
855 """Take the output of the submit command and return the job id."""
855 """Take the output of the submit command and return the job id."""
856 m = re.search(self.job_id_regexp, output)
856 m = re.search(self.job_id_regexp, output)
857 if m is not None:
857 if m is not None:
858 job_id = m.group()
858 job_id = m.group()
859 else:
859 else:
860 raise LauncherError("Job id couldn't be determined: %s" % output)
860 raise LauncherError("Job id couldn't be determined: %s" % output)
861 self.job_id = job_id
861 self.job_id = job_id
862 self.log.info('Job submitted with job id: %r' % job_id)
862 self.log.info('Job submitted with job id: %r' % job_id)
863 return job_id
863 return job_id
864
864
865 def write_batch_script(self, n):
865 def write_batch_script(self, n):
866 """Instantiate and write the batch script to the work_dir."""
866 """Instantiate and write the batch script to the work_dir."""
867 self.context['n'] = n
867 self.context['n'] = n
868 self.context['queue'] = self.queue
868 self.context['queue'] = self.queue
869 print self.context
869 print self.context
870 # first priority is batch_template if set
870 # first priority is batch_template if set
871 if self.batch_template_file and not self.batch_template:
871 if self.batch_template_file and not self.batch_template:
872 # second priority is batch_template_file
872 # second priority is batch_template_file
873 with open(self.batch_template_file) as f:
873 with open(self.batch_template_file) as f:
874 self.batch_template = f.read()
874 self.batch_template = f.read()
875 if not self.batch_template:
875 if not self.batch_template:
876 # third (last) priority is default_template
876 # third (last) priority is default_template
877 self.batch_template = self.default_template
877 self.batch_template = self.default_template
878
878
879 regex = re.compile(self.job_array_regexp)
879 regex = re.compile(self.job_array_regexp)
880 # print regex.search(self.batch_template)
880 # print regex.search(self.batch_template)
881 if not regex.search(self.batch_template):
881 if not regex.search(self.batch_template):
882 self.log.info("adding job array settings to batch script")
882 self.log.info("adding job array settings to batch script")
883 firstline, rest = self.batch_template.split('\n',1)
883 firstline, rest = self.batch_template.split('\n',1)
884 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
884 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
885
885
886 regex = re.compile(self.queue_regexp)
886 regex = re.compile(self.queue_regexp)
887 # print regex.search(self.batch_template)
887 # print regex.search(self.batch_template)
888 if self.queue and not regex.search(self.batch_template):
888 if self.queue and not regex.search(self.batch_template):
889 self.log.info("adding PBS queue settings to batch script")
889 self.log.info("adding PBS queue settings to batch script")
890 firstline, rest = self.batch_template.split('\n',1)
890 firstline, rest = self.batch_template.split('\n',1)
891 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
891 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
892
892
893 script_as_string = Itpl.itplns(self.batch_template, self.context)
893 script_as_string = Itpl.itplns(self.batch_template, self.context)
894 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
894 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
895
895
896 with open(self.batch_file, 'w') as f:
896 with open(self.batch_file, 'w') as f:
897 f.write(script_as_string)
897 f.write(script_as_string)
898 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
898 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
899
899
900 def start(self, n, profile_dir):
900 def start(self, n, profile_dir):
901 """Start n copies of the process using a batch system."""
901 """Start n copies of the process using a batch system."""
902 # Here we save profile and profile_dir in the context so they
902 # Here we save profile and profile_dir in the context so they
903 # can be used in the batch script template as ${profile} and
903 # can be used in the batch script template as ${profile} and
904 # ${profile_dir}
904 # ${profile_dir}
905 self.context['profile_dir'] = profile_dir
905 self.context['profile_dir'] = profile_dir
906 self.profile_dir = unicode(profile_dir)
906 self.profile_dir = unicode(profile_dir)
907 self.write_batch_script(n)
907 self.write_batch_script(n)
908 output = check_output(self.args, env=os.environ)
908 output = check_output(self.args, env=os.environ)
909
909
910 job_id = self.parse_job_id(output)
910 job_id = self.parse_job_id(output)
911 self.notify_start(job_id)
911 self.notify_start(job_id)
912 return job_id
912 return job_id
913
913
914 def stop(self):
914 def stop(self):
915 output = check_output(self.delete_command+[self.job_id], env=os.environ)
915 output = check_output(self.delete_command+[self.job_id], env=os.environ)
916 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
916 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
917 return output
917 return output
918
918
919
919
920 class PBSLauncher(BatchSystemLauncher):
920 class PBSLauncher(BatchSystemLauncher):
921 """A BatchSystemLauncher subclass for PBS."""
921 """A BatchSystemLauncher subclass for PBS."""
922
922
923 submit_command = List(['qsub'], config=True,
923 submit_command = List(['qsub'], config=True,
924 help="The PBS submit command ['qsub']")
924 help="The PBS submit command ['qsub']")
925 delete_command = List(['qdel'], config=True,
925 delete_command = List(['qdel'], config=True,
926 help="The PBS delete command ['qsub']")
926 help="The PBS delete command ['qsub']")
927 job_id_regexp = Unicode(r'\d+', config=True,
927 job_id_regexp = Unicode(r'\d+', config=True,
928 help="Regular expresion for identifying the job ID [r'\d+']")
928 help="Regular expresion for identifying the job ID [r'\d+']")
929
929
930 batch_file = Unicode(u'')
930 batch_file = Unicode(u'')
931 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
931 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
932 job_array_template = Unicode('#PBS -t 1-$n')
932 job_array_template = Unicode('#PBS -t 1-$n')
933 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
933 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
934 queue_template = Unicode('#PBS -q $queue')
934 queue_template = Unicode('#PBS -q $queue')
935
935
936
936
937 class PBSControllerLauncher(PBSLauncher):
937 class PBSControllerLauncher(PBSLauncher):
938 """Launch a controller using PBS."""
938 """Launch a controller using PBS."""
939
939
940 batch_file_name = Unicode(u'pbs_controller', config=True,
940 batch_file_name = Unicode(u'pbs_controller', config=True,
941 help="batch file name for the controller job.")
941 help="batch file name for the controller job.")
942 default_template= Unicode("""#!/bin/sh
942 default_template= Unicode("""#!/bin/sh
943 #PBS -V
943 #PBS -V
944 #PBS -N ipcontroller
944 #PBS -N ipcontroller
945 %s --log-to-file profile_dir $profile_dir
945 %s --log-to-file profile_dir $profile_dir
946 """%(' '.join(ipcontroller_cmd_argv)))
946 """%(' '.join(ipcontroller_cmd_argv)))
947
947
948 def start(self, profile_dir):
948 def start(self, profile_dir):
949 """Start the controller by profile or profile_dir."""
949 """Start the controller by profile or profile_dir."""
950 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
950 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
951 return super(PBSControllerLauncher, self).start(1, profile_dir)
951 return super(PBSControllerLauncher, self).start(1, profile_dir)
952
952
953
953
954 class PBSEngineSetLauncher(PBSLauncher):
954 class PBSEngineSetLauncher(PBSLauncher):
955 """Launch Engines using PBS"""
955 """Launch Engines using PBS"""
956 batch_file_name = Unicode(u'pbs_engines', config=True,
956 batch_file_name = Unicode(u'pbs_engines', config=True,
957 help="batch file name for the engine(s) job.")
957 help="batch file name for the engine(s) job.")
958 default_template= Unicode(u"""#!/bin/sh
958 default_template= Unicode(u"""#!/bin/sh
959 #PBS -V
959 #PBS -V
960 #PBS -N ipengine
960 #PBS -N ipengine
961 %s profile_dir $profile_dir
961 %s profile_dir $profile_dir
962 """%(' '.join(ipengine_cmd_argv)))
962 """%(' '.join(ipengine_cmd_argv)))
963
963
964 def start(self, n, profile_dir):
964 def start(self, n, profile_dir):
965 """Start n engines by profile or profile_dir."""
965 """Start n engines by profile or profile_dir."""
966 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
966 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
967 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
967 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
968
968
969 #SGE is very similar to PBS
969 #SGE is very similar to PBS
970
970
971 class SGELauncher(PBSLauncher):
971 class SGELauncher(PBSLauncher):
972 """Sun GridEngine is a PBS clone with slightly different syntax"""
972 """Sun GridEngine is a PBS clone with slightly different syntax"""
973 job_array_regexp = Unicode('#$$\W+-t\W+[\w\d\-\$]+')
973 job_array_regexp = Unicode('#$$\W+-t\W+[\w\d\-\$]+')
974 job_array_template = Unicode('#$$ -t 1-$n')
974 job_array_template = Unicode('#$$ -t 1-$n')
975 queue_regexp = Unicode('#$$\W+-q\W+\$?\w+')
975 queue_regexp = Unicode('#$$\W+-q\W+\$?\w+')
976 queue_template = Unicode('#$$ -q $queue')
976 queue_template = Unicode('#$$ -q $queue')
977
977
978 class SGEControllerLauncher(SGELauncher):
978 class SGEControllerLauncher(SGELauncher):
979 """Launch a controller using SGE."""
979 """Launch a controller using SGE."""
980
980
981 batch_file_name = Unicode(u'sge_controller', config=True,
981 batch_file_name = Unicode(u'sge_controller', config=True,
982 help="batch file name for the ipontroller job.")
982 help="batch file name for the ipontroller job.")
983 default_template= Unicode(u"""#$$ -V
983 default_template= Unicode(u"""#$$ -V
984 #$$ -S /bin/sh
984 #$$ -S /bin/sh
985 #$$ -N ipcontroller
985 #$$ -N ipcontroller
986 %s --log-to-file profile_dir=$profile_dir
986 %s --log-to-file profile_dir=$profile_dir
987 """%(' '.join(ipcontroller_cmd_argv)))
987 """%(' '.join(ipcontroller_cmd_argv)))
988
988
989 def start(self, profile_dir):
989 def start(self, profile_dir):
990 """Start the controller by profile or profile_dir."""
990 """Start the controller by profile or profile_dir."""
991 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
991 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
992 return super(PBSControllerLauncher, self).start(1, profile_dir)
992 return super(SGEControllerLauncher, self).start(1, profile_dir)
993
993
994 class SGEEngineSetLauncher(SGELauncher):
994 class SGEEngineSetLauncher(SGELauncher):
995 """Launch Engines with SGE"""
995 """Launch Engines with SGE"""
996 batch_file_name = Unicode(u'sge_engines', config=True,
996 batch_file_name = Unicode(u'sge_engines', config=True,
997 help="batch file name for the engine(s) job.")
997 help="batch file name for the engine(s) job.")
998 default_template = Unicode("""#$$ -V
998 default_template = Unicode("""#$$ -V
999 #$$ -S /bin/sh
999 #$$ -S /bin/sh
1000 #$$ -N ipengine
1000 #$$ -N ipengine
1001 %s profile_dir=$profile_dir
1001 %s profile_dir=$profile_dir
1002 """%(' '.join(ipengine_cmd_argv)))
1002 """%(' '.join(ipengine_cmd_argv)))
1003
1003
1004 def start(self, n, profile_dir):
1004 def start(self, n, profile_dir):
1005 """Start n engines by profile or profile_dir."""
1005 """Start n engines by profile or profile_dir."""
1006 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1006 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1007 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1007 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1008
1008
1009
1009
1010 #-----------------------------------------------------------------------------
1010 #-----------------------------------------------------------------------------
1011 # A launcher for ipcluster itself!
1011 # A launcher for ipcluster itself!
1012 #-----------------------------------------------------------------------------
1012 #-----------------------------------------------------------------------------
1013
1013
1014
1014
1015 class IPClusterLauncher(LocalProcessLauncher):
1015 class IPClusterLauncher(LocalProcessLauncher):
1016 """Launch the ipcluster program in an external process."""
1016 """Launch the ipcluster program in an external process."""
1017
1017
1018 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1018 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1019 help="Popen command for ipcluster")
1019 help="Popen command for ipcluster")
1020 ipcluster_args = List(
1020 ipcluster_args = List(
1021 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1021 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1022 help="Command line arguments to pass to ipcluster.")
1022 help="Command line arguments to pass to ipcluster.")
1023 ipcluster_subcommand = Unicode('start')
1023 ipcluster_subcommand = Unicode('start')
1024 ipcluster_n = Int(2)
1024 ipcluster_n = Int(2)
1025
1025
1026 def find_args(self):
1026 def find_args(self):
1027 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1027 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1028 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1028 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1029
1029
1030 def start(self):
1030 def start(self):
1031 self.log.info("Starting ipcluster: %r" % self.args)
1031 self.log.info("Starting ipcluster: %r" % self.args)
1032 return super(IPClusterLauncher, self).start()
1032 return super(IPClusterLauncher, self).start()
1033
1033
1034 #-----------------------------------------------------------------------------
1034 #-----------------------------------------------------------------------------
1035 # Collections of launchers
1035 # Collections of launchers
1036 #-----------------------------------------------------------------------------
1036 #-----------------------------------------------------------------------------
1037
1037
1038 local_launchers = [
1038 local_launchers = [
1039 LocalControllerLauncher,
1039 LocalControllerLauncher,
1040 LocalEngineLauncher,
1040 LocalEngineLauncher,
1041 LocalEngineSetLauncher,
1041 LocalEngineSetLauncher,
1042 ]
1042 ]
1043 mpi_launchers = [
1043 mpi_launchers = [
1044 MPIExecLauncher,
1044 MPIExecLauncher,
1045 MPIExecControllerLauncher,
1045 MPIExecControllerLauncher,
1046 MPIExecEngineSetLauncher,
1046 MPIExecEngineSetLauncher,
1047 ]
1047 ]
1048 ssh_launchers = [
1048 ssh_launchers = [
1049 SSHLauncher,
1049 SSHLauncher,
1050 SSHControllerLauncher,
1050 SSHControllerLauncher,
1051 SSHEngineLauncher,
1051 SSHEngineLauncher,
1052 SSHEngineSetLauncher,
1052 SSHEngineSetLauncher,
1053 ]
1053 ]
1054 winhpc_launchers = [
1054 winhpc_launchers = [
1055 WindowsHPCLauncher,
1055 WindowsHPCLauncher,
1056 WindowsHPCControllerLauncher,
1056 WindowsHPCControllerLauncher,
1057 WindowsHPCEngineSetLauncher,
1057 WindowsHPCEngineSetLauncher,
1058 ]
1058 ]
1059 pbs_launchers = [
1059 pbs_launchers = [
1060 PBSLauncher,
1060 PBSLauncher,
1061 PBSControllerLauncher,
1061 PBSControllerLauncher,
1062 PBSEngineSetLauncher,
1062 PBSEngineSetLauncher,
1063 ]
1063 ]
1064 sge_launchers = [
1064 sge_launchers = [
1065 SGELauncher,
1065 SGELauncher,
1066 SGEControllerLauncher,
1066 SGEControllerLauncher,
1067 SGEEngineSetLauncher,
1067 SGEEngineSetLauncher,
1068 ]
1068 ]
1069 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1069 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1070 + pbs_launchers + sge_launchers No newline at end of file
1070 + pbs_launchers + sge_launchers
General Comments 0
You need to be logged in to leave comments. Login now