##// END OF EJS Templates
Merge pull request #2270 from minrk/sshlaunchers...
Fernando Perez -
r8161:17840b98 merge
parent child Browse files
Show More
@@ -1,1342 +1,1345 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import pipes
25 import pipes
26 import stat
26 import stat
27 import sys
27 import sys
28 import time
28 import time
29
29
30 # signal imports, handling various platforms, versions
30 # signal imports, handling various platforms, versions
31
31
32 from signal import SIGINT, SIGTERM
32 from signal import SIGINT, SIGTERM
33 try:
33 try:
34 from signal import SIGKILL
34 from signal import SIGKILL
35 except ImportError:
35 except ImportError:
36 # Windows
36 # Windows
37 SIGKILL=SIGTERM
37 SIGKILL=SIGTERM
38
38
39 try:
39 try:
40 # Windows >= 2.7, 3.2
40 # Windows >= 2.7, 3.2
41 from signal import CTRL_C_EVENT as SIGINT
41 from signal import CTRL_C_EVENT as SIGINT
42 except ImportError:
42 except ImportError:
43 pass
43 pass
44
44
45 from subprocess import Popen, PIPE, STDOUT
45 from subprocess import Popen, PIPE, STDOUT
46 try:
46 try:
47 from subprocess import check_output
47 from subprocess import check_output
48 except ImportError:
48 except ImportError:
49 # pre-2.7, define check_output with Popen
49 # pre-2.7, define check_output with Popen
50 def check_output(*args, **kwargs):
50 def check_output(*args, **kwargs):
51 kwargs.update(dict(stdout=PIPE))
51 kwargs.update(dict(stdout=PIPE))
52 p = Popen(*args, **kwargs)
52 p = Popen(*args, **kwargs)
53 out,err = p.communicate()
53 out,err = p.communicate()
54 return out
54 return out
55
55
56 from zmq.eventloop import ioloop
56 from zmq.eventloop import ioloop
57
57
58 from IPython.config.application import Application
58 from IPython.config.application import Application
59 from IPython.config.configurable import LoggingConfigurable
59 from IPython.config.configurable import LoggingConfigurable
60 from IPython.utils.text import EvalFormatter
60 from IPython.utils.text import EvalFormatter
61 from IPython.utils.traitlets import (
61 from IPython.utils.traitlets import (
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 )
63 )
64 from IPython.utils.path import get_home_dir
64 from IPython.utils.path import get_home_dir
65 from IPython.utils.process import find_cmd, FindCmdError
65 from IPython.utils.process import find_cmd, FindCmdError
66
66
67 from .win32support import forward_read_events
67 from .win32support import forward_read_events
68
68
69 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
70
70
71 WINDOWS = os.name == 'nt'
71 WINDOWS = os.name == 'nt'
72
72
73 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
74 # Paths to the kernel apps
74 # Paths to the kernel apps
75 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
76
76
77 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
77 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
78
78
79 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
79 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
80
80
81 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
81 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
82
82
83 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
83 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
84
84
85 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
86 # Base launchers and errors
86 # Base launchers and errors
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88
88
89
89
90 class LauncherError(Exception):
90 class LauncherError(Exception):
91 pass
91 pass
92
92
93
93
94 class ProcessStateError(LauncherError):
94 class ProcessStateError(LauncherError):
95 pass
95 pass
96
96
97
97
98 class UnknownStatus(LauncherError):
98 class UnknownStatus(LauncherError):
99 pass
99 pass
100
100
101
101
102 class BaseLauncher(LoggingConfigurable):
102 class BaseLauncher(LoggingConfigurable):
103 """An asbtraction for starting, stopping and signaling a process."""
103 """An asbtraction for starting, stopping and signaling a process."""
104
104
105 # In all of the launchers, the work_dir is where child processes will be
105 # In all of the launchers, the work_dir is where child processes will be
106 # run. This will usually be the profile_dir, but may not be. any work_dir
106 # run. This will usually be the profile_dir, but may not be. any work_dir
107 # passed into the __init__ method will override the config value.
107 # passed into the __init__ method will override the config value.
108 # This should not be used to set the work_dir for the actual engine
108 # This should not be used to set the work_dir for the actual engine
109 # and controller. Instead, use their own config files or the
109 # and controller. Instead, use their own config files or the
110 # controller_args, engine_args attributes of the launchers to add
110 # controller_args, engine_args attributes of the launchers to add
111 # the work_dir option.
111 # the work_dir option.
112 work_dir = Unicode(u'.')
112 work_dir = Unicode(u'.')
113 loop = Instance('zmq.eventloop.ioloop.IOLoop')
113 loop = Instance('zmq.eventloop.ioloop.IOLoop')
114
114
115 start_data = Any()
115 start_data = Any()
116 stop_data = Any()
116 stop_data = Any()
117
117
118 def _loop_default(self):
118 def _loop_default(self):
119 return ioloop.IOLoop.instance()
119 return ioloop.IOLoop.instance()
120
120
121 def __init__(self, work_dir=u'.', config=None, **kwargs):
121 def __init__(self, work_dir=u'.', config=None, **kwargs):
122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
123 self.state = 'before' # can be before, running, after
123 self.state = 'before' # can be before, running, after
124 self.stop_callbacks = []
124 self.stop_callbacks = []
125 self.start_data = None
125 self.start_data = None
126 self.stop_data = None
126 self.stop_data = None
127
127
128 @property
128 @property
129 def args(self):
129 def args(self):
130 """A list of cmd and args that will be used to start the process.
130 """A list of cmd and args that will be used to start the process.
131
131
132 This is what is passed to :func:`spawnProcess` and the first element
132 This is what is passed to :func:`spawnProcess` and the first element
133 will be the process name.
133 will be the process name.
134 """
134 """
135 return self.find_args()
135 return self.find_args()
136
136
137 def find_args(self):
137 def find_args(self):
138 """The ``.args`` property calls this to find the args list.
138 """The ``.args`` property calls this to find the args list.
139
139
140 Subcommand should implement this to construct the cmd and args.
140 Subcommand should implement this to construct the cmd and args.
141 """
141 """
142 raise NotImplementedError('find_args must be implemented in a subclass')
142 raise NotImplementedError('find_args must be implemented in a subclass')
143
143
144 @property
144 @property
145 def arg_str(self):
145 def arg_str(self):
146 """The string form of the program arguments."""
146 """The string form of the program arguments."""
147 return ' '.join(self.args)
147 return ' '.join(self.args)
148
148
149 @property
149 @property
150 def running(self):
150 def running(self):
151 """Am I running."""
151 """Am I running."""
152 if self.state == 'running':
152 if self.state == 'running':
153 return True
153 return True
154 else:
154 else:
155 return False
155 return False
156
156
157 def start(self):
157 def start(self):
158 """Start the process."""
158 """Start the process."""
159 raise NotImplementedError('start must be implemented in a subclass')
159 raise NotImplementedError('start must be implemented in a subclass')
160
160
161 def stop(self):
161 def stop(self):
162 """Stop the process and notify observers of stopping.
162 """Stop the process and notify observers of stopping.
163
163
164 This method will return None immediately.
164 This method will return None immediately.
165 To observe the actual process stopping, see :meth:`on_stop`.
165 To observe the actual process stopping, see :meth:`on_stop`.
166 """
166 """
167 raise NotImplementedError('stop must be implemented in a subclass')
167 raise NotImplementedError('stop must be implemented in a subclass')
168
168
169 def on_stop(self, f):
169 def on_stop(self, f):
170 """Register a callback to be called with this Launcher's stop_data
170 """Register a callback to be called with this Launcher's stop_data
171 when the process actually finishes.
171 when the process actually finishes.
172 """
172 """
173 if self.state=='after':
173 if self.state=='after':
174 return f(self.stop_data)
174 return f(self.stop_data)
175 else:
175 else:
176 self.stop_callbacks.append(f)
176 self.stop_callbacks.append(f)
177
177
178 def notify_start(self, data):
178 def notify_start(self, data):
179 """Call this to trigger startup actions.
179 """Call this to trigger startup actions.
180
180
181 This logs the process startup and sets the state to 'running'. It is
181 This logs the process startup and sets the state to 'running'. It is
182 a pass-through so it can be used as a callback.
182 a pass-through so it can be used as a callback.
183 """
183 """
184
184
185 self.log.debug('Process %r started: %r', self.args[0], data)
185 self.log.debug('Process %r started: %r', self.args[0], data)
186 self.start_data = data
186 self.start_data = data
187 self.state = 'running'
187 self.state = 'running'
188 return data
188 return data
189
189
190 def notify_stop(self, data):
190 def notify_stop(self, data):
191 """Call this to trigger process stop actions.
191 """Call this to trigger process stop actions.
192
192
193 This logs the process stopping and sets the state to 'after'. Call
193 This logs the process stopping and sets the state to 'after'. Call
194 this to trigger callbacks registered via :meth:`on_stop`."""
194 this to trigger callbacks registered via :meth:`on_stop`."""
195
195
196 self.log.debug('Process %r stopped: %r', self.args[0], data)
196 self.log.debug('Process %r stopped: %r', self.args[0], data)
197 self.stop_data = data
197 self.stop_data = data
198 self.state = 'after'
198 self.state = 'after'
199 for i in range(len(self.stop_callbacks)):
199 for i in range(len(self.stop_callbacks)):
200 d = self.stop_callbacks.pop()
200 d = self.stop_callbacks.pop()
201 d(data)
201 d(data)
202 return data
202 return data
203
203
204 def signal(self, sig):
204 def signal(self, sig):
205 """Signal the process.
205 """Signal the process.
206
206
207 Parameters
207 Parameters
208 ----------
208 ----------
209 sig : str or int
209 sig : str or int
210 'KILL', 'INT', etc., or any signal number
210 'KILL', 'INT', etc., or any signal number
211 """
211 """
212 raise NotImplementedError('signal must be implemented in a subclass')
212 raise NotImplementedError('signal must be implemented in a subclass')
213
213
214 class ClusterAppMixin(HasTraits):
214 class ClusterAppMixin(HasTraits):
215 """MixIn for cluster args as traits"""
215 """MixIn for cluster args as traits"""
216 profile_dir=Unicode('')
216 profile_dir=Unicode('')
217 cluster_id=Unicode('')
217 cluster_id=Unicode('')
218
218
219 @property
219 @property
220 def cluster_args(self):
220 def cluster_args(self):
221 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
221 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
222
222
223 class ControllerMixin(ClusterAppMixin):
223 class ControllerMixin(ClusterAppMixin):
224 controller_cmd = List(ipcontroller_cmd_argv, config=True,
224 controller_cmd = List(ipcontroller_cmd_argv, config=True,
225 help="""Popen command to launch ipcontroller.""")
225 help="""Popen command to launch ipcontroller.""")
226 # Command line arguments to ipcontroller.
226 # Command line arguments to ipcontroller.
227 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
227 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
228 help="""command-line args to pass to ipcontroller""")
228 help="""command-line args to pass to ipcontroller""")
229
229
230 class EngineMixin(ClusterAppMixin):
230 class EngineMixin(ClusterAppMixin):
231 engine_cmd = List(ipengine_cmd_argv, config=True,
231 engine_cmd = List(ipengine_cmd_argv, config=True,
232 help="""command to launch the Engine.""")
232 help="""command to launch the Engine.""")
233 # Command line arguments for ipengine.
233 # Command line arguments for ipengine.
234 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
234 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
235 help="command-line arguments to pass to ipengine"
235 help="command-line arguments to pass to ipengine"
236 )
236 )
237
237
238
238
239 #-----------------------------------------------------------------------------
239 #-----------------------------------------------------------------------------
240 # Local process launchers
240 # Local process launchers
241 #-----------------------------------------------------------------------------
241 #-----------------------------------------------------------------------------
242
242
243
243
244 class LocalProcessLauncher(BaseLauncher):
244 class LocalProcessLauncher(BaseLauncher):
245 """Start and stop an external process in an asynchronous manner.
245 """Start and stop an external process in an asynchronous manner.
246
246
247 This will launch the external process with a working directory of
247 This will launch the external process with a working directory of
248 ``self.work_dir``.
248 ``self.work_dir``.
249 """
249 """
250
250
251 # This is used to to construct self.args, which is passed to
251 # This is used to to construct self.args, which is passed to
252 # spawnProcess.
252 # spawnProcess.
253 cmd_and_args = List([])
253 cmd_and_args = List([])
254 poll_frequency = Integer(100) # in ms
254 poll_frequency = Integer(100) # in ms
255
255
256 def __init__(self, work_dir=u'.', config=None, **kwargs):
256 def __init__(self, work_dir=u'.', config=None, **kwargs):
257 super(LocalProcessLauncher, self).__init__(
257 super(LocalProcessLauncher, self).__init__(
258 work_dir=work_dir, config=config, **kwargs
258 work_dir=work_dir, config=config, **kwargs
259 )
259 )
260 self.process = None
260 self.process = None
261 self.poller = None
261 self.poller = None
262
262
263 def find_args(self):
263 def find_args(self):
264 return self.cmd_and_args
264 return self.cmd_and_args
265
265
266 def start(self):
266 def start(self):
267 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
267 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
268 if self.state == 'before':
268 if self.state == 'before':
269 self.process = Popen(self.args,
269 self.process = Popen(self.args,
270 stdout=PIPE,stderr=PIPE,stdin=PIPE,
270 stdout=PIPE,stderr=PIPE,stdin=PIPE,
271 env=os.environ,
271 env=os.environ,
272 cwd=self.work_dir
272 cwd=self.work_dir
273 )
273 )
274 if WINDOWS:
274 if WINDOWS:
275 self.stdout = forward_read_events(self.process.stdout)
275 self.stdout = forward_read_events(self.process.stdout)
276 self.stderr = forward_read_events(self.process.stderr)
276 self.stderr = forward_read_events(self.process.stderr)
277 else:
277 else:
278 self.stdout = self.process.stdout.fileno()
278 self.stdout = self.process.stdout.fileno()
279 self.stderr = self.process.stderr.fileno()
279 self.stderr = self.process.stderr.fileno()
280 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
280 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
281 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
281 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
282 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
282 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
283 self.poller.start()
283 self.poller.start()
284 self.notify_start(self.process.pid)
284 self.notify_start(self.process.pid)
285 else:
285 else:
286 s = 'The process was already started and has state: %r' % self.state
286 s = 'The process was already started and has state: %r' % self.state
287 raise ProcessStateError(s)
287 raise ProcessStateError(s)
288
288
289 def stop(self):
289 def stop(self):
290 return self.interrupt_then_kill()
290 return self.interrupt_then_kill()
291
291
292 def signal(self, sig):
292 def signal(self, sig):
293 if self.state == 'running':
293 if self.state == 'running':
294 if WINDOWS and sig != SIGINT:
294 if WINDOWS and sig != SIGINT:
295 # use Windows tree-kill for better child cleanup
295 # use Windows tree-kill for better child cleanup
296 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
296 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
297 else:
297 else:
298 self.process.send_signal(sig)
298 self.process.send_signal(sig)
299
299
300 def interrupt_then_kill(self, delay=2.0):
300 def interrupt_then_kill(self, delay=2.0):
301 """Send INT, wait a delay and then send KILL."""
301 """Send INT, wait a delay and then send KILL."""
302 try:
302 try:
303 self.signal(SIGINT)
303 self.signal(SIGINT)
304 except Exception:
304 except Exception:
305 self.log.debug("interrupt failed")
305 self.log.debug("interrupt failed")
306 pass
306 pass
307 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
307 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
308 self.killer.start()
308 self.killer.start()
309
309
310 # callbacks, etc:
310 # callbacks, etc:
311
311
312 def handle_stdout(self, fd, events):
312 def handle_stdout(self, fd, events):
313 if WINDOWS:
313 if WINDOWS:
314 line = self.stdout.recv()
314 line = self.stdout.recv()
315 else:
315 else:
316 line = self.process.stdout.readline()
316 line = self.process.stdout.readline()
317 # a stopped process will be readable but return empty strings
317 # a stopped process will be readable but return empty strings
318 if line:
318 if line:
319 self.log.debug(line[:-1])
319 self.log.debug(line[:-1])
320 else:
320 else:
321 self.poll()
321 self.poll()
322
322
323 def handle_stderr(self, fd, events):
323 def handle_stderr(self, fd, events):
324 if WINDOWS:
324 if WINDOWS:
325 line = self.stderr.recv()
325 line = self.stderr.recv()
326 else:
326 else:
327 line = self.process.stderr.readline()
327 line = self.process.stderr.readline()
328 # a stopped process will be readable but return empty strings
328 # a stopped process will be readable but return empty strings
329 if line:
329 if line:
330 self.log.debug(line[:-1])
330 self.log.debug(line[:-1])
331 else:
331 else:
332 self.poll()
332 self.poll()
333
333
334 def poll(self):
334 def poll(self):
335 status = self.process.poll()
335 status = self.process.poll()
336 if status is not None:
336 if status is not None:
337 self.poller.stop()
337 self.poller.stop()
338 self.loop.remove_handler(self.stdout)
338 self.loop.remove_handler(self.stdout)
339 self.loop.remove_handler(self.stderr)
339 self.loop.remove_handler(self.stderr)
340 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
340 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
341 return status
341 return status
342
342
343 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
343 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
344 """Launch a controller as a regular external process."""
344 """Launch a controller as a regular external process."""
345
345
346 def find_args(self):
346 def find_args(self):
347 return self.controller_cmd + self.cluster_args + self.controller_args
347 return self.controller_cmd + self.cluster_args + self.controller_args
348
348
349 def start(self):
349 def start(self):
350 """Start the controller by profile_dir."""
350 """Start the controller by profile_dir."""
351 return super(LocalControllerLauncher, self).start()
351 return super(LocalControllerLauncher, self).start()
352
352
353
353
354 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
354 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
355 """Launch a single engine as a regular externall process."""
355 """Launch a single engine as a regular externall process."""
356
356
357 def find_args(self):
357 def find_args(self):
358 return self.engine_cmd + self.cluster_args + self.engine_args
358 return self.engine_cmd + self.cluster_args + self.engine_args
359
359
360
360
361 class LocalEngineSetLauncher(LocalEngineLauncher):
361 class LocalEngineSetLauncher(LocalEngineLauncher):
362 """Launch a set of engines as regular external processes."""
362 """Launch a set of engines as regular external processes."""
363
363
364 delay = CFloat(0.1, config=True,
364 delay = CFloat(0.1, config=True,
365 help="""delay (in seconds) between starting each engine after the first.
365 help="""delay (in seconds) between starting each engine after the first.
366 This can help force the engines to get their ids in order, or limit
366 This can help force the engines to get their ids in order, or limit
367 process flood when starting many engines."""
367 process flood when starting many engines."""
368 )
368 )
369
369
370 # launcher class
370 # launcher class
371 launcher_class = LocalEngineLauncher
371 launcher_class = LocalEngineLauncher
372
372
373 launchers = Dict()
373 launchers = Dict()
374 stop_data = Dict()
374 stop_data = Dict()
375
375
376 def __init__(self, work_dir=u'.', config=None, **kwargs):
376 def __init__(self, work_dir=u'.', config=None, **kwargs):
377 super(LocalEngineSetLauncher, self).__init__(
377 super(LocalEngineSetLauncher, self).__init__(
378 work_dir=work_dir, config=config, **kwargs
378 work_dir=work_dir, config=config, **kwargs
379 )
379 )
380 self.stop_data = {}
380 self.stop_data = {}
381
381
382 def start(self, n):
382 def start(self, n):
383 """Start n engines by profile or profile_dir."""
383 """Start n engines by profile or profile_dir."""
384 dlist = []
384 dlist = []
385 for i in range(n):
385 for i in range(n):
386 if i > 0:
386 if i > 0:
387 time.sleep(self.delay)
387 time.sleep(self.delay)
388 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
388 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
389 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
389 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
390 )
390 )
391
391
392 # Copy the engine args over to each engine launcher.
392 # Copy the engine args over to each engine launcher.
393 el.engine_cmd = copy.deepcopy(self.engine_cmd)
393 el.engine_cmd = copy.deepcopy(self.engine_cmd)
394 el.engine_args = copy.deepcopy(self.engine_args)
394 el.engine_args = copy.deepcopy(self.engine_args)
395 el.on_stop(self._notice_engine_stopped)
395 el.on_stop(self._notice_engine_stopped)
396 d = el.start()
396 d = el.start()
397 self.launchers[i] = el
397 self.launchers[i] = el
398 dlist.append(d)
398 dlist.append(d)
399 self.notify_start(dlist)
399 self.notify_start(dlist)
400 return dlist
400 return dlist
401
401
402 def find_args(self):
402 def find_args(self):
403 return ['engine set']
403 return ['engine set']
404
404
405 def signal(self, sig):
405 def signal(self, sig):
406 dlist = []
406 dlist = []
407 for el in self.launchers.itervalues():
407 for el in self.launchers.itervalues():
408 d = el.signal(sig)
408 d = el.signal(sig)
409 dlist.append(d)
409 dlist.append(d)
410 return dlist
410 return dlist
411
411
412 def interrupt_then_kill(self, delay=1.0):
412 def interrupt_then_kill(self, delay=1.0):
413 dlist = []
413 dlist = []
414 for el in self.launchers.itervalues():
414 for el in self.launchers.itervalues():
415 d = el.interrupt_then_kill(delay)
415 d = el.interrupt_then_kill(delay)
416 dlist.append(d)
416 dlist.append(d)
417 return dlist
417 return dlist
418
418
419 def stop(self):
419 def stop(self):
420 return self.interrupt_then_kill()
420 return self.interrupt_then_kill()
421
421
422 def _notice_engine_stopped(self, data):
422 def _notice_engine_stopped(self, data):
423 pid = data['pid']
423 pid = data['pid']
424 for idx,el in self.launchers.iteritems():
424 for idx,el in self.launchers.iteritems():
425 if el.process.pid == pid:
425 if el.process.pid == pid:
426 break
426 break
427 self.launchers.pop(idx)
427 self.launchers.pop(idx)
428 self.stop_data[idx] = data
428 self.stop_data[idx] = data
429 if not self.launchers:
429 if not self.launchers:
430 self.notify_stop(self.stop_data)
430 self.notify_stop(self.stop_data)
431
431
432
432
433 #-----------------------------------------------------------------------------
433 #-----------------------------------------------------------------------------
434 # MPI launchers
434 # MPI launchers
435 #-----------------------------------------------------------------------------
435 #-----------------------------------------------------------------------------
436
436
437
437
438 class MPILauncher(LocalProcessLauncher):
438 class MPILauncher(LocalProcessLauncher):
439 """Launch an external process using mpiexec."""
439 """Launch an external process using mpiexec."""
440
440
441 mpi_cmd = List(['mpiexec'], config=True,
441 mpi_cmd = List(['mpiexec'], config=True,
442 help="The mpiexec command to use in starting the process."
442 help="The mpiexec command to use in starting the process."
443 )
443 )
444 mpi_args = List([], config=True,
444 mpi_args = List([], config=True,
445 help="The command line arguments to pass to mpiexec."
445 help="The command line arguments to pass to mpiexec."
446 )
446 )
447 program = List(['date'],
447 program = List(['date'],
448 help="The program to start via mpiexec.")
448 help="The program to start via mpiexec.")
449 program_args = List([],
449 program_args = List([],
450 help="The command line argument to the program."
450 help="The command line argument to the program."
451 )
451 )
452 n = Integer(1)
452 n = Integer(1)
453
453
454 def __init__(self, *args, **kwargs):
454 def __init__(self, *args, **kwargs):
455 # deprecation for old MPIExec names:
455 # deprecation for old MPIExec names:
456 config = kwargs.get('config', {})
456 config = kwargs.get('config', {})
457 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
457 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
458 deprecated = config.get(oldname)
458 deprecated = config.get(oldname)
459 if deprecated:
459 if deprecated:
460 newname = oldname.replace('MPIExec', 'MPI')
460 newname = oldname.replace('MPIExec', 'MPI')
461 config[newname].update(deprecated)
461 config[newname].update(deprecated)
462 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
462 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
463
463
464 super(MPILauncher, self).__init__(*args, **kwargs)
464 super(MPILauncher, self).__init__(*args, **kwargs)
465
465
466 def find_args(self):
466 def find_args(self):
467 """Build self.args using all the fields."""
467 """Build self.args using all the fields."""
468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
469 self.program + self.program_args
469 self.program + self.program_args
470
470
471 def start(self, n):
471 def start(self, n):
472 """Start n instances of the program using mpiexec."""
472 """Start n instances of the program using mpiexec."""
473 self.n = n
473 self.n = n
474 return super(MPILauncher, self).start()
474 return super(MPILauncher, self).start()
475
475
476
476
477 class MPIControllerLauncher(MPILauncher, ControllerMixin):
477 class MPIControllerLauncher(MPILauncher, ControllerMixin):
478 """Launch a controller using mpiexec."""
478 """Launch a controller using mpiexec."""
479
479
480 # alias back to *non-configurable* program[_args] for use in find_args()
480 # alias back to *non-configurable* program[_args] for use in find_args()
481 # this way all Controller/EngineSetLaunchers have the same form, rather
481 # this way all Controller/EngineSetLaunchers have the same form, rather
482 # than *some* having `program_args` and others `controller_args`
482 # than *some* having `program_args` and others `controller_args`
483 @property
483 @property
484 def program(self):
484 def program(self):
485 return self.controller_cmd
485 return self.controller_cmd
486
486
487 @property
487 @property
488 def program_args(self):
488 def program_args(self):
489 return self.cluster_args + self.controller_args
489 return self.cluster_args + self.controller_args
490
490
491 def start(self):
491 def start(self):
492 """Start the controller by profile_dir."""
492 """Start the controller by profile_dir."""
493 return super(MPIControllerLauncher, self).start(1)
493 return super(MPIControllerLauncher, self).start(1)
494
494
495
495
496 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
496 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
497 """Launch engines using mpiexec"""
497 """Launch engines using mpiexec"""
498
498
499 # alias back to *non-configurable* program[_args] for use in find_args()
499 # alias back to *non-configurable* program[_args] for use in find_args()
500 # this way all Controller/EngineSetLaunchers have the same form, rather
500 # this way all Controller/EngineSetLaunchers have the same form, rather
501 # than *some* having `program_args` and others `controller_args`
501 # than *some* having `program_args` and others `controller_args`
502 @property
502 @property
503 def program(self):
503 def program(self):
504 return self.engine_cmd
504 return self.engine_cmd
505
505
506 @property
506 @property
507 def program_args(self):
507 def program_args(self):
508 return self.cluster_args + self.engine_args
508 return self.cluster_args + self.engine_args
509
509
510 def start(self, n):
510 def start(self, n):
511 """Start n engines by profile or profile_dir."""
511 """Start n engines by profile or profile_dir."""
512 self.n = n
512 self.n = n
513 return super(MPIEngineSetLauncher, self).start(n)
513 return super(MPIEngineSetLauncher, self).start(n)
514
514
515 # deprecated MPIExec names
515 # deprecated MPIExec names
516 class DeprecatedMPILauncher(object):
516 class DeprecatedMPILauncher(object):
517 def warn(self):
517 def warn(self):
518 oldname = self.__class__.__name__
518 oldname = self.__class__.__name__
519 newname = oldname.replace('MPIExec', 'MPI')
519 newname = oldname.replace('MPIExec', 'MPI')
520 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
520 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
521
521
522 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
522 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
523 """Deprecated, use MPILauncher"""
523 """Deprecated, use MPILauncher"""
524 def __init__(self, *args, **kwargs):
524 def __init__(self, *args, **kwargs):
525 super(MPIExecLauncher, self).__init__(*args, **kwargs)
525 super(MPIExecLauncher, self).__init__(*args, **kwargs)
526 self.warn()
526 self.warn()
527
527
528 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
528 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
529 """Deprecated, use MPIControllerLauncher"""
529 """Deprecated, use MPIControllerLauncher"""
530 def __init__(self, *args, **kwargs):
530 def __init__(self, *args, **kwargs):
531 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
531 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
532 self.warn()
532 self.warn()
533
533
534 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
534 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
535 """Deprecated, use MPIEngineSetLauncher"""
535 """Deprecated, use MPIEngineSetLauncher"""
536 def __init__(self, *args, **kwargs):
536 def __init__(self, *args, **kwargs):
537 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
537 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
538 self.warn()
538 self.warn()
539
539
540
540
541 #-----------------------------------------------------------------------------
541 #-----------------------------------------------------------------------------
542 # SSH launchers
542 # SSH launchers
543 #-----------------------------------------------------------------------------
543 #-----------------------------------------------------------------------------
544
544
545 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
545 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
546
546
547 class SSHLauncher(LocalProcessLauncher):
547 class SSHLauncher(LocalProcessLauncher):
548 """A minimal launcher for ssh.
548 """A minimal launcher for ssh.
549
549
550 To be useful this will probably have to be extended to use the ``sshx``
550 To be useful this will probably have to be extended to use the ``sshx``
551 idea for environment variables. There could be other things this needs
551 idea for environment variables. There could be other things this needs
552 as well.
552 as well.
553 """
553 """
554
554
555 ssh_cmd = List(['ssh'], config=True,
555 ssh_cmd = List(['ssh'], config=True,
556 help="command for starting ssh")
556 help="command for starting ssh")
557 ssh_args = List(['-tt'], config=True,
557 ssh_args = List(['-tt'], config=True,
558 help="args to pass to ssh")
558 help="args to pass to ssh")
559 scp_cmd = List(['scp'], config=True,
559 scp_cmd = List(['scp'], config=True,
560 help="command for sending files")
560 help="command for sending files")
561 program = List(['date'],
561 program = List(['date'],
562 help="Program to launch via ssh")
562 help="Program to launch via ssh")
563 program_args = List([],
563 program_args = List([],
564 help="args to pass to remote program")
564 help="args to pass to remote program")
565 hostname = Unicode('', config=True,
565 hostname = Unicode('', config=True,
566 help="hostname on which to launch the program")
566 help="hostname on which to launch the program")
567 user = Unicode('', config=True,
567 user = Unicode('', config=True,
568 help="username for ssh")
568 help="username for ssh")
569 location = Unicode('', config=True,
569 location = Unicode('', config=True,
570 help="user@hostname location for ssh in one setting")
570 help="user@hostname location for ssh in one setting")
571 to_fetch = List([], config=True,
571 to_fetch = List([], config=True,
572 help="List of (remote, local) files to fetch after starting")
572 help="List of (remote, local) files to fetch after starting")
573 to_send = List([], config=True,
573 to_send = List([], config=True,
574 help="List of (local, remote) files to send before starting")
574 help="List of (local, remote) files to send before starting")
575
575
576 def _hostname_changed(self, name, old, new):
576 def _hostname_changed(self, name, old, new):
577 if self.user:
577 if self.user:
578 self.location = u'%s@%s' % (self.user, new)
578 self.location = u'%s@%s' % (self.user, new)
579 else:
579 else:
580 self.location = new
580 self.location = new
581
581
582 def _user_changed(self, name, old, new):
582 def _user_changed(self, name, old, new):
583 self.location = u'%s@%s' % (new, self.hostname)
583 self.location = u'%s@%s' % (new, self.hostname)
584
584
585 def find_args(self):
585 def find_args(self):
586 return self.ssh_cmd + self.ssh_args + [self.location] + \
586 return self.ssh_cmd + self.ssh_args + [self.location] + \
587 self.program + self.program_args
587 list(map(pipes.quote, self.program + self.program_args))
588
588
589 def _send_file(self, local, remote):
589 def _send_file(self, local, remote):
590 """send a single file"""
590 """send a single file"""
591 remote = "%s:%s" % (self.location, remote)
591 remote = "%s:%s" % (self.location, remote)
592 for i in range(10):
592 for i in range(10):
593 if not os.path.exists(local):
593 if not os.path.exists(local):
594 self.log.debug("waiting for %s" % local)
594 self.log.debug("waiting for %s" % local)
595 time.sleep(1)
595 time.sleep(1)
596 else:
596 else:
597 break
597 break
598 self.log.info("sending %s to %s", local, remote)
598 self.log.info("sending %s to %s", local, remote)
599 check_output(self.scp_cmd + [local, remote])
599 check_output(self.scp_cmd + [local, remote])
600
600
601 def send_files(self):
601 def send_files(self):
602 """send our files (called before start)"""
602 """send our files (called before start)"""
603 if not self.to_send:
603 if not self.to_send:
604 return
604 return
605 for local_file, remote_file in self.to_send:
605 for local_file, remote_file in self.to_send:
606 self._send_file(local_file, remote_file)
606 self._send_file(local_file, remote_file)
607
607
608 def _fetch_file(self, remote, local):
608 def _fetch_file(self, remote, local):
609 """fetch a single file"""
609 """fetch a single file"""
610 full_remote = "%s:%s" % (self.location, remote)
610 full_remote = "%s:%s" % (self.location, remote)
611 self.log.info("fetching %s from %s", local, full_remote)
611 self.log.info("fetching %s from %s", local, full_remote)
612 for i in range(10):
612 for i in range(10):
613 # wait up to 10s for remote file to exist
613 # wait up to 10s for remote file to exist
614 check = check_output(self.ssh_cmd + self.ssh_args + \
614 check = check_output(self.ssh_cmd + self.ssh_args + \
615 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
615 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
616 check = check.strip()
616 check = check.strip()
617 if check == 'no':
617 if check == 'no':
618 time.sleep(1)
618 time.sleep(1)
619 elif check == 'yes':
619 elif check == 'yes':
620 break
620 break
621 check_output(self.scp_cmd + [full_remote, local])
621 check_output(self.scp_cmd + [full_remote, local])
622
622
623 def fetch_files(self):
623 def fetch_files(self):
624 """fetch remote files (called after start)"""
624 """fetch remote files (called after start)"""
625 if not self.to_fetch:
625 if not self.to_fetch:
626 return
626 return
627 for remote_file, local_file in self.to_fetch:
627 for remote_file, local_file in self.to_fetch:
628 self._fetch_file(remote_file, local_file)
628 self._fetch_file(remote_file, local_file)
629
629
630 def start(self, hostname=None, user=None):
630 def start(self, hostname=None, user=None):
631 if hostname is not None:
631 if hostname is not None:
632 self.hostname = hostname
632 self.hostname = hostname
633 if user is not None:
633 if user is not None:
634 self.user = user
634 self.user = user
635
635
636 self.send_files()
636 self.send_files()
637 super(SSHLauncher, self).start()
637 super(SSHLauncher, self).start()
638 self.fetch_files()
638 self.fetch_files()
639
639
640 def signal(self, sig):
640 def signal(self, sig):
641 if self.state == 'running':
641 if self.state == 'running':
642 # send escaped ssh connection-closer
642 # send escaped ssh connection-closer
643 self.process.stdin.write('~.')
643 self.process.stdin.write('~.')
644 self.process.stdin.flush()
644 self.process.stdin.flush()
645
645
646 class SSHClusterLauncher(SSHLauncher):
646 class SSHClusterLauncher(SSHLauncher):
647
647
648 remote_profile_dir = Unicode('', config=True,
648 remote_profile_dir = Unicode('', config=True,
649 help="""The remote profile_dir to use.
649 help="""The remote profile_dir to use.
650
650
651 If not specified, use calling profile, stripping out possible leading homedir.
651 If not specified, use calling profile, stripping out possible leading homedir.
652 """)
652 """)
653
653
654 def _remote_profile_dir_default(self):
654 def _remote_profile_dir_default(self):
655 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
655 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
656 """
656 """
657 home = get_home_dir()
657 home = get_home_dir()
658 if not home.endswith('/'):
658 if not home.endswith('/'):
659 home = home+'/'
659 home = home+'/'
660
660
661 if self.profile_dir.startswith(home):
661 if self.profile_dir.startswith(home):
662 return self.profile_dir[len(home):]
662 return self.profile_dir[len(home):]
663 else:
663 else:
664 return self.profile_dir
664 return self.profile_dir
665
665
666 def _cluster_id_changed(self, name, old, new):
666 def _cluster_id_changed(self, name, old, new):
667 if new:
667 if new:
668 raise ValueError("cluster id not supported by SSH launchers")
668 raise ValueError("cluster id not supported by SSH launchers")
669
669
670 @property
670 @property
671 def cluster_args(self):
671 def cluster_args(self):
672 return ['--profile-dir', self.remote_profile_dir]
672 return ['--profile-dir', self.remote_profile_dir]
673
673
674 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
674 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
675
675
676 # alias back to *non-configurable* program[_args] for use in find_args()
676 # alias back to *non-configurable* program[_args] for use in find_args()
677 # this way all Controller/EngineSetLaunchers have the same form, rather
677 # this way all Controller/EngineSetLaunchers have the same form, rather
678 # than *some* having `program_args` and others `controller_args`
678 # than *some* having `program_args` and others `controller_args`
679
679
680 def _controller_cmd_default(self):
680 def _controller_cmd_default(self):
681 return ['ipcontroller']
681 return ['ipcontroller']
682
682
683 @property
683 @property
684 def program(self):
684 def program(self):
685 return self.controller_cmd
685 return self.controller_cmd
686
686
687 @property
687 @property
688 def program_args(self):
688 def program_args(self):
689 return self.cluster_args + self.controller_args
689 return self.cluster_args + self.controller_args
690
690
691 def _to_fetch_default(self):
691 def _to_fetch_default(self):
692 return [
692 return [
693 (os.path.join(self.remote_profile_dir, 'security', cf),
693 (os.path.join(self.remote_profile_dir, 'security', cf),
694 os.path.join(self.profile_dir, 'security', cf),)
694 os.path.join(self.profile_dir, 'security', cf),)
695 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
695 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
696 ]
696 ]
697
697
698 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
698 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
699
699
700 # alias back to *non-configurable* program[_args] for use in find_args()
700 # alias back to *non-configurable* program[_args] for use in find_args()
701 # this way all Controller/EngineSetLaunchers have the same form, rather
701 # this way all Controller/EngineSetLaunchers have the same form, rather
702 # than *some* having `program_args` and others `controller_args`
702 # than *some* having `program_args` and others `controller_args`
703
703
704 def _engine_cmd_default(self):
704 def _engine_cmd_default(self):
705 return ['ipengine']
705 return ['ipengine']
706
706
707 @property
707 @property
708 def program(self):
708 def program(self):
709 return self.engine_cmd
709 return self.engine_cmd
710
710
711 @property
711 @property
712 def program_args(self):
712 def program_args(self):
713 return self.cluster_args + self.engine_args
713 return self.cluster_args + self.engine_args
714
714
715 def _to_send_default(self):
715 def _to_send_default(self):
716 return [
716 return [
717 (os.path.join(self.profile_dir, 'security', cf),
717 (os.path.join(self.profile_dir, 'security', cf),
718 os.path.join(self.remote_profile_dir, 'security', cf))
718 os.path.join(self.remote_profile_dir, 'security', cf))
719 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
719 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
720 ]
720 ]
721
721
722
722
723 class SSHEngineSetLauncher(LocalEngineSetLauncher):
723 class SSHEngineSetLauncher(LocalEngineSetLauncher):
724 launcher_class = SSHEngineLauncher
724 launcher_class = SSHEngineLauncher
725 engines = Dict(config=True,
725 engines = Dict(config=True,
726 help="""dict of engines to launch. This is a dict by hostname of ints,
726 help="""dict of engines to launch. This is a dict by hostname of ints,
727 corresponding to the number of engines to start on that host.""")
727 corresponding to the number of engines to start on that host.""")
728
728
729 def _engine_cmd_default(self):
730 return ['ipengine']
731
729 @property
732 @property
730 def engine_count(self):
733 def engine_count(self):
731 """determine engine count from `engines` dict"""
734 """determine engine count from `engines` dict"""
732 count = 0
735 count = 0
733 for n in self.engines.itervalues():
736 for n in self.engines.itervalues():
734 if isinstance(n, (tuple,list)):
737 if isinstance(n, (tuple,list)):
735 n,args = n
738 n,args = n
736 count += n
739 count += n
737 return count
740 return count
738
741
739 def start(self, n):
742 def start(self, n):
740 """Start engines by profile or profile_dir.
743 """Start engines by profile or profile_dir.
741 `n` is ignored, and the `engines` config property is used instead.
744 `n` is ignored, and the `engines` config property is used instead.
742 """
745 """
743
746
744 dlist = []
747 dlist = []
745 for host, n in self.engines.iteritems():
748 for host, n in self.engines.iteritems():
746 if isinstance(n, (tuple, list)):
749 if isinstance(n, (tuple, list)):
747 n, args = n
750 n, args = n
748 else:
751 else:
749 args = copy.deepcopy(self.engine_args)
752 args = copy.deepcopy(self.engine_args)
750
753
751 if '@' in host:
754 if '@' in host:
752 user,host = host.split('@',1)
755 user,host = host.split('@',1)
753 else:
756 else:
754 user=None
757 user=None
755 for i in range(n):
758 for i in range(n):
756 if i > 0:
759 if i > 0:
757 time.sleep(self.delay)
760 time.sleep(self.delay)
758 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
761 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
759 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
762 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
760 )
763 )
761 if i > 0:
764 if i > 0:
762 # only send files for the first engine on each host
765 # only send files for the first engine on each host
763 el.to_send = []
766 el.to_send = []
764
767
765 # Copy the engine args over to each engine launcher.
768 # Copy the engine args over to each engine launcher.
766 el.engine_cmd = self.engine_cmd
769 el.engine_cmd = self.engine_cmd
767 el.engine_args = args
770 el.engine_args = args
768 el.on_stop(self._notice_engine_stopped)
771 el.on_stop(self._notice_engine_stopped)
769 d = el.start(user=user, hostname=host)
772 d = el.start(user=user, hostname=host)
770 self.launchers[ "%s/%i" % (host,i) ] = el
773 self.launchers[ "%s/%i" % (host,i) ] = el
771 dlist.append(d)
774 dlist.append(d)
772 self.notify_start(dlist)
775 self.notify_start(dlist)
773 return dlist
776 return dlist
774
777
775
778
776 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
779 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
777 """Launcher for calling
780 """Launcher for calling
778 `ipcluster engines` on a remote machine.
781 `ipcluster engines` on a remote machine.
779
782
780 Requires that remote profile is already configured.
783 Requires that remote profile is already configured.
781 """
784 """
782
785
783 n = Integer()
786 n = Integer()
784 ipcluster_cmd = List(['ipcluster'], config=True)
787 ipcluster_cmd = List(['ipcluster'], config=True)
785
788
786 @property
789 @property
787 def program(self):
790 def program(self):
788 return self.ipcluster_cmd + ['engines']
791 return self.ipcluster_cmd + ['engines']
789
792
790 @property
793 @property
791 def program_args(self):
794 def program_args(self):
792 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
795 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
793
796
794 def _to_send_default(self):
797 def _to_send_default(self):
795 return [
798 return [
796 (os.path.join(self.profile_dir, 'security', cf),
799 (os.path.join(self.profile_dir, 'security', cf),
797 os.path.join(self.remote_profile_dir, 'security', cf))
800 os.path.join(self.remote_profile_dir, 'security', cf))
798 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
801 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
799 ]
802 ]
800
803
801 def start(self, n):
804 def start(self, n):
802 self.n = n
805 self.n = n
803 super(SSHProxyEngineSetLauncher, self).start()
806 super(SSHProxyEngineSetLauncher, self).start()
804
807
805
808
806 #-----------------------------------------------------------------------------
809 #-----------------------------------------------------------------------------
807 # Windows HPC Server 2008 scheduler launchers
810 # Windows HPC Server 2008 scheduler launchers
808 #-----------------------------------------------------------------------------
811 #-----------------------------------------------------------------------------
809
812
810
813
811 # This is only used on Windows.
814 # This is only used on Windows.
812 def find_job_cmd():
815 def find_job_cmd():
813 if WINDOWS:
816 if WINDOWS:
814 try:
817 try:
815 return find_cmd('job')
818 return find_cmd('job')
816 except (FindCmdError, ImportError):
819 except (FindCmdError, ImportError):
817 # ImportError will be raised if win32api is not installed
820 # ImportError will be raised if win32api is not installed
818 return 'job'
821 return 'job'
819 else:
822 else:
820 return 'job'
823 return 'job'
821
824
822
825
823 class WindowsHPCLauncher(BaseLauncher):
826 class WindowsHPCLauncher(BaseLauncher):
824
827
825 job_id_regexp = CRegExp(r'\d+', config=True,
828 job_id_regexp = CRegExp(r'\d+', config=True,
826 help="""A regular expression used to get the job id from the output of the
829 help="""A regular expression used to get the job id from the output of the
827 submit_command. """
830 submit_command. """
828 )
831 )
829 job_file_name = Unicode(u'ipython_job.xml', config=True,
832 job_file_name = Unicode(u'ipython_job.xml', config=True,
830 help="The filename of the instantiated job script.")
833 help="The filename of the instantiated job script.")
831 # The full path to the instantiated job script. This gets made dynamically
834 # The full path to the instantiated job script. This gets made dynamically
832 # by combining the work_dir with the job_file_name.
835 # by combining the work_dir with the job_file_name.
833 job_file = Unicode(u'')
836 job_file = Unicode(u'')
834 scheduler = Unicode('', config=True,
837 scheduler = Unicode('', config=True,
835 help="The hostname of the scheduler to submit the job to.")
838 help="The hostname of the scheduler to submit the job to.")
836 job_cmd = Unicode(find_job_cmd(), config=True,
839 job_cmd = Unicode(find_job_cmd(), config=True,
837 help="The command for submitting jobs.")
840 help="The command for submitting jobs.")
838
841
839 def __init__(self, work_dir=u'.', config=None, **kwargs):
842 def __init__(self, work_dir=u'.', config=None, **kwargs):
840 super(WindowsHPCLauncher, self).__init__(
843 super(WindowsHPCLauncher, self).__init__(
841 work_dir=work_dir, config=config, **kwargs
844 work_dir=work_dir, config=config, **kwargs
842 )
845 )
843
846
844 @property
847 @property
845 def job_file(self):
848 def job_file(self):
846 return os.path.join(self.work_dir, self.job_file_name)
849 return os.path.join(self.work_dir, self.job_file_name)
847
850
848 def write_job_file(self, n):
851 def write_job_file(self, n):
849 raise NotImplementedError("Implement write_job_file in a subclass.")
852 raise NotImplementedError("Implement write_job_file in a subclass.")
850
853
851 def find_args(self):
854 def find_args(self):
852 return [u'job.exe']
855 return [u'job.exe']
853
856
854 def parse_job_id(self, output):
857 def parse_job_id(self, output):
855 """Take the output of the submit command and return the job id."""
858 """Take the output of the submit command and return the job id."""
856 m = self.job_id_regexp.search(output)
859 m = self.job_id_regexp.search(output)
857 if m is not None:
860 if m is not None:
858 job_id = m.group()
861 job_id = m.group()
859 else:
862 else:
860 raise LauncherError("Job id couldn't be determined: %s" % output)
863 raise LauncherError("Job id couldn't be determined: %s" % output)
861 self.job_id = job_id
864 self.job_id = job_id
862 self.log.info('Job started with id: %r', job_id)
865 self.log.info('Job started with id: %r', job_id)
863 return job_id
866 return job_id
864
867
865 def start(self, n):
868 def start(self, n):
866 """Start n copies of the process using the Win HPC job scheduler."""
869 """Start n copies of the process using the Win HPC job scheduler."""
867 self.write_job_file(n)
870 self.write_job_file(n)
868 args = [
871 args = [
869 'submit',
872 'submit',
870 '/jobfile:%s' % self.job_file,
873 '/jobfile:%s' % self.job_file,
871 '/scheduler:%s' % self.scheduler
874 '/scheduler:%s' % self.scheduler
872 ]
875 ]
873 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
876 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
874
877
875 output = check_output([self.job_cmd]+args,
878 output = check_output([self.job_cmd]+args,
876 env=os.environ,
879 env=os.environ,
877 cwd=self.work_dir,
880 cwd=self.work_dir,
878 stderr=STDOUT
881 stderr=STDOUT
879 )
882 )
880 job_id = self.parse_job_id(output)
883 job_id = self.parse_job_id(output)
881 self.notify_start(job_id)
884 self.notify_start(job_id)
882 return job_id
885 return job_id
883
886
884 def stop(self):
887 def stop(self):
885 args = [
888 args = [
886 'cancel',
889 'cancel',
887 self.job_id,
890 self.job_id,
888 '/scheduler:%s' % self.scheduler
891 '/scheduler:%s' % self.scheduler
889 ]
892 ]
890 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
893 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
891 try:
894 try:
892 output = check_output([self.job_cmd]+args,
895 output = check_output([self.job_cmd]+args,
893 env=os.environ,
896 env=os.environ,
894 cwd=self.work_dir,
897 cwd=self.work_dir,
895 stderr=STDOUT
898 stderr=STDOUT
896 )
899 )
897 except:
900 except:
898 output = 'The job already appears to be stoppped: %r' % self.job_id
901 output = 'The job already appears to be stoppped: %r' % self.job_id
899 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
902 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
900 return output
903 return output
901
904
902
905
903 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
906 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
904
907
905 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
908 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
906 help="WinHPC xml job file.")
909 help="WinHPC xml job file.")
907 controller_args = List([], config=False,
910 controller_args = List([], config=False,
908 help="extra args to pass to ipcontroller")
911 help="extra args to pass to ipcontroller")
909
912
910 def write_job_file(self, n):
913 def write_job_file(self, n):
911 job = IPControllerJob(config=self.config)
914 job = IPControllerJob(config=self.config)
912
915
913 t = IPControllerTask(config=self.config)
916 t = IPControllerTask(config=self.config)
914 # The tasks work directory is *not* the actual work directory of
917 # The tasks work directory is *not* the actual work directory of
915 # the controller. It is used as the base path for the stdout/stderr
918 # the controller. It is used as the base path for the stdout/stderr
916 # files that the scheduler redirects to.
919 # files that the scheduler redirects to.
917 t.work_directory = self.profile_dir
920 t.work_directory = self.profile_dir
918 # Add the profile_dir and from self.start().
921 # Add the profile_dir and from self.start().
919 t.controller_args.extend(self.cluster_args)
922 t.controller_args.extend(self.cluster_args)
920 t.controller_args.extend(self.controller_args)
923 t.controller_args.extend(self.controller_args)
921 job.add_task(t)
924 job.add_task(t)
922
925
923 self.log.debug("Writing job description file: %s", self.job_file)
926 self.log.debug("Writing job description file: %s", self.job_file)
924 job.write(self.job_file)
927 job.write(self.job_file)
925
928
926 @property
929 @property
927 def job_file(self):
930 def job_file(self):
928 return os.path.join(self.profile_dir, self.job_file_name)
931 return os.path.join(self.profile_dir, self.job_file_name)
929
932
930 def start(self):
933 def start(self):
931 """Start the controller by profile_dir."""
934 """Start the controller by profile_dir."""
932 return super(WindowsHPCControllerLauncher, self).start(1)
935 return super(WindowsHPCControllerLauncher, self).start(1)
933
936
934
937
935 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
938 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
936
939
937 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
940 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
938 help="jobfile for ipengines job")
941 help="jobfile for ipengines job")
939 engine_args = List([], config=False,
942 engine_args = List([], config=False,
940 help="extra args to pas to ipengine")
943 help="extra args to pas to ipengine")
941
944
942 def write_job_file(self, n):
945 def write_job_file(self, n):
943 job = IPEngineSetJob(config=self.config)
946 job = IPEngineSetJob(config=self.config)
944
947
945 for i in range(n):
948 for i in range(n):
946 t = IPEngineTask(config=self.config)
949 t = IPEngineTask(config=self.config)
947 # The tasks work directory is *not* the actual work directory of
950 # The tasks work directory is *not* the actual work directory of
948 # the engine. It is used as the base path for the stdout/stderr
951 # the engine. It is used as the base path for the stdout/stderr
949 # files that the scheduler redirects to.
952 # files that the scheduler redirects to.
950 t.work_directory = self.profile_dir
953 t.work_directory = self.profile_dir
951 # Add the profile_dir and from self.start().
954 # Add the profile_dir and from self.start().
952 t.engine_args.extend(self.cluster_args)
955 t.engine_args.extend(self.cluster_args)
953 t.engine_args.extend(self.engine_args)
956 t.engine_args.extend(self.engine_args)
954 job.add_task(t)
957 job.add_task(t)
955
958
956 self.log.debug("Writing job description file: %s", self.job_file)
959 self.log.debug("Writing job description file: %s", self.job_file)
957 job.write(self.job_file)
960 job.write(self.job_file)
958
961
959 @property
962 @property
960 def job_file(self):
963 def job_file(self):
961 return os.path.join(self.profile_dir, self.job_file_name)
964 return os.path.join(self.profile_dir, self.job_file_name)
962
965
963 def start(self, n):
966 def start(self, n):
964 """Start the controller by profile_dir."""
967 """Start the controller by profile_dir."""
965 return super(WindowsHPCEngineSetLauncher, self).start(n)
968 return super(WindowsHPCEngineSetLauncher, self).start(n)
966
969
967
970
968 #-----------------------------------------------------------------------------
971 #-----------------------------------------------------------------------------
969 # Batch (PBS) system launchers
972 # Batch (PBS) system launchers
970 #-----------------------------------------------------------------------------
973 #-----------------------------------------------------------------------------
971
974
972 class BatchClusterAppMixin(ClusterAppMixin):
975 class BatchClusterAppMixin(ClusterAppMixin):
973 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
976 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
974 def _profile_dir_changed(self, name, old, new):
977 def _profile_dir_changed(self, name, old, new):
975 self.context[name] = new
978 self.context[name] = new
976 _cluster_id_changed = _profile_dir_changed
979 _cluster_id_changed = _profile_dir_changed
977
980
978 def _profile_dir_default(self):
981 def _profile_dir_default(self):
979 self.context['profile_dir'] = ''
982 self.context['profile_dir'] = ''
980 return ''
983 return ''
981 def _cluster_id_default(self):
984 def _cluster_id_default(self):
982 self.context['cluster_id'] = ''
985 self.context['cluster_id'] = ''
983 return ''
986 return ''
984
987
985
988
986 class BatchSystemLauncher(BaseLauncher):
989 class BatchSystemLauncher(BaseLauncher):
987 """Launch an external process using a batch system.
990 """Launch an external process using a batch system.
988
991
989 This class is designed to work with UNIX batch systems like PBS, LSF,
992 This class is designed to work with UNIX batch systems like PBS, LSF,
990 GridEngine, etc. The overall model is that there are different commands
993 GridEngine, etc. The overall model is that there are different commands
991 like qsub, qdel, etc. that handle the starting and stopping of the process.
994 like qsub, qdel, etc. that handle the starting and stopping of the process.
992
995
993 This class also has the notion of a batch script. The ``batch_template``
996 This class also has the notion of a batch script. The ``batch_template``
994 attribute can be set to a string that is a template for the batch script.
997 attribute can be set to a string that is a template for the batch script.
995 This template is instantiated using string formatting. Thus the template can
998 This template is instantiated using string formatting. Thus the template can
996 use {n} fot the number of instances. Subclasses can add additional variables
999 use {n} fot the number of instances. Subclasses can add additional variables
997 to the template dict.
1000 to the template dict.
998 """
1001 """
999
1002
1000 # Subclasses must fill these in. See PBSEngineSet
1003 # Subclasses must fill these in. See PBSEngineSet
1001 submit_command = List([''], config=True,
1004 submit_command = List([''], config=True,
1002 help="The name of the command line program used to submit jobs.")
1005 help="The name of the command line program used to submit jobs.")
1003 delete_command = List([''], config=True,
1006 delete_command = List([''], config=True,
1004 help="The name of the command line program used to delete jobs.")
1007 help="The name of the command line program used to delete jobs.")
1005 job_id_regexp = CRegExp('', config=True,
1008 job_id_regexp = CRegExp('', config=True,
1006 help="""A regular expression used to get the job id from the output of the
1009 help="""A regular expression used to get the job id from the output of the
1007 submit_command.""")
1010 submit_command.""")
1008 batch_template = Unicode('', config=True,
1011 batch_template = Unicode('', config=True,
1009 help="The string that is the batch script template itself.")
1012 help="The string that is the batch script template itself.")
1010 batch_template_file = Unicode(u'', config=True,
1013 batch_template_file = Unicode(u'', config=True,
1011 help="The file that contains the batch template.")
1014 help="The file that contains the batch template.")
1012 batch_file_name = Unicode(u'batch_script', config=True,
1015 batch_file_name = Unicode(u'batch_script', config=True,
1013 help="The filename of the instantiated batch script.")
1016 help="The filename of the instantiated batch script.")
1014 queue = Unicode(u'', config=True,
1017 queue = Unicode(u'', config=True,
1015 help="The PBS Queue.")
1018 help="The PBS Queue.")
1016
1019
1017 def _queue_changed(self, name, old, new):
1020 def _queue_changed(self, name, old, new):
1018 self.context[name] = new
1021 self.context[name] = new
1019
1022
1020 n = Integer(1)
1023 n = Integer(1)
1021 _n_changed = _queue_changed
1024 _n_changed = _queue_changed
1022
1025
1023 # not configurable, override in subclasses
1026 # not configurable, override in subclasses
1024 # PBS Job Array regex
1027 # PBS Job Array regex
1025 job_array_regexp = CRegExp('')
1028 job_array_regexp = CRegExp('')
1026 job_array_template = Unicode('')
1029 job_array_template = Unicode('')
1027 # PBS Queue regex
1030 # PBS Queue regex
1028 queue_regexp = CRegExp('')
1031 queue_regexp = CRegExp('')
1029 queue_template = Unicode('')
1032 queue_template = Unicode('')
1030 # The default batch template, override in subclasses
1033 # The default batch template, override in subclasses
1031 default_template = Unicode('')
1034 default_template = Unicode('')
1032 # The full path to the instantiated batch script.
1035 # The full path to the instantiated batch script.
1033 batch_file = Unicode(u'')
1036 batch_file = Unicode(u'')
1034 # the format dict used with batch_template:
1037 # the format dict used with batch_template:
1035 context = Dict()
1038 context = Dict()
1036 def _context_default(self):
1039 def _context_default(self):
1037 """load the default context with the default values for the basic keys
1040 """load the default context with the default values for the basic keys
1038
1041
1039 because the _trait_changed methods only load the context if they
1042 because the _trait_changed methods only load the context if they
1040 are set to something other than the default value.
1043 are set to something other than the default value.
1041 """
1044 """
1042 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1045 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1043
1046
1044 # the Formatter instance for rendering the templates:
1047 # the Formatter instance for rendering the templates:
1045 formatter = Instance(EvalFormatter, (), {})
1048 formatter = Instance(EvalFormatter, (), {})
1046
1049
1047
1050
1048 def find_args(self):
1051 def find_args(self):
1049 return self.submit_command + [self.batch_file]
1052 return self.submit_command + [self.batch_file]
1050
1053
1051 def __init__(self, work_dir=u'.', config=None, **kwargs):
1054 def __init__(self, work_dir=u'.', config=None, **kwargs):
1052 super(BatchSystemLauncher, self).__init__(
1055 super(BatchSystemLauncher, self).__init__(
1053 work_dir=work_dir, config=config, **kwargs
1056 work_dir=work_dir, config=config, **kwargs
1054 )
1057 )
1055 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1058 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1056
1059
1057 def parse_job_id(self, output):
1060 def parse_job_id(self, output):
1058 """Take the output of the submit command and return the job id."""
1061 """Take the output of the submit command and return the job id."""
1059 m = self.job_id_regexp.search(output)
1062 m = self.job_id_regexp.search(output)
1060 if m is not None:
1063 if m is not None:
1061 job_id = m.group()
1064 job_id = m.group()
1062 else:
1065 else:
1063 raise LauncherError("Job id couldn't be determined: %s" % output)
1066 raise LauncherError("Job id couldn't be determined: %s" % output)
1064 self.job_id = job_id
1067 self.job_id = job_id
1065 self.log.info('Job submitted with job id: %r', job_id)
1068 self.log.info('Job submitted with job id: %r', job_id)
1066 return job_id
1069 return job_id
1067
1070
1068 def write_batch_script(self, n):
1071 def write_batch_script(self, n):
1069 """Instantiate and write the batch script to the work_dir."""
1072 """Instantiate and write the batch script to the work_dir."""
1070 self.n = n
1073 self.n = n
1071 # first priority is batch_template if set
1074 # first priority is batch_template if set
1072 if self.batch_template_file and not self.batch_template:
1075 if self.batch_template_file and not self.batch_template:
1073 # second priority is batch_template_file
1076 # second priority is batch_template_file
1074 with open(self.batch_template_file) as f:
1077 with open(self.batch_template_file) as f:
1075 self.batch_template = f.read()
1078 self.batch_template = f.read()
1076 if not self.batch_template:
1079 if not self.batch_template:
1077 # third (last) priority is default_template
1080 # third (last) priority is default_template
1078 self.batch_template = self.default_template
1081 self.batch_template = self.default_template
1079
1082
1080 # add jobarray or queue lines to user-specified template
1083 # add jobarray or queue lines to user-specified template
1081 # note that this is *only* when user did not specify a template.
1084 # note that this is *only* when user did not specify a template.
1082 # print self.job_array_regexp.search(self.batch_template)
1085 # print self.job_array_regexp.search(self.batch_template)
1083 if not self.job_array_regexp.search(self.batch_template):
1086 if not self.job_array_regexp.search(self.batch_template):
1084 self.log.debug("adding job array settings to batch script")
1087 self.log.debug("adding job array settings to batch script")
1085 firstline, rest = self.batch_template.split('\n',1)
1088 firstline, rest = self.batch_template.split('\n',1)
1086 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1089 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1087
1090
1088 # print self.queue_regexp.search(self.batch_template)
1091 # print self.queue_regexp.search(self.batch_template)
1089 if self.queue and not self.queue_regexp.search(self.batch_template):
1092 if self.queue and not self.queue_regexp.search(self.batch_template):
1090 self.log.debug("adding PBS queue settings to batch script")
1093 self.log.debug("adding PBS queue settings to batch script")
1091 firstline, rest = self.batch_template.split('\n',1)
1094 firstline, rest = self.batch_template.split('\n',1)
1092 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1095 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1093
1096
1094 script_as_string = self.formatter.format(self.batch_template, **self.context)
1097 script_as_string = self.formatter.format(self.batch_template, **self.context)
1095 self.log.debug('Writing batch script: %s', self.batch_file)
1098 self.log.debug('Writing batch script: %s', self.batch_file)
1096
1099
1097 with open(self.batch_file, 'w') as f:
1100 with open(self.batch_file, 'w') as f:
1098 f.write(script_as_string)
1101 f.write(script_as_string)
1099 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1102 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1100
1103
1101 def start(self, n):
1104 def start(self, n):
1102 """Start n copies of the process using a batch system."""
1105 """Start n copies of the process using a batch system."""
1103 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1106 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1104 # Here we save profile_dir in the context so they
1107 # Here we save profile_dir in the context so they
1105 # can be used in the batch script template as {profile_dir}
1108 # can be used in the batch script template as {profile_dir}
1106 self.write_batch_script(n)
1109 self.write_batch_script(n)
1107 output = check_output(self.args, env=os.environ)
1110 output = check_output(self.args, env=os.environ)
1108
1111
1109 job_id = self.parse_job_id(output)
1112 job_id = self.parse_job_id(output)
1110 self.notify_start(job_id)
1113 self.notify_start(job_id)
1111 return job_id
1114 return job_id
1112
1115
1113 def stop(self):
1116 def stop(self):
1114 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1117 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1115 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1118 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1116 return output
1119 return output
1117
1120
1118
1121
1119 class PBSLauncher(BatchSystemLauncher):
1122 class PBSLauncher(BatchSystemLauncher):
1120 """A BatchSystemLauncher subclass for PBS."""
1123 """A BatchSystemLauncher subclass for PBS."""
1121
1124
1122 submit_command = List(['qsub'], config=True,
1125 submit_command = List(['qsub'], config=True,
1123 help="The PBS submit command ['qsub']")
1126 help="The PBS submit command ['qsub']")
1124 delete_command = List(['qdel'], config=True,
1127 delete_command = List(['qdel'], config=True,
1125 help="The PBS delete command ['qsub']")
1128 help="The PBS delete command ['qsub']")
1126 job_id_regexp = CRegExp(r'\d+', config=True,
1129 job_id_regexp = CRegExp(r'\d+', config=True,
1127 help="Regular expresion for identifying the job ID [r'\d+']")
1130 help="Regular expresion for identifying the job ID [r'\d+']")
1128
1131
1129 batch_file = Unicode(u'')
1132 batch_file = Unicode(u'')
1130 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1133 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1131 job_array_template = Unicode('#PBS -t 1-{n}')
1134 job_array_template = Unicode('#PBS -t 1-{n}')
1132 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1135 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1133 queue_template = Unicode('#PBS -q {queue}')
1136 queue_template = Unicode('#PBS -q {queue}')
1134
1137
1135
1138
1136 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1139 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1137 """Launch a controller using PBS."""
1140 """Launch a controller using PBS."""
1138
1141
1139 batch_file_name = Unicode(u'pbs_controller', config=True,
1142 batch_file_name = Unicode(u'pbs_controller', config=True,
1140 help="batch file name for the controller job.")
1143 help="batch file name for the controller job.")
1141 default_template= Unicode("""#!/bin/sh
1144 default_template= Unicode("""#!/bin/sh
1142 #PBS -V
1145 #PBS -V
1143 #PBS -N ipcontroller
1146 #PBS -N ipcontroller
1144 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1147 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1145 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1148 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1146
1149
1147
1150
1148 def start(self):
1151 def start(self):
1149 """Start the controller by profile or profile_dir."""
1152 """Start the controller by profile or profile_dir."""
1150 return super(PBSControllerLauncher, self).start(1)
1153 return super(PBSControllerLauncher, self).start(1)
1151
1154
1152
1155
1153 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1156 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1154 """Launch Engines using PBS"""
1157 """Launch Engines using PBS"""
1155 batch_file_name = Unicode(u'pbs_engines', config=True,
1158 batch_file_name = Unicode(u'pbs_engines', config=True,
1156 help="batch file name for the engine(s) job.")
1159 help="batch file name for the engine(s) job.")
1157 default_template= Unicode(u"""#!/bin/sh
1160 default_template= Unicode(u"""#!/bin/sh
1158 #PBS -V
1161 #PBS -V
1159 #PBS -N ipengine
1162 #PBS -N ipengine
1160 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1163 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1161 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1164 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1162
1165
1163 def start(self, n):
1166 def start(self, n):
1164 """Start n engines by profile or profile_dir."""
1167 """Start n engines by profile or profile_dir."""
1165 return super(PBSEngineSetLauncher, self).start(n)
1168 return super(PBSEngineSetLauncher, self).start(n)
1166
1169
1167 #SGE is very similar to PBS
1170 #SGE is very similar to PBS
1168
1171
1169 class SGELauncher(PBSLauncher):
1172 class SGELauncher(PBSLauncher):
1170 """Sun GridEngine is a PBS clone with slightly different syntax"""
1173 """Sun GridEngine is a PBS clone with slightly different syntax"""
1171 job_array_regexp = CRegExp('#\$\W+\-t')
1174 job_array_regexp = CRegExp('#\$\W+\-t')
1172 job_array_template = Unicode('#$ -t 1-{n}')
1175 job_array_template = Unicode('#$ -t 1-{n}')
1173 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1176 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1174 queue_template = Unicode('#$ -q {queue}')
1177 queue_template = Unicode('#$ -q {queue}')
1175
1178
1176 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1179 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1177 """Launch a controller using SGE."""
1180 """Launch a controller using SGE."""
1178
1181
1179 batch_file_name = Unicode(u'sge_controller', config=True,
1182 batch_file_name = Unicode(u'sge_controller', config=True,
1180 help="batch file name for the ipontroller job.")
1183 help="batch file name for the ipontroller job.")
1181 default_template= Unicode(u"""#$ -V
1184 default_template= Unicode(u"""#$ -V
1182 #$ -S /bin/sh
1185 #$ -S /bin/sh
1183 #$ -N ipcontroller
1186 #$ -N ipcontroller
1184 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1187 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1185 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1188 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1186
1189
1187 def start(self):
1190 def start(self):
1188 """Start the controller by profile or profile_dir."""
1191 """Start the controller by profile or profile_dir."""
1189 return super(SGEControllerLauncher, self).start(1)
1192 return super(SGEControllerLauncher, self).start(1)
1190
1193
1191 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1194 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1192 """Launch Engines with SGE"""
1195 """Launch Engines with SGE"""
1193 batch_file_name = Unicode(u'sge_engines', config=True,
1196 batch_file_name = Unicode(u'sge_engines', config=True,
1194 help="batch file name for the engine(s) job.")
1197 help="batch file name for the engine(s) job.")
1195 default_template = Unicode("""#$ -V
1198 default_template = Unicode("""#$ -V
1196 #$ -S /bin/sh
1199 #$ -S /bin/sh
1197 #$ -N ipengine
1200 #$ -N ipengine
1198 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1201 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1199 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1202 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1200
1203
1201 def start(self, n):
1204 def start(self, n):
1202 """Start n engines by profile or profile_dir."""
1205 """Start n engines by profile or profile_dir."""
1203 return super(SGEEngineSetLauncher, self).start(n)
1206 return super(SGEEngineSetLauncher, self).start(n)
1204
1207
1205
1208
1206 # LSF launchers
1209 # LSF launchers
1207
1210
1208 class LSFLauncher(BatchSystemLauncher):
1211 class LSFLauncher(BatchSystemLauncher):
1209 """A BatchSystemLauncher subclass for LSF."""
1212 """A BatchSystemLauncher subclass for LSF."""
1210
1213
1211 submit_command = List(['bsub'], config=True,
1214 submit_command = List(['bsub'], config=True,
1212 help="The PBS submit command ['bsub']")
1215 help="The PBS submit command ['bsub']")
1213 delete_command = List(['bkill'], config=True,
1216 delete_command = List(['bkill'], config=True,
1214 help="The PBS delete command ['bkill']")
1217 help="The PBS delete command ['bkill']")
1215 job_id_regexp = CRegExp(r'\d+', config=True,
1218 job_id_regexp = CRegExp(r'\d+', config=True,
1216 help="Regular expresion for identifying the job ID [r'\d+']")
1219 help="Regular expresion for identifying the job ID [r'\d+']")
1217
1220
1218 batch_file = Unicode(u'')
1221 batch_file = Unicode(u'')
1219 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1222 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1220 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1223 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1221 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1224 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1222 queue_template = Unicode('#BSUB -q {queue}')
1225 queue_template = Unicode('#BSUB -q {queue}')
1223
1226
1224 def start(self, n):
1227 def start(self, n):
1225 """Start n copies of the process using LSF batch system.
1228 """Start n copies of the process using LSF batch system.
1226 This cant inherit from the base class because bsub expects
1229 This cant inherit from the base class because bsub expects
1227 to be piped a shell script in order to honor the #BSUB directives :
1230 to be piped a shell script in order to honor the #BSUB directives :
1228 bsub < script
1231 bsub < script
1229 """
1232 """
1230 # Here we save profile_dir in the context so they
1233 # Here we save profile_dir in the context so they
1231 # can be used in the batch script template as {profile_dir}
1234 # can be used in the batch script template as {profile_dir}
1232 self.write_batch_script(n)
1235 self.write_batch_script(n)
1233 #output = check_output(self.args, env=os.environ)
1236 #output = check_output(self.args, env=os.environ)
1234 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1237 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1235 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1238 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1236 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1239 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1237 output,err = p.communicate()
1240 output,err = p.communicate()
1238 job_id = self.parse_job_id(output)
1241 job_id = self.parse_job_id(output)
1239 self.notify_start(job_id)
1242 self.notify_start(job_id)
1240 return job_id
1243 return job_id
1241
1244
1242
1245
1243 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1246 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1244 """Launch a controller using LSF."""
1247 """Launch a controller using LSF."""
1245
1248
1246 batch_file_name = Unicode(u'lsf_controller', config=True,
1249 batch_file_name = Unicode(u'lsf_controller', config=True,
1247 help="batch file name for the controller job.")
1250 help="batch file name for the controller job.")
1248 default_template= Unicode("""#!/bin/sh
1251 default_template= Unicode("""#!/bin/sh
1249 #BSUB -J ipcontroller
1252 #BSUB -J ipcontroller
1250 #BSUB -oo ipcontroller.o.%%J
1253 #BSUB -oo ipcontroller.o.%%J
1251 #BSUB -eo ipcontroller.e.%%J
1254 #BSUB -eo ipcontroller.e.%%J
1252 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1255 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1253 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1256 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1254
1257
1255 def start(self):
1258 def start(self):
1256 """Start the controller by profile or profile_dir."""
1259 """Start the controller by profile or profile_dir."""
1257 return super(LSFControllerLauncher, self).start(1)
1260 return super(LSFControllerLauncher, self).start(1)
1258
1261
1259
1262
1260 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1263 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1261 """Launch Engines using LSF"""
1264 """Launch Engines using LSF"""
1262 batch_file_name = Unicode(u'lsf_engines', config=True,
1265 batch_file_name = Unicode(u'lsf_engines', config=True,
1263 help="batch file name for the engine(s) job.")
1266 help="batch file name for the engine(s) job.")
1264 default_template= Unicode(u"""#!/bin/sh
1267 default_template= Unicode(u"""#!/bin/sh
1265 #BSUB -oo ipengine.o.%%J
1268 #BSUB -oo ipengine.o.%%J
1266 #BSUB -eo ipengine.e.%%J
1269 #BSUB -eo ipengine.e.%%J
1267 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1270 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1268 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1271 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1269
1272
1270 def start(self, n):
1273 def start(self, n):
1271 """Start n engines by profile or profile_dir."""
1274 """Start n engines by profile or profile_dir."""
1272 return super(LSFEngineSetLauncher, self).start(n)
1275 return super(LSFEngineSetLauncher, self).start(n)
1273
1276
1274
1277
1275 #-----------------------------------------------------------------------------
1278 #-----------------------------------------------------------------------------
1276 # A launcher for ipcluster itself!
1279 # A launcher for ipcluster itself!
1277 #-----------------------------------------------------------------------------
1280 #-----------------------------------------------------------------------------
1278
1281
1279
1282
1280 class IPClusterLauncher(LocalProcessLauncher):
1283 class IPClusterLauncher(LocalProcessLauncher):
1281 """Launch the ipcluster program in an external process."""
1284 """Launch the ipcluster program in an external process."""
1282
1285
1283 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1286 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1284 help="Popen command for ipcluster")
1287 help="Popen command for ipcluster")
1285 ipcluster_args = List(
1288 ipcluster_args = List(
1286 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1289 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1287 help="Command line arguments to pass to ipcluster.")
1290 help="Command line arguments to pass to ipcluster.")
1288 ipcluster_subcommand = Unicode('start')
1291 ipcluster_subcommand = Unicode('start')
1289 profile = Unicode('default')
1292 profile = Unicode('default')
1290 n = Integer(2)
1293 n = Integer(2)
1291
1294
1292 def find_args(self):
1295 def find_args(self):
1293 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1296 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1294 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1297 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1295 self.ipcluster_args
1298 self.ipcluster_args
1296
1299
1297 def start(self):
1300 def start(self):
1298 return super(IPClusterLauncher, self).start()
1301 return super(IPClusterLauncher, self).start()
1299
1302
1300 #-----------------------------------------------------------------------------
1303 #-----------------------------------------------------------------------------
1301 # Collections of launchers
1304 # Collections of launchers
1302 #-----------------------------------------------------------------------------
1305 #-----------------------------------------------------------------------------
1303
1306
1304 local_launchers = [
1307 local_launchers = [
1305 LocalControllerLauncher,
1308 LocalControllerLauncher,
1306 LocalEngineLauncher,
1309 LocalEngineLauncher,
1307 LocalEngineSetLauncher,
1310 LocalEngineSetLauncher,
1308 ]
1311 ]
1309 mpi_launchers = [
1312 mpi_launchers = [
1310 MPILauncher,
1313 MPILauncher,
1311 MPIControllerLauncher,
1314 MPIControllerLauncher,
1312 MPIEngineSetLauncher,
1315 MPIEngineSetLauncher,
1313 ]
1316 ]
1314 ssh_launchers = [
1317 ssh_launchers = [
1315 SSHLauncher,
1318 SSHLauncher,
1316 SSHControllerLauncher,
1319 SSHControllerLauncher,
1317 SSHEngineLauncher,
1320 SSHEngineLauncher,
1318 SSHEngineSetLauncher,
1321 SSHEngineSetLauncher,
1319 ]
1322 ]
1320 winhpc_launchers = [
1323 winhpc_launchers = [
1321 WindowsHPCLauncher,
1324 WindowsHPCLauncher,
1322 WindowsHPCControllerLauncher,
1325 WindowsHPCControllerLauncher,
1323 WindowsHPCEngineSetLauncher,
1326 WindowsHPCEngineSetLauncher,
1324 ]
1327 ]
1325 pbs_launchers = [
1328 pbs_launchers = [
1326 PBSLauncher,
1329 PBSLauncher,
1327 PBSControllerLauncher,
1330 PBSControllerLauncher,
1328 PBSEngineSetLauncher,
1331 PBSEngineSetLauncher,
1329 ]
1332 ]
1330 sge_launchers = [
1333 sge_launchers = [
1331 SGELauncher,
1334 SGELauncher,
1332 SGEControllerLauncher,
1335 SGEControllerLauncher,
1333 SGEEngineSetLauncher,
1336 SGEEngineSetLauncher,
1334 ]
1337 ]
1335 lsf_launchers = [
1338 lsf_launchers = [
1336 LSFLauncher,
1339 LSFLauncher,
1337 LSFControllerLauncher,
1340 LSFControllerLauncher,
1338 LSFEngineSetLauncher,
1341 LSFEngineSetLauncher,
1339 ]
1342 ]
1340 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1343 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1341 + pbs_launchers + sge_launchers + lsf_launchers
1344 + pbs_launchers + sge_launchers + lsf_launchers
1342
1345
General Comments 0
You need to be logged in to leave comments. Login now