##// END OF EJS Templates
Update PBS/SGE launchers with 0.10.1 options and defaults
MinRK -
Show More
@@ -1,227 +1,238 b''
1 1 import os
2 2
3 3 c = get_config()
4 4
5 5 #-----------------------------------------------------------------------------
6 6 # Select which launchers to use
7 7 #-----------------------------------------------------------------------------
8 8
9 9 # This allows you to control what method is used to start the controller
10 10 # and engines. The following methods are currently supported:
11 11 # - Start as a regular process on localhost.
12 12 # - Start using mpiexec.
13 13 # - Start using the Windows HPC Server 2008 scheduler
14 # - Start using PBS
14 # - Start using PBS/SGE
15 15 # - Start using SSH
16 16
17 17
18 18 # The selected launchers can be configured below.
19 19
20 20 # Options are:
21 21 # - LocalControllerLauncher
22 22 # - MPIExecControllerLauncher
23 23 # - PBSControllerLauncher
24 # - SGEControllerLauncher
24 25 # - WindowsHPCControllerLauncher
25 26 # c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
27 c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.PBSControllerLauncher'
26 28
27 29 # Options are:
28 30 # - LocalEngineSetLauncher
29 31 # - MPIExecEngineSetLauncher
30 32 # - PBSEngineSetLauncher
33 # - SGEEngineSetLauncher
31 34 # - WindowsHPCEngineSetLauncher
32 35 # c.Global.engine_launcher = 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
33 36
34 37 #-----------------------------------------------------------------------------
35 38 # Global configuration
36 39 #-----------------------------------------------------------------------------
37 40
38 41 # The default number of engines that will be started. This is overridden by
39 42 # the -n command line option: "ipcluster start -n 4"
40 43 # c.Global.n = 2
41 44
42 45 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
43 46 # c.Global.log_to_file = False
44 47
45 48 # Remove old logs from cluster_dir/log before starting.
46 49 # c.Global.clean_logs = True
47 50
48 51 # The working directory for the process. The application will use os.chdir
49 52 # to change to this directory before starting.
50 53 # c.Global.work_dir = os.getcwd()
51 54
52 55
53 56 #-----------------------------------------------------------------------------
54 57 # Local process launchers
55 58 #-----------------------------------------------------------------------------
56 59
57 60 # The command line arguments to call the controller with.
58 61 # c.LocalControllerLauncher.controller_args = \
59 62 # ['--log-to-file','--log-level', '40']
60 63
61 64 # The working directory for the controller
62 65 # c.LocalEngineSetLauncher.work_dir = u''
63 66
64 67 # Command line argument passed to the engines.
65 68 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
66 69
67 70 #-----------------------------------------------------------------------------
68 71 # MPIExec launchers
69 72 #-----------------------------------------------------------------------------
70 73
71 74 # The mpiexec/mpirun command to use in both the controller and engines.
72 75 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
73 76
74 77 # Additional arguments to pass to the actual mpiexec command.
75 78 # c.MPIExecLauncher.mpi_args = []
76 79
77 80 # The mpiexec/mpirun command and args can be overridden if they should be different
78 81 # for controller and engines.
79 82 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
80 83 # c.MPIExecControllerLauncher.mpi_args = []
81 84 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
82 85 # c.MPIExecEngineSetLauncher.mpi_args = []
83 86
84 87 # The command line argument to call the controller with.
85 88 # c.MPIExecControllerLauncher.controller_args = \
86 89 # ['--log-to-file','--log-level', '40']
87 90
88 91 # Command line argument passed to the engines.
89 92 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
90 93
91 94 # The default number of engines to start if not given elsewhere.
92 95 # c.MPIExecEngineSetLauncher.n = 1
93 96
94 97 #-----------------------------------------------------------------------------
95 98 # SSH launchers
96 99 #-----------------------------------------------------------------------------
97 100
98 101 # ipclusterz can be used to launch controller and engines remotely via ssh.
99 102 # Note that currently ipclusterz does not do any file distribution, so if
100 103 # machines are not on a shared filesystem, config and json files must be
101 104 # distributed. For this reason, the reuse_files defaults to True on an
102 105 # ssh-launched Controller. This flag can be overridded by the program_args
103 106 # attribute of c.SSHControllerLauncher.
104 107
105 108 # set the ssh cmd for launching remote commands. The default is ['ssh']
106 109 # c.SSHLauncher.ssh_cmd = ['ssh']
107 110
108 111 # set the ssh cmd for launching remote commands. The default is ['ssh']
109 112 # c.SSHLauncher.ssh_args = ['tt']
110 113
111 114 # Set the user and hostname for the controller
112 115 # c.SSHControllerLauncher.hostname = 'controller.example.com'
113 116 # c.SSHControllerLauncher.user = os.environ.get('USER','username')
114 117
115 118 # Set the arguments to be passed to ipcontrollerz
116 119 # note that remotely launched ipcontrollerz will not get the contents of
117 120 # the local ipcontrollerz_config.py unless it resides on the *remote host*
118 121 # in the location specified by the --cluster_dir argument.
119 122 # c.SSHControllerLauncher.program_args = ['-r', '-ip', '0.0.0.0', '--cluster_dir', '/path/to/cd']
120 123
121 124 # Set the default args passed to ipenginez for SSH launched engines
122 125 # c.SSHEngineSetLauncher.engine_args = ['--mpi', 'mpi4py']
123 126
124 127 # SSH engines are launched as a dict of locations/n-engines.
125 128 # if a value is a tuple instead of an int, it is assumed to be of the form
126 129 # (n, [args]), setting the arguments to passed to ipenginez on `host`.
127 130 # otherwise, c.SSHEngineSetLauncher.engine_args will be used as the default.
128 131
129 132 # In this case, there will be 3 engines at my.example.com, and
130 133 # 2 at you@ipython.scipy.org with a special json connector location.
131 134 # c.SSHEngineSetLauncher.engines = {'my.example.com' : 3,
132 135 # 'you@ipython.scipy.org' : (2, ['-f', '/path/to/ipcontroller-engine.json']}
133 136 # }
134 137
135 138 #-----------------------------------------------------------------------------
136 139 # Unix batch (PBS) schedulers launchers
137 140 #-----------------------------------------------------------------------------
138 141
142 # SGE and PBS are very similar. All configurables in this section called 'PBS*'
143 # also exist as 'SGE*'.
144
139 145 # The command line program to use to submit a PBS job.
140 # c.PBSControllerLauncher.submit_command = ['qsub']
146 # c.PBSLauncher.submit_command = ['qsub']
141 147
142 148 # The command line program to use to delete a PBS job.
143 # c.PBSControllerLauncher.delete_command = ['qdel']
149 # c.PBSLauncher.delete_command = ['qdel']
150
151 # The PBS queue in which the job should run
152 # c.PBSLauncher.queue = 'myqueue'
144 153
145 154 # A regular expression that takes the output of qsub and find the job id.
146 # c.PBSControllerLauncher.job_id_regexp = r'\d+'
155 # c.PBSLauncher.job_id_regexp = r'\d+'
156
157 # If for some reason the Controller and Engines have different options above, they
158 # can be set as c.PBSControllerLauncher.<option> etc.
147 159
148 160 # The batch submission script used to start the controller. This is where
149 161 # environment variables would be setup, etc. This string is interpreted using
150 162 # the Itpl module in IPython.external. Basically, you can use ${n} for the
151 163 # number of engine and ${cluster_dir} for the cluster_dir.
152 164 # c.PBSControllerLauncher.batch_template = """
153 165 # #PBS -N ipcontroller
166 # #PBS -q $queue
154 167 #
155 168 # ipcontrollerz --cluster-dir $cluster_dir
156 169 # """
157 170
171 # You can also load this template from a file
172 # c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
173
158 174 # The name of the instantiated batch script that will actually be used to
159 175 # submit the job. This will be written to the cluster directory.
160 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
161
162
163 # The command line program to use to submit a PBS job.
164 # c.PBSEngineSetLauncher.submit_command = 'qsub'
165
166 # The command line program to use to delete a PBS job.
167 # c.PBSEngineSetLauncher.delete_command = 'qdel'
168
169 # A regular expression that takes the output of qsub and find the job id.
170 # c.PBSEngineSetLauncher.job_id_regexp = r'\d+'
176 # c.PBSControllerLauncher.batch_file_name = u'pbs_controller'
171 177
172 178 # The batch submission script used to start the engines. This is where
173 179 # environment variables would be setup, etc. This string is interpreted using
174 180 # the Itpl module in IPython.external. Basically, you can use ${n} for the
175 181 # number of engine and ${cluster_dir} for the cluster_dir.
176 182 # c.PBSEngineSetLauncher.batch_template = """
177 183 # #PBS -N ipcontroller
178 184 # #PBS -l nprocs=$n
179 185 #
180 186 # ipenginez --cluster-dir $cluster_dir$s
181 187 # """
182 188
189 # You can also load this template from a file
190 # c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
191
183 192 # The name of the instantiated batch script that will actually be used to
184 193 # submit the job. This will be written to the cluster directory.
185 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
194 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_engines'
195
196
186 197
187 198 #-----------------------------------------------------------------------------
188 199 # Windows HPC Server 2008 launcher configuration
189 200 #-----------------------------------------------------------------------------
190 201
191 202 # c.IPControllerJob.job_name = 'IPController'
192 203 # c.IPControllerJob.is_exclusive = False
193 204 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
194 205 # c.IPControllerJob.priority = 'Highest'
195 206 # c.IPControllerJob.requested_nodes = ''
196 207 # c.IPControllerJob.project = 'MyProject'
197 208
198 209 # c.IPControllerTask.task_name = 'IPController'
199 210 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
200 211 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
201 212 # c.IPControllerTask.environment_variables = {}
202 213
203 214 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
204 215 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
205 216
206 217
207 218 # c.IPEngineSetJob.job_name = 'IPEngineSet'
208 219 # c.IPEngineSetJob.is_exclusive = False
209 220 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
210 221 # c.IPEngineSetJob.priority = 'Highest'
211 222 # c.IPEngineSetJob.requested_nodes = ''
212 223 # c.IPEngineSetJob.project = 'MyProject'
213 224
214 225 # c.IPEngineTask.task_name = 'IPEngine'
215 226 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
216 227 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
217 228 # c.IPEngineTask.environment_variables = {}
218 229
219 230 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
220 231 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
221 232
222 233
223 234
224 235
225 236
226 237
227 238
@@ -1,879 +1,971 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import os
21 21 import re
22 import stat
22 23
23 24 from signal import SIGINT, SIGTERM
24 25 try:
25 26 from signal import SIGKILL
26 27 except ImportError:
27 28 SIGKILL=SIGTERM
28 29
29 30 from subprocess import Popen, PIPE, STDOUT
30 31 try:
31 32 from subprocess import check_output
32 33 except ImportError:
33 34 # pre-2.7, define check_output with Popen
34 35 def check_output(*args, **kwargs):
35 36 kwargs.update(dict(stdout=PIPE))
36 37 p = Popen(*args, **kwargs)
37 38 out,err = p.communicate()
38 39 return out
39 40
40 41 from zmq.eventloop import ioloop
41 42
42 43 from IPython.external import Itpl
43 44 # from IPython.config.configurable import Configurable
44 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance
45 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
45 46 from IPython.utils.path import get_ipython_module_path
46 47 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
47 48
48 49 from .factory import LoggingFactory
49 50
50 51 # load winhpcjob from IPython.kernel
51 52 try:
52 53 from IPython.kernel.winhpcjob import (
53 54 IPControllerTask, IPEngineTask,
54 55 IPControllerJob, IPEngineSetJob
55 56 )
56 57 except ImportError:
57 58 pass
58 59
59 60
60 61 #-----------------------------------------------------------------------------
61 62 # Paths to the kernel apps
62 63 #-----------------------------------------------------------------------------
63 64
64 65
65 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
66 ipclusterz_cmd_argv = pycmd2argv(get_ipython_module_path(
66 67 'IPython.zmq.parallel.ipclusterapp'
67 68 ))
68 69
69 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
70 ipenginez_cmd_argv = pycmd2argv(get_ipython_module_path(
70 71 'IPython.zmq.parallel.ipengineapp'
71 72 ))
72 73
73 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
74 ipcontrollerz_cmd_argv = pycmd2argv(get_ipython_module_path(
74 75 'IPython.zmq.parallel.ipcontrollerapp'
75 76 ))
76 77
77 78 #-----------------------------------------------------------------------------
78 79 # Base launchers and errors
79 80 #-----------------------------------------------------------------------------
80 81
81 82
82 83 class LauncherError(Exception):
83 84 pass
84 85
85 86
86 87 class ProcessStateError(LauncherError):
87 88 pass
88 89
89 90
90 91 class UnknownStatus(LauncherError):
91 92 pass
92 93
93 94
94 95 class BaseLauncher(LoggingFactory):
95 96 """An asbtraction for starting, stopping and signaling a process."""
96 97
97 98 # In all of the launchers, the work_dir is where child processes will be
98 99 # run. This will usually be the cluster_dir, but may not be. any work_dir
99 100 # passed into the __init__ method will override the config value.
100 101 # This should not be used to set the work_dir for the actual engine
101 102 # and controller. Instead, use their own config files or the
102 103 # controller_args, engine_args attributes of the launchers to add
103 104 # the --work-dir option.
104 105 work_dir = Unicode(u'.')
105 106 loop = Instance('zmq.eventloop.ioloop.IOLoop')
106 107
107 108 start_data = Any()
108 109 stop_data = Any()
109 110
110 111 def _loop_default(self):
111 112 return ioloop.IOLoop.instance()
112 113
113 114 def __init__(self, work_dir=u'.', config=None, **kwargs):
114 115 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
115 116 self.state = 'before' # can be before, running, after
116 117 self.stop_callbacks = []
117 118 self.start_data = None
118 119 self.stop_data = None
119 120
120 121 @property
121 122 def args(self):
122 123 """A list of cmd and args that will be used to start the process.
123 124
124 125 This is what is passed to :func:`spawnProcess` and the first element
125 126 will be the process name.
126 127 """
127 128 return self.find_args()
128 129
129 130 def find_args(self):
130 131 """The ``.args`` property calls this to find the args list.
131 132
132 133 Subcommand should implement this to construct the cmd and args.
133 134 """
134 135 raise NotImplementedError('find_args must be implemented in a subclass')
135 136
136 137 @property
137 138 def arg_str(self):
138 139 """The string form of the program arguments."""
139 140 return ' '.join(self.args)
140 141
141 142 @property
142 143 def running(self):
143 144 """Am I running."""
144 145 if self.state == 'running':
145 146 return True
146 147 else:
147 148 return False
148 149
149 150 def start(self):
150 151 """Start the process.
151 152
152 153 This must return a deferred that fires with information about the
153 154 process starting (like a pid, job id, etc.).
154 155 """
155 156 raise NotImplementedError('start must be implemented in a subclass')
156 157
157 158 def stop(self):
158 159 """Stop the process and notify observers of stopping.
159 160
160 161 This must return a deferred that fires with information about the
161 162 processing stopping, like errors that occur while the process is
162 163 attempting to be shut down. This deferred won't fire when the process
163 164 actually stops. To observe the actual process stopping, see
164 165 :func:`observe_stop`.
165 166 """
166 167 raise NotImplementedError('stop must be implemented in a subclass')
167 168
168 169 def on_stop(self, f):
169 170 """Get a deferred that will fire when the process stops.
170 171
171 172 The deferred will fire with data that contains information about
172 173 the exit status of the process.
173 174 """
174 175 if self.state=='after':
175 176 return f(self.stop_data)
176 177 else:
177 178 self.stop_callbacks.append(f)
178 179
179 180 def notify_start(self, data):
180 181 """Call this to trigger startup actions.
181 182
182 183 This logs the process startup and sets the state to 'running'. It is
183 184 a pass-through so it can be used as a callback.
184 185 """
185 186
186 187 self.log.info('Process %r started: %r' % (self.args[0], data))
187 188 self.start_data = data
188 189 self.state = 'running'
189 190 return data
190 191
191 192 def notify_stop(self, data):
192 193 """Call this to trigger process stop actions.
193 194
194 195 This logs the process stopping and sets the state to 'after'. Call
195 196 this to trigger all the deferreds from :func:`observe_stop`."""
196 197
197 198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 199 self.stop_data = data
199 200 self.state = 'after'
200 201 for i in range(len(self.stop_callbacks)):
201 202 d = self.stop_callbacks.pop()
202 203 d(data)
203 204 return data
204 205
205 206 def signal(self, sig):
206 207 """Signal the process.
207 208
208 209 Return a semi-meaningless deferred after signaling the process.
209 210
210 211 Parameters
211 212 ----------
212 213 sig : str or int
213 214 'KILL', 'INT', etc., or any signal number
214 215 """
215 216 raise NotImplementedError('signal must be implemented in a subclass')
216 217
217 218
218 219 #-----------------------------------------------------------------------------
219 220 # Local process launchers
220 221 #-----------------------------------------------------------------------------
221 222
222 223
223 224 class LocalProcessLauncher(BaseLauncher):
224 225 """Start and stop an external process in an asynchronous manner.
225 226
226 227 This will launch the external process with a working directory of
227 228 ``self.work_dir``.
228 229 """
229 230
230 231 # This is used to to construct self.args, which is passed to
231 232 # spawnProcess.
232 233 cmd_and_args = List([])
233 234 poll_frequency = Int(100) # in ms
234 235
235 236 def __init__(self, work_dir=u'.', config=None, **kwargs):
236 237 super(LocalProcessLauncher, self).__init__(
237 238 work_dir=work_dir, config=config, **kwargs
238 239 )
239 240 self.process = None
240 241 self.start_deferred = None
241 242 self.poller = None
242 243
243 244 def find_args(self):
244 245 return self.cmd_and_args
245 246
246 247 def start(self):
247 248 if self.state == 'before':
248 249 self.process = Popen(self.args,
249 250 stdout=PIPE,stderr=PIPE,stdin=PIPE,
250 251 env=os.environ,
251 252 cwd=self.work_dir
252 253 )
253 254
254 255 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
255 256 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
256 257 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
257 258 self.poller.start()
258 259 self.notify_start(self.process.pid)
259 260 else:
260 261 s = 'The process was already started and has state: %r' % self.state
261 262 raise ProcessStateError(s)
262 263
263 264 def stop(self):
264 265 return self.interrupt_then_kill()
265 266
266 267 def signal(self, sig):
267 268 if self.state == 'running':
268 269 self.process.send_signal(sig)
269 270
270 271 def interrupt_then_kill(self, delay=2.0):
271 272 """Send INT, wait a delay and then send KILL."""
272 273 self.signal(SIGINT)
273 274 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
274 275 self.killer.start()
275 276
276 277 # callbacks, etc:
277 278
278 279 def handle_stdout(self, fd, events):
279 280 line = self.process.stdout.readline()
280 281 # a stopped process will be readable but return empty strings
281 282 if line:
282 283 self.log.info(line[:-1])
283 284 else:
284 285 self.poll()
285 286
286 287 def handle_stderr(self, fd, events):
287 288 line = self.process.stderr.readline()
288 289 # a stopped process will be readable but return empty strings
289 290 if line:
290 291 self.log.error(line[:-1])
291 292 else:
292 293 self.poll()
293 294
294 295 def poll(self):
295 296 status = self.process.poll()
296 297 if status is not None:
297 298 self.poller.stop()
298 299 self.loop.remove_handler(self.process.stdout.fileno())
299 300 self.loop.remove_handler(self.process.stderr.fileno())
300 301 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
301 302 return status
302 303
303 304 class LocalControllerLauncher(LocalProcessLauncher):
304 305 """Launch a controller as a regular external process."""
305 306
306 controller_cmd = List(ipcontroller_cmd_argv, config=True)
307 controller_cmd = List(ipcontrollerz_cmd_argv, config=True)
307 308 # Command line arguments to ipcontroller.
308 309 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
309 310
310 311 def find_args(self):
311 312 return self.controller_cmd + self.controller_args
312 313
313 314 def start(self, cluster_dir):
314 315 """Start the controller by cluster_dir."""
315 316 self.controller_args.extend(['--cluster-dir', cluster_dir])
316 317 self.cluster_dir = unicode(cluster_dir)
317 318 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
318 319 return super(LocalControllerLauncher, self).start()
319 320
320 321
321 322 class LocalEngineLauncher(LocalProcessLauncher):
322 323 """Launch a single engine as a regular externall process."""
323 324
324 engine_cmd = List(ipengine_cmd_argv, config=True)
325 engine_cmd = List(ipenginez_cmd_argv, config=True)
325 326 # Command line arguments for ipengine.
326 327 engine_args = List(
327 328 ['--log-to-file','--log-level', str(logging.INFO)], config=True
328 329 )
329 330
330 331 def find_args(self):
331 332 return self.engine_cmd + self.engine_args
332 333
333 334 def start(self, cluster_dir):
334 335 """Start the engine by cluster_dir."""
335 336 self.engine_args.extend(['--cluster-dir', cluster_dir])
336 337 self.cluster_dir = unicode(cluster_dir)
337 338 return super(LocalEngineLauncher, self).start()
338 339
339 340
340 341 class LocalEngineSetLauncher(BaseLauncher):
341 342 """Launch a set of engines as regular external processes."""
342 343
343 344 # Command line arguments for ipengine.
344 345 engine_args = List(
345 346 ['--log-to-file','--log-level', str(logging.INFO)], config=True
346 347 )
347 348 # launcher class
348 349 launcher_class = LocalEngineLauncher
349 350
350 351 launchers = Dict()
351 352 stop_data = Dict()
352 353
353 354 def __init__(self, work_dir=u'.', config=None, **kwargs):
354 355 super(LocalEngineSetLauncher, self).__init__(
355 356 work_dir=work_dir, config=config, **kwargs
356 357 )
357 358 self.stop_data = {}
358 359
359 360 def start(self, n, cluster_dir):
360 361 """Start n engines by profile or cluster_dir."""
361 362 self.cluster_dir = unicode(cluster_dir)
362 363 dlist = []
363 364 for i in range(n):
364 365 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
365 366 # Copy the engine args over to each engine launcher.
366 367 el.engine_args = copy.deepcopy(self.engine_args)
367 368 el.on_stop(self._notice_engine_stopped)
368 369 d = el.start(cluster_dir)
369 370 if i==0:
370 371 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
371 372 self.launchers[i] = el
372 373 dlist.append(d)
373 374 self.notify_start(dlist)
374 375 # The consumeErrors here could be dangerous
375 376 # dfinal = gatherBoth(dlist, consumeErrors=True)
376 377 # dfinal.addCallback(self.notify_start)
377 378 return dlist
378 379
379 380 def find_args(self):
380 381 return ['engine set']
381 382
382 383 def signal(self, sig):
383 384 dlist = []
384 385 for el in self.launchers.itervalues():
385 386 d = el.signal(sig)
386 387 dlist.append(d)
387 388 # dfinal = gatherBoth(dlist, consumeErrors=True)
388 389 return dlist
389 390
390 391 def interrupt_then_kill(self, delay=1.0):
391 392 dlist = []
392 393 for el in self.launchers.itervalues():
393 394 d = el.interrupt_then_kill(delay)
394 395 dlist.append(d)
395 396 # dfinal = gatherBoth(dlist, consumeErrors=True)
396 397 return dlist
397 398
398 399 def stop(self):
399 400 return self.interrupt_then_kill()
400 401
401 402 def _notice_engine_stopped(self, data):
402 403 pid = data['pid']
403 404 for idx,el in self.launchers.iteritems():
404 405 if el.process.pid == pid:
405 406 break
406 407 self.launchers.pop(idx)
407 408 self.stop_data[idx] = data
408 409 if not self.launchers:
409 410 self.notify_stop(self.stop_data)
410 411
411 412
412 413 #-----------------------------------------------------------------------------
413 414 # MPIExec launchers
414 415 #-----------------------------------------------------------------------------
415 416
416 417
417 418 class MPIExecLauncher(LocalProcessLauncher):
418 419 """Launch an external process using mpiexec."""
419 420
420 421 # The mpiexec command to use in starting the process.
421 422 mpi_cmd = List(['mpiexec'], config=True)
422 423 # The command line arguments to pass to mpiexec.
423 424 mpi_args = List([], config=True)
424 425 # The program to start using mpiexec.
425 426 program = List(['date'], config=True)
426 427 # The command line argument to the program.
427 428 program_args = List([], config=True)
428 429 # The number of instances of the program to start.
429 430 n = Int(1, config=True)
430 431
431 432 def find_args(self):
432 433 """Build self.args using all the fields."""
433 434 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
434 435 self.program + self.program_args
435 436
436 437 def start(self, n):
437 438 """Start n instances of the program using mpiexec."""
438 439 self.n = n
439 440 return super(MPIExecLauncher, self).start()
440 441
441 442
442 443 class MPIExecControllerLauncher(MPIExecLauncher):
443 444 """Launch a controller using mpiexec."""
444 445
445 controller_cmd = List(ipcontroller_cmd_argv, config=True)
446 controller_cmd = List(ipcontrollerz_cmd_argv, config=True)
446 447 # Command line arguments to ipcontroller.
447 448 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
448 449 n = Int(1, config=False)
449 450
450 451 def start(self, cluster_dir):
451 452 """Start the controller by cluster_dir."""
452 453 self.controller_args.extend(['--cluster-dir', cluster_dir])
453 454 self.cluster_dir = unicode(cluster_dir)
454 455 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
455 456 return super(MPIExecControllerLauncher, self).start(1)
456 457
457 458 def find_args(self):
458 459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
459 460 self.controller_cmd + self.controller_args
460 461
461 462
462 463 class MPIExecEngineSetLauncher(MPIExecLauncher):
463 464
464 program = List(ipengine_cmd_argv, config=True)
465 program = List(ipenginez_cmd_argv, config=True)
465 466 # Command line arguments for ipengine.
466 467 program_args = List(
467 468 ['--log-to-file','--log-level', str(logging.INFO)], config=True
468 469 )
469 470 n = Int(1, config=True)
470 471
471 472 def start(self, n, cluster_dir):
472 473 """Start n engines by profile or cluster_dir."""
473 474 self.program_args.extend(['--cluster-dir', cluster_dir])
474 475 self.cluster_dir = unicode(cluster_dir)
475 476 self.n = n
476 477 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
477 478 return super(MPIExecEngineSetLauncher, self).start(n)
478 479
479 480 #-----------------------------------------------------------------------------
480 481 # SSH launchers
481 482 #-----------------------------------------------------------------------------
482 483
483 484 # TODO: Get SSH Launcher working again.
484 485
485 486 class SSHLauncher(LocalProcessLauncher):
486 487 """A minimal launcher for ssh.
487 488
488 489 To be useful this will probably have to be extended to use the ``sshx``
489 490 idea for environment variables. There could be other things this needs
490 491 as well.
491 492 """
492 493
493 494 ssh_cmd = List(['ssh'], config=True)
494 495 ssh_args = List(['-tt'], config=True)
495 496 program = List(['date'], config=True)
496 497 program_args = List([], config=True)
497 hostname = Str('', config=True)
498 user = Str('', config=True)
499 location = Str('')
498 hostname = CUnicode('', config=True)
499 user = CUnicode('', config=True)
500 location = CUnicode('')
500 501
501 502 def _hostname_changed(self, name, old, new):
502 503 if self.user:
503 self.location = '%s@%s' % (self.user, new)
504 self.location = u'%s@%s' % (self.user, new)
504 505 else:
505 506 self.location = new
506 507
507 508 def _user_changed(self, name, old, new):
508 self.location = '%s@%s' % (new, self.hostname)
509 self.location = u'%s@%s' % (new, self.hostname)
509 510
510 511 def find_args(self):
511 512 return self.ssh_cmd + self.ssh_args + [self.location] + \
512 513 self.program + self.program_args
513 514
514 515 def start(self, cluster_dir, hostname=None, user=None):
515 516 self.cluster_dir = unicode(cluster_dir)
516 517 if hostname is not None:
517 518 self.hostname = hostname
518 519 if user is not None:
519 520 self.user = user
520 521
521 522 return super(SSHLauncher, self).start()
522 523
523 524 def signal(self, sig):
524 525 if self.state == 'running':
525 526 # send escaped ssh connection-closer
526 527 self.process.stdin.write('~.')
527 528 self.process.stdin.flush()
528 529
529 530
530 531
531 532 class SSHControllerLauncher(SSHLauncher):
532 533
533 program = List(ipcontroller_cmd_argv, config=True)
534 program = List(ipcontrollerz_cmd_argv, config=True)
534 535 # Command line arguments to ipcontroller.
535 536 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
536 537
537 538
538 539 class SSHEngineLauncher(SSHLauncher):
539 program = List(ipengine_cmd_argv, config=True)
540 program = List(ipenginez_cmd_argv, config=True)
540 541 # Command line arguments for ipengine.
541 542 program_args = List(
542 543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
543 544 )
544 545
545 546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
546 547 launcher_class = SSHEngineLauncher
547 548 engines = Dict(config=True)
548 549
549 550 def start(self, n, cluster_dir):
550 551 """Start engines by profile or cluster_dir.
551 552 `n` is ignored, and the `engines` config property is used instead.
552 553 """
553 554
554 555 self.cluster_dir = unicode(cluster_dir)
555 556 dlist = []
556 557 for host, n in self.engines.iteritems():
557 558 if isinstance(n, (tuple, list)):
558 559 n, args = n
559 560 else:
560 561 args = copy.deepcopy(self.engine_args)
561 562
562 563 if '@' in host:
563 564 user,host = host.split('@',1)
564 565 else:
565 566 user=None
566 567 for i in range(n):
567 568 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
568 569
569 570 # Copy the engine args over to each engine launcher.
570 571 i
571 572 el.program_args = args
572 573 el.on_stop(self._notice_engine_stopped)
573 574 d = el.start(cluster_dir, user=user, hostname=host)
574 575 if i==0:
575 576 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
576 577 self.launchers[host+str(i)] = el
577 578 dlist.append(d)
578 579 self.notify_start(dlist)
579 580 return dlist
580 581
581 582
582 583
583 584 #-----------------------------------------------------------------------------
584 585 # Windows HPC Server 2008 scheduler launchers
585 586 #-----------------------------------------------------------------------------
586 587
587 588
588 589 # This is only used on Windows.
589 590 def find_job_cmd():
590 591 if os.name=='nt':
591 592 try:
592 593 return find_cmd('job')
593 594 except FindCmdError:
594 595 return 'job'
595 596 else:
596 597 return 'job'
597 598
598 599
599 600 class WindowsHPCLauncher(BaseLauncher):
600 601
601 602 # A regular expression used to get the job id from the output of the
602 603 # submit_command.
603 604 job_id_regexp = Str(r'\d+', config=True)
604 605 # The filename of the instantiated job script.
605 job_file_name = Unicode(u'ipython_job.xml', config=True)
606 job_file_name = CUnicode(u'ipython_job.xml', config=True)
606 607 # The full path to the instantiated job script. This gets made dynamically
607 608 # by combining the work_dir with the job_file_name.
608 job_file = Unicode(u'')
609 job_file = CUnicode(u'')
609 610 # The hostname of the scheduler to submit the job to
610 scheduler = Str('', config=True)
611 job_cmd = Str(find_job_cmd(), config=True)
611 scheduler = CUnicode('', config=True)
612 job_cmd = CUnicode(find_job_cmd(), config=True)
612 613
613 614 def __init__(self, work_dir=u'.', config=None, **kwargs):
614 615 super(WindowsHPCLauncher, self).__init__(
615 616 work_dir=work_dir, config=config, **kwargs
616 617 )
617 618
618 619 @property
619 620 def job_file(self):
620 621 return os.path.join(self.work_dir, self.job_file_name)
621 622
622 623 def write_job_file(self, n):
623 624 raise NotImplementedError("Implement write_job_file in a subclass.")
624 625
625 626 def find_args(self):
626 return ['job.exe']
627 return [u'job.exe']
627 628
628 629 def parse_job_id(self, output):
629 630 """Take the output of the submit command and return the job id."""
630 631 m = re.search(self.job_id_regexp, output)
631 632 if m is not None:
632 633 job_id = m.group()
633 634 else:
634 635 raise LauncherError("Job id couldn't be determined: %s" % output)
635 636 self.job_id = job_id
636 637 self.log.info('Job started with job id: %r' % job_id)
637 638 return job_id
638 639
639 640 def start(self, n):
640 641 """Start n copies of the process using the Win HPC job scheduler."""
641 642 self.write_job_file(n)
642 643 args = [
643 644 'submit',
644 645 '/jobfile:%s' % self.job_file,
645 646 '/scheduler:%s' % self.scheduler
646 647 ]
647 648 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
648 649 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
649 650 output = check_output([self.job_cmd]+args,
650 651 env=os.environ,
651 652 cwd=self.work_dir,
652 653 stderr=STDOUT
653 654 )
654 655 job_id = self.parse_job_id(output)
655 656 self.notify_start(job_id)
656 657 return job_id
657 658
658 659 def stop(self):
659 660 args = [
660 661 'cancel',
661 662 self.job_id,
662 663 '/scheduler:%s' % self.scheduler
663 664 ]
664 665 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
665 666 try:
666 667 output = check_output([self.job_cmd]+args,
667 668 env=os.environ,
668 669 cwd=self.work_dir,
669 670 stderr=STDOUT
670 671 )
671 672 except:
672 673 output = 'The job already appears to be stoppped: %r' % self.job_id
673 674 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
674 675 return output
675 676
676 677
677 678 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
678 679
679 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
680 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
680 681 extra_args = List([], config=False)
681 682
682 683 def write_job_file(self, n):
683 684 job = IPControllerJob(config=self.config)
684 685
685 686 t = IPControllerTask(config=self.config)
686 687 # The tasks work directory is *not* the actual work directory of
687 688 # the controller. It is used as the base path for the stdout/stderr
688 689 # files that the scheduler redirects to.
689 690 t.work_directory = self.cluster_dir
690 691 # Add the --cluster-dir and from self.start().
691 692 t.controller_args.extend(self.extra_args)
692 693 job.add_task(t)
693 694
694 695 self.log.info("Writing job description file: %s" % self.job_file)
695 696 job.write(self.job_file)
696 697
697 698 @property
698 699 def job_file(self):
699 700 return os.path.join(self.cluster_dir, self.job_file_name)
700 701
701 702 def start(self, cluster_dir):
702 703 """Start the controller by cluster_dir."""
703 704 self.extra_args = ['--cluster-dir', cluster_dir]
704 705 self.cluster_dir = unicode(cluster_dir)
705 706 return super(WindowsHPCControllerLauncher, self).start(1)
706 707
707 708
708 709 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
709 710
710 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
711 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
711 712 extra_args = List([], config=False)
712 713
713 714 def write_job_file(self, n):
714 715 job = IPEngineSetJob(config=self.config)
715 716
716 717 for i in range(n):
717 718 t = IPEngineTask(config=self.config)
718 719 # The tasks work directory is *not* the actual work directory of
719 720 # the engine. It is used as the base path for the stdout/stderr
720 721 # files that the scheduler redirects to.
721 722 t.work_directory = self.cluster_dir
722 723 # Add the --cluster-dir and from self.start().
723 724 t.engine_args.extend(self.extra_args)
724 725 job.add_task(t)
725 726
726 727 self.log.info("Writing job description file: %s" % self.job_file)
727 728 job.write(self.job_file)
728 729
729 730 @property
730 731 def job_file(self):
731 732 return os.path.join(self.cluster_dir, self.job_file_name)
732 733
733 734 def start(self, n, cluster_dir):
734 735 """Start the controller by cluster_dir."""
735 736 self.extra_args = ['--cluster-dir', cluster_dir]
736 737 self.cluster_dir = unicode(cluster_dir)
737 738 return super(WindowsHPCEngineSetLauncher, self).start(n)
738 739
739 740
740 741 #-----------------------------------------------------------------------------
741 742 # Batch (PBS) system launchers
742 743 #-----------------------------------------------------------------------------
743 744
744 745 class BatchSystemLauncher(BaseLauncher):
745 746 """Launch an external process using a batch system.
746 747
747 748 This class is designed to work with UNIX batch systems like PBS, LSF,
748 749 GridEngine, etc. The overall model is that there are different commands
749 750 like qsub, qdel, etc. that handle the starting and stopping of the process.
750 751
751 752 This class also has the notion of a batch script. The ``batch_template``
752 753 attribute can be set to a string that is a template for the batch script.
753 754 This template is instantiated using Itpl. Thus the template can use
754 755 ${n} fot the number of instances. Subclasses can add additional variables
755 756 to the template dict.
756 757 """
757 758
758 759 # Subclasses must fill these in. See PBSEngineSet
759 760 # The name of the command line program used to submit jobs.
760 submit_command = Str('', config=True)
761 submit_command = List([''], config=True)
761 762 # The name of the command line program used to delete jobs.
762 delete_command = Str('', config=True)
763 delete_command = List([''], config=True)
763 764 # A regular expression used to get the job id from the output of the
764 765 # submit_command.
765 job_id_regexp = Str('', config=True)
766 job_id_regexp = CUnicode('', config=True)
766 767 # The string that is the batch script template itself.
767 batch_template = Str('', config=True)
768 batch_template = CUnicode('', config=True)
769 # The file that contains the batch template
770 batch_template_file = CUnicode(u'', config=True)
768 771 # The filename of the instantiated batch script.
769 batch_file_name = Unicode(u'batch_script', config=True)
772 batch_file_name = CUnicode(u'batch_script', config=True)
773 # The PBS Queue
774 queue = CUnicode(u'', config=True)
775
776 # not configurable, override in subclasses
777 # PBS Job Array regex
778 job_array_regexp = CUnicode('')
779 job_array_template = CUnicode('')
780 # PBS Queue regex
781 queue_regexp = CUnicode('')
782 queue_template = CUnicode('')
783 # The default batch template, override in subclasses
784 default_template = CUnicode('')
770 785 # The full path to the instantiated batch script.
771 batch_file = Unicode(u'')
786 batch_file = CUnicode(u'')
772 787 # the format dict used with batch_template:
773 788 context = Dict()
774 789
775 790
776 791 def find_args(self):
777 return [self.submit_command, self.batch_file]
792 return self.submit_command + [self.batch_file]
778 793
779 794 def __init__(self, work_dir=u'.', config=None, **kwargs):
780 795 super(BatchSystemLauncher, self).__init__(
781 796 work_dir=work_dir, config=config, **kwargs
782 797 )
783 798 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
784 799
785 800 def parse_job_id(self, output):
786 801 """Take the output of the submit command and return the job id."""
787 802 m = re.search(self.job_id_regexp, output)
788 803 if m is not None:
789 804 job_id = m.group()
790 805 else:
791 806 raise LauncherError("Job id couldn't be determined: %s" % output)
792 807 self.job_id = job_id
793 808 self.log.info('Job submitted with job id: %r' % job_id)
794 809 return job_id
795 810
796 811 def write_batch_script(self, n):
797 812 """Instantiate and write the batch script to the work_dir."""
798 813 self.context['n'] = n
814 self.context['queue'] = self.queue
815 print self.context
816 # first priority is batch_template if set
817 if self.batch_template_file and not self.batch_template:
818 # second priority is batch_template_file
819 with open(self.batch_template_file) as f:
820 self.batch_template = f.read()
821 if not self.batch_template:
822 # third (last) priority is default_template
823 self.batch_template = self.default_template
824
825 regex = re.compile(self.job_array_regexp)
826 # print regex.search(self.batch_template)
827 if not regex.search(self.batch_template):
828 self.log.info("adding job array settings to batch script")
829 firstline, rest = self.batch_template.split('\n',1)
830 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
831
832 regex = re.compile(self.queue_regexp)
833 # print regex.search(self.batch_template)
834 if self.queue and not regex.search(self.batch_template):
835 self.log.info("adding PBS queue settings to batch script")
836 firstline, rest = self.batch_template.split('\n',1)
837 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
838
799 839 script_as_string = Itpl.itplns(self.batch_template, self.context)
800 840 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
801 f = open(self.batch_file, 'w')
802 f.write(script_as_string)
803 f.close()
841
842 with open(self.batch_file, 'w') as f:
843 f.write(script_as_string)
844 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
804 845
805 846 def start(self, n, cluster_dir):
806 847 """Start n copies of the process using a batch system."""
807 848 # Here we save profile and cluster_dir in the context so they
808 849 # can be used in the batch script template as ${profile} and
809 850 # ${cluster_dir}
810 851 self.context['cluster_dir'] = cluster_dir
811 852 self.cluster_dir = unicode(cluster_dir)
812 853 self.write_batch_script(n)
813 854 output = check_output(self.args, env=os.environ)
814 855
815 856 job_id = self.parse_job_id(output)
816 857 self.notify_start(job_id)
817 858 return job_id
818 859
819 860 def stop(self):
820 output = check_output([self.delete_command, self.job_id], env=os.environ)
861 output = check_output(self.delete_command+[self.job_id], env=os.environ)
821 862 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
822 863 return output
823 864
824 865
825 866 class PBSLauncher(BatchSystemLauncher):
826 867 """A BatchSystemLauncher subclass for PBS."""
827 868
828 submit_command = Str('qsub', config=True)
829 delete_command = Str('qdel', config=True)
830 job_id_regexp = Str(r'\d+', config=True)
831 batch_template = Str('', config=True)
832 batch_file_name = Unicode(u'pbs_batch_script', config=True)
833 batch_file = Unicode(u'')
869 submit_command = List(['qsub'], config=True)
870 delete_command = List(['qdel'], config=True)
871 job_id_regexp = CUnicode(r'\d+', config=True)
872
873 batch_file = CUnicode(u'')
874 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
875 job_array_template = CUnicode('#PBS -t 1-$n')
876 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
877 queue_template = CUnicode('#PBS -q $queue')
834 878
835 879
836 880 class PBSControllerLauncher(PBSLauncher):
837 881 """Launch a controller using PBS."""
838 882
839 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
883 batch_file_name = CUnicode(u'pbs_controller', config=True)
884 default_template= CUnicode("""#!/bin/sh
885 #PBS -V
886 #PBS -N ipcontrollerz
887 %s --log-to-file --cluster-dir $cluster_dir
888 """%(' '.join(ipcontrollerz_cmd_argv)))
840 889
841 890 def start(self, cluster_dir):
842 891 """Start the controller by profile or cluster_dir."""
843 892 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
844 893 return super(PBSControllerLauncher, self).start(1, cluster_dir)
845 894
846 895
847 896 class PBSEngineSetLauncher(PBSLauncher):
848
849 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
897 """Launch Engines using PBS"""
898 batch_file_name = CUnicode(u'pbs_engines', config=True)
899 default_template= CUnicode(u"""#!/bin/sh
900 #PBS -V
901 #PBS -N ipenginez
902 %s --cluster-dir $cluster_dir
903 """%(' '.join(ipenginez_cmd_argv)))
850 904
851 905 def start(self, n, cluster_dir):
852 906 """Start n engines by profile or cluster_dir."""
853 self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
907 self.log.info('Starting %n engines with PBSEngineSetLauncher: %r' % (n, self.args))
854 908 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
855 909
910 #SGE is very similar to PBS
911
912 class SGELauncher(PBSLauncher):
913 """Sun GridEngine is a PBS clone with slightly different syntax"""
914 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
915 job_array_template = CUnicode('#$$ -t 1-$n')
916 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
917 queue_template = CUnicode('#$$ -q $queue')
918
919 class SGEControllerLauncher(SGELauncher):
920 """Launch a controller using SGE."""
921
922 batch_file_name = CUnicode(u'sge_controller', config=True)
923 default_template= CUnicode(u"""#$$ -V
924 #$$ -S /bin/sh
925 #$$ -N ipcontrollerz
926 %s --log-to-file --cluster-dir $cluster_dir
927 """%(' '.join(ipcontrollerz_cmd_argv)))
928
929 def start(self, cluster_dir):
930 """Start the controller by profile or cluster_dir."""
931 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
932 return super(PBSControllerLauncher, self).start(1, cluster_dir)
933
934 class SGEEngineSetLauncher(SGELauncher):
935 """Launch Engines with SGE"""
936 batch_file_name = CUnicode(u'sge_engines', config=True)
937 default_template = CUnicode("""#$$ -V
938 #$$ -S /bin/sh
939 #$$ -N ipenginez
940 %s --cluster-dir $cluster_dir
941 """%(' '.join(ipenginez_cmd_argv)))
942
943 def start(self, n, cluster_dir):
944 """Start n engines by profile or cluster_dir."""
945 self.log.info('Starting %n engines with SGEEngineSetLauncher: %r' % (n, self.args))
946 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
947
856 948
857 949 #-----------------------------------------------------------------------------
858 950 # A launcher for ipcluster itself!
859 951 #-----------------------------------------------------------------------------
860 952
861 953
862 954 class IPClusterLauncher(LocalProcessLauncher):
863 955 """Launch the ipcluster program in an external process."""
864 956
865 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
957 ipcluster_cmd = List(ipclusterz_cmd_argv, config=True)
866 958 # Command line arguments to pass to ipcluster.
867 959 ipcluster_args = List(
868 960 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
869 961 ipcluster_subcommand = Str('start')
870 962 ipcluster_n = Int(2)
871 963
872 964 def find_args(self):
873 965 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
874 966 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
875 967
876 968 def start(self):
877 969 self.log.info("Starting ipcluster: %r" % self.args)
878 970 return super(IPClusterLauncher, self).start()
879 971
General Comments 0
You need to be logged in to leave comments. Login now