##// END OF EJS Templates
don't use `python -m package` on Windows Python 2...
MinRK -
Show More
@@ -1,1449 +1,1455 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import pipes
25 import pipes
26 import stat
26 import stat
27 import sys
27 import sys
28 import time
28 import time
29
29
30 # signal imports, handling various platforms, versions
30 # signal imports, handling various platforms, versions
31
31
32 from signal import SIGINT, SIGTERM
32 from signal import SIGINT, SIGTERM
33 try:
33 try:
34 from signal import SIGKILL
34 from signal import SIGKILL
35 except ImportError:
35 except ImportError:
36 # Windows
36 # Windows
37 SIGKILL=SIGTERM
37 SIGKILL=SIGTERM
38
38
39 try:
39 try:
40 # Windows >= 2.7, 3.2
40 # Windows >= 2.7, 3.2
41 from signal import CTRL_C_EVENT as SIGINT
41 from signal import CTRL_C_EVENT as SIGINT
42 except ImportError:
42 except ImportError:
43 pass
43 pass
44
44
45 from subprocess import Popen, PIPE, STDOUT
45 from subprocess import Popen, PIPE, STDOUT
46 try:
46 try:
47 from subprocess import check_output
47 from subprocess import check_output
48 except ImportError:
48 except ImportError:
49 # pre-2.7, define check_output with Popen
49 # pre-2.7, define check_output with Popen
50 def check_output(*args, **kwargs):
50 def check_output(*args, **kwargs):
51 kwargs.update(dict(stdout=PIPE))
51 kwargs.update(dict(stdout=PIPE))
52 p = Popen(*args, **kwargs)
52 p = Popen(*args, **kwargs)
53 out,err = p.communicate()
53 out,err = p.communicate()
54 return out
54 return out
55
55
56 from zmq.eventloop import ioloop
56 from zmq.eventloop import ioloop
57
57
58 from IPython.config.application import Application
58 from IPython.config.application import Application
59 from IPython.config.configurable import LoggingConfigurable
59 from IPython.config.configurable import LoggingConfigurable
60 from IPython.utils.text import EvalFormatter
60 from IPython.utils.text import EvalFormatter
61 from IPython.utils.traitlets import (
61 from IPython.utils.traitlets import (
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 )
63 )
64 from IPython.utils.encoding import DEFAULT_ENCODING
64 from IPython.utils.encoding import DEFAULT_ENCODING
65 from IPython.utils.path import get_home_dir
65 from IPython.utils.path import get_home_dir
66 from IPython.utils.process import find_cmd, FindCmdError
66 from IPython.utils.process import find_cmd, FindCmdError
67 from IPython.utils.py3compat import iteritems, itervalues
67 from IPython.utils.py3compat import iteritems, itervalues
68
68
69 from .win32support import forward_read_events
69 from .win32support import forward_read_events
70
70
71 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
71 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
72
72
73 WINDOWS = os.name == 'nt'
73 WINDOWS = os.name == 'nt'
74
74
75 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
76 # Paths to the kernel apps
76 # Paths to the kernel apps
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78
78
79 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
79 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
80
80
81 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
81 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
82
82
83 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
83 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
84
84
85 if WINDOWS and sys.version_info < (3,):
86 # `python -m package` doesn't work on Windows Python 2,
87 # but `python -m module` does.
88 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipengineapp"]
89 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.apps.ipcontrollerapp"]
90
85 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
86 # Base launchers and errors
92 # Base launchers and errors
87 #-----------------------------------------------------------------------------
93 #-----------------------------------------------------------------------------
88
94
89 class LauncherError(Exception):
95 class LauncherError(Exception):
90 pass
96 pass
91
97
92
98
93 class ProcessStateError(LauncherError):
99 class ProcessStateError(LauncherError):
94 pass
100 pass
95
101
96
102
97 class UnknownStatus(LauncherError):
103 class UnknownStatus(LauncherError):
98 pass
104 pass
99
105
100
106
101 class BaseLauncher(LoggingConfigurable):
107 class BaseLauncher(LoggingConfigurable):
102 """An asbtraction for starting, stopping and signaling a process."""
108 """An asbtraction for starting, stopping and signaling a process."""
103
109
104 # In all of the launchers, the work_dir is where child processes will be
110 # In all of the launchers, the work_dir is where child processes will be
105 # run. This will usually be the profile_dir, but may not be. any work_dir
111 # run. This will usually be the profile_dir, but may not be. any work_dir
106 # passed into the __init__ method will override the config value.
112 # passed into the __init__ method will override the config value.
107 # This should not be used to set the work_dir for the actual engine
113 # This should not be used to set the work_dir for the actual engine
108 # and controller. Instead, use their own config files or the
114 # and controller. Instead, use their own config files or the
109 # controller_args, engine_args attributes of the launchers to add
115 # controller_args, engine_args attributes of the launchers to add
110 # the work_dir option.
116 # the work_dir option.
111 work_dir = Unicode(u'.')
117 work_dir = Unicode(u'.')
112 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118 loop = Instance('zmq.eventloop.ioloop.IOLoop')
113
119
114 start_data = Any()
120 start_data = Any()
115 stop_data = Any()
121 stop_data = Any()
116
122
117 def _loop_default(self):
123 def _loop_default(self):
118 return ioloop.IOLoop.instance()
124 return ioloop.IOLoop.instance()
119
125
120 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 def __init__(self, work_dir=u'.', config=None, **kwargs):
121 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
122 self.state = 'before' # can be before, running, after
128 self.state = 'before' # can be before, running, after
123 self.stop_callbacks = []
129 self.stop_callbacks = []
124 self.start_data = None
130 self.start_data = None
125 self.stop_data = None
131 self.stop_data = None
126
132
127 @property
133 @property
128 def args(self):
134 def args(self):
129 """A list of cmd and args that will be used to start the process.
135 """A list of cmd and args that will be used to start the process.
130
136
131 This is what is passed to :func:`spawnProcess` and the first element
137 This is what is passed to :func:`spawnProcess` and the first element
132 will be the process name.
138 will be the process name.
133 """
139 """
134 return self.find_args()
140 return self.find_args()
135
141
136 def find_args(self):
142 def find_args(self):
137 """The ``.args`` property calls this to find the args list.
143 """The ``.args`` property calls this to find the args list.
138
144
139 Subcommand should implement this to construct the cmd and args.
145 Subcommand should implement this to construct the cmd and args.
140 """
146 """
141 raise NotImplementedError('find_args must be implemented in a subclass')
147 raise NotImplementedError('find_args must be implemented in a subclass')
142
148
143 @property
149 @property
144 def arg_str(self):
150 def arg_str(self):
145 """The string form of the program arguments."""
151 """The string form of the program arguments."""
146 return ' '.join(self.args)
152 return ' '.join(self.args)
147
153
148 @property
154 @property
149 def running(self):
155 def running(self):
150 """Am I running."""
156 """Am I running."""
151 if self.state == 'running':
157 if self.state == 'running':
152 return True
158 return True
153 else:
159 else:
154 return False
160 return False
155
161
156 def start(self):
162 def start(self):
157 """Start the process."""
163 """Start the process."""
158 raise NotImplementedError('start must be implemented in a subclass')
164 raise NotImplementedError('start must be implemented in a subclass')
159
165
160 def stop(self):
166 def stop(self):
161 """Stop the process and notify observers of stopping.
167 """Stop the process and notify observers of stopping.
162
168
163 This method will return None immediately.
169 This method will return None immediately.
164 To observe the actual process stopping, see :meth:`on_stop`.
170 To observe the actual process stopping, see :meth:`on_stop`.
165 """
171 """
166 raise NotImplementedError('stop must be implemented in a subclass')
172 raise NotImplementedError('stop must be implemented in a subclass')
167
173
168 def on_stop(self, f):
174 def on_stop(self, f):
169 """Register a callback to be called with this Launcher's stop_data
175 """Register a callback to be called with this Launcher's stop_data
170 when the process actually finishes.
176 when the process actually finishes.
171 """
177 """
172 if self.state=='after':
178 if self.state=='after':
173 return f(self.stop_data)
179 return f(self.stop_data)
174 else:
180 else:
175 self.stop_callbacks.append(f)
181 self.stop_callbacks.append(f)
176
182
177 def notify_start(self, data):
183 def notify_start(self, data):
178 """Call this to trigger startup actions.
184 """Call this to trigger startup actions.
179
185
180 This logs the process startup and sets the state to 'running'. It is
186 This logs the process startup and sets the state to 'running'. It is
181 a pass-through so it can be used as a callback.
187 a pass-through so it can be used as a callback.
182 """
188 """
183
189
184 self.log.debug('Process %r started: %r', self.args[0], data)
190 self.log.debug('Process %r started: %r', self.args[0], data)
185 self.start_data = data
191 self.start_data = data
186 self.state = 'running'
192 self.state = 'running'
187 return data
193 return data
188
194
189 def notify_stop(self, data):
195 def notify_stop(self, data):
190 """Call this to trigger process stop actions.
196 """Call this to trigger process stop actions.
191
197
192 This logs the process stopping and sets the state to 'after'. Call
198 This logs the process stopping and sets the state to 'after'. Call
193 this to trigger callbacks registered via :meth:`on_stop`."""
199 this to trigger callbacks registered via :meth:`on_stop`."""
194
200
195 self.log.debug('Process %r stopped: %r', self.args[0], data)
201 self.log.debug('Process %r stopped: %r', self.args[0], data)
196 self.stop_data = data
202 self.stop_data = data
197 self.state = 'after'
203 self.state = 'after'
198 for i in range(len(self.stop_callbacks)):
204 for i in range(len(self.stop_callbacks)):
199 d = self.stop_callbacks.pop()
205 d = self.stop_callbacks.pop()
200 d(data)
206 d(data)
201 return data
207 return data
202
208
203 def signal(self, sig):
209 def signal(self, sig):
204 """Signal the process.
210 """Signal the process.
205
211
206 Parameters
212 Parameters
207 ----------
213 ----------
208 sig : str or int
214 sig : str or int
209 'KILL', 'INT', etc., or any signal number
215 'KILL', 'INT', etc., or any signal number
210 """
216 """
211 raise NotImplementedError('signal must be implemented in a subclass')
217 raise NotImplementedError('signal must be implemented in a subclass')
212
218
213 class ClusterAppMixin(HasTraits):
219 class ClusterAppMixin(HasTraits):
214 """MixIn for cluster args as traits"""
220 """MixIn for cluster args as traits"""
215 profile_dir=Unicode('')
221 profile_dir=Unicode('')
216 cluster_id=Unicode('')
222 cluster_id=Unicode('')
217
223
218 @property
224 @property
219 def cluster_args(self):
225 def cluster_args(self):
220 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
226 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
221
227
222 class ControllerMixin(ClusterAppMixin):
228 class ControllerMixin(ClusterAppMixin):
223 controller_cmd = List(ipcontroller_cmd_argv, config=True,
229 controller_cmd = List(ipcontroller_cmd_argv, config=True,
224 help="""Popen command to launch ipcontroller.""")
230 help="""Popen command to launch ipcontroller.""")
225 # Command line arguments to ipcontroller.
231 # Command line arguments to ipcontroller.
226 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
232 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
227 help="""command-line args to pass to ipcontroller""")
233 help="""command-line args to pass to ipcontroller""")
228
234
229 class EngineMixin(ClusterAppMixin):
235 class EngineMixin(ClusterAppMixin):
230 engine_cmd = List(ipengine_cmd_argv, config=True,
236 engine_cmd = List(ipengine_cmd_argv, config=True,
231 help="""command to launch the Engine.""")
237 help="""command to launch the Engine.""")
232 # Command line arguments for ipengine.
238 # Command line arguments for ipengine.
233 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
239 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
234 help="command-line arguments to pass to ipengine"
240 help="command-line arguments to pass to ipengine"
235 )
241 )
236
242
237
243
238 #-----------------------------------------------------------------------------
244 #-----------------------------------------------------------------------------
239 # Local process launchers
245 # Local process launchers
240 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
241
247
242
248
243 class LocalProcessLauncher(BaseLauncher):
249 class LocalProcessLauncher(BaseLauncher):
244 """Start and stop an external process in an asynchronous manner.
250 """Start and stop an external process in an asynchronous manner.
245
251
246 This will launch the external process with a working directory of
252 This will launch the external process with a working directory of
247 ``self.work_dir``.
253 ``self.work_dir``.
248 """
254 """
249
255
250 # This is used to to construct self.args, which is passed to
256 # This is used to to construct self.args, which is passed to
251 # spawnProcess.
257 # spawnProcess.
252 cmd_and_args = List([])
258 cmd_and_args = List([])
253 poll_frequency = Integer(100) # in ms
259 poll_frequency = Integer(100) # in ms
254
260
255 def __init__(self, work_dir=u'.', config=None, **kwargs):
261 def __init__(self, work_dir=u'.', config=None, **kwargs):
256 super(LocalProcessLauncher, self).__init__(
262 super(LocalProcessLauncher, self).__init__(
257 work_dir=work_dir, config=config, **kwargs
263 work_dir=work_dir, config=config, **kwargs
258 )
264 )
259 self.process = None
265 self.process = None
260 self.poller = None
266 self.poller = None
261
267
262 def find_args(self):
268 def find_args(self):
263 return self.cmd_and_args
269 return self.cmd_and_args
264
270
265 def start(self):
271 def start(self):
266 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
272 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
267 if self.state == 'before':
273 if self.state == 'before':
268 self.process = Popen(self.args,
274 self.process = Popen(self.args,
269 stdout=PIPE,stderr=PIPE,stdin=PIPE,
275 stdout=PIPE,stderr=PIPE,stdin=PIPE,
270 env=os.environ,
276 env=os.environ,
271 cwd=self.work_dir
277 cwd=self.work_dir
272 )
278 )
273 if WINDOWS:
279 if WINDOWS:
274 self.stdout = forward_read_events(self.process.stdout)
280 self.stdout = forward_read_events(self.process.stdout)
275 self.stderr = forward_read_events(self.process.stderr)
281 self.stderr = forward_read_events(self.process.stderr)
276 else:
282 else:
277 self.stdout = self.process.stdout.fileno()
283 self.stdout = self.process.stdout.fileno()
278 self.stderr = self.process.stderr.fileno()
284 self.stderr = self.process.stderr.fileno()
279 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
285 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
280 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
286 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
281 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
287 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
282 self.poller.start()
288 self.poller.start()
283 self.notify_start(self.process.pid)
289 self.notify_start(self.process.pid)
284 else:
290 else:
285 s = 'The process was already started and has state: %r' % self.state
291 s = 'The process was already started and has state: %r' % self.state
286 raise ProcessStateError(s)
292 raise ProcessStateError(s)
287
293
288 def stop(self):
294 def stop(self):
289 return self.interrupt_then_kill()
295 return self.interrupt_then_kill()
290
296
291 def signal(self, sig):
297 def signal(self, sig):
292 if self.state == 'running':
298 if self.state == 'running':
293 if WINDOWS and sig != SIGINT:
299 if WINDOWS and sig != SIGINT:
294 # use Windows tree-kill for better child cleanup
300 # use Windows tree-kill for better child cleanup
295 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
301 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
296 else:
302 else:
297 self.process.send_signal(sig)
303 self.process.send_signal(sig)
298
304
299 def interrupt_then_kill(self, delay=2.0):
305 def interrupt_then_kill(self, delay=2.0):
300 """Send INT, wait a delay and then send KILL."""
306 """Send INT, wait a delay and then send KILL."""
301 try:
307 try:
302 self.signal(SIGINT)
308 self.signal(SIGINT)
303 except Exception:
309 except Exception:
304 self.log.debug("interrupt failed")
310 self.log.debug("interrupt failed")
305 pass
311 pass
306 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
312 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
307 self.killer.start()
313 self.killer.start()
308
314
309 # callbacks, etc:
315 # callbacks, etc:
310
316
311 def handle_stdout(self, fd, events):
317 def handle_stdout(self, fd, events):
312 if WINDOWS:
318 if WINDOWS:
313 line = self.stdout.recv()
319 line = self.stdout.recv()
314 else:
320 else:
315 line = self.process.stdout.readline()
321 line = self.process.stdout.readline()
316 # a stopped process will be readable but return empty strings
322 # a stopped process will be readable but return empty strings
317 if line:
323 if line:
318 self.log.debug(line[:-1])
324 self.log.debug(line[:-1])
319 else:
325 else:
320 self.poll()
326 self.poll()
321
327
322 def handle_stderr(self, fd, events):
328 def handle_stderr(self, fd, events):
323 if WINDOWS:
329 if WINDOWS:
324 line = self.stderr.recv()
330 line = self.stderr.recv()
325 else:
331 else:
326 line = self.process.stderr.readline()
332 line = self.process.stderr.readline()
327 # a stopped process will be readable but return empty strings
333 # a stopped process will be readable but return empty strings
328 if line:
334 if line:
329 self.log.debug(line[:-1])
335 self.log.debug(line[:-1])
330 else:
336 else:
331 self.poll()
337 self.poll()
332
338
333 def poll(self):
339 def poll(self):
334 status = self.process.poll()
340 status = self.process.poll()
335 if status is not None:
341 if status is not None:
336 self.poller.stop()
342 self.poller.stop()
337 self.loop.remove_handler(self.stdout)
343 self.loop.remove_handler(self.stdout)
338 self.loop.remove_handler(self.stderr)
344 self.loop.remove_handler(self.stderr)
339 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
345 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
340 return status
346 return status
341
347
342 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
348 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
343 """Launch a controller as a regular external process."""
349 """Launch a controller as a regular external process."""
344
350
345 def find_args(self):
351 def find_args(self):
346 return self.controller_cmd + self.cluster_args + self.controller_args
352 return self.controller_cmd + self.cluster_args + self.controller_args
347
353
348 def start(self):
354 def start(self):
349 """Start the controller by profile_dir."""
355 """Start the controller by profile_dir."""
350 return super(LocalControllerLauncher, self).start()
356 return super(LocalControllerLauncher, self).start()
351
357
352
358
353 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
359 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
354 """Launch a single engine as a regular externall process."""
360 """Launch a single engine as a regular externall process."""
355
361
356 def find_args(self):
362 def find_args(self):
357 return self.engine_cmd + self.cluster_args + self.engine_args
363 return self.engine_cmd + self.cluster_args + self.engine_args
358
364
359
365
360 class LocalEngineSetLauncher(LocalEngineLauncher):
366 class LocalEngineSetLauncher(LocalEngineLauncher):
361 """Launch a set of engines as regular external processes."""
367 """Launch a set of engines as regular external processes."""
362
368
363 delay = CFloat(0.1, config=True,
369 delay = CFloat(0.1, config=True,
364 help="""delay (in seconds) between starting each engine after the first.
370 help="""delay (in seconds) between starting each engine after the first.
365 This can help force the engines to get their ids in order, or limit
371 This can help force the engines to get their ids in order, or limit
366 process flood when starting many engines."""
372 process flood when starting many engines."""
367 )
373 )
368
374
369 # launcher class
375 # launcher class
370 launcher_class = LocalEngineLauncher
376 launcher_class = LocalEngineLauncher
371
377
372 launchers = Dict()
378 launchers = Dict()
373 stop_data = Dict()
379 stop_data = Dict()
374
380
375 def __init__(self, work_dir=u'.', config=None, **kwargs):
381 def __init__(self, work_dir=u'.', config=None, **kwargs):
376 super(LocalEngineSetLauncher, self).__init__(
382 super(LocalEngineSetLauncher, self).__init__(
377 work_dir=work_dir, config=config, **kwargs
383 work_dir=work_dir, config=config, **kwargs
378 )
384 )
379 self.stop_data = {}
385 self.stop_data = {}
380
386
381 def start(self, n):
387 def start(self, n):
382 """Start n engines by profile or profile_dir."""
388 """Start n engines by profile or profile_dir."""
383 dlist = []
389 dlist = []
384 for i in range(n):
390 for i in range(n):
385 if i > 0:
391 if i > 0:
386 time.sleep(self.delay)
392 time.sleep(self.delay)
387 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
393 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
388 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
394 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
389 )
395 )
390
396
391 # Copy the engine args over to each engine launcher.
397 # Copy the engine args over to each engine launcher.
392 el.engine_cmd = copy.deepcopy(self.engine_cmd)
398 el.engine_cmd = copy.deepcopy(self.engine_cmd)
393 el.engine_args = copy.deepcopy(self.engine_args)
399 el.engine_args = copy.deepcopy(self.engine_args)
394 el.on_stop(self._notice_engine_stopped)
400 el.on_stop(self._notice_engine_stopped)
395 d = el.start()
401 d = el.start()
396 self.launchers[i] = el
402 self.launchers[i] = el
397 dlist.append(d)
403 dlist.append(d)
398 self.notify_start(dlist)
404 self.notify_start(dlist)
399 return dlist
405 return dlist
400
406
401 def find_args(self):
407 def find_args(self):
402 return ['engine set']
408 return ['engine set']
403
409
404 def signal(self, sig):
410 def signal(self, sig):
405 dlist = []
411 dlist = []
406 for el in itervalues(self.launchers):
412 for el in itervalues(self.launchers):
407 d = el.signal(sig)
413 d = el.signal(sig)
408 dlist.append(d)
414 dlist.append(d)
409 return dlist
415 return dlist
410
416
411 def interrupt_then_kill(self, delay=1.0):
417 def interrupt_then_kill(self, delay=1.0):
412 dlist = []
418 dlist = []
413 for el in itervalues(self.launchers):
419 for el in itervalues(self.launchers):
414 d = el.interrupt_then_kill(delay)
420 d = el.interrupt_then_kill(delay)
415 dlist.append(d)
421 dlist.append(d)
416 return dlist
422 return dlist
417
423
418 def stop(self):
424 def stop(self):
419 return self.interrupt_then_kill()
425 return self.interrupt_then_kill()
420
426
421 def _notice_engine_stopped(self, data):
427 def _notice_engine_stopped(self, data):
422 pid = data['pid']
428 pid = data['pid']
423 for idx,el in iteritems(self.launchers):
429 for idx,el in iteritems(self.launchers):
424 if el.process.pid == pid:
430 if el.process.pid == pid:
425 break
431 break
426 self.launchers.pop(idx)
432 self.launchers.pop(idx)
427 self.stop_data[idx] = data
433 self.stop_data[idx] = data
428 if not self.launchers:
434 if not self.launchers:
429 self.notify_stop(self.stop_data)
435 self.notify_stop(self.stop_data)
430
436
431
437
432 #-----------------------------------------------------------------------------
438 #-----------------------------------------------------------------------------
433 # MPI launchers
439 # MPI launchers
434 #-----------------------------------------------------------------------------
440 #-----------------------------------------------------------------------------
435
441
436
442
437 class MPILauncher(LocalProcessLauncher):
443 class MPILauncher(LocalProcessLauncher):
438 """Launch an external process using mpiexec."""
444 """Launch an external process using mpiexec."""
439
445
440 mpi_cmd = List(['mpiexec'], config=True,
446 mpi_cmd = List(['mpiexec'], config=True,
441 help="The mpiexec command to use in starting the process."
447 help="The mpiexec command to use in starting the process."
442 )
448 )
443 mpi_args = List([], config=True,
449 mpi_args = List([], config=True,
444 help="The command line arguments to pass to mpiexec."
450 help="The command line arguments to pass to mpiexec."
445 )
451 )
446 program = List(['date'],
452 program = List(['date'],
447 help="The program to start via mpiexec.")
453 help="The program to start via mpiexec.")
448 program_args = List([],
454 program_args = List([],
449 help="The command line argument to the program."
455 help="The command line argument to the program."
450 )
456 )
451 n = Integer(1)
457 n = Integer(1)
452
458
453 def __init__(self, *args, **kwargs):
459 def __init__(self, *args, **kwargs):
454 # deprecation for old MPIExec names:
460 # deprecation for old MPIExec names:
455 config = kwargs.get('config', {})
461 config = kwargs.get('config', {})
456 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
462 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
457 deprecated = config.get(oldname)
463 deprecated = config.get(oldname)
458 if deprecated:
464 if deprecated:
459 newname = oldname.replace('MPIExec', 'MPI')
465 newname = oldname.replace('MPIExec', 'MPI')
460 config[newname].update(deprecated)
466 config[newname].update(deprecated)
461 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
467 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
462
468
463 super(MPILauncher, self).__init__(*args, **kwargs)
469 super(MPILauncher, self).__init__(*args, **kwargs)
464
470
465 def find_args(self):
471 def find_args(self):
466 """Build self.args using all the fields."""
472 """Build self.args using all the fields."""
467 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
473 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
468 self.program + self.program_args
474 self.program + self.program_args
469
475
470 def start(self, n):
476 def start(self, n):
471 """Start n instances of the program using mpiexec."""
477 """Start n instances of the program using mpiexec."""
472 self.n = n
478 self.n = n
473 return super(MPILauncher, self).start()
479 return super(MPILauncher, self).start()
474
480
475
481
476 class MPIControllerLauncher(MPILauncher, ControllerMixin):
482 class MPIControllerLauncher(MPILauncher, ControllerMixin):
477 """Launch a controller using mpiexec."""
483 """Launch a controller using mpiexec."""
478
484
479 # alias back to *non-configurable* program[_args] for use in find_args()
485 # alias back to *non-configurable* program[_args] for use in find_args()
480 # this way all Controller/EngineSetLaunchers have the same form, rather
486 # this way all Controller/EngineSetLaunchers have the same form, rather
481 # than *some* having `program_args` and others `controller_args`
487 # than *some* having `program_args` and others `controller_args`
482 @property
488 @property
483 def program(self):
489 def program(self):
484 return self.controller_cmd
490 return self.controller_cmd
485
491
486 @property
492 @property
487 def program_args(self):
493 def program_args(self):
488 return self.cluster_args + self.controller_args
494 return self.cluster_args + self.controller_args
489
495
490 def start(self):
496 def start(self):
491 """Start the controller by profile_dir."""
497 """Start the controller by profile_dir."""
492 return super(MPIControllerLauncher, self).start(1)
498 return super(MPIControllerLauncher, self).start(1)
493
499
494
500
495 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
501 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
496 """Launch engines using mpiexec"""
502 """Launch engines using mpiexec"""
497
503
498 # alias back to *non-configurable* program[_args] for use in find_args()
504 # alias back to *non-configurable* program[_args] for use in find_args()
499 # this way all Controller/EngineSetLaunchers have the same form, rather
505 # this way all Controller/EngineSetLaunchers have the same form, rather
500 # than *some* having `program_args` and others `controller_args`
506 # than *some* having `program_args` and others `controller_args`
501 @property
507 @property
502 def program(self):
508 def program(self):
503 return self.engine_cmd
509 return self.engine_cmd
504
510
505 @property
511 @property
506 def program_args(self):
512 def program_args(self):
507 return self.cluster_args + self.engine_args
513 return self.cluster_args + self.engine_args
508
514
509 def start(self, n):
515 def start(self, n):
510 """Start n engines by profile or profile_dir."""
516 """Start n engines by profile or profile_dir."""
511 self.n = n
517 self.n = n
512 return super(MPIEngineSetLauncher, self).start(n)
518 return super(MPIEngineSetLauncher, self).start(n)
513
519
514 # deprecated MPIExec names
520 # deprecated MPIExec names
515 class DeprecatedMPILauncher(object):
521 class DeprecatedMPILauncher(object):
516 def warn(self):
522 def warn(self):
517 oldname = self.__class__.__name__
523 oldname = self.__class__.__name__
518 newname = oldname.replace('MPIExec', 'MPI')
524 newname = oldname.replace('MPIExec', 'MPI')
519 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
525 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
520
526
521 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
527 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
522 """Deprecated, use MPILauncher"""
528 """Deprecated, use MPILauncher"""
523 def __init__(self, *args, **kwargs):
529 def __init__(self, *args, **kwargs):
524 super(MPIExecLauncher, self).__init__(*args, **kwargs)
530 super(MPIExecLauncher, self).__init__(*args, **kwargs)
525 self.warn()
531 self.warn()
526
532
527 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
533 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
528 """Deprecated, use MPIControllerLauncher"""
534 """Deprecated, use MPIControllerLauncher"""
529 def __init__(self, *args, **kwargs):
535 def __init__(self, *args, **kwargs):
530 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
536 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
531 self.warn()
537 self.warn()
532
538
533 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
539 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
534 """Deprecated, use MPIEngineSetLauncher"""
540 """Deprecated, use MPIEngineSetLauncher"""
535 def __init__(self, *args, **kwargs):
541 def __init__(self, *args, **kwargs):
536 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
542 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
537 self.warn()
543 self.warn()
538
544
539
545
540 #-----------------------------------------------------------------------------
546 #-----------------------------------------------------------------------------
541 # SSH launchers
547 # SSH launchers
542 #-----------------------------------------------------------------------------
548 #-----------------------------------------------------------------------------
543
549
544 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
550 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
545
551
546 class SSHLauncher(LocalProcessLauncher):
552 class SSHLauncher(LocalProcessLauncher):
547 """A minimal launcher for ssh.
553 """A minimal launcher for ssh.
548
554
549 To be useful this will probably have to be extended to use the ``sshx``
555 To be useful this will probably have to be extended to use the ``sshx``
550 idea for environment variables. There could be other things this needs
556 idea for environment variables. There could be other things this needs
551 as well.
557 as well.
552 """
558 """
553
559
554 ssh_cmd = List(['ssh'], config=True,
560 ssh_cmd = List(['ssh'], config=True,
555 help="command for starting ssh")
561 help="command for starting ssh")
556 ssh_args = List(['-tt'], config=True,
562 ssh_args = List(['-tt'], config=True,
557 help="args to pass to ssh")
563 help="args to pass to ssh")
558 scp_cmd = List(['scp'], config=True,
564 scp_cmd = List(['scp'], config=True,
559 help="command for sending files")
565 help="command for sending files")
560 program = List(['date'],
566 program = List(['date'],
561 help="Program to launch via ssh")
567 help="Program to launch via ssh")
562 program_args = List([],
568 program_args = List([],
563 help="args to pass to remote program")
569 help="args to pass to remote program")
564 hostname = Unicode('', config=True,
570 hostname = Unicode('', config=True,
565 help="hostname on which to launch the program")
571 help="hostname on which to launch the program")
566 user = Unicode('', config=True,
572 user = Unicode('', config=True,
567 help="username for ssh")
573 help="username for ssh")
568 location = Unicode('', config=True,
574 location = Unicode('', config=True,
569 help="user@hostname location for ssh in one setting")
575 help="user@hostname location for ssh in one setting")
570 to_fetch = List([], config=True,
576 to_fetch = List([], config=True,
571 help="List of (remote, local) files to fetch after starting")
577 help="List of (remote, local) files to fetch after starting")
572 to_send = List([], config=True,
578 to_send = List([], config=True,
573 help="List of (local, remote) files to send before starting")
579 help="List of (local, remote) files to send before starting")
574
580
575 def _hostname_changed(self, name, old, new):
581 def _hostname_changed(self, name, old, new):
576 if self.user:
582 if self.user:
577 self.location = u'%s@%s' % (self.user, new)
583 self.location = u'%s@%s' % (self.user, new)
578 else:
584 else:
579 self.location = new
585 self.location = new
580
586
581 def _user_changed(self, name, old, new):
587 def _user_changed(self, name, old, new):
582 self.location = u'%s@%s' % (new, self.hostname)
588 self.location = u'%s@%s' % (new, self.hostname)
583
589
584 def find_args(self):
590 def find_args(self):
585 return self.ssh_cmd + self.ssh_args + [self.location] + \
591 return self.ssh_cmd + self.ssh_args + [self.location] + \
586 list(map(pipes.quote, self.program + self.program_args))
592 list(map(pipes.quote, self.program + self.program_args))
587
593
588 def _send_file(self, local, remote):
594 def _send_file(self, local, remote):
589 """send a single file"""
595 """send a single file"""
590 remote = "%s:%s" % (self.location, remote)
596 remote = "%s:%s" % (self.location, remote)
591 for i in range(10):
597 for i in range(10):
592 if not os.path.exists(local):
598 if not os.path.exists(local):
593 self.log.debug("waiting for %s" % local)
599 self.log.debug("waiting for %s" % local)
594 time.sleep(1)
600 time.sleep(1)
595 else:
601 else:
596 break
602 break
597 self.log.info("sending %s to %s", local, remote)
603 self.log.info("sending %s to %s", local, remote)
598 check_output(self.scp_cmd + [local, remote])
604 check_output(self.scp_cmd + [local, remote])
599
605
600 def send_files(self):
606 def send_files(self):
601 """send our files (called before start)"""
607 """send our files (called before start)"""
602 if not self.to_send:
608 if not self.to_send:
603 return
609 return
604 for local_file, remote_file in self.to_send:
610 for local_file, remote_file in self.to_send:
605 self._send_file(local_file, remote_file)
611 self._send_file(local_file, remote_file)
606
612
607 def _fetch_file(self, remote, local):
613 def _fetch_file(self, remote, local):
608 """fetch a single file"""
614 """fetch a single file"""
609 full_remote = "%s:%s" % (self.location, remote)
615 full_remote = "%s:%s" % (self.location, remote)
610 self.log.info("fetching %s from %s", local, full_remote)
616 self.log.info("fetching %s from %s", local, full_remote)
611 for i in range(10):
617 for i in range(10):
612 # wait up to 10s for remote file to exist
618 # wait up to 10s for remote file to exist
613 check = check_output(self.ssh_cmd + self.ssh_args + \
619 check = check_output(self.ssh_cmd + self.ssh_args + \
614 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
620 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
615 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
621 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
616 if check == u'no':
622 if check == u'no':
617 time.sleep(1)
623 time.sleep(1)
618 elif check == u'yes':
624 elif check == u'yes':
619 break
625 break
620 check_output(self.scp_cmd + [full_remote, local])
626 check_output(self.scp_cmd + [full_remote, local])
621
627
622 def fetch_files(self):
628 def fetch_files(self):
623 """fetch remote files (called after start)"""
629 """fetch remote files (called after start)"""
624 if not self.to_fetch:
630 if not self.to_fetch:
625 return
631 return
626 for remote_file, local_file in self.to_fetch:
632 for remote_file, local_file in self.to_fetch:
627 self._fetch_file(remote_file, local_file)
633 self._fetch_file(remote_file, local_file)
628
634
629 def start(self, hostname=None, user=None):
635 def start(self, hostname=None, user=None):
630 if hostname is not None:
636 if hostname is not None:
631 self.hostname = hostname
637 self.hostname = hostname
632 if user is not None:
638 if user is not None:
633 self.user = user
639 self.user = user
634
640
635 self.send_files()
641 self.send_files()
636 super(SSHLauncher, self).start()
642 super(SSHLauncher, self).start()
637 self.fetch_files()
643 self.fetch_files()
638
644
639 def signal(self, sig):
645 def signal(self, sig):
640 if self.state == 'running':
646 if self.state == 'running':
641 # send escaped ssh connection-closer
647 # send escaped ssh connection-closer
642 self.process.stdin.write('~.')
648 self.process.stdin.write('~.')
643 self.process.stdin.flush()
649 self.process.stdin.flush()
644
650
645 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
651 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
646
652
647 remote_profile_dir = Unicode('', config=True,
653 remote_profile_dir = Unicode('', config=True,
648 help="""The remote profile_dir to use.
654 help="""The remote profile_dir to use.
649
655
650 If not specified, use calling profile, stripping out possible leading homedir.
656 If not specified, use calling profile, stripping out possible leading homedir.
651 """)
657 """)
652
658
653 def _profile_dir_changed(self, name, old, new):
659 def _profile_dir_changed(self, name, old, new):
654 if not self.remote_profile_dir:
660 if not self.remote_profile_dir:
655 # trigger remote_profile_dir_default logic again,
661 # trigger remote_profile_dir_default logic again,
656 # in case it was already triggered before profile_dir was set
662 # in case it was already triggered before profile_dir was set
657 self.remote_profile_dir = self._strip_home(new)
663 self.remote_profile_dir = self._strip_home(new)
658
664
659 @staticmethod
665 @staticmethod
660 def _strip_home(path):
666 def _strip_home(path):
661 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
667 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
662 home = get_home_dir()
668 home = get_home_dir()
663 if not home.endswith('/'):
669 if not home.endswith('/'):
664 home = home+'/'
670 home = home+'/'
665
671
666 if path.startswith(home):
672 if path.startswith(home):
667 return path[len(home):]
673 return path[len(home):]
668 else:
674 else:
669 return path
675 return path
670
676
671 def _remote_profile_dir_default(self):
677 def _remote_profile_dir_default(self):
672 return self._strip_home(self.profile_dir)
678 return self._strip_home(self.profile_dir)
673
679
674 def _cluster_id_changed(self, name, old, new):
680 def _cluster_id_changed(self, name, old, new):
675 if new:
681 if new:
676 raise ValueError("cluster id not supported by SSH launchers")
682 raise ValueError("cluster id not supported by SSH launchers")
677
683
678 @property
684 @property
679 def cluster_args(self):
685 def cluster_args(self):
680 return ['--profile-dir', self.remote_profile_dir]
686 return ['--profile-dir', self.remote_profile_dir]
681
687
682 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
688 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
683
689
684 # alias back to *non-configurable* program[_args] for use in find_args()
690 # alias back to *non-configurable* program[_args] for use in find_args()
685 # this way all Controller/EngineSetLaunchers have the same form, rather
691 # this way all Controller/EngineSetLaunchers have the same form, rather
686 # than *some* having `program_args` and others `controller_args`
692 # than *some* having `program_args` and others `controller_args`
687
693
688 def _controller_cmd_default(self):
694 def _controller_cmd_default(self):
689 return ['ipcontroller']
695 return ['ipcontroller']
690
696
691 @property
697 @property
692 def program(self):
698 def program(self):
693 return self.controller_cmd
699 return self.controller_cmd
694
700
695 @property
701 @property
696 def program_args(self):
702 def program_args(self):
697 return self.cluster_args + self.controller_args
703 return self.cluster_args + self.controller_args
698
704
699 def _to_fetch_default(self):
705 def _to_fetch_default(self):
700 return [
706 return [
701 (os.path.join(self.remote_profile_dir, 'security', cf),
707 (os.path.join(self.remote_profile_dir, 'security', cf),
702 os.path.join(self.profile_dir, 'security', cf),)
708 os.path.join(self.profile_dir, 'security', cf),)
703 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
709 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
704 ]
710 ]
705
711
706 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
712 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
707
713
708 # alias back to *non-configurable* program[_args] for use in find_args()
714 # alias back to *non-configurable* program[_args] for use in find_args()
709 # this way all Controller/EngineSetLaunchers have the same form, rather
715 # this way all Controller/EngineSetLaunchers have the same form, rather
710 # than *some* having `program_args` and others `controller_args`
716 # than *some* having `program_args` and others `controller_args`
711
717
712 def _engine_cmd_default(self):
718 def _engine_cmd_default(self):
713 return ['ipengine']
719 return ['ipengine']
714
720
715 @property
721 @property
716 def program(self):
722 def program(self):
717 return self.engine_cmd
723 return self.engine_cmd
718
724
719 @property
725 @property
720 def program_args(self):
726 def program_args(self):
721 return self.cluster_args + self.engine_args
727 return self.cluster_args + self.engine_args
722
728
723 def _to_send_default(self):
729 def _to_send_default(self):
724 return [
730 return [
725 (os.path.join(self.profile_dir, 'security', cf),
731 (os.path.join(self.profile_dir, 'security', cf),
726 os.path.join(self.remote_profile_dir, 'security', cf))
732 os.path.join(self.remote_profile_dir, 'security', cf))
727 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
733 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
728 ]
734 ]
729
735
730
736
731 class SSHEngineSetLauncher(LocalEngineSetLauncher):
737 class SSHEngineSetLauncher(LocalEngineSetLauncher):
732 launcher_class = SSHEngineLauncher
738 launcher_class = SSHEngineLauncher
733 engines = Dict(config=True,
739 engines = Dict(config=True,
734 help="""dict of engines to launch. This is a dict by hostname of ints,
740 help="""dict of engines to launch. This is a dict by hostname of ints,
735 corresponding to the number of engines to start on that host.""")
741 corresponding to the number of engines to start on that host.""")
736
742
737 def _engine_cmd_default(self):
743 def _engine_cmd_default(self):
738 return ['ipengine']
744 return ['ipengine']
739
745
740 @property
746 @property
741 def engine_count(self):
747 def engine_count(self):
742 """determine engine count from `engines` dict"""
748 """determine engine count from `engines` dict"""
743 count = 0
749 count = 0
744 for n in itervalues(self.engines):
750 for n in itervalues(self.engines):
745 if isinstance(n, (tuple,list)):
751 if isinstance(n, (tuple,list)):
746 n,args = n
752 n,args = n
747 count += n
753 count += n
748 return count
754 return count
749
755
750 def start(self, n):
756 def start(self, n):
751 """Start engines by profile or profile_dir.
757 """Start engines by profile or profile_dir.
752 `n` is ignored, and the `engines` config property is used instead.
758 `n` is ignored, and the `engines` config property is used instead.
753 """
759 """
754
760
755 dlist = []
761 dlist = []
756 for host, n in iteritems(self.engines):
762 for host, n in iteritems(self.engines):
757 if isinstance(n, (tuple, list)):
763 if isinstance(n, (tuple, list)):
758 n, args = n
764 n, args = n
759 else:
765 else:
760 args = copy.deepcopy(self.engine_args)
766 args = copy.deepcopy(self.engine_args)
761
767
762 if '@' in host:
768 if '@' in host:
763 user,host = host.split('@',1)
769 user,host = host.split('@',1)
764 else:
770 else:
765 user=None
771 user=None
766 for i in range(n):
772 for i in range(n):
767 if i > 0:
773 if i > 0:
768 time.sleep(self.delay)
774 time.sleep(self.delay)
769 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
775 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
770 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
776 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
771 )
777 )
772 if i > 0:
778 if i > 0:
773 # only send files for the first engine on each host
779 # only send files for the first engine on each host
774 el.to_send = []
780 el.to_send = []
775
781
776 # Copy the engine args over to each engine launcher.
782 # Copy the engine args over to each engine launcher.
777 el.engine_cmd = self.engine_cmd
783 el.engine_cmd = self.engine_cmd
778 el.engine_args = args
784 el.engine_args = args
779 el.on_stop(self._notice_engine_stopped)
785 el.on_stop(self._notice_engine_stopped)
780 d = el.start(user=user, hostname=host)
786 d = el.start(user=user, hostname=host)
781 self.launchers[ "%s/%i" % (host,i) ] = el
787 self.launchers[ "%s/%i" % (host,i) ] = el
782 dlist.append(d)
788 dlist.append(d)
783 self.notify_start(dlist)
789 self.notify_start(dlist)
784 return dlist
790 return dlist
785
791
786
792
787 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
793 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
788 """Launcher for calling
794 """Launcher for calling
789 `ipcluster engines` on a remote machine.
795 `ipcluster engines` on a remote machine.
790
796
791 Requires that remote profile is already configured.
797 Requires that remote profile is already configured.
792 """
798 """
793
799
794 n = Integer()
800 n = Integer()
795 ipcluster_cmd = List(['ipcluster'], config=True)
801 ipcluster_cmd = List(['ipcluster'], config=True)
796
802
797 @property
803 @property
798 def program(self):
804 def program(self):
799 return self.ipcluster_cmd + ['engines']
805 return self.ipcluster_cmd + ['engines']
800
806
801 @property
807 @property
802 def program_args(self):
808 def program_args(self):
803 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
809 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
804
810
805 def _to_send_default(self):
811 def _to_send_default(self):
806 return [
812 return [
807 (os.path.join(self.profile_dir, 'security', cf),
813 (os.path.join(self.profile_dir, 'security', cf),
808 os.path.join(self.remote_profile_dir, 'security', cf))
814 os.path.join(self.remote_profile_dir, 'security', cf))
809 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
815 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
810 ]
816 ]
811
817
812 def start(self, n):
818 def start(self, n):
813 self.n = n
819 self.n = n
814 super(SSHProxyEngineSetLauncher, self).start()
820 super(SSHProxyEngineSetLauncher, self).start()
815
821
816
822
817 #-----------------------------------------------------------------------------
823 #-----------------------------------------------------------------------------
818 # Windows HPC Server 2008 scheduler launchers
824 # Windows HPC Server 2008 scheduler launchers
819 #-----------------------------------------------------------------------------
825 #-----------------------------------------------------------------------------
820
826
821
827
822 # This is only used on Windows.
828 # This is only used on Windows.
823 def find_job_cmd():
829 def find_job_cmd():
824 if WINDOWS:
830 if WINDOWS:
825 try:
831 try:
826 return find_cmd('job')
832 return find_cmd('job')
827 except (FindCmdError, ImportError):
833 except (FindCmdError, ImportError):
828 # ImportError will be raised if win32api is not installed
834 # ImportError will be raised if win32api is not installed
829 return 'job'
835 return 'job'
830 else:
836 else:
831 return 'job'
837 return 'job'
832
838
833
839
834 class WindowsHPCLauncher(BaseLauncher):
840 class WindowsHPCLauncher(BaseLauncher):
835
841
836 job_id_regexp = CRegExp(r'\d+', config=True,
842 job_id_regexp = CRegExp(r'\d+', config=True,
837 help="""A regular expression used to get the job id from the output of the
843 help="""A regular expression used to get the job id from the output of the
838 submit_command. """
844 submit_command. """
839 )
845 )
840 job_file_name = Unicode(u'ipython_job.xml', config=True,
846 job_file_name = Unicode(u'ipython_job.xml', config=True,
841 help="The filename of the instantiated job script.")
847 help="The filename of the instantiated job script.")
842 # The full path to the instantiated job script. This gets made dynamically
848 # The full path to the instantiated job script. This gets made dynamically
843 # by combining the work_dir with the job_file_name.
849 # by combining the work_dir with the job_file_name.
844 job_file = Unicode(u'')
850 job_file = Unicode(u'')
845 scheduler = Unicode('', config=True,
851 scheduler = Unicode('', config=True,
846 help="The hostname of the scheduler to submit the job to.")
852 help="The hostname of the scheduler to submit the job to.")
847 job_cmd = Unicode(find_job_cmd(), config=True,
853 job_cmd = Unicode(find_job_cmd(), config=True,
848 help="The command for submitting jobs.")
854 help="The command for submitting jobs.")
849
855
850 def __init__(self, work_dir=u'.', config=None, **kwargs):
856 def __init__(self, work_dir=u'.', config=None, **kwargs):
851 super(WindowsHPCLauncher, self).__init__(
857 super(WindowsHPCLauncher, self).__init__(
852 work_dir=work_dir, config=config, **kwargs
858 work_dir=work_dir, config=config, **kwargs
853 )
859 )
854
860
855 @property
861 @property
856 def job_file(self):
862 def job_file(self):
857 return os.path.join(self.work_dir, self.job_file_name)
863 return os.path.join(self.work_dir, self.job_file_name)
858
864
859 def write_job_file(self, n):
865 def write_job_file(self, n):
860 raise NotImplementedError("Implement write_job_file in a subclass.")
866 raise NotImplementedError("Implement write_job_file in a subclass.")
861
867
862 def find_args(self):
868 def find_args(self):
863 return [u'job.exe']
869 return [u'job.exe']
864
870
865 def parse_job_id(self, output):
871 def parse_job_id(self, output):
866 """Take the output of the submit command and return the job id."""
872 """Take the output of the submit command and return the job id."""
867 m = self.job_id_regexp.search(output)
873 m = self.job_id_regexp.search(output)
868 if m is not None:
874 if m is not None:
869 job_id = m.group()
875 job_id = m.group()
870 else:
876 else:
871 raise LauncherError("Job id couldn't be determined: %s" % output)
877 raise LauncherError("Job id couldn't be determined: %s" % output)
872 self.job_id = job_id
878 self.job_id = job_id
873 self.log.info('Job started with id: %r', job_id)
879 self.log.info('Job started with id: %r', job_id)
874 return job_id
880 return job_id
875
881
876 def start(self, n):
882 def start(self, n):
877 """Start n copies of the process using the Win HPC job scheduler."""
883 """Start n copies of the process using the Win HPC job scheduler."""
878 self.write_job_file(n)
884 self.write_job_file(n)
879 args = [
885 args = [
880 'submit',
886 'submit',
881 '/jobfile:%s' % self.job_file,
887 '/jobfile:%s' % self.job_file,
882 '/scheduler:%s' % self.scheduler
888 '/scheduler:%s' % self.scheduler
883 ]
889 ]
884 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
890 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
885
891
886 output = check_output([self.job_cmd]+args,
892 output = check_output([self.job_cmd]+args,
887 env=os.environ,
893 env=os.environ,
888 cwd=self.work_dir,
894 cwd=self.work_dir,
889 stderr=STDOUT
895 stderr=STDOUT
890 )
896 )
891 output = output.decode(DEFAULT_ENCODING, 'replace')
897 output = output.decode(DEFAULT_ENCODING, 'replace')
892 job_id = self.parse_job_id(output)
898 job_id = self.parse_job_id(output)
893 self.notify_start(job_id)
899 self.notify_start(job_id)
894 return job_id
900 return job_id
895
901
896 def stop(self):
902 def stop(self):
897 args = [
903 args = [
898 'cancel',
904 'cancel',
899 self.job_id,
905 self.job_id,
900 '/scheduler:%s' % self.scheduler
906 '/scheduler:%s' % self.scheduler
901 ]
907 ]
902 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
908 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
903 try:
909 try:
904 output = check_output([self.job_cmd]+args,
910 output = check_output([self.job_cmd]+args,
905 env=os.environ,
911 env=os.environ,
906 cwd=self.work_dir,
912 cwd=self.work_dir,
907 stderr=STDOUT
913 stderr=STDOUT
908 )
914 )
909 output = output.decode(DEFAULT_ENCODING, 'replace')
915 output = output.decode(DEFAULT_ENCODING, 'replace')
910 except:
916 except:
911 output = u'The job already appears to be stopped: %r' % self.job_id
917 output = u'The job already appears to be stopped: %r' % self.job_id
912 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
918 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
913 return output
919 return output
914
920
915
921
916 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
922 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
917
923
918 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
924 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
919 help="WinHPC xml job file.")
925 help="WinHPC xml job file.")
920 controller_args = List([], config=False,
926 controller_args = List([], config=False,
921 help="extra args to pass to ipcontroller")
927 help="extra args to pass to ipcontroller")
922
928
923 def write_job_file(self, n):
929 def write_job_file(self, n):
924 job = IPControllerJob(parent=self)
930 job = IPControllerJob(parent=self)
925
931
926 t = IPControllerTask(parent=self)
932 t = IPControllerTask(parent=self)
927 # The tasks work directory is *not* the actual work directory of
933 # The tasks work directory is *not* the actual work directory of
928 # the controller. It is used as the base path for the stdout/stderr
934 # the controller. It is used as the base path for the stdout/stderr
929 # files that the scheduler redirects to.
935 # files that the scheduler redirects to.
930 t.work_directory = self.profile_dir
936 t.work_directory = self.profile_dir
931 # Add the profile_dir and from self.start().
937 # Add the profile_dir and from self.start().
932 t.controller_args.extend(self.cluster_args)
938 t.controller_args.extend(self.cluster_args)
933 t.controller_args.extend(self.controller_args)
939 t.controller_args.extend(self.controller_args)
934 job.add_task(t)
940 job.add_task(t)
935
941
936 self.log.debug("Writing job description file: %s", self.job_file)
942 self.log.debug("Writing job description file: %s", self.job_file)
937 job.write(self.job_file)
943 job.write(self.job_file)
938
944
939 @property
945 @property
940 def job_file(self):
946 def job_file(self):
941 return os.path.join(self.profile_dir, self.job_file_name)
947 return os.path.join(self.profile_dir, self.job_file_name)
942
948
943 def start(self):
949 def start(self):
944 """Start the controller by profile_dir."""
950 """Start the controller by profile_dir."""
945 return super(WindowsHPCControllerLauncher, self).start(1)
951 return super(WindowsHPCControllerLauncher, self).start(1)
946
952
947
953
948 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
954 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
949
955
950 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
956 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
951 help="jobfile for ipengines job")
957 help="jobfile for ipengines job")
952 engine_args = List([], config=False,
958 engine_args = List([], config=False,
953 help="extra args to pas to ipengine")
959 help="extra args to pas to ipengine")
954
960
955 def write_job_file(self, n):
961 def write_job_file(self, n):
956 job = IPEngineSetJob(parent=self)
962 job = IPEngineSetJob(parent=self)
957
963
958 for i in range(n):
964 for i in range(n):
959 t = IPEngineTask(parent=self)
965 t = IPEngineTask(parent=self)
960 # The tasks work directory is *not* the actual work directory of
966 # The tasks work directory is *not* the actual work directory of
961 # the engine. It is used as the base path for the stdout/stderr
967 # the engine. It is used as the base path for the stdout/stderr
962 # files that the scheduler redirects to.
968 # files that the scheduler redirects to.
963 t.work_directory = self.profile_dir
969 t.work_directory = self.profile_dir
964 # Add the profile_dir and from self.start().
970 # Add the profile_dir and from self.start().
965 t.engine_args.extend(self.cluster_args)
971 t.engine_args.extend(self.cluster_args)
966 t.engine_args.extend(self.engine_args)
972 t.engine_args.extend(self.engine_args)
967 job.add_task(t)
973 job.add_task(t)
968
974
969 self.log.debug("Writing job description file: %s", self.job_file)
975 self.log.debug("Writing job description file: %s", self.job_file)
970 job.write(self.job_file)
976 job.write(self.job_file)
971
977
972 @property
978 @property
973 def job_file(self):
979 def job_file(self):
974 return os.path.join(self.profile_dir, self.job_file_name)
980 return os.path.join(self.profile_dir, self.job_file_name)
975
981
976 def start(self, n):
982 def start(self, n):
977 """Start the controller by profile_dir."""
983 """Start the controller by profile_dir."""
978 return super(WindowsHPCEngineSetLauncher, self).start(n)
984 return super(WindowsHPCEngineSetLauncher, self).start(n)
979
985
980
986
981 #-----------------------------------------------------------------------------
987 #-----------------------------------------------------------------------------
982 # Batch (PBS) system launchers
988 # Batch (PBS) system launchers
983 #-----------------------------------------------------------------------------
989 #-----------------------------------------------------------------------------
984
990
985 class BatchClusterAppMixin(ClusterAppMixin):
991 class BatchClusterAppMixin(ClusterAppMixin):
986 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
992 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
987 def _profile_dir_changed(self, name, old, new):
993 def _profile_dir_changed(self, name, old, new):
988 self.context[name] = new
994 self.context[name] = new
989 _cluster_id_changed = _profile_dir_changed
995 _cluster_id_changed = _profile_dir_changed
990
996
991 def _profile_dir_default(self):
997 def _profile_dir_default(self):
992 self.context['profile_dir'] = ''
998 self.context['profile_dir'] = ''
993 return ''
999 return ''
994 def _cluster_id_default(self):
1000 def _cluster_id_default(self):
995 self.context['cluster_id'] = ''
1001 self.context['cluster_id'] = ''
996 return ''
1002 return ''
997
1003
998
1004
999 class BatchSystemLauncher(BaseLauncher):
1005 class BatchSystemLauncher(BaseLauncher):
1000 """Launch an external process using a batch system.
1006 """Launch an external process using a batch system.
1001
1007
1002 This class is designed to work with UNIX batch systems like PBS, LSF,
1008 This class is designed to work with UNIX batch systems like PBS, LSF,
1003 GridEngine, etc. The overall model is that there are different commands
1009 GridEngine, etc. The overall model is that there are different commands
1004 like qsub, qdel, etc. that handle the starting and stopping of the process.
1010 like qsub, qdel, etc. that handle the starting and stopping of the process.
1005
1011
1006 This class also has the notion of a batch script. The ``batch_template``
1012 This class also has the notion of a batch script. The ``batch_template``
1007 attribute can be set to a string that is a template for the batch script.
1013 attribute can be set to a string that is a template for the batch script.
1008 This template is instantiated using string formatting. Thus the template can
1014 This template is instantiated using string formatting. Thus the template can
1009 use {n} fot the number of instances. Subclasses can add additional variables
1015 use {n} fot the number of instances. Subclasses can add additional variables
1010 to the template dict.
1016 to the template dict.
1011 """
1017 """
1012
1018
1013 # Subclasses must fill these in. See PBSEngineSet
1019 # Subclasses must fill these in. See PBSEngineSet
1014 submit_command = List([''], config=True,
1020 submit_command = List([''], config=True,
1015 help="The name of the command line program used to submit jobs.")
1021 help="The name of the command line program used to submit jobs.")
1016 delete_command = List([''], config=True,
1022 delete_command = List([''], config=True,
1017 help="The name of the command line program used to delete jobs.")
1023 help="The name of the command line program used to delete jobs.")
1018 job_id_regexp = CRegExp('', config=True,
1024 job_id_regexp = CRegExp('', config=True,
1019 help="""A regular expression used to get the job id from the output of the
1025 help="""A regular expression used to get the job id from the output of the
1020 submit_command.""")
1026 submit_command.""")
1021 job_id_regexp_group = Integer(0, config=True,
1027 job_id_regexp_group = Integer(0, config=True,
1022 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1028 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1023 batch_template = Unicode('', config=True,
1029 batch_template = Unicode('', config=True,
1024 help="The string that is the batch script template itself.")
1030 help="The string that is the batch script template itself.")
1025 batch_template_file = Unicode(u'', config=True,
1031 batch_template_file = Unicode(u'', config=True,
1026 help="The file that contains the batch template.")
1032 help="The file that contains the batch template.")
1027 batch_file_name = Unicode(u'batch_script', config=True,
1033 batch_file_name = Unicode(u'batch_script', config=True,
1028 help="The filename of the instantiated batch script.")
1034 help="The filename of the instantiated batch script.")
1029 queue = Unicode(u'', config=True,
1035 queue = Unicode(u'', config=True,
1030 help="The PBS Queue.")
1036 help="The PBS Queue.")
1031
1037
1032 def _queue_changed(self, name, old, new):
1038 def _queue_changed(self, name, old, new):
1033 self.context[name] = new
1039 self.context[name] = new
1034
1040
1035 n = Integer(1)
1041 n = Integer(1)
1036 _n_changed = _queue_changed
1042 _n_changed = _queue_changed
1037
1043
1038 # not configurable, override in subclasses
1044 # not configurable, override in subclasses
1039 # PBS Job Array regex
1045 # PBS Job Array regex
1040 job_array_regexp = CRegExp('')
1046 job_array_regexp = CRegExp('')
1041 job_array_template = Unicode('')
1047 job_array_template = Unicode('')
1042 # PBS Queue regex
1048 # PBS Queue regex
1043 queue_regexp = CRegExp('')
1049 queue_regexp = CRegExp('')
1044 queue_template = Unicode('')
1050 queue_template = Unicode('')
1045 # The default batch template, override in subclasses
1051 # The default batch template, override in subclasses
1046 default_template = Unicode('')
1052 default_template = Unicode('')
1047 # The full path to the instantiated batch script.
1053 # The full path to the instantiated batch script.
1048 batch_file = Unicode(u'')
1054 batch_file = Unicode(u'')
1049 # the format dict used with batch_template:
1055 # the format dict used with batch_template:
1050 context = Dict()
1056 context = Dict()
1051
1057
1052 def _context_default(self):
1058 def _context_default(self):
1053 """load the default context with the default values for the basic keys
1059 """load the default context with the default values for the basic keys
1054
1060
1055 because the _trait_changed methods only load the context if they
1061 because the _trait_changed methods only load the context if they
1056 are set to something other than the default value.
1062 are set to something other than the default value.
1057 """
1063 """
1058 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1064 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1059
1065
1060 # the Formatter instance for rendering the templates:
1066 # the Formatter instance for rendering the templates:
1061 formatter = Instance(EvalFormatter, (), {})
1067 formatter = Instance(EvalFormatter, (), {})
1062
1068
1063 def find_args(self):
1069 def find_args(self):
1064 return self.submit_command + [self.batch_file]
1070 return self.submit_command + [self.batch_file]
1065
1071
1066 def __init__(self, work_dir=u'.', config=None, **kwargs):
1072 def __init__(self, work_dir=u'.', config=None, **kwargs):
1067 super(BatchSystemLauncher, self).__init__(
1073 super(BatchSystemLauncher, self).__init__(
1068 work_dir=work_dir, config=config, **kwargs
1074 work_dir=work_dir, config=config, **kwargs
1069 )
1075 )
1070 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1076 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1071
1077
1072 def parse_job_id(self, output):
1078 def parse_job_id(self, output):
1073 """Take the output of the submit command and return the job id."""
1079 """Take the output of the submit command and return the job id."""
1074 m = self.job_id_regexp.search(output)
1080 m = self.job_id_regexp.search(output)
1075 if m is not None:
1081 if m is not None:
1076 job_id = m.group(self.job_id_regexp_group)
1082 job_id = m.group(self.job_id_regexp_group)
1077 else:
1083 else:
1078 raise LauncherError("Job id couldn't be determined: %s" % output)
1084 raise LauncherError("Job id couldn't be determined: %s" % output)
1079 self.job_id = job_id
1085 self.job_id = job_id
1080 self.log.info('Job submitted with job id: %r', job_id)
1086 self.log.info('Job submitted with job id: %r', job_id)
1081 return job_id
1087 return job_id
1082
1088
1083 def write_batch_script(self, n):
1089 def write_batch_script(self, n):
1084 """Instantiate and write the batch script to the work_dir."""
1090 """Instantiate and write the batch script to the work_dir."""
1085 self.n = n
1091 self.n = n
1086 # first priority is batch_template if set
1092 # first priority is batch_template if set
1087 if self.batch_template_file and not self.batch_template:
1093 if self.batch_template_file and not self.batch_template:
1088 # second priority is batch_template_file
1094 # second priority is batch_template_file
1089 with open(self.batch_template_file) as f:
1095 with open(self.batch_template_file) as f:
1090 self.batch_template = f.read()
1096 self.batch_template = f.read()
1091 if not self.batch_template:
1097 if not self.batch_template:
1092 # third (last) priority is default_template
1098 # third (last) priority is default_template
1093 self.batch_template = self.default_template
1099 self.batch_template = self.default_template
1094 # add jobarray or queue lines to user-specified template
1100 # add jobarray or queue lines to user-specified template
1095 # note that this is *only* when user did not specify a template.
1101 # note that this is *only* when user did not specify a template.
1096 self._insert_queue_in_script()
1102 self._insert_queue_in_script()
1097 self._insert_job_array_in_script()
1103 self._insert_job_array_in_script()
1098 script_as_string = self.formatter.format(self.batch_template, **self.context)
1104 script_as_string = self.formatter.format(self.batch_template, **self.context)
1099 self.log.debug('Writing batch script: %s', self.batch_file)
1105 self.log.debug('Writing batch script: %s', self.batch_file)
1100 with open(self.batch_file, 'w') as f:
1106 with open(self.batch_file, 'w') as f:
1101 f.write(script_as_string)
1107 f.write(script_as_string)
1102 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1108 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1103
1109
1104 def _insert_queue_in_script(self):
1110 def _insert_queue_in_script(self):
1105 """Inserts a queue if required into the batch script.
1111 """Inserts a queue if required into the batch script.
1106 """
1112 """
1107 if self.queue and not self.queue_regexp.search(self.batch_template):
1113 if self.queue and not self.queue_regexp.search(self.batch_template):
1108 self.log.debug("adding PBS queue settings to batch script")
1114 self.log.debug("adding PBS queue settings to batch script")
1109 firstline, rest = self.batch_template.split('\n',1)
1115 firstline, rest = self.batch_template.split('\n',1)
1110 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1116 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1111
1117
1112 def _insert_job_array_in_script(self):
1118 def _insert_job_array_in_script(self):
1113 """Inserts a job array if required into the batch script.
1119 """Inserts a job array if required into the batch script.
1114 """
1120 """
1115 if not self.job_array_regexp.search(self.batch_template):
1121 if not self.job_array_regexp.search(self.batch_template):
1116 self.log.debug("adding job array settings to batch script")
1122 self.log.debug("adding job array settings to batch script")
1117 firstline, rest = self.batch_template.split('\n',1)
1123 firstline, rest = self.batch_template.split('\n',1)
1118 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1124 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1119
1125
1120 def start(self, n):
1126 def start(self, n):
1121 """Start n copies of the process using a batch system."""
1127 """Start n copies of the process using a batch system."""
1122 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1128 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1123 # Here we save profile_dir in the context so they
1129 # Here we save profile_dir in the context so they
1124 # can be used in the batch script template as {profile_dir}
1130 # can be used in the batch script template as {profile_dir}
1125 self.write_batch_script(n)
1131 self.write_batch_script(n)
1126 output = check_output(self.args, env=os.environ)
1132 output = check_output(self.args, env=os.environ)
1127 output = output.decode(DEFAULT_ENCODING, 'replace')
1133 output = output.decode(DEFAULT_ENCODING, 'replace')
1128
1134
1129 job_id = self.parse_job_id(output)
1135 job_id = self.parse_job_id(output)
1130 self.notify_start(job_id)
1136 self.notify_start(job_id)
1131 return job_id
1137 return job_id
1132
1138
1133 def stop(self):
1139 def stop(self):
1134 try:
1140 try:
1135 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1141 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1136 stdout=PIPE, stderr=PIPE)
1142 stdout=PIPE, stderr=PIPE)
1137 out, err = p.communicate()
1143 out, err = p.communicate()
1138 output = out + err
1144 output = out + err
1139 except:
1145 except:
1140 self.log.exception("Problem stopping cluster with command: %s" %
1146 self.log.exception("Problem stopping cluster with command: %s" %
1141 (self.delete_command + [self.job_id]))
1147 (self.delete_command + [self.job_id]))
1142 output = ""
1148 output = ""
1143 output = output.decode(DEFAULT_ENCODING, 'replace')
1149 output = output.decode(DEFAULT_ENCODING, 'replace')
1144 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1150 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1145 return output
1151 return output
1146
1152
1147
1153
1148 class PBSLauncher(BatchSystemLauncher):
1154 class PBSLauncher(BatchSystemLauncher):
1149 """A BatchSystemLauncher subclass for PBS."""
1155 """A BatchSystemLauncher subclass for PBS."""
1150
1156
1151 submit_command = List(['qsub'], config=True,
1157 submit_command = List(['qsub'], config=True,
1152 help="The PBS submit command ['qsub']")
1158 help="The PBS submit command ['qsub']")
1153 delete_command = List(['qdel'], config=True,
1159 delete_command = List(['qdel'], config=True,
1154 help="The PBS delete command ['qsub']")
1160 help="The PBS delete command ['qsub']")
1155 job_id_regexp = CRegExp(r'\d+', config=True,
1161 job_id_regexp = CRegExp(r'\d+', config=True,
1156 help="Regular expresion for identifying the job ID [r'\d+']")
1162 help="Regular expresion for identifying the job ID [r'\d+']")
1157
1163
1158 batch_file = Unicode(u'')
1164 batch_file = Unicode(u'')
1159 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1165 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1160 job_array_template = Unicode('#PBS -t 1-{n}')
1166 job_array_template = Unicode('#PBS -t 1-{n}')
1161 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1167 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1162 queue_template = Unicode('#PBS -q {queue}')
1168 queue_template = Unicode('#PBS -q {queue}')
1163
1169
1164
1170
1165 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1171 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1166 """Launch a controller using PBS."""
1172 """Launch a controller using PBS."""
1167
1173
1168 batch_file_name = Unicode(u'pbs_controller', config=True,
1174 batch_file_name = Unicode(u'pbs_controller', config=True,
1169 help="batch file name for the controller job.")
1175 help="batch file name for the controller job.")
1170 default_template= Unicode("""#!/bin/sh
1176 default_template= Unicode("""#!/bin/sh
1171 #PBS -V
1177 #PBS -V
1172 #PBS -N ipcontroller
1178 #PBS -N ipcontroller
1173 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1179 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1174 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1180 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1175
1181
1176 def start(self):
1182 def start(self):
1177 """Start the controller by profile or profile_dir."""
1183 """Start the controller by profile or profile_dir."""
1178 return super(PBSControllerLauncher, self).start(1)
1184 return super(PBSControllerLauncher, self).start(1)
1179
1185
1180
1186
1181 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1187 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1182 """Launch Engines using PBS"""
1188 """Launch Engines using PBS"""
1183 batch_file_name = Unicode(u'pbs_engines', config=True,
1189 batch_file_name = Unicode(u'pbs_engines', config=True,
1184 help="batch file name for the engine(s) job.")
1190 help="batch file name for the engine(s) job.")
1185 default_template= Unicode(u"""#!/bin/sh
1191 default_template= Unicode(u"""#!/bin/sh
1186 #PBS -V
1192 #PBS -V
1187 #PBS -N ipengine
1193 #PBS -N ipengine
1188 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1194 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1189 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1195 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1190
1196
1191
1197
1192 #SGE is very similar to PBS
1198 #SGE is very similar to PBS
1193
1199
1194 class SGELauncher(PBSLauncher):
1200 class SGELauncher(PBSLauncher):
1195 """Sun GridEngine is a PBS clone with slightly different syntax"""
1201 """Sun GridEngine is a PBS clone with slightly different syntax"""
1196 job_array_regexp = CRegExp('#\$\W+\-t')
1202 job_array_regexp = CRegExp('#\$\W+\-t')
1197 job_array_template = Unicode('#$ -t 1-{n}')
1203 job_array_template = Unicode('#$ -t 1-{n}')
1198 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1204 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1199 queue_template = Unicode('#$ -q {queue}')
1205 queue_template = Unicode('#$ -q {queue}')
1200
1206
1201
1207
1202 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1208 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1203 """Launch a controller using SGE."""
1209 """Launch a controller using SGE."""
1204
1210
1205 batch_file_name = Unicode(u'sge_controller', config=True,
1211 batch_file_name = Unicode(u'sge_controller', config=True,
1206 help="batch file name for the ipontroller job.")
1212 help="batch file name for the ipontroller job.")
1207 default_template= Unicode(u"""#$ -V
1213 default_template= Unicode(u"""#$ -V
1208 #$ -S /bin/sh
1214 #$ -S /bin/sh
1209 #$ -N ipcontroller
1215 #$ -N ipcontroller
1210 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1216 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1211 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1217 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1212
1218
1213 def start(self):
1219 def start(self):
1214 """Start the controller by profile or profile_dir."""
1220 """Start the controller by profile or profile_dir."""
1215 return super(SGEControllerLauncher, self).start(1)
1221 return super(SGEControllerLauncher, self).start(1)
1216
1222
1217
1223
1218 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1224 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1219 """Launch Engines with SGE"""
1225 """Launch Engines with SGE"""
1220 batch_file_name = Unicode(u'sge_engines', config=True,
1226 batch_file_name = Unicode(u'sge_engines', config=True,
1221 help="batch file name for the engine(s) job.")
1227 help="batch file name for the engine(s) job.")
1222 default_template = Unicode("""#$ -V
1228 default_template = Unicode("""#$ -V
1223 #$ -S /bin/sh
1229 #$ -S /bin/sh
1224 #$ -N ipengine
1230 #$ -N ipengine
1225 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1231 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1226 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1232 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1227
1233
1228
1234
1229 # LSF launchers
1235 # LSF launchers
1230
1236
1231 class LSFLauncher(BatchSystemLauncher):
1237 class LSFLauncher(BatchSystemLauncher):
1232 """A BatchSystemLauncher subclass for LSF."""
1238 """A BatchSystemLauncher subclass for LSF."""
1233
1239
1234 submit_command = List(['bsub'], config=True,
1240 submit_command = List(['bsub'], config=True,
1235 help="The PBS submit command ['bsub']")
1241 help="The PBS submit command ['bsub']")
1236 delete_command = List(['bkill'], config=True,
1242 delete_command = List(['bkill'], config=True,
1237 help="The PBS delete command ['bkill']")
1243 help="The PBS delete command ['bkill']")
1238 job_id_regexp = CRegExp(r'\d+', config=True,
1244 job_id_regexp = CRegExp(r'\d+', config=True,
1239 help="Regular expresion for identifying the job ID [r'\d+']")
1245 help="Regular expresion for identifying the job ID [r'\d+']")
1240
1246
1241 batch_file = Unicode(u'')
1247 batch_file = Unicode(u'')
1242 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1248 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1243 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1249 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1244 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1250 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1245 queue_template = Unicode('#BSUB -q {queue}')
1251 queue_template = Unicode('#BSUB -q {queue}')
1246
1252
1247 def start(self, n):
1253 def start(self, n):
1248 """Start n copies of the process using LSF batch system.
1254 """Start n copies of the process using LSF batch system.
1249 This cant inherit from the base class because bsub expects
1255 This cant inherit from the base class because bsub expects
1250 to be piped a shell script in order to honor the #BSUB directives :
1256 to be piped a shell script in order to honor the #BSUB directives :
1251 bsub < script
1257 bsub < script
1252 """
1258 """
1253 # Here we save profile_dir in the context so they
1259 # Here we save profile_dir in the context so they
1254 # can be used in the batch script template as {profile_dir}
1260 # can be used in the batch script template as {profile_dir}
1255 self.write_batch_script(n)
1261 self.write_batch_script(n)
1256 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1262 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1257 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1263 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1258 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1264 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1259 output,err = p.communicate()
1265 output,err = p.communicate()
1260 output = output.decode(DEFAULT_ENCODING, 'replace')
1266 output = output.decode(DEFAULT_ENCODING, 'replace')
1261 job_id = self.parse_job_id(output)
1267 job_id = self.parse_job_id(output)
1262 self.notify_start(job_id)
1268 self.notify_start(job_id)
1263 return job_id
1269 return job_id
1264
1270
1265
1271
1266 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1272 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1267 """Launch a controller using LSF."""
1273 """Launch a controller using LSF."""
1268
1274
1269 batch_file_name = Unicode(u'lsf_controller', config=True,
1275 batch_file_name = Unicode(u'lsf_controller', config=True,
1270 help="batch file name for the controller job.")
1276 help="batch file name for the controller job.")
1271 default_template= Unicode("""#!/bin/sh
1277 default_template= Unicode("""#!/bin/sh
1272 #BSUB -J ipcontroller
1278 #BSUB -J ipcontroller
1273 #BSUB -oo ipcontroller.o.%%J
1279 #BSUB -oo ipcontroller.o.%%J
1274 #BSUB -eo ipcontroller.e.%%J
1280 #BSUB -eo ipcontroller.e.%%J
1275 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1281 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1276 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1282 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1277
1283
1278 def start(self):
1284 def start(self):
1279 """Start the controller by profile or profile_dir."""
1285 """Start the controller by profile or profile_dir."""
1280 return super(LSFControllerLauncher, self).start(1)
1286 return super(LSFControllerLauncher, self).start(1)
1281
1287
1282
1288
1283 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1289 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1284 """Launch Engines using LSF"""
1290 """Launch Engines using LSF"""
1285 batch_file_name = Unicode(u'lsf_engines', config=True,
1291 batch_file_name = Unicode(u'lsf_engines', config=True,
1286 help="batch file name for the engine(s) job.")
1292 help="batch file name for the engine(s) job.")
1287 default_template= Unicode(u"""#!/bin/sh
1293 default_template= Unicode(u"""#!/bin/sh
1288 #BSUB -oo ipengine.o.%%J
1294 #BSUB -oo ipengine.o.%%J
1289 #BSUB -eo ipengine.e.%%J
1295 #BSUB -eo ipengine.e.%%J
1290 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1296 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1291 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1297 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1292
1298
1293
1299
1294
1300
1295 class HTCondorLauncher(BatchSystemLauncher):
1301 class HTCondorLauncher(BatchSystemLauncher):
1296 """A BatchSystemLauncher subclass for HTCondor.
1302 """A BatchSystemLauncher subclass for HTCondor.
1297
1303
1298 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1304 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1299 that the python instance but otherwise is very similar to PBS. This is because
1305 that the python instance but otherwise is very similar to PBS. This is because
1300 HTCondor destroys sys.executable when launching remote processes - a launched
1306 HTCondor destroys sys.executable when launching remote processes - a launched
1301 python process depends on sys.executable to effectively evaluate its
1307 python process depends on sys.executable to effectively evaluate its
1302 module search paths. Without it, regardless of which python interpreter you launch
1308 module search paths. Without it, regardless of which python interpreter you launch
1303 you will get the to built in module search paths.
1309 you will get the to built in module search paths.
1304
1310
1305 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1311 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1306 this - the mechanism of shebanged scripts means that the python binary will be
1312 this - the mechanism of shebanged scripts means that the python binary will be
1307 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1313 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1308 scripts on the remote node*. This means you need to take care that:
1314 scripts on the remote node*. This means you need to take care that:
1309
1315
1310 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1316 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1311 of the python environment you wish to execute code in having top precedence.
1317 of the python environment you wish to execute code in having top precedence.
1312 b. This functionality is untested on Windows.
1318 b. This functionality is untested on Windows.
1313
1319
1314 If you need different behavior, consider making you own template.
1320 If you need different behavior, consider making you own template.
1315 """
1321 """
1316
1322
1317 submit_command = List(['condor_submit'], config=True,
1323 submit_command = List(['condor_submit'], config=True,
1318 help="The HTCondor submit command ['condor_submit']")
1324 help="The HTCondor submit command ['condor_submit']")
1319 delete_command = List(['condor_rm'], config=True,
1325 delete_command = List(['condor_rm'], config=True,
1320 help="The HTCondor delete command ['condor_rm']")
1326 help="The HTCondor delete command ['condor_rm']")
1321 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1327 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1322 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1328 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1323 job_id_regexp_group = Integer(1, config=True,
1329 job_id_regexp_group = Integer(1, config=True,
1324 help="""The group we wish to match in job_id_regexp [1]""")
1330 help="""The group we wish to match in job_id_regexp [1]""")
1325
1331
1326 job_array_regexp = CRegExp('queue\W+\$')
1332 job_array_regexp = CRegExp('queue\W+\$')
1327 job_array_template = Unicode('queue {n}')
1333 job_array_template = Unicode('queue {n}')
1328
1334
1329
1335
1330 def _insert_job_array_in_script(self):
1336 def _insert_job_array_in_script(self):
1331 """Inserts a job array if required into the batch script.
1337 """Inserts a job array if required into the batch script.
1332 """
1338 """
1333 if not self.job_array_regexp.search(self.batch_template):
1339 if not self.job_array_regexp.search(self.batch_template):
1334 self.log.debug("adding job array settings to batch script")
1340 self.log.debug("adding job array settings to batch script")
1335 #HTCondor requires that the job array goes at the bottom of the script
1341 #HTCondor requires that the job array goes at the bottom of the script
1336 self.batch_template = '\n'.join([self.batch_template,
1342 self.batch_template = '\n'.join([self.batch_template,
1337 self.job_array_template])
1343 self.job_array_template])
1338
1344
1339 def _insert_queue_in_script(self):
1345 def _insert_queue_in_script(self):
1340 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1346 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1341 specified in the script.
1347 specified in the script.
1342 """
1348 """
1343 pass
1349 pass
1344
1350
1345
1351
1346 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1352 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1347 """Launch a controller using HTCondor."""
1353 """Launch a controller using HTCondor."""
1348
1354
1349 batch_file_name = Unicode(u'htcondor_controller', config=True,
1355 batch_file_name = Unicode(u'htcondor_controller', config=True,
1350 help="batch file name for the controller job.")
1356 help="batch file name for the controller job.")
1351 default_template = Unicode(r"""
1357 default_template = Unicode(r"""
1352 universe = vanilla
1358 universe = vanilla
1353 executable = ipcontroller
1359 executable = ipcontroller
1354 # by default we expect a shared file system
1360 # by default we expect a shared file system
1355 transfer_executable = False
1361 transfer_executable = False
1356 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1362 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1357 """)
1363 """)
1358
1364
1359 def start(self):
1365 def start(self):
1360 """Start the controller by profile or profile_dir."""
1366 """Start the controller by profile or profile_dir."""
1361 return super(HTCondorControllerLauncher, self).start(1)
1367 return super(HTCondorControllerLauncher, self).start(1)
1362
1368
1363
1369
1364 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1370 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1365 """Launch Engines using HTCondor"""
1371 """Launch Engines using HTCondor"""
1366 batch_file_name = Unicode(u'htcondor_engines', config=True,
1372 batch_file_name = Unicode(u'htcondor_engines', config=True,
1367 help="batch file name for the engine(s) job.")
1373 help="batch file name for the engine(s) job.")
1368 default_template = Unicode("""
1374 default_template = Unicode("""
1369 universe = vanilla
1375 universe = vanilla
1370 executable = ipengine
1376 executable = ipengine
1371 # by default we expect a shared file system
1377 # by default we expect a shared file system
1372 transfer_executable = False
1378 transfer_executable = False
1373 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1379 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1374 """)
1380 """)
1375
1381
1376
1382
1377 #-----------------------------------------------------------------------------
1383 #-----------------------------------------------------------------------------
1378 # A launcher for ipcluster itself!
1384 # A launcher for ipcluster itself!
1379 #-----------------------------------------------------------------------------
1385 #-----------------------------------------------------------------------------
1380
1386
1381
1387
1382 class IPClusterLauncher(LocalProcessLauncher):
1388 class IPClusterLauncher(LocalProcessLauncher):
1383 """Launch the ipcluster program in an external process."""
1389 """Launch the ipcluster program in an external process."""
1384
1390
1385 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1391 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1386 help="Popen command for ipcluster")
1392 help="Popen command for ipcluster")
1387 ipcluster_args = List(
1393 ipcluster_args = List(
1388 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1394 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1389 help="Command line arguments to pass to ipcluster.")
1395 help="Command line arguments to pass to ipcluster.")
1390 ipcluster_subcommand = Unicode('start')
1396 ipcluster_subcommand = Unicode('start')
1391 profile = Unicode('default')
1397 profile = Unicode('default')
1392 n = Integer(2)
1398 n = Integer(2)
1393
1399
1394 def find_args(self):
1400 def find_args(self):
1395 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1401 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1396 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1402 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1397 self.ipcluster_args
1403 self.ipcluster_args
1398
1404
1399 def start(self):
1405 def start(self):
1400 return super(IPClusterLauncher, self).start()
1406 return super(IPClusterLauncher, self).start()
1401
1407
1402 #-----------------------------------------------------------------------------
1408 #-----------------------------------------------------------------------------
1403 # Collections of launchers
1409 # Collections of launchers
1404 #-----------------------------------------------------------------------------
1410 #-----------------------------------------------------------------------------
1405
1411
1406 local_launchers = [
1412 local_launchers = [
1407 LocalControllerLauncher,
1413 LocalControllerLauncher,
1408 LocalEngineLauncher,
1414 LocalEngineLauncher,
1409 LocalEngineSetLauncher,
1415 LocalEngineSetLauncher,
1410 ]
1416 ]
1411 mpi_launchers = [
1417 mpi_launchers = [
1412 MPILauncher,
1418 MPILauncher,
1413 MPIControllerLauncher,
1419 MPIControllerLauncher,
1414 MPIEngineSetLauncher,
1420 MPIEngineSetLauncher,
1415 ]
1421 ]
1416 ssh_launchers = [
1422 ssh_launchers = [
1417 SSHLauncher,
1423 SSHLauncher,
1418 SSHControllerLauncher,
1424 SSHControllerLauncher,
1419 SSHEngineLauncher,
1425 SSHEngineLauncher,
1420 SSHEngineSetLauncher,
1426 SSHEngineSetLauncher,
1421 SSHProxyEngineSetLauncher,
1427 SSHProxyEngineSetLauncher,
1422 ]
1428 ]
1423 winhpc_launchers = [
1429 winhpc_launchers = [
1424 WindowsHPCLauncher,
1430 WindowsHPCLauncher,
1425 WindowsHPCControllerLauncher,
1431 WindowsHPCControllerLauncher,
1426 WindowsHPCEngineSetLauncher,
1432 WindowsHPCEngineSetLauncher,
1427 ]
1433 ]
1428 pbs_launchers = [
1434 pbs_launchers = [
1429 PBSLauncher,
1435 PBSLauncher,
1430 PBSControllerLauncher,
1436 PBSControllerLauncher,
1431 PBSEngineSetLauncher,
1437 PBSEngineSetLauncher,
1432 ]
1438 ]
1433 sge_launchers = [
1439 sge_launchers = [
1434 SGELauncher,
1440 SGELauncher,
1435 SGEControllerLauncher,
1441 SGEControllerLauncher,
1436 SGEEngineSetLauncher,
1442 SGEEngineSetLauncher,
1437 ]
1443 ]
1438 lsf_launchers = [
1444 lsf_launchers = [
1439 LSFLauncher,
1445 LSFLauncher,
1440 LSFControllerLauncher,
1446 LSFControllerLauncher,
1441 LSFEngineSetLauncher,
1447 LSFEngineSetLauncher,
1442 ]
1448 ]
1443 htcondor_launchers = [
1449 htcondor_launchers = [
1444 HTCondorLauncher,
1450 HTCondorLauncher,
1445 HTCondorControllerLauncher,
1451 HTCondorControllerLauncher,
1446 HTCondorEngineSetLauncher,
1452 HTCondorEngineSetLauncher,
1447 ]
1453 ]
1448 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1454 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1449 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
1455 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
General Comments 0
You need to be logged in to leave comments. Login now