##// END OF EJS Templates
Replace Itpl with str.format in parallel launcher....
Thomas Kluyver -
Show More
@@ -1,241 +1,242 b''
1 1 import os
2 2
3 3 c = get_config()
4 4
5 5 #-----------------------------------------------------------------------------
6 6 # Select which launchers to use
7 7 #-----------------------------------------------------------------------------
8 8
9 9 # This allows you to control what method is used to start the controller
10 10 # and engines. The following methods are currently supported:
11 11 # - Start as a regular process on localhost.
12 12 # - Start using mpiexec.
13 13 # - Start using the Windows HPC Server 2008 scheduler
14 14 # - Start using PBS/SGE
15 15 # - Start using SSH
16 16
17 17
18 18 # The selected launchers can be configured below.
19 19
20 20 # Options are:
21 21 # - LocalControllerLauncher
22 22 # - MPIExecControllerLauncher
23 23 # - PBSControllerLauncher
24 24 # - SGEControllerLauncher
25 25 # - WindowsHPCControllerLauncher
26 26 # c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.LocalControllerLauncher'
27 27 # c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.PBSControllerLauncher'
28 28
29 29 # Options are:
30 30 # - LocalEngineSetLauncher
31 31 # - MPIExecEngineSetLauncher
32 32 # - PBSEngineSetLauncher
33 33 # - SGEEngineSetLauncher
34 34 # - WindowsHPCEngineSetLauncher
35 35 # c.IPClusterEnginesApp.engine_launcher = 'IPython.parallel.apps.launcher.LocalEngineSetLauncher'
36 36
37 37 #-----------------------------------------------------------------------------
38 38 # Application configuration
39 39 #-----------------------------------------------------------------------------
40 40
41 41 # The default number of engines that will be started. This is overridden by
42 42 # the -n command line option: "ipcluster start -n 4"
43 43 # c.IPClusterEnginesApp.n = 2
44 44
45 45 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
46 46 # c.BaseParallelApp.log_to_file = False
47 47
48 48 # Remove old logs from cluster_dir/log before starting.
49 49 # c.BaseParallelApp.clean_logs = True
50 50
51 51 # The working directory for the process. The application will use os.chdir
52 52 # to change to this directory before starting.
53 53 # c.BaseParallelApp.work_dir = os.getcwd()
54 54
55 55
56 56 #-----------------------------------------------------------------------------
57 57 # Local process launchers
58 58 #-----------------------------------------------------------------------------
59 59
60 60 # The command line arguments to call the controller with.
61 61 # c.LocalControllerLauncher.controller_args = \
62 62 # ['--log-to-file','--log-level', '40']
63 63
64 64 # The working directory for the controller
65 65 # c.LocalEngineSetLauncher.work_dir = u''
66 66
67 67 # Command line argument passed to the engines.
68 68 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
69 69
70 70 #-----------------------------------------------------------------------------
71 71 # MPIExec launchers
72 72 #-----------------------------------------------------------------------------
73 73
74 74 # The mpiexec/mpirun command to use in both the controller and engines.
75 75 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
76 76
77 77 # Additional arguments to pass to the actual mpiexec command.
78 78 # c.MPIExecLauncher.mpi_args = []
79 79
80 80 # The mpiexec/mpirun command and args can be overridden if they should be different
81 81 # for controller and engines.
82 82 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
83 83 # c.MPIExecControllerLauncher.mpi_args = []
84 84 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
85 85 # c.MPIExecEngineSetLauncher.mpi_args = []
86 86
87 87 # The command line argument to call the controller with.
88 88 # c.MPIExecControllerLauncher.controller_args = \
89 89 # ['--log-to-file','--log-level', '40']
90 90
91 91 # Command line argument passed to the engines.
92 92 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
93 93
94 94 # The default number of engines to start if not given elsewhere.
95 95 # c.MPIExecEngineSetLauncher.n = 1
96 96
97 97 #-----------------------------------------------------------------------------
98 98 # SSH launchers
99 99 #-----------------------------------------------------------------------------
100 100
101 101 # ipclusterz can be used to launch controller and engines remotely via ssh.
102 102 # Note that currently ipclusterz does not do any file distribution, so if
103 103 # machines are not on a shared filesystem, config and json files must be
104 104 # distributed. For this reason, the reuse_files defaults to True on an
105 105 # ssh-launched Controller. This flag can be overridded by the program_args
106 106 # attribute of c.SSHControllerLauncher.
107 107
108 108 # set the ssh cmd for launching remote commands. The default is ['ssh']
109 109 # c.SSHLauncher.ssh_cmd = ['ssh']
110 110
111 111 # set the ssh cmd for launching remote commands. The default is ['ssh']
112 112 # c.SSHLauncher.ssh_args = ['tt']
113 113
114 114 # Set the user and hostname for the controller
115 115 # c.SSHControllerLauncher.hostname = 'controller.example.com'
116 116 # c.SSHControllerLauncher.user = os.environ.get('USER','username')
117 117
118 118 # Set the arguments to be passed to ipcontrollerz
119 119 # note that remotely launched ipcontrollerz will not get the contents of
120 120 # the local ipcontrollerz_config.py unless it resides on the *remote host*
121 121 # in the location specified by the --cluster_dir argument.
122 122 # c.SSHControllerLauncher.program_args = ['-r', '-ip', '0.0.0.0', '--cluster_dir', '/path/to/cd']
123 123
124 # Set the default args passed to ipenginez for SSH launched engines
124 # Set the default args passed to ipengine for SSH launched engines
125 125 # c.SSHEngineSetLauncher.engine_args = ['--mpi', 'mpi4py']
126 126
127 127 # SSH engines are launched as a dict of locations/n-engines.
128 128 # if a value is a tuple instead of an int, it is assumed to be of the form
129 # (n, [args]), setting the arguments to passed to ipenginez on `host`.
129 # (n, [args]), setting the arguments to passed to ipengine on `host`.
130 130 # otherwise, c.SSHEngineSetLauncher.engine_args will be used as the default.
131 131
132 132 # In this case, there will be 3 engines at my.example.com, and
133 133 # 2 at you@ipython.scipy.org with a special json connector location.
134 134 # c.SSHEngineSetLauncher.engines = {'my.example.com' : 3,
135 135 # 'you@ipython.scipy.org' : (2, ['-f', '/path/to/ipcontroller-engine.json']}
136 136 # }
137 137
138 138 #-----------------------------------------------------------------------------
139 139 # Unix batch (PBS) schedulers launchers
140 140 #-----------------------------------------------------------------------------
141 141
142 142 # SGE and PBS are very similar. All configurables in this section called 'PBS*'
143 143 # also exist as 'SGE*'.
144 144
145 145 # The command line program to use to submit a PBS job.
146 146 # c.PBSLauncher.submit_command = ['qsub']
147 147
148 148 # The command line program to use to delete a PBS job.
149 149 # c.PBSLauncher.delete_command = ['qdel']
150 150
151 151 # The PBS queue in which the job should run
152 152 # c.PBSLauncher.queue = 'myqueue'
153 153
154 154 # A regular expression that takes the output of qsub and find the job id.
155 155 # c.PBSLauncher.job_id_regexp = r'\d+'
156 156
157 157 # If for some reason the Controller and Engines have different options above, they
158 158 # can be set as c.PBSControllerLauncher.<option> etc.
159 159
160 160 # PBS and SGE have default templates, but you can specify your own, either as strings
161 161 # or from files, as described here:
162 162
163 163 # The batch submission script used to start the controller. This is where
164 164 # environment variables would be setup, etc. This string is interpreted using
165 # the Itpl module in IPython.external. Basically, you can use ${n} for the
166 # number of engine and ${cluster_dir} for the cluster_dir.
165 # Python's string formatting. Basically, you can use {queue} for the name
166 # of the PBS queue, and {profile_dir} for the profile_dir.
167 167 # c.PBSControllerLauncher.batch_template = """
168 168 # #PBS -N ipcontroller
169 # #PBS -q $queue
169 # #PBS -q {queue}
170 170 #
171 # ipcontrollerz --cluster-dir $cluster_dir
171 # ipcontroller profile_dir={profile_dir}
172 172 # """
173 173
174 174 # You can also load this template from a file
175 175 # c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
176 176
177 177 # The name of the instantiated batch script that will actually be used to
178 178 # submit the job. This will be written to the cluster directory.
179 179 # c.PBSControllerLauncher.batch_file_name = u'pbs_controller'
180 180
181 181 # The batch submission script used to start the engines. This is where
182 182 # environment variables would be setup, etc. This string is interpreted using
183 # the Itpl module in IPython.external. Basically, you can use ${n} for the
184 # number of engine and ${cluster_dir} for the cluster_dir.
183 # Python's string formatting. Basically, you can use {queue} for the name
184 # of the PBS queue, and {profile_dir} for the profile_dir, and {n}
185 # for the number of engines.
185 186 # c.PBSEngineSetLauncher.batch_template = """
186 # #PBS -N ipcontroller
187 # #PBS -l nprocs=$n
187 # #PBS -N ipengine
188 # #PBS -l nprocs={n}
188 189 #
189 # ipenginez --cluster-dir $cluster_dir$s
190 # ipengine profile_dir={profile_dir}
190 191 # """
191 192
192 193 # You can also load this template from a file
193 194 # c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
194 195
195 196 # The name of the instantiated batch script that will actually be used to
196 197 # submit the job. This will be written to the cluster directory.
197 198 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_engines'
198 199
199 200
200 201
201 202 #-----------------------------------------------------------------------------
202 203 # Windows HPC Server 2008 launcher configuration
203 204 #-----------------------------------------------------------------------------
204 205
205 206 # c.IPControllerJob.job_name = 'IPController'
206 207 # c.IPControllerJob.is_exclusive = False
207 208 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
208 209 # c.IPControllerJob.priority = 'Highest'
209 210 # c.IPControllerJob.requested_nodes = ''
210 211 # c.IPControllerJob.project = 'MyProject'
211 212
212 213 # c.IPControllerTask.task_name = 'IPController'
213 214 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
214 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
215 # c.IPControllerTask.controller_args = ['--log-to-file', 'log_level=40']
215 216 # c.IPControllerTask.environment_variables = {}
216 217
217 218 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
218 219 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
219 220
220 221
221 222 # c.IPEngineSetJob.job_name = 'IPEngineSet'
222 223 # c.IPEngineSetJob.is_exclusive = False
223 224 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
224 225 # c.IPEngineSetJob.priority = 'Highest'
225 226 # c.IPEngineSetJob.requested_nodes = ''
226 227 # c.IPEngineSetJob.project = 'MyProject'
227 228
228 229 # c.IPEngineTask.task_name = 'IPEngine'
229 230 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
230 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
231 # c.IPEngineTask.engine_args = ['--log-to-file', 'log_level=40']
231 232 # c.IPEngineTask.environment_variables = {}
232 233
233 234 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
234 235 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
235 236
236 237
237 238
238 239
239 240
240 241
241 242
@@ -1,1070 +1,1067 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import os
21 21 import re
22 22 import stat
23 23
24 24 # signal imports, handling various platforms, versions
25 25
26 26 from signal import SIGINT, SIGTERM
27 27 try:
28 28 from signal import SIGKILL
29 29 except ImportError:
30 30 # Windows
31 31 SIGKILL=SIGTERM
32 32
33 33 try:
34 34 # Windows >= 2.7, 3.2
35 35 from signal import CTRL_C_EVENT as SIGINT
36 36 except ImportError:
37 37 pass
38 38
39 39 from subprocess import Popen, PIPE, STDOUT
40 40 try:
41 41 from subprocess import check_output
42 42 except ImportError:
43 43 # pre-2.7, define check_output with Popen
44 44 def check_output(*args, **kwargs):
45 45 kwargs.update(dict(stdout=PIPE))
46 46 p = Popen(*args, **kwargs)
47 47 out,err = p.communicate()
48 48 return out
49 49
50 50 from zmq.eventloop import ioloop
51 51
52 from IPython.external import Itpl
53 52 # from IPython.config.configurable import Configurable
54 53 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
55 54 from IPython.utils.path import get_ipython_module_path
56 55 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
57 56
58 57 from IPython.parallel.factory import LoggingFactory
59 58
60 59 from .win32support import forward_read_events
61 60
62 61 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
63 62
64 63 WINDOWS = os.name == 'nt'
65 64
66 65 #-----------------------------------------------------------------------------
67 66 # Paths to the kernel apps
68 67 #-----------------------------------------------------------------------------
69 68
70 69
71 70 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
72 71 'IPython.parallel.apps.ipclusterapp'
73 72 ))
74 73
75 74 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
76 75 'IPython.parallel.apps.ipengineapp'
77 76 ))
78 77
79 78 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
80 79 'IPython.parallel.apps.ipcontrollerapp'
81 80 ))
82 81
83 82 #-----------------------------------------------------------------------------
84 83 # Base launchers and errors
85 84 #-----------------------------------------------------------------------------
86 85
87 86
88 87 class LauncherError(Exception):
89 88 pass
90 89
91 90
92 91 class ProcessStateError(LauncherError):
93 92 pass
94 93
95 94
96 95 class UnknownStatus(LauncherError):
97 96 pass
98 97
99 98
100 99 class BaseLauncher(LoggingFactory):
101 100 """An asbtraction for starting, stopping and signaling a process."""
102 101
103 102 # In all of the launchers, the work_dir is where child processes will be
104 103 # run. This will usually be the profile_dir, but may not be. any work_dir
105 104 # passed into the __init__ method will override the config value.
106 105 # This should not be used to set the work_dir for the actual engine
107 106 # and controller. Instead, use their own config files or the
108 107 # controller_args, engine_args attributes of the launchers to add
109 108 # the work_dir option.
110 109 work_dir = Unicode(u'.')
111 110 loop = Instance('zmq.eventloop.ioloop.IOLoop')
112 111
113 112 start_data = Any()
114 113 stop_data = Any()
115 114
116 115 def _loop_default(self):
117 116 return ioloop.IOLoop.instance()
118 117
119 118 def __init__(self, work_dir=u'.', config=None, **kwargs):
120 119 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
121 120 self.state = 'before' # can be before, running, after
122 121 self.stop_callbacks = []
123 122 self.start_data = None
124 123 self.stop_data = None
125 124
126 125 @property
127 126 def args(self):
128 127 """A list of cmd and args that will be used to start the process.
129 128
130 129 This is what is passed to :func:`spawnProcess` and the first element
131 130 will be the process name.
132 131 """
133 132 return self.find_args()
134 133
135 134 def find_args(self):
136 135 """The ``.args`` property calls this to find the args list.
137 136
138 137 Subcommand should implement this to construct the cmd and args.
139 138 """
140 139 raise NotImplementedError('find_args must be implemented in a subclass')
141 140
142 141 @property
143 142 def arg_str(self):
144 143 """The string form of the program arguments."""
145 144 return ' '.join(self.args)
146 145
147 146 @property
148 147 def running(self):
149 148 """Am I running."""
150 149 if self.state == 'running':
151 150 return True
152 151 else:
153 152 return False
154 153
155 154 def start(self):
156 155 """Start the process.
157 156
158 157 This must return a deferred that fires with information about the
159 158 process starting (like a pid, job id, etc.).
160 159 """
161 160 raise NotImplementedError('start must be implemented in a subclass')
162 161
163 162 def stop(self):
164 163 """Stop the process and notify observers of stopping.
165 164
166 165 This must return a deferred that fires with information about the
167 166 processing stopping, like errors that occur while the process is
168 167 attempting to be shut down. This deferred won't fire when the process
169 168 actually stops. To observe the actual process stopping, see
170 169 :func:`observe_stop`.
171 170 """
172 171 raise NotImplementedError('stop must be implemented in a subclass')
173 172
174 173 def on_stop(self, f):
175 174 """Get a deferred that will fire when the process stops.
176 175
177 176 The deferred will fire with data that contains information about
178 177 the exit status of the process.
179 178 """
180 179 if self.state=='after':
181 180 return f(self.stop_data)
182 181 else:
183 182 self.stop_callbacks.append(f)
184 183
185 184 def notify_start(self, data):
186 185 """Call this to trigger startup actions.
187 186
188 187 This logs the process startup and sets the state to 'running'. It is
189 188 a pass-through so it can be used as a callback.
190 189 """
191 190
192 191 self.log.info('Process %r started: %r' % (self.args[0], data))
193 192 self.start_data = data
194 193 self.state = 'running'
195 194 return data
196 195
197 196 def notify_stop(self, data):
198 197 """Call this to trigger process stop actions.
199 198
200 199 This logs the process stopping and sets the state to 'after'. Call
201 200 this to trigger all the deferreds from :func:`observe_stop`."""
202 201
203 202 self.log.info('Process %r stopped: %r' % (self.args[0], data))
204 203 self.stop_data = data
205 204 self.state = 'after'
206 205 for i in range(len(self.stop_callbacks)):
207 206 d = self.stop_callbacks.pop()
208 207 d(data)
209 208 return data
210 209
211 210 def signal(self, sig):
212 211 """Signal the process.
213 212
214 213 Return a semi-meaningless deferred after signaling the process.
215 214
216 215 Parameters
217 216 ----------
218 217 sig : str or int
219 218 'KILL', 'INT', etc., or any signal number
220 219 """
221 220 raise NotImplementedError('signal must be implemented in a subclass')
222 221
223 222
224 223 #-----------------------------------------------------------------------------
225 224 # Local process launchers
226 225 #-----------------------------------------------------------------------------
227 226
228 227
229 228 class LocalProcessLauncher(BaseLauncher):
230 229 """Start and stop an external process in an asynchronous manner.
231 230
232 231 This will launch the external process with a working directory of
233 232 ``self.work_dir``.
234 233 """
235 234
236 235 # This is used to to construct self.args, which is passed to
237 236 # spawnProcess.
238 237 cmd_and_args = List([])
239 238 poll_frequency = Int(100) # in ms
240 239
241 240 def __init__(self, work_dir=u'.', config=None, **kwargs):
242 241 super(LocalProcessLauncher, self).__init__(
243 242 work_dir=work_dir, config=config, **kwargs
244 243 )
245 244 self.process = None
246 245 self.start_deferred = None
247 246 self.poller = None
248 247
249 248 def find_args(self):
250 249 return self.cmd_and_args
251 250
252 251 def start(self):
253 252 if self.state == 'before':
254 253 self.process = Popen(self.args,
255 254 stdout=PIPE,stderr=PIPE,stdin=PIPE,
256 255 env=os.environ,
257 256 cwd=self.work_dir
258 257 )
259 258 if WINDOWS:
260 259 self.stdout = forward_read_events(self.process.stdout)
261 260 self.stderr = forward_read_events(self.process.stderr)
262 261 else:
263 262 self.stdout = self.process.stdout.fileno()
264 263 self.stderr = self.process.stderr.fileno()
265 264 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
266 265 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
267 266 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
268 267 self.poller.start()
269 268 self.notify_start(self.process.pid)
270 269 else:
271 270 s = 'The process was already started and has state: %r' % self.state
272 271 raise ProcessStateError(s)
273 272
274 273 def stop(self):
275 274 return self.interrupt_then_kill()
276 275
277 276 def signal(self, sig):
278 277 if self.state == 'running':
279 278 if WINDOWS and sig != SIGINT:
280 279 # use Windows tree-kill for better child cleanup
281 280 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
282 281 else:
283 282 self.process.send_signal(sig)
284 283
285 284 def interrupt_then_kill(self, delay=2.0):
286 285 """Send INT, wait a delay and then send KILL."""
287 286 try:
288 287 self.signal(SIGINT)
289 288 except Exception:
290 289 self.log.debug("interrupt failed")
291 290 pass
292 291 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
293 292 self.killer.start()
294 293
295 294 # callbacks, etc:
296 295
297 296 def handle_stdout(self, fd, events):
298 297 if WINDOWS:
299 298 line = self.stdout.recv()
300 299 else:
301 300 line = self.process.stdout.readline()
302 301 # a stopped process will be readable but return empty strings
303 302 if line:
304 303 self.log.info(line[:-1])
305 304 else:
306 305 self.poll()
307 306
308 307 def handle_stderr(self, fd, events):
309 308 if WINDOWS:
310 309 line = self.stderr.recv()
311 310 else:
312 311 line = self.process.stderr.readline()
313 312 # a stopped process will be readable but return empty strings
314 313 if line:
315 314 self.log.error(line[:-1])
316 315 else:
317 316 self.poll()
318 317
319 318 def poll(self):
320 319 status = self.process.poll()
321 320 if status is not None:
322 321 self.poller.stop()
323 322 self.loop.remove_handler(self.stdout)
324 323 self.loop.remove_handler(self.stderr)
325 324 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
326 325 return status
327 326
328 327 class LocalControllerLauncher(LocalProcessLauncher):
329 328 """Launch a controller as a regular external process."""
330 329
331 330 controller_cmd = List(ipcontroller_cmd_argv, config=True,
332 331 help="""Popen command to launch ipcontroller.""")
333 332 # Command line arguments to ipcontroller.
334 333 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
335 334 help="""command-line args to pass to ipcontroller""")
336 335
337 336 def find_args(self):
338 337 return self.controller_cmd + self.controller_args
339 338
340 339 def start(self, profile_dir):
341 340 """Start the controller by profile_dir."""
342 341 self.controller_args.extend(['profile_dir=%s'%profile_dir])
343 342 self.profile_dir = unicode(profile_dir)
344 343 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
345 344 return super(LocalControllerLauncher, self).start()
346 345
347 346
348 347 class LocalEngineLauncher(LocalProcessLauncher):
349 348 """Launch a single engine as a regular externall process."""
350 349
351 350 engine_cmd = List(ipengine_cmd_argv, config=True,
352 351 help="""command to launch the Engine.""")
353 352 # Command line arguments for ipengine.
354 353 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
355 354 help="command-line arguments to pass to ipengine"
356 355 )
357 356
358 357 def find_args(self):
359 358 return self.engine_cmd + self.engine_args
360 359
361 360 def start(self, profile_dir):
362 361 """Start the engine by profile_dir."""
363 362 self.engine_args.extend(['profile_dir=%s'%profile_dir])
364 363 self.profile_dir = unicode(profile_dir)
365 364 return super(LocalEngineLauncher, self).start()
366 365
367 366
368 367 class LocalEngineSetLauncher(BaseLauncher):
369 368 """Launch a set of engines as regular external processes."""
370 369
371 370 # Command line arguments for ipengine.
372 371 engine_args = List(
373 372 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
374 373 help="command-line arguments to pass to ipengine"
375 374 )
376 375 # launcher class
377 376 launcher_class = LocalEngineLauncher
378 377
379 378 launchers = Dict()
380 379 stop_data = Dict()
381 380
382 381 def __init__(self, work_dir=u'.', config=None, **kwargs):
383 382 super(LocalEngineSetLauncher, self).__init__(
384 383 work_dir=work_dir, config=config, **kwargs
385 384 )
386 385 self.stop_data = {}
387 386
388 387 def start(self, n, profile_dir):
389 388 """Start n engines by profile or profile_dir."""
390 389 self.profile_dir = unicode(profile_dir)
391 390 dlist = []
392 391 for i in range(n):
393 392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
394 393 # Copy the engine args over to each engine launcher.
395 394 el.engine_args = copy.deepcopy(self.engine_args)
396 395 el.on_stop(self._notice_engine_stopped)
397 396 d = el.start(profile_dir)
398 397 if i==0:
399 398 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 399 self.launchers[i] = el
401 400 dlist.append(d)
402 401 self.notify_start(dlist)
403 402 # The consumeErrors here could be dangerous
404 403 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 404 # dfinal.addCallback(self.notify_start)
406 405 return dlist
407 406
408 407 def find_args(self):
409 408 return ['engine set']
410 409
411 410 def signal(self, sig):
412 411 dlist = []
413 412 for el in self.launchers.itervalues():
414 413 d = el.signal(sig)
415 414 dlist.append(d)
416 415 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 416 return dlist
418 417
419 418 def interrupt_then_kill(self, delay=1.0):
420 419 dlist = []
421 420 for el in self.launchers.itervalues():
422 421 d = el.interrupt_then_kill(delay)
423 422 dlist.append(d)
424 423 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 424 return dlist
426 425
427 426 def stop(self):
428 427 return self.interrupt_then_kill()
429 428
430 429 def _notice_engine_stopped(self, data):
431 430 pid = data['pid']
432 431 for idx,el in self.launchers.iteritems():
433 432 if el.process.pid == pid:
434 433 break
435 434 self.launchers.pop(idx)
436 435 self.stop_data[idx] = data
437 436 if not self.launchers:
438 437 self.notify_stop(self.stop_data)
439 438
440 439
441 440 #-----------------------------------------------------------------------------
442 441 # MPIExec launchers
443 442 #-----------------------------------------------------------------------------
444 443
445 444
446 445 class MPIExecLauncher(LocalProcessLauncher):
447 446 """Launch an external process using mpiexec."""
448 447
449 448 mpi_cmd = List(['mpiexec'], config=True,
450 449 help="The mpiexec command to use in starting the process."
451 450 )
452 451 mpi_args = List([], config=True,
453 452 help="The command line arguments to pass to mpiexec."
454 453 )
455 454 program = List(['date'], config=True,
456 455 help="The program to start via mpiexec.")
457 456 program_args = List([], config=True,
458 457 help="The command line argument to the program."
459 458 )
460 459 n = Int(1)
461 460
462 461 def find_args(self):
463 462 """Build self.args using all the fields."""
464 463 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 464 self.program + self.program_args
466 465
467 466 def start(self, n):
468 467 """Start n instances of the program using mpiexec."""
469 468 self.n = n
470 469 return super(MPIExecLauncher, self).start()
471 470
472 471
473 472 class MPIExecControllerLauncher(MPIExecLauncher):
474 473 """Launch a controller using mpiexec."""
475 474
476 475 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 476 help="Popen command to launch the Contropper"
478 477 )
479 478 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
480 479 help="Command line arguments to pass to ipcontroller."
481 480 )
482 481 n = Int(1)
483 482
484 483 def start(self, profile_dir):
485 484 """Start the controller by profile_dir."""
486 485 self.controller_args.extend(['profile_dir=%s'%profile_dir])
487 486 self.profile_dir = unicode(profile_dir)
488 487 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 488 return super(MPIExecControllerLauncher, self).start(1)
490 489
491 490 def find_args(self):
492 491 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
493 492 self.controller_cmd + self.controller_args
494 493
495 494
496 495 class MPIExecEngineSetLauncher(MPIExecLauncher):
497 496
498 497 program = List(ipengine_cmd_argv, config=True,
499 498 help="Popen command for ipengine"
500 499 )
501 500 program_args = List(
502 501 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
503 502 help="Command line arguments for ipengine."
504 503 )
505 504 n = Int(1)
506 505
507 506 def start(self, n, profile_dir):
508 507 """Start n engines by profile or profile_dir."""
509 508 self.program_args.extend(['profile_dir=%s'%profile_dir])
510 509 self.profile_dir = unicode(profile_dir)
511 510 self.n = n
512 511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 512 return super(MPIExecEngineSetLauncher, self).start(n)
514 513
515 514 #-----------------------------------------------------------------------------
516 515 # SSH launchers
517 516 #-----------------------------------------------------------------------------
518 517
519 518 # TODO: Get SSH Launcher working again.
520 519
521 520 class SSHLauncher(LocalProcessLauncher):
522 521 """A minimal launcher for ssh.
523 522
524 523 To be useful this will probably have to be extended to use the ``sshx``
525 524 idea for environment variables. There could be other things this needs
526 525 as well.
527 526 """
528 527
529 528 ssh_cmd = List(['ssh'], config=True,
530 529 help="command for starting ssh")
531 530 ssh_args = List(['-tt'], config=True,
532 531 help="args to pass to ssh")
533 532 program = List(['date'], config=True,
534 533 help="Program to launch via ssh")
535 534 program_args = List([], config=True,
536 535 help="args to pass to remote program")
537 536 hostname = Unicode('', config=True,
538 537 help="hostname on which to launch the program")
539 538 user = Unicode('', config=True,
540 539 help="username for ssh")
541 540 location = Unicode('', config=True,
542 541 help="user@hostname location for ssh in one setting")
543 542
544 543 def _hostname_changed(self, name, old, new):
545 544 if self.user:
546 545 self.location = u'%s@%s' % (self.user, new)
547 546 else:
548 547 self.location = new
549 548
550 549 def _user_changed(self, name, old, new):
551 550 self.location = u'%s@%s' % (new, self.hostname)
552 551
553 552 def find_args(self):
554 553 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 554 self.program + self.program_args
556 555
557 556 def start(self, profile_dir, hostname=None, user=None):
558 557 self.profile_dir = unicode(profile_dir)
559 558 if hostname is not None:
560 559 self.hostname = hostname
561 560 if user is not None:
562 561 self.user = user
563 562
564 563 return super(SSHLauncher, self).start()
565 564
566 565 def signal(self, sig):
567 566 if self.state == 'running':
568 567 # send escaped ssh connection-closer
569 568 self.process.stdin.write('~.')
570 569 self.process.stdin.flush()
571 570
572 571
573 572
574 573 class SSHControllerLauncher(SSHLauncher):
575 574
576 575 program = List(ipcontroller_cmd_argv, config=True,
577 576 help="remote ipcontroller command.")
578 577 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
579 578 help="Command line arguments to ipcontroller.")
580 579
581 580
582 581 class SSHEngineLauncher(SSHLauncher):
583 582 program = List(ipengine_cmd_argv, config=True,
584 583 help="remote ipengine command.")
585 584 # Command line arguments for ipengine.
586 585 program_args = List(
587 586 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
588 587 help="Command line arguments to ipengine."
589 588 )
590 589
591 590 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 591 launcher_class = SSHEngineLauncher
593 592 engines = Dict(config=True,
594 593 help="""dict of engines to launch. This is a dict by hostname of ints,
595 594 corresponding to the number of engines to start on that host.""")
596 595
597 596 def start(self, n, profile_dir):
598 597 """Start engines by profile or profile_dir.
599 598 `n` is ignored, and the `engines` config property is used instead.
600 599 """
601 600
602 601 self.profile_dir = unicode(profile_dir)
603 602 dlist = []
604 603 for host, n in self.engines.iteritems():
605 604 if isinstance(n, (tuple, list)):
606 605 n, args = n
607 606 else:
608 607 args = copy.deepcopy(self.engine_args)
609 608
610 609 if '@' in host:
611 610 user,host = host.split('@',1)
612 611 else:
613 612 user=None
614 613 for i in range(n):
615 614 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
616 615
617 616 # Copy the engine args over to each engine launcher.
618 617 i
619 618 el.program_args = args
620 619 el.on_stop(self._notice_engine_stopped)
621 620 d = el.start(profile_dir, user=user, hostname=host)
622 621 if i==0:
623 622 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
624 623 self.launchers[host+str(i)] = el
625 624 dlist.append(d)
626 625 self.notify_start(dlist)
627 626 return dlist
628 627
629 628
630 629
631 630 #-----------------------------------------------------------------------------
632 631 # Windows HPC Server 2008 scheduler launchers
633 632 #-----------------------------------------------------------------------------
634 633
635 634
636 635 # This is only used on Windows.
637 636 def find_job_cmd():
638 637 if WINDOWS:
639 638 try:
640 639 return find_cmd('job')
641 640 except (FindCmdError, ImportError):
642 641 # ImportError will be raised if win32api is not installed
643 642 return 'job'
644 643 else:
645 644 return 'job'
646 645
647 646
648 647 class WindowsHPCLauncher(BaseLauncher):
649 648
650 649 job_id_regexp = Unicode(r'\d+', config=True,
651 650 help="""A regular expression used to get the job id from the output of the
652 651 submit_command. """
653 652 )
654 653 job_file_name = Unicode(u'ipython_job.xml', config=True,
655 654 help="The filename of the instantiated job script.")
656 655 # The full path to the instantiated job script. This gets made dynamically
657 656 # by combining the work_dir with the job_file_name.
658 657 job_file = Unicode(u'')
659 658 scheduler = Unicode('', config=True,
660 659 help="The hostname of the scheduler to submit the job to.")
661 660 job_cmd = Unicode(find_job_cmd(), config=True,
662 661 help="The command for submitting jobs.")
663 662
664 663 def __init__(self, work_dir=u'.', config=None, **kwargs):
665 664 super(WindowsHPCLauncher, self).__init__(
666 665 work_dir=work_dir, config=config, **kwargs
667 666 )
668 667
669 668 @property
670 669 def job_file(self):
671 670 return os.path.join(self.work_dir, self.job_file_name)
672 671
673 672 def write_job_file(self, n):
674 673 raise NotImplementedError("Implement write_job_file in a subclass.")
675 674
676 675 def find_args(self):
677 676 return [u'job.exe']
678 677
679 678 def parse_job_id(self, output):
680 679 """Take the output of the submit command and return the job id."""
681 680 m = re.search(self.job_id_regexp, output)
682 681 if m is not None:
683 682 job_id = m.group()
684 683 else:
685 684 raise LauncherError("Job id couldn't be determined: %s" % output)
686 685 self.job_id = job_id
687 686 self.log.info('Job started with job id: %r' % job_id)
688 687 return job_id
689 688
690 689 def start(self, n):
691 690 """Start n copies of the process using the Win HPC job scheduler."""
692 691 self.write_job_file(n)
693 692 args = [
694 693 'submit',
695 694 '/jobfile:%s' % self.job_file,
696 695 '/scheduler:%s' % self.scheduler
697 696 ]
698 697 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
699 698 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
700 699 output = check_output([self.job_cmd]+args,
701 700 env=os.environ,
702 701 cwd=self.work_dir,
703 702 stderr=STDOUT
704 703 )
705 704 job_id = self.parse_job_id(output)
706 705 self.notify_start(job_id)
707 706 return job_id
708 707
709 708 def stop(self):
710 709 args = [
711 710 'cancel',
712 711 self.job_id,
713 712 '/scheduler:%s' % self.scheduler
714 713 ]
715 714 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
716 715 try:
717 716 output = check_output([self.job_cmd]+args,
718 717 env=os.environ,
719 718 cwd=self.work_dir,
720 719 stderr=STDOUT
721 720 )
722 721 except:
723 722 output = 'The job already appears to be stoppped: %r' % self.job_id
724 723 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
725 724 return output
726 725
727 726
728 727 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
729 728
730 729 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
731 730 help="WinHPC xml job file.")
732 731 extra_args = List([], config=False,
733 732 help="extra args to pass to ipcontroller")
734 733
735 734 def write_job_file(self, n):
736 735 job = IPControllerJob(config=self.config)
737 736
738 737 t = IPControllerTask(config=self.config)
739 738 # The tasks work directory is *not* the actual work directory of
740 739 # the controller. It is used as the base path for the stdout/stderr
741 740 # files that the scheduler redirects to.
742 741 t.work_directory = self.profile_dir
743 742 # Add the profile_dir and from self.start().
744 743 t.controller_args.extend(self.extra_args)
745 744 job.add_task(t)
746 745
747 746 self.log.info("Writing job description file: %s" % self.job_file)
748 747 job.write(self.job_file)
749 748
750 749 @property
751 750 def job_file(self):
752 751 return os.path.join(self.profile_dir, self.job_file_name)
753 752
754 753 def start(self, profile_dir):
755 754 """Start the controller by profile_dir."""
756 755 self.extra_args = ['profile_dir=%s'%profile_dir]
757 756 self.profile_dir = unicode(profile_dir)
758 757 return super(WindowsHPCControllerLauncher, self).start(1)
759 758
760 759
761 760 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
762 761
763 762 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
764 763 help="jobfile for ipengines job")
765 764 extra_args = List([], config=False,
766 765 help="extra args to pas to ipengine")
767 766
768 767 def write_job_file(self, n):
769 768 job = IPEngineSetJob(config=self.config)
770 769
771 770 for i in range(n):
772 771 t = IPEngineTask(config=self.config)
773 772 # The tasks work directory is *not* the actual work directory of
774 773 # the engine. It is used as the base path for the stdout/stderr
775 774 # files that the scheduler redirects to.
776 775 t.work_directory = self.profile_dir
777 776 # Add the profile_dir and from self.start().
778 777 t.engine_args.extend(self.extra_args)
779 778 job.add_task(t)
780 779
781 780 self.log.info("Writing job description file: %s" % self.job_file)
782 781 job.write(self.job_file)
783 782
784 783 @property
785 784 def job_file(self):
786 785 return os.path.join(self.profile_dir, self.job_file_name)
787 786
788 787 def start(self, n, profile_dir):
789 788 """Start the controller by profile_dir."""
790 789 self.extra_args = ['profile_dir=%s'%profile_dir]
791 790 self.profile_dir = unicode(profile_dir)
792 791 return super(WindowsHPCEngineSetLauncher, self).start(n)
793 792
794 793
795 794 #-----------------------------------------------------------------------------
796 795 # Batch (PBS) system launchers
797 796 #-----------------------------------------------------------------------------
798 797
799 798 class BatchSystemLauncher(BaseLauncher):
800 799 """Launch an external process using a batch system.
801 800
802 801 This class is designed to work with UNIX batch systems like PBS, LSF,
803 802 GridEngine, etc. The overall model is that there are different commands
804 803 like qsub, qdel, etc. that handle the starting and stopping of the process.
805 804
806 805 This class also has the notion of a batch script. The ``batch_template``
807 806 attribute can be set to a string that is a template for the batch script.
808 This template is instantiated using Itpl. Thus the template can use
809 ${n} fot the number of instances. Subclasses can add additional variables
807 This template is instantiated using string formatting. Thus the template can
808 use {n} fot the number of instances. Subclasses can add additional variables
810 809 to the template dict.
811 810 """
812 811
813 812 # Subclasses must fill these in. See PBSEngineSet
814 813 submit_command = List([''], config=True,
815 814 help="The name of the command line program used to submit jobs.")
816 815 delete_command = List([''], config=True,
817 816 help="The name of the command line program used to delete jobs.")
818 817 job_id_regexp = Unicode('', config=True,
819 818 help="""A regular expression used to get the job id from the output of the
820 819 submit_command.""")
821 820 batch_template = Unicode('', config=True,
822 821 help="The string that is the batch script template itself.")
823 822 batch_template_file = Unicode(u'', config=True,
824 823 help="The file that contains the batch template.")
825 824 batch_file_name = Unicode(u'batch_script', config=True,
826 825 help="The filename of the instantiated batch script.")
827 826 queue = Unicode(u'', config=True,
828 827 help="The PBS Queue.")
829 828
830 829 # not configurable, override in subclasses
831 830 # PBS Job Array regex
832 831 job_array_regexp = Unicode('')
833 832 job_array_template = Unicode('')
834 833 # PBS Queue regex
835 834 queue_regexp = Unicode('')
836 835 queue_template = Unicode('')
837 836 # The default batch template, override in subclasses
838 837 default_template = Unicode('')
839 838 # The full path to the instantiated batch script.
840 839 batch_file = Unicode(u'')
841 840 # the format dict used with batch_template:
842 841 context = Dict()
843 842
844 843
845 844 def find_args(self):
846 845 return self.submit_command + [self.batch_file]
847 846
848 847 def __init__(self, work_dir=u'.', config=None, **kwargs):
849 848 super(BatchSystemLauncher, self).__init__(
850 849 work_dir=work_dir, config=config, **kwargs
851 850 )
852 851 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
853 852
854 853 def parse_job_id(self, output):
855 854 """Take the output of the submit command and return the job id."""
856 855 m = re.search(self.job_id_regexp, output)
857 856 if m is not None:
858 857 job_id = m.group()
859 858 else:
860 859 raise LauncherError("Job id couldn't be determined: %s" % output)
861 860 self.job_id = job_id
862 861 self.log.info('Job submitted with job id: %r' % job_id)
863 862 return job_id
864 863
865 864 def write_batch_script(self, n):
866 865 """Instantiate and write the batch script to the work_dir."""
867 866 self.context['n'] = n
868 867 self.context['queue'] = self.queue
869 print self.context
870 868 # first priority is batch_template if set
871 869 if self.batch_template_file and not self.batch_template:
872 870 # second priority is batch_template_file
873 871 with open(self.batch_template_file) as f:
874 872 self.batch_template = f.read()
875 873 if not self.batch_template:
876 874 # third (last) priority is default_template
877 875 self.batch_template = self.default_template
878 876
879 877 regex = re.compile(self.job_array_regexp)
880 878 # print regex.search(self.batch_template)
881 879 if not regex.search(self.batch_template):
882 880 self.log.info("adding job array settings to batch script")
883 881 firstline, rest = self.batch_template.split('\n',1)
884 882 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
885 883
886 884 regex = re.compile(self.queue_regexp)
887 885 # print regex.search(self.batch_template)
888 886 if self.queue and not regex.search(self.batch_template):
889 887 self.log.info("adding PBS queue settings to batch script")
890 888 firstline, rest = self.batch_template.split('\n',1)
891 889 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
892 890
893 script_as_string = Itpl.itplns(self.batch_template, self.context)
891 script_as_string = self.batch_template.format(**self.context)
894 892 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
895 893
896 894 with open(self.batch_file, 'w') as f:
897 895 f.write(script_as_string)
898 896 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
899 897
900 898 def start(self, n, profile_dir):
901 899 """Start n copies of the process using a batch system."""
902 # Here we save profile and profile_dir in the context so they
903 # can be used in the batch script template as ${profile} and
904 # ${profile_dir}
900 # Here we save profile_dir in the context so they
901 # can be used in the batch script template as {profile_dir}
905 902 self.context['profile_dir'] = profile_dir
906 903 self.profile_dir = unicode(profile_dir)
907 904 self.write_batch_script(n)
908 905 output = check_output(self.args, env=os.environ)
909 906
910 907 job_id = self.parse_job_id(output)
911 908 self.notify_start(job_id)
912 909 return job_id
913 910
914 911 def stop(self):
915 912 output = check_output(self.delete_command+[self.job_id], env=os.environ)
916 913 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
917 914 return output
918 915
919 916
920 917 class PBSLauncher(BatchSystemLauncher):
921 918 """A BatchSystemLauncher subclass for PBS."""
922 919
923 920 submit_command = List(['qsub'], config=True,
924 921 help="The PBS submit command ['qsub']")
925 922 delete_command = List(['qdel'], config=True,
926 923 help="The PBS delete command ['qsub']")
927 924 job_id_regexp = Unicode(r'\d+', config=True,
928 925 help="Regular expresion for identifying the job ID [r'\d+']")
929 926
930 927 batch_file = Unicode(u'')
931 928 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
932 job_array_template = Unicode('#PBS -t 1-$n')
929 job_array_template = Unicode('#PBS -t 1-{n}')
933 930 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
934 queue_template = Unicode('#PBS -q $queue')
931 queue_template = Unicode('#PBS -q {queue}')
935 932
936 933
937 934 class PBSControllerLauncher(PBSLauncher):
938 935 """Launch a controller using PBS."""
939 936
940 937 batch_file_name = Unicode(u'pbs_controller', config=True,
941 938 help="batch file name for the controller job.")
942 939 default_template= Unicode("""#!/bin/sh
943 940 #PBS -V
944 941 #PBS -N ipcontroller
945 %s --log-to-file profile_dir $profile_dir
942 %s --log-to-file profile_dir={profile_dir}
946 943 """%(' '.join(ipcontroller_cmd_argv)))
947 944
948 945 def start(self, profile_dir):
949 946 """Start the controller by profile or profile_dir."""
950 947 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
951 948 return super(PBSControllerLauncher, self).start(1, profile_dir)
952 949
953 950
954 951 class PBSEngineSetLauncher(PBSLauncher):
955 952 """Launch Engines using PBS"""
956 953 batch_file_name = Unicode(u'pbs_engines', config=True,
957 954 help="batch file name for the engine(s) job.")
958 955 default_template= Unicode(u"""#!/bin/sh
959 956 #PBS -V
960 957 #PBS -N ipengine
961 %s profile_dir $profile_dir
958 %s profile_dir={profile_dir}
962 959 """%(' '.join(ipengine_cmd_argv)))
963 960
964 961 def start(self, n, profile_dir):
965 962 """Start n engines by profile or profile_dir."""
966 963 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
967 964 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
968 965
969 966 #SGE is very similar to PBS
970 967
971 968 class SGELauncher(PBSLauncher):
972 969 """Sun GridEngine is a PBS clone with slightly different syntax"""
973 job_array_regexp = Unicode('#\$\$\W+\-t')
974 job_array_template = Unicode('#$$ -t 1-$n')
975 queue_regexp = Unicode('#$$\W+-q\W+\$?\w+')
976 queue_template = Unicode('#$$ -q $queue')
970 job_array_regexp = Unicode('#\$\W+\-t')
971 job_array_template = Unicode('#$ -t 1-{n}')
972 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
973 queue_template = Unicode('#$ -q $queue')
977 974
978 975 class SGEControllerLauncher(SGELauncher):
979 976 """Launch a controller using SGE."""
980 977
981 978 batch_file_name = Unicode(u'sge_controller', config=True,
982 979 help="batch file name for the ipontroller job.")
983 default_template= Unicode(u"""#$$ -V
984 #$$ -S /bin/sh
985 #$$ -N ipcontroller
986 %s --log-to-file profile_dir=$profile_dir
980 default_template= Unicode(u"""#$ -V
981 #$ -S /bin/sh
982 #$ -N ipcontroller
983 %s --log-to-file profile_dir={profile_dir}
987 984 """%(' '.join(ipcontroller_cmd_argv)))
988 985
989 986 def start(self, profile_dir):
990 987 """Start the controller by profile or profile_dir."""
991 988 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
992 989 return super(SGEControllerLauncher, self).start(1, profile_dir)
993 990
994 991 class SGEEngineSetLauncher(SGELauncher):
995 992 """Launch Engines with SGE"""
996 993 batch_file_name = Unicode(u'sge_engines', config=True,
997 994 help="batch file name for the engine(s) job.")
998 default_template = Unicode("""#$$ -V
999 #$$ -S /bin/sh
1000 #$$ -N ipengine
1001 %s profile_dir=$profile_dir
995 default_template = Unicode("""#$ -V
996 #$ -S /bin/sh
997 #$ -N ipengine
998 %s profile_dir={profile_dir}
1002 999 """%(' '.join(ipengine_cmd_argv)))
1003 1000
1004 1001 def start(self, n, profile_dir):
1005 1002 """Start n engines by profile or profile_dir."""
1006 1003 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1007 1004 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1008 1005
1009 1006
1010 1007 #-----------------------------------------------------------------------------
1011 1008 # A launcher for ipcluster itself!
1012 1009 #-----------------------------------------------------------------------------
1013 1010
1014 1011
1015 1012 class IPClusterLauncher(LocalProcessLauncher):
1016 1013 """Launch the ipcluster program in an external process."""
1017 1014
1018 1015 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1019 1016 help="Popen command for ipcluster")
1020 1017 ipcluster_args = List(
1021 1018 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1022 1019 help="Command line arguments to pass to ipcluster.")
1023 1020 ipcluster_subcommand = Unicode('start')
1024 1021 ipcluster_n = Int(2)
1025 1022
1026 1023 def find_args(self):
1027 1024 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1028 1025 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1029 1026
1030 1027 def start(self):
1031 1028 self.log.info("Starting ipcluster: %r" % self.args)
1032 1029 return super(IPClusterLauncher, self).start()
1033 1030
1034 1031 #-----------------------------------------------------------------------------
1035 1032 # Collections of launchers
1036 1033 #-----------------------------------------------------------------------------
1037 1034
1038 1035 local_launchers = [
1039 1036 LocalControllerLauncher,
1040 1037 LocalEngineLauncher,
1041 1038 LocalEngineSetLauncher,
1042 1039 ]
1043 1040 mpi_launchers = [
1044 1041 MPIExecLauncher,
1045 1042 MPIExecControllerLauncher,
1046 1043 MPIExecEngineSetLauncher,
1047 1044 ]
1048 1045 ssh_launchers = [
1049 1046 SSHLauncher,
1050 1047 SSHControllerLauncher,
1051 1048 SSHEngineLauncher,
1052 1049 SSHEngineSetLauncher,
1053 1050 ]
1054 1051 winhpc_launchers = [
1055 1052 WindowsHPCLauncher,
1056 1053 WindowsHPCControllerLauncher,
1057 1054 WindowsHPCEngineSetLauncher,
1058 1055 ]
1059 1056 pbs_launchers = [
1060 1057 PBSLauncher,
1061 1058 PBSControllerLauncher,
1062 1059 PBSEngineSetLauncher,
1063 1060 ]
1064 1061 sge_launchers = [
1065 1062 SGELauncher,
1066 1063 SGEControllerLauncher,
1067 1064 SGEEngineSetLauncher,
1068 1065 ]
1069 1066 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1070 1067 + pbs_launchers + sge_launchers
General Comments 0
You need to be logged in to leave comments. Login now