##// END OF EJS Templates
Backport PR #2126: ipcluster broken with any batch (PBS/LSF/SGE)...
MinRK -
Show More
@@ -1,1341 +1,1342 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import pipes
25 import stat
26 import stat
26 import sys
27 import sys
27 import time
28 import time
28
29
29 # signal imports, handling various platforms, versions
30 # signal imports, handling various platforms, versions
30
31
31 from signal import SIGINT, SIGTERM
32 from signal import SIGINT, SIGTERM
32 try:
33 try:
33 from signal import SIGKILL
34 from signal import SIGKILL
34 except ImportError:
35 except ImportError:
35 # Windows
36 # Windows
36 SIGKILL=SIGTERM
37 SIGKILL=SIGTERM
37
38
38 try:
39 try:
39 # Windows >= 2.7, 3.2
40 # Windows >= 2.7, 3.2
40 from signal import CTRL_C_EVENT as SIGINT
41 from signal import CTRL_C_EVENT as SIGINT
41 except ImportError:
42 except ImportError:
42 pass
43 pass
43
44
44 from subprocess import Popen, PIPE, STDOUT
45 from subprocess import Popen, PIPE, STDOUT
45 try:
46 try:
46 from subprocess import check_output
47 from subprocess import check_output
47 except ImportError:
48 except ImportError:
48 # pre-2.7, define check_output with Popen
49 # pre-2.7, define check_output with Popen
49 def check_output(*args, **kwargs):
50 def check_output(*args, **kwargs):
50 kwargs.update(dict(stdout=PIPE))
51 kwargs.update(dict(stdout=PIPE))
51 p = Popen(*args, **kwargs)
52 p = Popen(*args, **kwargs)
52 out,err = p.communicate()
53 out,err = p.communicate()
53 return out
54 return out
54
55
55 from zmq.eventloop import ioloop
56 from zmq.eventloop import ioloop
56
57
57 from IPython.config.application import Application
58 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import (
61 from IPython.utils.traitlets import (
61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 )
63 )
63 from IPython.utils.path import get_home_dir
64 from IPython.utils.path import get_home_dir
64 from IPython.utils.process import find_cmd, FindCmdError
65 from IPython.utils.process import find_cmd, FindCmdError
65
66
66 from .win32support import forward_read_events
67 from .win32support import forward_read_events
67
68
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69
70
70 WINDOWS = os.name == 'nt'
71 WINDOWS = os.name == 'nt'
71
72
72 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
73 # Paths to the kernel apps
74 # Paths to the kernel apps
74 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
75
76
76 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
77 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
77
78
78 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
79 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
79
80
80 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
81 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
81
82
82 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
83 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
83
84
84 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
85 # Base launchers and errors
86 # Base launchers and errors
86 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
87
88
88
89
89 class LauncherError(Exception):
90 class LauncherError(Exception):
90 pass
91 pass
91
92
92
93
93 class ProcessStateError(LauncherError):
94 class ProcessStateError(LauncherError):
94 pass
95 pass
95
96
96
97
97 class UnknownStatus(LauncherError):
98 class UnknownStatus(LauncherError):
98 pass
99 pass
99
100
100
101
101 class BaseLauncher(LoggingConfigurable):
102 class BaseLauncher(LoggingConfigurable):
102 """An asbtraction for starting, stopping and signaling a process."""
103 """An asbtraction for starting, stopping and signaling a process."""
103
104
104 # In all of the launchers, the work_dir is where child processes will be
105 # In all of the launchers, the work_dir is where child processes will be
105 # run. This will usually be the profile_dir, but may not be. any work_dir
106 # run. This will usually be the profile_dir, but may not be. any work_dir
106 # passed into the __init__ method will override the config value.
107 # passed into the __init__ method will override the config value.
107 # This should not be used to set the work_dir for the actual engine
108 # This should not be used to set the work_dir for the actual engine
108 # and controller. Instead, use their own config files or the
109 # and controller. Instead, use their own config files or the
109 # controller_args, engine_args attributes of the launchers to add
110 # controller_args, engine_args attributes of the launchers to add
110 # the work_dir option.
111 # the work_dir option.
111 work_dir = Unicode(u'.')
112 work_dir = Unicode(u'.')
112 loop = Instance('zmq.eventloop.ioloop.IOLoop')
113 loop = Instance('zmq.eventloop.ioloop.IOLoop')
113
114
114 start_data = Any()
115 start_data = Any()
115 stop_data = Any()
116 stop_data = Any()
116
117
117 def _loop_default(self):
118 def _loop_default(self):
118 return ioloop.IOLoop.instance()
119 return ioloop.IOLoop.instance()
119
120
120 def __init__(self, work_dir=u'.', config=None, **kwargs):
121 def __init__(self, work_dir=u'.', config=None, **kwargs):
121 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
122 self.state = 'before' # can be before, running, after
123 self.state = 'before' # can be before, running, after
123 self.stop_callbacks = []
124 self.stop_callbacks = []
124 self.start_data = None
125 self.start_data = None
125 self.stop_data = None
126 self.stop_data = None
126
127
127 @property
128 @property
128 def args(self):
129 def args(self):
129 """A list of cmd and args that will be used to start the process.
130 """A list of cmd and args that will be used to start the process.
130
131
131 This is what is passed to :func:`spawnProcess` and the first element
132 This is what is passed to :func:`spawnProcess` and the first element
132 will be the process name.
133 will be the process name.
133 """
134 """
134 return self.find_args()
135 return self.find_args()
135
136
136 def find_args(self):
137 def find_args(self):
137 """The ``.args`` property calls this to find the args list.
138 """The ``.args`` property calls this to find the args list.
138
139
139 Subcommand should implement this to construct the cmd and args.
140 Subcommand should implement this to construct the cmd and args.
140 """
141 """
141 raise NotImplementedError('find_args must be implemented in a subclass')
142 raise NotImplementedError('find_args must be implemented in a subclass')
142
143
143 @property
144 @property
144 def arg_str(self):
145 def arg_str(self):
145 """The string form of the program arguments."""
146 """The string form of the program arguments."""
146 return ' '.join(self.args)
147 return ' '.join(self.args)
147
148
148 @property
149 @property
149 def running(self):
150 def running(self):
150 """Am I running."""
151 """Am I running."""
151 if self.state == 'running':
152 if self.state == 'running':
152 return True
153 return True
153 else:
154 else:
154 return False
155 return False
155
156
156 def start(self):
157 def start(self):
157 """Start the process."""
158 """Start the process."""
158 raise NotImplementedError('start must be implemented in a subclass')
159 raise NotImplementedError('start must be implemented in a subclass')
159
160
160 def stop(self):
161 def stop(self):
161 """Stop the process and notify observers of stopping.
162 """Stop the process and notify observers of stopping.
162
163
163 This method will return None immediately.
164 This method will return None immediately.
164 To observe the actual process stopping, see :meth:`on_stop`.
165 To observe the actual process stopping, see :meth:`on_stop`.
165 """
166 """
166 raise NotImplementedError('stop must be implemented in a subclass')
167 raise NotImplementedError('stop must be implemented in a subclass')
167
168
168 def on_stop(self, f):
169 def on_stop(self, f):
169 """Register a callback to be called with this Launcher's stop_data
170 """Register a callback to be called with this Launcher's stop_data
170 when the process actually finishes.
171 when the process actually finishes.
171 """
172 """
172 if self.state=='after':
173 if self.state=='after':
173 return f(self.stop_data)
174 return f(self.stop_data)
174 else:
175 else:
175 self.stop_callbacks.append(f)
176 self.stop_callbacks.append(f)
176
177
177 def notify_start(self, data):
178 def notify_start(self, data):
178 """Call this to trigger startup actions.
179 """Call this to trigger startup actions.
179
180
180 This logs the process startup and sets the state to 'running'. It is
181 This logs the process startup and sets the state to 'running'. It is
181 a pass-through so it can be used as a callback.
182 a pass-through so it can be used as a callback.
182 """
183 """
183
184
184 self.log.debug('Process %r started: %r', self.args[0], data)
185 self.log.debug('Process %r started: %r', self.args[0], data)
185 self.start_data = data
186 self.start_data = data
186 self.state = 'running'
187 self.state = 'running'
187 return data
188 return data
188
189
189 def notify_stop(self, data):
190 def notify_stop(self, data):
190 """Call this to trigger process stop actions.
191 """Call this to trigger process stop actions.
191
192
192 This logs the process stopping and sets the state to 'after'. Call
193 This logs the process stopping and sets the state to 'after'. Call
193 this to trigger callbacks registered via :meth:`on_stop`."""
194 this to trigger callbacks registered via :meth:`on_stop`."""
194
195
195 self.log.debug('Process %r stopped: %r', self.args[0], data)
196 self.log.debug('Process %r stopped: %r', self.args[0], data)
196 self.stop_data = data
197 self.stop_data = data
197 self.state = 'after'
198 self.state = 'after'
198 for i in range(len(self.stop_callbacks)):
199 for i in range(len(self.stop_callbacks)):
199 d = self.stop_callbacks.pop()
200 d = self.stop_callbacks.pop()
200 d(data)
201 d(data)
201 return data
202 return data
202
203
203 def signal(self, sig):
204 def signal(self, sig):
204 """Signal the process.
205 """Signal the process.
205
206
206 Parameters
207 Parameters
207 ----------
208 ----------
208 sig : str or int
209 sig : str or int
209 'KILL', 'INT', etc., or any signal number
210 'KILL', 'INT', etc., or any signal number
210 """
211 """
211 raise NotImplementedError('signal must be implemented in a subclass')
212 raise NotImplementedError('signal must be implemented in a subclass')
212
213
213 class ClusterAppMixin(HasTraits):
214 class ClusterAppMixin(HasTraits):
214 """MixIn for cluster args as traits"""
215 """MixIn for cluster args as traits"""
215 profile_dir=Unicode('')
216 profile_dir=Unicode('')
216 cluster_id=Unicode('')
217 cluster_id=Unicode('')
217
218
218 @property
219 @property
219 def cluster_args(self):
220 def cluster_args(self):
220 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
221 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
221
222
222 class ControllerMixin(ClusterAppMixin):
223 class ControllerMixin(ClusterAppMixin):
223 controller_cmd = List(ipcontroller_cmd_argv, config=True,
224 controller_cmd = List(ipcontroller_cmd_argv, config=True,
224 help="""Popen command to launch ipcontroller.""")
225 help="""Popen command to launch ipcontroller.""")
225 # Command line arguments to ipcontroller.
226 # Command line arguments to ipcontroller.
226 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
227 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
227 help="""command-line args to pass to ipcontroller""")
228 help="""command-line args to pass to ipcontroller""")
228
229
229 class EngineMixin(ClusterAppMixin):
230 class EngineMixin(ClusterAppMixin):
230 engine_cmd = List(ipengine_cmd_argv, config=True,
231 engine_cmd = List(ipengine_cmd_argv, config=True,
231 help="""command to launch the Engine.""")
232 help="""command to launch the Engine.""")
232 # Command line arguments for ipengine.
233 # Command line arguments for ipengine.
233 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
234 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
234 help="command-line arguments to pass to ipengine"
235 help="command-line arguments to pass to ipengine"
235 )
236 )
236
237
237
238
238 #-----------------------------------------------------------------------------
239 #-----------------------------------------------------------------------------
239 # Local process launchers
240 # Local process launchers
240 #-----------------------------------------------------------------------------
241 #-----------------------------------------------------------------------------
241
242
242
243
243 class LocalProcessLauncher(BaseLauncher):
244 class LocalProcessLauncher(BaseLauncher):
244 """Start and stop an external process in an asynchronous manner.
245 """Start and stop an external process in an asynchronous manner.
245
246
246 This will launch the external process with a working directory of
247 This will launch the external process with a working directory of
247 ``self.work_dir``.
248 ``self.work_dir``.
248 """
249 """
249
250
250 # This is used to to construct self.args, which is passed to
251 # This is used to to construct self.args, which is passed to
251 # spawnProcess.
252 # spawnProcess.
252 cmd_and_args = List([])
253 cmd_and_args = List([])
253 poll_frequency = Integer(100) # in ms
254 poll_frequency = Integer(100) # in ms
254
255
255 def __init__(self, work_dir=u'.', config=None, **kwargs):
256 def __init__(self, work_dir=u'.', config=None, **kwargs):
256 super(LocalProcessLauncher, self).__init__(
257 super(LocalProcessLauncher, self).__init__(
257 work_dir=work_dir, config=config, **kwargs
258 work_dir=work_dir, config=config, **kwargs
258 )
259 )
259 self.process = None
260 self.process = None
260 self.poller = None
261 self.poller = None
261
262
262 def find_args(self):
263 def find_args(self):
263 return self.cmd_and_args
264 return self.cmd_and_args
264
265
265 def start(self):
266 def start(self):
266 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
267 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
267 if self.state == 'before':
268 if self.state == 'before':
268 self.process = Popen(self.args,
269 self.process = Popen(self.args,
269 stdout=PIPE,stderr=PIPE,stdin=PIPE,
270 stdout=PIPE,stderr=PIPE,stdin=PIPE,
270 env=os.environ,
271 env=os.environ,
271 cwd=self.work_dir
272 cwd=self.work_dir
272 )
273 )
273 if WINDOWS:
274 if WINDOWS:
274 self.stdout = forward_read_events(self.process.stdout)
275 self.stdout = forward_read_events(self.process.stdout)
275 self.stderr = forward_read_events(self.process.stderr)
276 self.stderr = forward_read_events(self.process.stderr)
276 else:
277 else:
277 self.stdout = self.process.stdout.fileno()
278 self.stdout = self.process.stdout.fileno()
278 self.stderr = self.process.stderr.fileno()
279 self.stderr = self.process.stderr.fileno()
279 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
280 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
280 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
281 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
281 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
282 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
282 self.poller.start()
283 self.poller.start()
283 self.notify_start(self.process.pid)
284 self.notify_start(self.process.pid)
284 else:
285 else:
285 s = 'The process was already started and has state: %r' % self.state
286 s = 'The process was already started and has state: %r' % self.state
286 raise ProcessStateError(s)
287 raise ProcessStateError(s)
287
288
288 def stop(self):
289 def stop(self):
289 return self.interrupt_then_kill()
290 return self.interrupt_then_kill()
290
291
291 def signal(self, sig):
292 def signal(self, sig):
292 if self.state == 'running':
293 if self.state == 'running':
293 if WINDOWS and sig != SIGINT:
294 if WINDOWS and sig != SIGINT:
294 # use Windows tree-kill for better child cleanup
295 # use Windows tree-kill for better child cleanup
295 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
296 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
296 else:
297 else:
297 self.process.send_signal(sig)
298 self.process.send_signal(sig)
298
299
299 def interrupt_then_kill(self, delay=2.0):
300 def interrupt_then_kill(self, delay=2.0):
300 """Send INT, wait a delay and then send KILL."""
301 """Send INT, wait a delay and then send KILL."""
301 try:
302 try:
302 self.signal(SIGINT)
303 self.signal(SIGINT)
303 except Exception:
304 except Exception:
304 self.log.debug("interrupt failed")
305 self.log.debug("interrupt failed")
305 pass
306 pass
306 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
307 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
307 self.killer.start()
308 self.killer.start()
308
309
309 # callbacks, etc:
310 # callbacks, etc:
310
311
311 def handle_stdout(self, fd, events):
312 def handle_stdout(self, fd, events):
312 if WINDOWS:
313 if WINDOWS:
313 line = self.stdout.recv()
314 line = self.stdout.recv()
314 else:
315 else:
315 line = self.process.stdout.readline()
316 line = self.process.stdout.readline()
316 # a stopped process will be readable but return empty strings
317 # a stopped process will be readable but return empty strings
317 if line:
318 if line:
318 self.log.debug(line[:-1])
319 self.log.debug(line[:-1])
319 else:
320 else:
320 self.poll()
321 self.poll()
321
322
322 def handle_stderr(self, fd, events):
323 def handle_stderr(self, fd, events):
323 if WINDOWS:
324 if WINDOWS:
324 line = self.stderr.recv()
325 line = self.stderr.recv()
325 else:
326 else:
326 line = self.process.stderr.readline()
327 line = self.process.stderr.readline()
327 # a stopped process will be readable but return empty strings
328 # a stopped process will be readable but return empty strings
328 if line:
329 if line:
329 self.log.debug(line[:-1])
330 self.log.debug(line[:-1])
330 else:
331 else:
331 self.poll()
332 self.poll()
332
333
333 def poll(self):
334 def poll(self):
334 status = self.process.poll()
335 status = self.process.poll()
335 if status is not None:
336 if status is not None:
336 self.poller.stop()
337 self.poller.stop()
337 self.loop.remove_handler(self.stdout)
338 self.loop.remove_handler(self.stdout)
338 self.loop.remove_handler(self.stderr)
339 self.loop.remove_handler(self.stderr)
339 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
340 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
340 return status
341 return status
341
342
342 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
343 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
343 """Launch a controller as a regular external process."""
344 """Launch a controller as a regular external process."""
344
345
345 def find_args(self):
346 def find_args(self):
346 return self.controller_cmd + self.cluster_args + self.controller_args
347 return self.controller_cmd + self.cluster_args + self.controller_args
347
348
348 def start(self):
349 def start(self):
349 """Start the controller by profile_dir."""
350 """Start the controller by profile_dir."""
350 return super(LocalControllerLauncher, self).start()
351 return super(LocalControllerLauncher, self).start()
351
352
352
353
353 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
354 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
354 """Launch a single engine as a regular externall process."""
355 """Launch a single engine as a regular externall process."""
355
356
356 def find_args(self):
357 def find_args(self):
357 return self.engine_cmd + self.cluster_args + self.engine_args
358 return self.engine_cmd + self.cluster_args + self.engine_args
358
359
359
360
360 class LocalEngineSetLauncher(LocalEngineLauncher):
361 class LocalEngineSetLauncher(LocalEngineLauncher):
361 """Launch a set of engines as regular external processes."""
362 """Launch a set of engines as regular external processes."""
362
363
363 delay = CFloat(0.1, config=True,
364 delay = CFloat(0.1, config=True,
364 help="""delay (in seconds) between starting each engine after the first.
365 help="""delay (in seconds) between starting each engine after the first.
365 This can help force the engines to get their ids in order, or limit
366 This can help force the engines to get their ids in order, or limit
366 process flood when starting many engines."""
367 process flood when starting many engines."""
367 )
368 )
368
369
369 # launcher class
370 # launcher class
370 launcher_class = LocalEngineLauncher
371 launcher_class = LocalEngineLauncher
371
372
372 launchers = Dict()
373 launchers = Dict()
373 stop_data = Dict()
374 stop_data = Dict()
374
375
375 def __init__(self, work_dir=u'.', config=None, **kwargs):
376 def __init__(self, work_dir=u'.', config=None, **kwargs):
376 super(LocalEngineSetLauncher, self).__init__(
377 super(LocalEngineSetLauncher, self).__init__(
377 work_dir=work_dir, config=config, **kwargs
378 work_dir=work_dir, config=config, **kwargs
378 )
379 )
379 self.stop_data = {}
380 self.stop_data = {}
380
381
381 def start(self, n):
382 def start(self, n):
382 """Start n engines by profile or profile_dir."""
383 """Start n engines by profile or profile_dir."""
383 dlist = []
384 dlist = []
384 for i in range(n):
385 for i in range(n):
385 if i > 0:
386 if i > 0:
386 time.sleep(self.delay)
387 time.sleep(self.delay)
387 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
388 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
388 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
389 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
389 )
390 )
390
391
391 # Copy the engine args over to each engine launcher.
392 # Copy the engine args over to each engine launcher.
392 el.engine_cmd = copy.deepcopy(self.engine_cmd)
393 el.engine_cmd = copy.deepcopy(self.engine_cmd)
393 el.engine_args = copy.deepcopy(self.engine_args)
394 el.engine_args = copy.deepcopy(self.engine_args)
394 el.on_stop(self._notice_engine_stopped)
395 el.on_stop(self._notice_engine_stopped)
395 d = el.start()
396 d = el.start()
396 self.launchers[i] = el
397 self.launchers[i] = el
397 dlist.append(d)
398 dlist.append(d)
398 self.notify_start(dlist)
399 self.notify_start(dlist)
399 return dlist
400 return dlist
400
401
401 def find_args(self):
402 def find_args(self):
402 return ['engine set']
403 return ['engine set']
403
404
404 def signal(self, sig):
405 def signal(self, sig):
405 dlist = []
406 dlist = []
406 for el in self.launchers.itervalues():
407 for el in self.launchers.itervalues():
407 d = el.signal(sig)
408 d = el.signal(sig)
408 dlist.append(d)
409 dlist.append(d)
409 return dlist
410 return dlist
410
411
411 def interrupt_then_kill(self, delay=1.0):
412 def interrupt_then_kill(self, delay=1.0):
412 dlist = []
413 dlist = []
413 for el in self.launchers.itervalues():
414 for el in self.launchers.itervalues():
414 d = el.interrupt_then_kill(delay)
415 d = el.interrupt_then_kill(delay)
415 dlist.append(d)
416 dlist.append(d)
416 return dlist
417 return dlist
417
418
418 def stop(self):
419 def stop(self):
419 return self.interrupt_then_kill()
420 return self.interrupt_then_kill()
420
421
421 def _notice_engine_stopped(self, data):
422 def _notice_engine_stopped(self, data):
422 pid = data['pid']
423 pid = data['pid']
423 for idx,el in self.launchers.iteritems():
424 for idx,el in self.launchers.iteritems():
424 if el.process.pid == pid:
425 if el.process.pid == pid:
425 break
426 break
426 self.launchers.pop(idx)
427 self.launchers.pop(idx)
427 self.stop_data[idx] = data
428 self.stop_data[idx] = data
428 if not self.launchers:
429 if not self.launchers:
429 self.notify_stop(self.stop_data)
430 self.notify_stop(self.stop_data)
430
431
431
432
432 #-----------------------------------------------------------------------------
433 #-----------------------------------------------------------------------------
433 # MPI launchers
434 # MPI launchers
434 #-----------------------------------------------------------------------------
435 #-----------------------------------------------------------------------------
435
436
436
437
437 class MPILauncher(LocalProcessLauncher):
438 class MPILauncher(LocalProcessLauncher):
438 """Launch an external process using mpiexec."""
439 """Launch an external process using mpiexec."""
439
440
440 mpi_cmd = List(['mpiexec'], config=True,
441 mpi_cmd = List(['mpiexec'], config=True,
441 help="The mpiexec command to use in starting the process."
442 help="The mpiexec command to use in starting the process."
442 )
443 )
443 mpi_args = List([], config=True,
444 mpi_args = List([], config=True,
444 help="The command line arguments to pass to mpiexec."
445 help="The command line arguments to pass to mpiexec."
445 )
446 )
446 program = List(['date'],
447 program = List(['date'],
447 help="The program to start via mpiexec.")
448 help="The program to start via mpiexec.")
448 program_args = List([],
449 program_args = List([],
449 help="The command line argument to the program."
450 help="The command line argument to the program."
450 )
451 )
451 n = Integer(1)
452 n = Integer(1)
452
453
453 def __init__(self, *args, **kwargs):
454 def __init__(self, *args, **kwargs):
454 # deprecation for old MPIExec names:
455 # deprecation for old MPIExec names:
455 config = kwargs.get('config', {})
456 config = kwargs.get('config', {})
456 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
457 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
457 deprecated = config.get(oldname)
458 deprecated = config.get(oldname)
458 if deprecated:
459 if deprecated:
459 newname = oldname.replace('MPIExec', 'MPI')
460 newname = oldname.replace('MPIExec', 'MPI')
460 config[newname].update(deprecated)
461 config[newname].update(deprecated)
461 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
462 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
462
463
463 super(MPILauncher, self).__init__(*args, **kwargs)
464 super(MPILauncher, self).__init__(*args, **kwargs)
464
465
465 def find_args(self):
466 def find_args(self):
466 """Build self.args using all the fields."""
467 """Build self.args using all the fields."""
467 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
468 self.program + self.program_args
469 self.program + self.program_args
469
470
470 def start(self, n):
471 def start(self, n):
471 """Start n instances of the program using mpiexec."""
472 """Start n instances of the program using mpiexec."""
472 self.n = n
473 self.n = n
473 return super(MPILauncher, self).start()
474 return super(MPILauncher, self).start()
474
475
475
476
476 class MPIControllerLauncher(MPILauncher, ControllerMixin):
477 class MPIControllerLauncher(MPILauncher, ControllerMixin):
477 """Launch a controller using mpiexec."""
478 """Launch a controller using mpiexec."""
478
479
479 # alias back to *non-configurable* program[_args] for use in find_args()
480 # alias back to *non-configurable* program[_args] for use in find_args()
480 # this way all Controller/EngineSetLaunchers have the same form, rather
481 # this way all Controller/EngineSetLaunchers have the same form, rather
481 # than *some* having `program_args` and others `controller_args`
482 # than *some* having `program_args` and others `controller_args`
482 @property
483 @property
483 def program(self):
484 def program(self):
484 return self.controller_cmd
485 return self.controller_cmd
485
486
486 @property
487 @property
487 def program_args(self):
488 def program_args(self):
488 return self.cluster_args + self.controller_args
489 return self.cluster_args + self.controller_args
489
490
490 def start(self):
491 def start(self):
491 """Start the controller by profile_dir."""
492 """Start the controller by profile_dir."""
492 return super(MPIControllerLauncher, self).start(1)
493 return super(MPIControllerLauncher, self).start(1)
493
494
494
495
495 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
496 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
496 """Launch engines using mpiexec"""
497 """Launch engines using mpiexec"""
497
498
498 # alias back to *non-configurable* program[_args] for use in find_args()
499 # alias back to *non-configurable* program[_args] for use in find_args()
499 # this way all Controller/EngineSetLaunchers have the same form, rather
500 # this way all Controller/EngineSetLaunchers have the same form, rather
500 # than *some* having `program_args` and others `controller_args`
501 # than *some* having `program_args` and others `controller_args`
501 @property
502 @property
502 def program(self):
503 def program(self):
503 return self.engine_cmd
504 return self.engine_cmd
504
505
505 @property
506 @property
506 def program_args(self):
507 def program_args(self):
507 return self.cluster_args + self.engine_args
508 return self.cluster_args + self.engine_args
508
509
509 def start(self, n):
510 def start(self, n):
510 """Start n engines by profile or profile_dir."""
511 """Start n engines by profile or profile_dir."""
511 self.n = n
512 self.n = n
512 return super(MPIEngineSetLauncher, self).start(n)
513 return super(MPIEngineSetLauncher, self).start(n)
513
514
514 # deprecated MPIExec names
515 # deprecated MPIExec names
515 class DeprecatedMPILauncher(object):
516 class DeprecatedMPILauncher(object):
516 def warn(self):
517 def warn(self):
517 oldname = self.__class__.__name__
518 oldname = self.__class__.__name__
518 newname = oldname.replace('MPIExec', 'MPI')
519 newname = oldname.replace('MPIExec', 'MPI')
519 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
520 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
520
521
521 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
522 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
522 """Deprecated, use MPILauncher"""
523 """Deprecated, use MPILauncher"""
523 def __init__(self, *args, **kwargs):
524 def __init__(self, *args, **kwargs):
524 super(MPIExecLauncher, self).__init__(*args, **kwargs)
525 super(MPIExecLauncher, self).__init__(*args, **kwargs)
525 self.warn()
526 self.warn()
526
527
527 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
528 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
528 """Deprecated, use MPIControllerLauncher"""
529 """Deprecated, use MPIControllerLauncher"""
529 def __init__(self, *args, **kwargs):
530 def __init__(self, *args, **kwargs):
530 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
531 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
531 self.warn()
532 self.warn()
532
533
533 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
534 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
534 """Deprecated, use MPIEngineSetLauncher"""
535 """Deprecated, use MPIEngineSetLauncher"""
535 def __init__(self, *args, **kwargs):
536 def __init__(self, *args, **kwargs):
536 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
537 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
537 self.warn()
538 self.warn()
538
539
539
540
540 #-----------------------------------------------------------------------------
541 #-----------------------------------------------------------------------------
541 # SSH launchers
542 # SSH launchers
542 #-----------------------------------------------------------------------------
543 #-----------------------------------------------------------------------------
543
544
544 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
545 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
545
546
546 class SSHLauncher(LocalProcessLauncher):
547 class SSHLauncher(LocalProcessLauncher):
547 """A minimal launcher for ssh.
548 """A minimal launcher for ssh.
548
549
549 To be useful this will probably have to be extended to use the ``sshx``
550 To be useful this will probably have to be extended to use the ``sshx``
550 idea for environment variables. There could be other things this needs
551 idea for environment variables. There could be other things this needs
551 as well.
552 as well.
552 """
553 """
553
554
554 ssh_cmd = List(['ssh'], config=True,
555 ssh_cmd = List(['ssh'], config=True,
555 help="command for starting ssh")
556 help="command for starting ssh")
556 ssh_args = List(['-tt'], config=True,
557 ssh_args = List(['-tt'], config=True,
557 help="args to pass to ssh")
558 help="args to pass to ssh")
558 scp_cmd = List(['scp'], config=True,
559 scp_cmd = List(['scp'], config=True,
559 help="command for sending files")
560 help="command for sending files")
560 program = List(['date'],
561 program = List(['date'],
561 help="Program to launch via ssh")
562 help="Program to launch via ssh")
562 program_args = List([],
563 program_args = List([],
563 help="args to pass to remote program")
564 help="args to pass to remote program")
564 hostname = Unicode('', config=True,
565 hostname = Unicode('', config=True,
565 help="hostname on which to launch the program")
566 help="hostname on which to launch the program")
566 user = Unicode('', config=True,
567 user = Unicode('', config=True,
567 help="username for ssh")
568 help="username for ssh")
568 location = Unicode('', config=True,
569 location = Unicode('', config=True,
569 help="user@hostname location for ssh in one setting")
570 help="user@hostname location for ssh in one setting")
570 to_fetch = List([], config=True,
571 to_fetch = List([], config=True,
571 help="List of (remote, local) files to fetch after starting")
572 help="List of (remote, local) files to fetch after starting")
572 to_send = List([], config=True,
573 to_send = List([], config=True,
573 help="List of (local, remote) files to send before starting")
574 help="List of (local, remote) files to send before starting")
574
575
575 def _hostname_changed(self, name, old, new):
576 def _hostname_changed(self, name, old, new):
576 if self.user:
577 if self.user:
577 self.location = u'%s@%s' % (self.user, new)
578 self.location = u'%s@%s' % (self.user, new)
578 else:
579 else:
579 self.location = new
580 self.location = new
580
581
581 def _user_changed(self, name, old, new):
582 def _user_changed(self, name, old, new):
582 self.location = u'%s@%s' % (new, self.hostname)
583 self.location = u'%s@%s' % (new, self.hostname)
583
584
584 def find_args(self):
585 def find_args(self):
585 return self.ssh_cmd + self.ssh_args + [self.location] + \
586 return self.ssh_cmd + self.ssh_args + [self.location] + \
586 self.program + self.program_args
587 self.program + self.program_args
587
588
588 def _send_file(self, local, remote):
589 def _send_file(self, local, remote):
589 """send a single file"""
590 """send a single file"""
590 remote = "%s:%s" % (self.location, remote)
591 remote = "%s:%s" % (self.location, remote)
591 for i in range(10):
592 for i in range(10):
592 if not os.path.exists(local):
593 if not os.path.exists(local):
593 self.log.debug("waiting for %s" % local)
594 self.log.debug("waiting for %s" % local)
594 time.sleep(1)
595 time.sleep(1)
595 else:
596 else:
596 break
597 break
597 self.log.info("sending %s to %s", local, remote)
598 self.log.info("sending %s to %s", local, remote)
598 check_output(self.scp_cmd + [local, remote])
599 check_output(self.scp_cmd + [local, remote])
599
600
600 def send_files(self):
601 def send_files(self):
601 """send our files (called before start)"""
602 """send our files (called before start)"""
602 if not self.to_send:
603 if not self.to_send:
603 return
604 return
604 for local_file, remote_file in self.to_send:
605 for local_file, remote_file in self.to_send:
605 self._send_file(local_file, remote_file)
606 self._send_file(local_file, remote_file)
606
607
607 def _fetch_file(self, remote, local):
608 def _fetch_file(self, remote, local):
608 """fetch a single file"""
609 """fetch a single file"""
609 full_remote = "%s:%s" % (self.location, remote)
610 full_remote = "%s:%s" % (self.location, remote)
610 self.log.info("fetching %s from %s", local, full_remote)
611 self.log.info("fetching %s from %s", local, full_remote)
611 for i in range(10):
612 for i in range(10):
612 # wait up to 10s for remote file to exist
613 # wait up to 10s for remote file to exist
613 check = check_output(self.ssh_cmd + self.ssh_args + \
614 check = check_output(self.ssh_cmd + self.ssh_args + \
614 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
615 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
615 check = check.strip()
616 check = check.strip()
616 if check == 'no':
617 if check == 'no':
617 time.sleep(1)
618 time.sleep(1)
618 elif check == 'yes':
619 elif check == 'yes':
619 break
620 break
620 check_output(self.scp_cmd + [full_remote, local])
621 check_output(self.scp_cmd + [full_remote, local])
621
622
622 def fetch_files(self):
623 def fetch_files(self):
623 """fetch remote files (called after start)"""
624 """fetch remote files (called after start)"""
624 if not self.to_fetch:
625 if not self.to_fetch:
625 return
626 return
626 for remote_file, local_file in self.to_fetch:
627 for remote_file, local_file in self.to_fetch:
627 self._fetch_file(remote_file, local_file)
628 self._fetch_file(remote_file, local_file)
628
629
629 def start(self, hostname=None, user=None):
630 def start(self, hostname=None, user=None):
630 if hostname is not None:
631 if hostname is not None:
631 self.hostname = hostname
632 self.hostname = hostname
632 if user is not None:
633 if user is not None:
633 self.user = user
634 self.user = user
634
635
635 self.send_files()
636 self.send_files()
636 super(SSHLauncher, self).start()
637 super(SSHLauncher, self).start()
637 self.fetch_files()
638 self.fetch_files()
638
639
639 def signal(self, sig):
640 def signal(self, sig):
640 if self.state == 'running':
641 if self.state == 'running':
641 # send escaped ssh connection-closer
642 # send escaped ssh connection-closer
642 self.process.stdin.write('~.')
643 self.process.stdin.write('~.')
643 self.process.stdin.flush()
644 self.process.stdin.flush()
644
645
645 class SSHClusterLauncher(SSHLauncher):
646 class SSHClusterLauncher(SSHLauncher):
646
647
647 remote_profile_dir = Unicode('', config=True,
648 remote_profile_dir = Unicode('', config=True,
648 help="""The remote profile_dir to use.
649 help="""The remote profile_dir to use.
649
650
650 If not specified, use calling profile, stripping out possible leading homedir.
651 If not specified, use calling profile, stripping out possible leading homedir.
651 """)
652 """)
652
653
653 def _remote_profie_dir_default(self):
654 def _remote_profie_dir_default(self):
654 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
655 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
655 """
656 """
656 home = get_home_dir()
657 home = get_home_dir()
657 if not home.endswith('/'):
658 if not home.endswith('/'):
658 home = home+'/'
659 home = home+'/'
659
660
660 if self.profile_dir.startswith(home):
661 if self.profile_dir.startswith(home):
661 return self.profile_dir[len(home):]
662 return self.profile_dir[len(home):]
662 else:
663 else:
663 return self.profile_dir
664 return self.profile_dir
664
665
665 def _cluster_id_changed(self, name, old, new):
666 def _cluster_id_changed(self, name, old, new):
666 if new:
667 if new:
667 raise ValueError("cluster id not supported by SSH launchers")
668 raise ValueError("cluster id not supported by SSH launchers")
668
669
669 @property
670 @property
670 def cluster_args(self):
671 def cluster_args(self):
671 return ['--profile-dir', self.remote_profile_dir]
672 return ['--profile-dir', self.remote_profile_dir]
672
673
673 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
674 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
674
675
675 # alias back to *non-configurable* program[_args] for use in find_args()
676 # alias back to *non-configurable* program[_args] for use in find_args()
676 # this way all Controller/EngineSetLaunchers have the same form, rather
677 # this way all Controller/EngineSetLaunchers have the same form, rather
677 # than *some* having `program_args` and others `controller_args`
678 # than *some* having `program_args` and others `controller_args`
678
679
679 def _controller_cmd_default(self):
680 def _controller_cmd_default(self):
680 return ['ipcontroller']
681 return ['ipcontroller']
681
682
682 @property
683 @property
683 def program(self):
684 def program(self):
684 return self.controller_cmd
685 return self.controller_cmd
685
686
686 @property
687 @property
687 def program_args(self):
688 def program_args(self):
688 return self.cluster_args + self.controller_args
689 return self.cluster_args + self.controller_args
689
690
690 def _to_fetch_default(self):
691 def _to_fetch_default(self):
691 return [
692 return [
692 (os.path.join(self.remote_profile_dir, 'security', cf),
693 (os.path.join(self.remote_profile_dir, 'security', cf),
693 os.path.join(self.profile_dir, 'security', cf),)
694 os.path.join(self.profile_dir, 'security', cf),)
694 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
695 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
695 ]
696 ]
696
697
697 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
698 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
698
699
699 # alias back to *non-configurable* program[_args] for use in find_args()
700 # alias back to *non-configurable* program[_args] for use in find_args()
700 # this way all Controller/EngineSetLaunchers have the same form, rather
701 # this way all Controller/EngineSetLaunchers have the same form, rather
701 # than *some* having `program_args` and others `controller_args`
702 # than *some* having `program_args` and others `controller_args`
702
703
703 def _engine_cmd_default(self):
704 def _engine_cmd_default(self):
704 return ['ipengine']
705 return ['ipengine']
705
706
706 @property
707 @property
707 def program(self):
708 def program(self):
708 return self.engine_cmd
709 return self.engine_cmd
709
710
710 @property
711 @property
711 def program_args(self):
712 def program_args(self):
712 return self.cluster_args + self.engine_args
713 return self.cluster_args + self.engine_args
713
714
714 def _to_send_default(self):
715 def _to_send_default(self):
715 return [
716 return [
716 (os.path.join(self.profile_dir, 'security', cf),
717 (os.path.join(self.profile_dir, 'security', cf),
717 os.path.join(self.remote_profile_dir, 'security', cf))
718 os.path.join(self.remote_profile_dir, 'security', cf))
718 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
719 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
719 ]
720 ]
720
721
721
722
722 class SSHEngineSetLauncher(LocalEngineSetLauncher):
723 class SSHEngineSetLauncher(LocalEngineSetLauncher):
723 launcher_class = SSHEngineLauncher
724 launcher_class = SSHEngineLauncher
724 engines = Dict(config=True,
725 engines = Dict(config=True,
725 help="""dict of engines to launch. This is a dict by hostname of ints,
726 help="""dict of engines to launch. This is a dict by hostname of ints,
726 corresponding to the number of engines to start on that host.""")
727 corresponding to the number of engines to start on that host.""")
727
728
728 @property
729 @property
729 def engine_count(self):
730 def engine_count(self):
730 """determine engine count from `engines` dict"""
731 """determine engine count from `engines` dict"""
731 count = 0
732 count = 0
732 for n in self.engines.itervalues():
733 for n in self.engines.itervalues():
733 if isinstance(n, (tuple,list)):
734 if isinstance(n, (tuple,list)):
734 n,args = n
735 n,args = n
735 count += n
736 count += n
736 return count
737 return count
737
738
738 def start(self, n):
739 def start(self, n):
739 """Start engines by profile or profile_dir.
740 """Start engines by profile or profile_dir.
740 `n` is ignored, and the `engines` config property is used instead.
741 `n` is ignored, and the `engines` config property is used instead.
741 """
742 """
742
743
743 dlist = []
744 dlist = []
744 for host, n in self.engines.iteritems():
745 for host, n in self.engines.iteritems():
745 if isinstance(n, (tuple, list)):
746 if isinstance(n, (tuple, list)):
746 n, args = n
747 n, args = n
747 else:
748 else:
748 args = copy.deepcopy(self.engine_args)
749 args = copy.deepcopy(self.engine_args)
749
750
750 if '@' in host:
751 if '@' in host:
751 user,host = host.split('@',1)
752 user,host = host.split('@',1)
752 else:
753 else:
753 user=None
754 user=None
754 for i in range(n):
755 for i in range(n):
755 if i > 0:
756 if i > 0:
756 time.sleep(self.delay)
757 time.sleep(self.delay)
757 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
758 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
758 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
759 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
759 )
760 )
760 if i > 0:
761 if i > 0:
761 # only send files for the first engine on each host
762 # only send files for the first engine on each host
762 el.to_send = []
763 el.to_send = []
763
764
764 # Copy the engine args over to each engine launcher.
765 # Copy the engine args over to each engine launcher.
765 el.engine_cmd = self.engine_cmd
766 el.engine_cmd = self.engine_cmd
766 el.engine_args = args
767 el.engine_args = args
767 el.on_stop(self._notice_engine_stopped)
768 el.on_stop(self._notice_engine_stopped)
768 d = el.start(user=user, hostname=host)
769 d = el.start(user=user, hostname=host)
769 self.launchers[ "%s/%i" % (host,i) ] = el
770 self.launchers[ "%s/%i" % (host,i) ] = el
770 dlist.append(d)
771 dlist.append(d)
771 self.notify_start(dlist)
772 self.notify_start(dlist)
772 return dlist
773 return dlist
773
774
774
775
775 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
776 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
776 """Launcher for calling
777 """Launcher for calling
777 `ipcluster engines` on a remote machine.
778 `ipcluster engines` on a remote machine.
778
779
779 Requires that remote profile is already configured.
780 Requires that remote profile is already configured.
780 """
781 """
781
782
782 n = Integer()
783 n = Integer()
783 ipcluster_cmd = List(['ipcluster'], config=True)
784 ipcluster_cmd = List(['ipcluster'], config=True)
784
785
785 @property
786 @property
786 def program(self):
787 def program(self):
787 return self.ipcluster_cmd + ['engines']
788 return self.ipcluster_cmd + ['engines']
788
789
789 @property
790 @property
790 def program_args(self):
791 def program_args(self):
791 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
792 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
792
793
793 def _to_send_default(self):
794 def _to_send_default(self):
794 return [
795 return [
795 (os.path.join(self.profile_dir, 'security', cf),
796 (os.path.join(self.profile_dir, 'security', cf),
796 os.path.join(self.remote_profile_dir, 'security', cf))
797 os.path.join(self.remote_profile_dir, 'security', cf))
797 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
798 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
798 ]
799 ]
799
800
800 def start(self, n):
801 def start(self, n):
801 self.n = n
802 self.n = n
802 super(SSHProxyEngineSetLauncher, self).start()
803 super(SSHProxyEngineSetLauncher, self).start()
803
804
804
805
805 #-----------------------------------------------------------------------------
806 #-----------------------------------------------------------------------------
806 # Windows HPC Server 2008 scheduler launchers
807 # Windows HPC Server 2008 scheduler launchers
807 #-----------------------------------------------------------------------------
808 #-----------------------------------------------------------------------------
808
809
809
810
810 # This is only used on Windows.
811 # This is only used on Windows.
811 def find_job_cmd():
812 def find_job_cmd():
812 if WINDOWS:
813 if WINDOWS:
813 try:
814 try:
814 return find_cmd('job')
815 return find_cmd('job')
815 except (FindCmdError, ImportError):
816 except (FindCmdError, ImportError):
816 # ImportError will be raised if win32api is not installed
817 # ImportError will be raised if win32api is not installed
817 return 'job'
818 return 'job'
818 else:
819 else:
819 return 'job'
820 return 'job'
820
821
821
822
822 class WindowsHPCLauncher(BaseLauncher):
823 class WindowsHPCLauncher(BaseLauncher):
823
824
824 job_id_regexp = CRegExp(r'\d+', config=True,
825 job_id_regexp = CRegExp(r'\d+', config=True,
825 help="""A regular expression used to get the job id from the output of the
826 help="""A regular expression used to get the job id from the output of the
826 submit_command. """
827 submit_command. """
827 )
828 )
828 job_file_name = Unicode(u'ipython_job.xml', config=True,
829 job_file_name = Unicode(u'ipython_job.xml', config=True,
829 help="The filename of the instantiated job script.")
830 help="The filename of the instantiated job script.")
830 # The full path to the instantiated job script. This gets made dynamically
831 # The full path to the instantiated job script. This gets made dynamically
831 # by combining the work_dir with the job_file_name.
832 # by combining the work_dir with the job_file_name.
832 job_file = Unicode(u'')
833 job_file = Unicode(u'')
833 scheduler = Unicode('', config=True,
834 scheduler = Unicode('', config=True,
834 help="The hostname of the scheduler to submit the job to.")
835 help="The hostname of the scheduler to submit the job to.")
835 job_cmd = Unicode(find_job_cmd(), config=True,
836 job_cmd = Unicode(find_job_cmd(), config=True,
836 help="The command for submitting jobs.")
837 help="The command for submitting jobs.")
837
838
838 def __init__(self, work_dir=u'.', config=None, **kwargs):
839 def __init__(self, work_dir=u'.', config=None, **kwargs):
839 super(WindowsHPCLauncher, self).__init__(
840 super(WindowsHPCLauncher, self).__init__(
840 work_dir=work_dir, config=config, **kwargs
841 work_dir=work_dir, config=config, **kwargs
841 )
842 )
842
843
843 @property
844 @property
844 def job_file(self):
845 def job_file(self):
845 return os.path.join(self.work_dir, self.job_file_name)
846 return os.path.join(self.work_dir, self.job_file_name)
846
847
847 def write_job_file(self, n):
848 def write_job_file(self, n):
848 raise NotImplementedError("Implement write_job_file in a subclass.")
849 raise NotImplementedError("Implement write_job_file in a subclass.")
849
850
850 def find_args(self):
851 def find_args(self):
851 return [u'job.exe']
852 return [u'job.exe']
852
853
853 def parse_job_id(self, output):
854 def parse_job_id(self, output):
854 """Take the output of the submit command and return the job id."""
855 """Take the output of the submit command and return the job id."""
855 m = self.job_id_regexp.search(output)
856 m = self.job_id_regexp.search(output)
856 if m is not None:
857 if m is not None:
857 job_id = m.group()
858 job_id = m.group()
858 else:
859 else:
859 raise LauncherError("Job id couldn't be determined: %s" % output)
860 raise LauncherError("Job id couldn't be determined: %s" % output)
860 self.job_id = job_id
861 self.job_id = job_id
861 self.log.info('Job started with id: %r', job_id)
862 self.log.info('Job started with id: %r', job_id)
862 return job_id
863 return job_id
863
864
864 def start(self, n):
865 def start(self, n):
865 """Start n copies of the process using the Win HPC job scheduler."""
866 """Start n copies of the process using the Win HPC job scheduler."""
866 self.write_job_file(n)
867 self.write_job_file(n)
867 args = [
868 args = [
868 'submit',
869 'submit',
869 '/jobfile:%s' % self.job_file,
870 '/jobfile:%s' % self.job_file,
870 '/scheduler:%s' % self.scheduler
871 '/scheduler:%s' % self.scheduler
871 ]
872 ]
872 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
873 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
873
874
874 output = check_output([self.job_cmd]+args,
875 output = check_output([self.job_cmd]+args,
875 env=os.environ,
876 env=os.environ,
876 cwd=self.work_dir,
877 cwd=self.work_dir,
877 stderr=STDOUT
878 stderr=STDOUT
878 )
879 )
879 job_id = self.parse_job_id(output)
880 job_id = self.parse_job_id(output)
880 self.notify_start(job_id)
881 self.notify_start(job_id)
881 return job_id
882 return job_id
882
883
883 def stop(self):
884 def stop(self):
884 args = [
885 args = [
885 'cancel',
886 'cancel',
886 self.job_id,
887 self.job_id,
887 '/scheduler:%s' % self.scheduler
888 '/scheduler:%s' % self.scheduler
888 ]
889 ]
889 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
890 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
890 try:
891 try:
891 output = check_output([self.job_cmd]+args,
892 output = check_output([self.job_cmd]+args,
892 env=os.environ,
893 env=os.environ,
893 cwd=self.work_dir,
894 cwd=self.work_dir,
894 stderr=STDOUT
895 stderr=STDOUT
895 )
896 )
896 except:
897 except:
897 output = 'The job already appears to be stoppped: %r' % self.job_id
898 output = 'The job already appears to be stoppped: %r' % self.job_id
898 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
899 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
899 return output
900 return output
900
901
901
902
902 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
903 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
903
904
904 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
905 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
905 help="WinHPC xml job file.")
906 help="WinHPC xml job file.")
906 controller_args = List([], config=False,
907 controller_args = List([], config=False,
907 help="extra args to pass to ipcontroller")
908 help="extra args to pass to ipcontroller")
908
909
909 def write_job_file(self, n):
910 def write_job_file(self, n):
910 job = IPControllerJob(config=self.config)
911 job = IPControllerJob(config=self.config)
911
912
912 t = IPControllerTask(config=self.config)
913 t = IPControllerTask(config=self.config)
913 # The tasks work directory is *not* the actual work directory of
914 # The tasks work directory is *not* the actual work directory of
914 # the controller. It is used as the base path for the stdout/stderr
915 # the controller. It is used as the base path for the stdout/stderr
915 # files that the scheduler redirects to.
916 # files that the scheduler redirects to.
916 t.work_directory = self.profile_dir
917 t.work_directory = self.profile_dir
917 # Add the profile_dir and from self.start().
918 # Add the profile_dir and from self.start().
918 t.controller_args.extend(self.cluster_args)
919 t.controller_args.extend(self.cluster_args)
919 t.controller_args.extend(self.controller_args)
920 t.controller_args.extend(self.controller_args)
920 job.add_task(t)
921 job.add_task(t)
921
922
922 self.log.debug("Writing job description file: %s", self.job_file)
923 self.log.debug("Writing job description file: %s", self.job_file)
923 job.write(self.job_file)
924 job.write(self.job_file)
924
925
925 @property
926 @property
926 def job_file(self):
927 def job_file(self):
927 return os.path.join(self.profile_dir, self.job_file_name)
928 return os.path.join(self.profile_dir, self.job_file_name)
928
929
929 def start(self):
930 def start(self):
930 """Start the controller by profile_dir."""
931 """Start the controller by profile_dir."""
931 return super(WindowsHPCControllerLauncher, self).start(1)
932 return super(WindowsHPCControllerLauncher, self).start(1)
932
933
933
934
934 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
935 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
935
936
936 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
937 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
937 help="jobfile for ipengines job")
938 help="jobfile for ipengines job")
938 engine_args = List([], config=False,
939 engine_args = List([], config=False,
939 help="extra args to pas to ipengine")
940 help="extra args to pas to ipengine")
940
941
941 def write_job_file(self, n):
942 def write_job_file(self, n):
942 job = IPEngineSetJob(config=self.config)
943 job = IPEngineSetJob(config=self.config)
943
944
944 for i in range(n):
945 for i in range(n):
945 t = IPEngineTask(config=self.config)
946 t = IPEngineTask(config=self.config)
946 # The tasks work directory is *not* the actual work directory of
947 # The tasks work directory is *not* the actual work directory of
947 # the engine. It is used as the base path for the stdout/stderr
948 # the engine. It is used as the base path for the stdout/stderr
948 # files that the scheduler redirects to.
949 # files that the scheduler redirects to.
949 t.work_directory = self.profile_dir
950 t.work_directory = self.profile_dir
950 # Add the profile_dir and from self.start().
951 # Add the profile_dir and from self.start().
951 t.engine_args.extend(self.cluster_args)
952 t.engine_args.extend(self.cluster_args)
952 t.engine_args.extend(self.engine_args)
953 t.engine_args.extend(self.engine_args)
953 job.add_task(t)
954 job.add_task(t)
954
955
955 self.log.debug("Writing job description file: %s", self.job_file)
956 self.log.debug("Writing job description file: %s", self.job_file)
956 job.write(self.job_file)
957 job.write(self.job_file)
957
958
958 @property
959 @property
959 def job_file(self):
960 def job_file(self):
960 return os.path.join(self.profile_dir, self.job_file_name)
961 return os.path.join(self.profile_dir, self.job_file_name)
961
962
962 def start(self, n):
963 def start(self, n):
963 """Start the controller by profile_dir."""
964 """Start the controller by profile_dir."""
964 return super(WindowsHPCEngineSetLauncher, self).start(n)
965 return super(WindowsHPCEngineSetLauncher, self).start(n)
965
966
966
967
967 #-----------------------------------------------------------------------------
968 #-----------------------------------------------------------------------------
968 # Batch (PBS) system launchers
969 # Batch (PBS) system launchers
969 #-----------------------------------------------------------------------------
970 #-----------------------------------------------------------------------------
970
971
971 class BatchClusterAppMixin(ClusterAppMixin):
972 class BatchClusterAppMixin(ClusterAppMixin):
972 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
973 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
973 def _profile_dir_changed(self, name, old, new):
974 def _profile_dir_changed(self, name, old, new):
974 self.context[name] = new
975 self.context[name] = new
975 _cluster_id_changed = _profile_dir_changed
976 _cluster_id_changed = _profile_dir_changed
976
977
977 def _profile_dir_default(self):
978 def _profile_dir_default(self):
978 self.context['profile_dir'] = ''
979 self.context['profile_dir'] = ''
979 return ''
980 return ''
980 def _cluster_id_default(self):
981 def _cluster_id_default(self):
981 self.context['cluster_id'] = ''
982 self.context['cluster_id'] = ''
982 return ''
983 return ''
983
984
984
985
985 class BatchSystemLauncher(BaseLauncher):
986 class BatchSystemLauncher(BaseLauncher):
986 """Launch an external process using a batch system.
987 """Launch an external process using a batch system.
987
988
988 This class is designed to work with UNIX batch systems like PBS, LSF,
989 This class is designed to work with UNIX batch systems like PBS, LSF,
989 GridEngine, etc. The overall model is that there are different commands
990 GridEngine, etc. The overall model is that there are different commands
990 like qsub, qdel, etc. that handle the starting and stopping of the process.
991 like qsub, qdel, etc. that handle the starting and stopping of the process.
991
992
992 This class also has the notion of a batch script. The ``batch_template``
993 This class also has the notion of a batch script. The ``batch_template``
993 attribute can be set to a string that is a template for the batch script.
994 attribute can be set to a string that is a template for the batch script.
994 This template is instantiated using string formatting. Thus the template can
995 This template is instantiated using string formatting. Thus the template can
995 use {n} fot the number of instances. Subclasses can add additional variables
996 use {n} fot the number of instances. Subclasses can add additional variables
996 to the template dict.
997 to the template dict.
997 """
998 """
998
999
999 # Subclasses must fill these in. See PBSEngineSet
1000 # Subclasses must fill these in. See PBSEngineSet
1000 submit_command = List([''], config=True,
1001 submit_command = List([''], config=True,
1001 help="The name of the command line program used to submit jobs.")
1002 help="The name of the command line program used to submit jobs.")
1002 delete_command = List([''], config=True,
1003 delete_command = List([''], config=True,
1003 help="The name of the command line program used to delete jobs.")
1004 help="The name of the command line program used to delete jobs.")
1004 job_id_regexp = CRegExp('', config=True,
1005 job_id_regexp = CRegExp('', config=True,
1005 help="""A regular expression used to get the job id from the output of the
1006 help="""A regular expression used to get the job id from the output of the
1006 submit_command.""")
1007 submit_command.""")
1007 batch_template = Unicode('', config=True,
1008 batch_template = Unicode('', config=True,
1008 help="The string that is the batch script template itself.")
1009 help="The string that is the batch script template itself.")
1009 batch_template_file = Unicode(u'', config=True,
1010 batch_template_file = Unicode(u'', config=True,
1010 help="The file that contains the batch template.")
1011 help="The file that contains the batch template.")
1011 batch_file_name = Unicode(u'batch_script', config=True,
1012 batch_file_name = Unicode(u'batch_script', config=True,
1012 help="The filename of the instantiated batch script.")
1013 help="The filename of the instantiated batch script.")
1013 queue = Unicode(u'', config=True,
1014 queue = Unicode(u'', config=True,
1014 help="The PBS Queue.")
1015 help="The PBS Queue.")
1015
1016
1016 def _queue_changed(self, name, old, new):
1017 def _queue_changed(self, name, old, new):
1017 self.context[name] = new
1018 self.context[name] = new
1018
1019
1019 n = Integer(1)
1020 n = Integer(1)
1020 _n_changed = _queue_changed
1021 _n_changed = _queue_changed
1021
1022
1022 # not configurable, override in subclasses
1023 # not configurable, override in subclasses
1023 # PBS Job Array regex
1024 # PBS Job Array regex
1024 job_array_regexp = CRegExp('')
1025 job_array_regexp = CRegExp('')
1025 job_array_template = Unicode('')
1026 job_array_template = Unicode('')
1026 # PBS Queue regex
1027 # PBS Queue regex
1027 queue_regexp = CRegExp('')
1028 queue_regexp = CRegExp('')
1028 queue_template = Unicode('')
1029 queue_template = Unicode('')
1029 # The default batch template, override in subclasses
1030 # The default batch template, override in subclasses
1030 default_template = Unicode('')
1031 default_template = Unicode('')
1031 # The full path to the instantiated batch script.
1032 # The full path to the instantiated batch script.
1032 batch_file = Unicode(u'')
1033 batch_file = Unicode(u'')
1033 # the format dict used with batch_template:
1034 # the format dict used with batch_template:
1034 context = Dict()
1035 context = Dict()
1035 def _context_default(self):
1036 def _context_default(self):
1036 """load the default context with the default values for the basic keys
1037 """load the default context with the default values for the basic keys
1037
1038
1038 because the _trait_changed methods only load the context if they
1039 because the _trait_changed methods only load the context if they
1039 are set to something other than the default value.
1040 are set to something other than the default value.
1040 """
1041 """
1041 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1042 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1042
1043
1043 # the Formatter instance for rendering the templates:
1044 # the Formatter instance for rendering the templates:
1044 formatter = Instance(EvalFormatter, (), {})
1045 formatter = Instance(EvalFormatter, (), {})
1045
1046
1046
1047
1047 def find_args(self):
1048 def find_args(self):
1048 return self.submit_command + [self.batch_file]
1049 return self.submit_command + [self.batch_file]
1049
1050
1050 def __init__(self, work_dir=u'.', config=None, **kwargs):
1051 def __init__(self, work_dir=u'.', config=None, **kwargs):
1051 super(BatchSystemLauncher, self).__init__(
1052 super(BatchSystemLauncher, self).__init__(
1052 work_dir=work_dir, config=config, **kwargs
1053 work_dir=work_dir, config=config, **kwargs
1053 )
1054 )
1054 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1055 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1055
1056
1056 def parse_job_id(self, output):
1057 def parse_job_id(self, output):
1057 """Take the output of the submit command and return the job id."""
1058 """Take the output of the submit command and return the job id."""
1058 m = self.job_id_regexp.search(output)
1059 m = self.job_id_regexp.search(output)
1059 if m is not None:
1060 if m is not None:
1060 job_id = m.group()
1061 job_id = m.group()
1061 else:
1062 else:
1062 raise LauncherError("Job id couldn't be determined: %s" % output)
1063 raise LauncherError("Job id couldn't be determined: %s" % output)
1063 self.job_id = job_id
1064 self.job_id = job_id
1064 self.log.info('Job submitted with job id: %r', job_id)
1065 self.log.info('Job submitted with job id: %r', job_id)
1065 return job_id
1066 return job_id
1066
1067
1067 def write_batch_script(self, n):
1068 def write_batch_script(self, n):
1068 """Instantiate and write the batch script to the work_dir."""
1069 """Instantiate and write the batch script to the work_dir."""
1069 self.n = n
1070 self.n = n
1070 # first priority is batch_template if set
1071 # first priority is batch_template if set
1071 if self.batch_template_file and not self.batch_template:
1072 if self.batch_template_file and not self.batch_template:
1072 # second priority is batch_template_file
1073 # second priority is batch_template_file
1073 with open(self.batch_template_file) as f:
1074 with open(self.batch_template_file) as f:
1074 self.batch_template = f.read()
1075 self.batch_template = f.read()
1075 if not self.batch_template:
1076 if not self.batch_template:
1076 # third (last) priority is default_template
1077 # third (last) priority is default_template
1077 self.batch_template = self.default_template
1078 self.batch_template = self.default_template
1078
1079
1079 # add jobarray or queue lines to user-specified template
1080 # add jobarray or queue lines to user-specified template
1080 # note that this is *only* when user did not specify a template.
1081 # note that this is *only* when user did not specify a template.
1081 # print self.job_array_regexp.search(self.batch_template)
1082 # print self.job_array_regexp.search(self.batch_template)
1082 if not self.job_array_regexp.search(self.batch_template):
1083 if not self.job_array_regexp.search(self.batch_template):
1083 self.log.debug("adding job array settings to batch script")
1084 self.log.debug("adding job array settings to batch script")
1084 firstline, rest = self.batch_template.split('\n',1)
1085 firstline, rest = self.batch_template.split('\n',1)
1085 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1086 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1086
1087
1087 # print self.queue_regexp.search(self.batch_template)
1088 # print self.queue_regexp.search(self.batch_template)
1088 if self.queue and not self.queue_regexp.search(self.batch_template):
1089 if self.queue and not self.queue_regexp.search(self.batch_template):
1089 self.log.debug("adding PBS queue settings to batch script")
1090 self.log.debug("adding PBS queue settings to batch script")
1090 firstline, rest = self.batch_template.split('\n',1)
1091 firstline, rest = self.batch_template.split('\n',1)
1091 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1092 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1092
1093
1093 script_as_string = self.formatter.format(self.batch_template, **self.context)
1094 script_as_string = self.formatter.format(self.batch_template, **self.context)
1094 self.log.debug('Writing batch script: %s', self.batch_file)
1095 self.log.debug('Writing batch script: %s', self.batch_file)
1095
1096
1096 with open(self.batch_file, 'w') as f:
1097 with open(self.batch_file, 'w') as f:
1097 f.write(script_as_string)
1098 f.write(script_as_string)
1098 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1099 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1099
1100
1100 def start(self, n):
1101 def start(self, n):
1101 """Start n copies of the process using a batch system."""
1102 """Start n copies of the process using a batch system."""
1102 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1103 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1103 # Here we save profile_dir in the context so they
1104 # Here we save profile_dir in the context so they
1104 # can be used in the batch script template as {profile_dir}
1105 # can be used in the batch script template as {profile_dir}
1105 self.write_batch_script(n)
1106 self.write_batch_script(n)
1106 output = check_output(self.args, env=os.environ)
1107 output = check_output(self.args, env=os.environ)
1107
1108
1108 job_id = self.parse_job_id(output)
1109 job_id = self.parse_job_id(output)
1109 self.notify_start(job_id)
1110 self.notify_start(job_id)
1110 return job_id
1111 return job_id
1111
1112
1112 def stop(self):
1113 def stop(self):
1113 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1114 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1114 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1115 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1115 return output
1116 return output
1116
1117
1117
1118
1118 class PBSLauncher(BatchSystemLauncher):
1119 class PBSLauncher(BatchSystemLauncher):
1119 """A BatchSystemLauncher subclass for PBS."""
1120 """A BatchSystemLauncher subclass for PBS."""
1120
1121
1121 submit_command = List(['qsub'], config=True,
1122 submit_command = List(['qsub'], config=True,
1122 help="The PBS submit command ['qsub']")
1123 help="The PBS submit command ['qsub']")
1123 delete_command = List(['qdel'], config=True,
1124 delete_command = List(['qdel'], config=True,
1124 help="The PBS delete command ['qsub']")
1125 help="The PBS delete command ['qsub']")
1125 job_id_regexp = CRegExp(r'\d+', config=True,
1126 job_id_regexp = CRegExp(r'\d+', config=True,
1126 help="Regular expresion for identifying the job ID [r'\d+']")
1127 help="Regular expresion for identifying the job ID [r'\d+']")
1127
1128
1128 batch_file = Unicode(u'')
1129 batch_file = Unicode(u'')
1129 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1130 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1130 job_array_template = Unicode('#PBS -t 1-{n}')
1131 job_array_template = Unicode('#PBS -t 1-{n}')
1131 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1132 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1132 queue_template = Unicode('#PBS -q {queue}')
1133 queue_template = Unicode('#PBS -q {queue}')
1133
1134
1134
1135
1135 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1136 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1136 """Launch a controller using PBS."""
1137 """Launch a controller using PBS."""
1137
1138
1138 batch_file_name = Unicode(u'pbs_controller', config=True,
1139 batch_file_name = Unicode(u'pbs_controller', config=True,
1139 help="batch file name for the controller job.")
1140 help="batch file name for the controller job.")
1140 default_template= Unicode("""#!/bin/sh
1141 default_template= Unicode("""#!/bin/sh
1141 #PBS -V
1142 #PBS -V
1142 #PBS -N ipcontroller
1143 #PBS -N ipcontroller
1143 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1144 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1144 """%(' '.join(ipcontroller_cmd_argv)))
1145 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1145
1146
1146
1147
1147 def start(self):
1148 def start(self):
1148 """Start the controller by profile or profile_dir."""
1149 """Start the controller by profile or profile_dir."""
1149 return super(PBSControllerLauncher, self).start(1)
1150 return super(PBSControllerLauncher, self).start(1)
1150
1151
1151
1152
1152 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1153 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1153 """Launch Engines using PBS"""
1154 """Launch Engines using PBS"""
1154 batch_file_name = Unicode(u'pbs_engines', config=True,
1155 batch_file_name = Unicode(u'pbs_engines', config=True,
1155 help="batch file name for the engine(s) job.")
1156 help="batch file name for the engine(s) job.")
1156 default_template= Unicode(u"""#!/bin/sh
1157 default_template= Unicode(u"""#!/bin/sh
1157 #PBS -V
1158 #PBS -V
1158 #PBS -N ipengine
1159 #PBS -N ipengine
1159 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1160 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1160 """%(' '.join(ipengine_cmd_argv)))
1161 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1161
1162
1162 def start(self, n):
1163 def start(self, n):
1163 """Start n engines by profile or profile_dir."""
1164 """Start n engines by profile or profile_dir."""
1164 return super(PBSEngineSetLauncher, self).start(n)
1165 return super(PBSEngineSetLauncher, self).start(n)
1165
1166
1166 #SGE is very similar to PBS
1167 #SGE is very similar to PBS
1167
1168
1168 class SGELauncher(PBSLauncher):
1169 class SGELauncher(PBSLauncher):
1169 """Sun GridEngine is a PBS clone with slightly different syntax"""
1170 """Sun GridEngine is a PBS clone with slightly different syntax"""
1170 job_array_regexp = CRegExp('#\$\W+\-t')
1171 job_array_regexp = CRegExp('#\$\W+\-t')
1171 job_array_template = Unicode('#$ -t 1-{n}')
1172 job_array_template = Unicode('#$ -t 1-{n}')
1172 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1173 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1173 queue_template = Unicode('#$ -q {queue}')
1174 queue_template = Unicode('#$ -q {queue}')
1174
1175
1175 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1176 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1176 """Launch a controller using SGE."""
1177 """Launch a controller using SGE."""
1177
1178
1178 batch_file_name = Unicode(u'sge_controller', config=True,
1179 batch_file_name = Unicode(u'sge_controller', config=True,
1179 help="batch file name for the ipontroller job.")
1180 help="batch file name for the ipontroller job.")
1180 default_template= Unicode(u"""#$ -V
1181 default_template= Unicode(u"""#$ -V
1181 #$ -S /bin/sh
1182 #$ -S /bin/sh
1182 #$ -N ipcontroller
1183 #$ -N ipcontroller
1183 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1184 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1184 """%(' '.join(ipcontroller_cmd_argv)))
1185 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1185
1186
1186 def start(self):
1187 def start(self):
1187 """Start the controller by profile or profile_dir."""
1188 """Start the controller by profile or profile_dir."""
1188 return super(SGEControllerLauncher, self).start(1)
1189 return super(SGEControllerLauncher, self).start(1)
1189
1190
1190 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1191 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1191 """Launch Engines with SGE"""
1192 """Launch Engines with SGE"""
1192 batch_file_name = Unicode(u'sge_engines', config=True,
1193 batch_file_name = Unicode(u'sge_engines', config=True,
1193 help="batch file name for the engine(s) job.")
1194 help="batch file name for the engine(s) job.")
1194 default_template = Unicode("""#$ -V
1195 default_template = Unicode("""#$ -V
1195 #$ -S /bin/sh
1196 #$ -S /bin/sh
1196 #$ -N ipengine
1197 #$ -N ipengine
1197 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1198 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1198 """%(' '.join(ipengine_cmd_argv)))
1199 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1199
1200
1200 def start(self, n):
1201 def start(self, n):
1201 """Start n engines by profile or profile_dir."""
1202 """Start n engines by profile or profile_dir."""
1202 return super(SGEEngineSetLauncher, self).start(n)
1203 return super(SGEEngineSetLauncher, self).start(n)
1203
1204
1204
1205
1205 # LSF launchers
1206 # LSF launchers
1206
1207
1207 class LSFLauncher(BatchSystemLauncher):
1208 class LSFLauncher(BatchSystemLauncher):
1208 """A BatchSystemLauncher subclass for LSF."""
1209 """A BatchSystemLauncher subclass for LSF."""
1209
1210
1210 submit_command = List(['bsub'], config=True,
1211 submit_command = List(['bsub'], config=True,
1211 help="The PBS submit command ['bsub']")
1212 help="The PBS submit command ['bsub']")
1212 delete_command = List(['bkill'], config=True,
1213 delete_command = List(['bkill'], config=True,
1213 help="The PBS delete command ['bkill']")
1214 help="The PBS delete command ['bkill']")
1214 job_id_regexp = CRegExp(r'\d+', config=True,
1215 job_id_regexp = CRegExp(r'\d+', config=True,
1215 help="Regular expresion for identifying the job ID [r'\d+']")
1216 help="Regular expresion for identifying the job ID [r'\d+']")
1216
1217
1217 batch_file = Unicode(u'')
1218 batch_file = Unicode(u'')
1218 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1219 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1219 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1220 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1220 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1221 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1221 queue_template = Unicode('#BSUB -q {queue}')
1222 queue_template = Unicode('#BSUB -q {queue}')
1222
1223
1223 def start(self, n):
1224 def start(self, n):
1224 """Start n copies of the process using LSF batch system.
1225 """Start n copies of the process using LSF batch system.
1225 This cant inherit from the base class because bsub expects
1226 This cant inherit from the base class because bsub expects
1226 to be piped a shell script in order to honor the #BSUB directives :
1227 to be piped a shell script in order to honor the #BSUB directives :
1227 bsub < script
1228 bsub < script
1228 """
1229 """
1229 # Here we save profile_dir in the context so they
1230 # Here we save profile_dir in the context so they
1230 # can be used in the batch script template as {profile_dir}
1231 # can be used in the batch script template as {profile_dir}
1231 self.write_batch_script(n)
1232 self.write_batch_script(n)
1232 #output = check_output(self.args, env=os.environ)
1233 #output = check_output(self.args, env=os.environ)
1233 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1234 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1234 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1235 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1235 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1236 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1236 output,err = p.communicate()
1237 output,err = p.communicate()
1237 job_id = self.parse_job_id(output)
1238 job_id = self.parse_job_id(output)
1238 self.notify_start(job_id)
1239 self.notify_start(job_id)
1239 return job_id
1240 return job_id
1240
1241
1241
1242
1242 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1243 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1243 """Launch a controller using LSF."""
1244 """Launch a controller using LSF."""
1244
1245
1245 batch_file_name = Unicode(u'lsf_controller', config=True,
1246 batch_file_name = Unicode(u'lsf_controller', config=True,
1246 help="batch file name for the controller job.")
1247 help="batch file name for the controller job.")
1247 default_template= Unicode("""#!/bin/sh
1248 default_template= Unicode("""#!/bin/sh
1248 #BSUB -J ipcontroller
1249 #BSUB -J ipcontroller
1249 #BSUB -oo ipcontroller.o.%%J
1250 #BSUB -oo ipcontroller.o.%%J
1250 #BSUB -eo ipcontroller.e.%%J
1251 #BSUB -eo ipcontroller.e.%%J
1251 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1252 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1252 """%(' '.join(ipcontroller_cmd_argv)))
1253 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1253
1254
1254 def start(self):
1255 def start(self):
1255 """Start the controller by profile or profile_dir."""
1256 """Start the controller by profile or profile_dir."""
1256 return super(LSFControllerLauncher, self).start(1)
1257 return super(LSFControllerLauncher, self).start(1)
1257
1258
1258
1259
1259 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1260 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1260 """Launch Engines using LSF"""
1261 """Launch Engines using LSF"""
1261 batch_file_name = Unicode(u'lsf_engines', config=True,
1262 batch_file_name = Unicode(u'lsf_engines', config=True,
1262 help="batch file name for the engine(s) job.")
1263 help="batch file name for the engine(s) job.")
1263 default_template= Unicode(u"""#!/bin/sh
1264 default_template= Unicode(u"""#!/bin/sh
1264 #BSUB -oo ipengine.o.%%J
1265 #BSUB -oo ipengine.o.%%J
1265 #BSUB -eo ipengine.e.%%J
1266 #BSUB -eo ipengine.e.%%J
1266 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1267 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1267 """%(' '.join(ipengine_cmd_argv)))
1268 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1268
1269
1269 def start(self, n):
1270 def start(self, n):
1270 """Start n engines by profile or profile_dir."""
1271 """Start n engines by profile or profile_dir."""
1271 return super(LSFEngineSetLauncher, self).start(n)
1272 return super(LSFEngineSetLauncher, self).start(n)
1272
1273
1273
1274
1274 #-----------------------------------------------------------------------------
1275 #-----------------------------------------------------------------------------
1275 # A launcher for ipcluster itself!
1276 # A launcher for ipcluster itself!
1276 #-----------------------------------------------------------------------------
1277 #-----------------------------------------------------------------------------
1277
1278
1278
1279
1279 class IPClusterLauncher(LocalProcessLauncher):
1280 class IPClusterLauncher(LocalProcessLauncher):
1280 """Launch the ipcluster program in an external process."""
1281 """Launch the ipcluster program in an external process."""
1281
1282
1282 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1283 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1283 help="Popen command for ipcluster")
1284 help="Popen command for ipcluster")
1284 ipcluster_args = List(
1285 ipcluster_args = List(
1285 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1286 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1286 help="Command line arguments to pass to ipcluster.")
1287 help="Command line arguments to pass to ipcluster.")
1287 ipcluster_subcommand = Unicode('start')
1288 ipcluster_subcommand = Unicode('start')
1288 profile = Unicode('default')
1289 profile = Unicode('default')
1289 n = Integer(2)
1290 n = Integer(2)
1290
1291
1291 def find_args(self):
1292 def find_args(self):
1292 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1293 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1293 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1294 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1294 self.ipcluster_args
1295 self.ipcluster_args
1295
1296
1296 def start(self):
1297 def start(self):
1297 return super(IPClusterLauncher, self).start()
1298 return super(IPClusterLauncher, self).start()
1298
1299
1299 #-----------------------------------------------------------------------------
1300 #-----------------------------------------------------------------------------
1300 # Collections of launchers
1301 # Collections of launchers
1301 #-----------------------------------------------------------------------------
1302 #-----------------------------------------------------------------------------
1302
1303
1303 local_launchers = [
1304 local_launchers = [
1304 LocalControllerLauncher,
1305 LocalControllerLauncher,
1305 LocalEngineLauncher,
1306 LocalEngineLauncher,
1306 LocalEngineSetLauncher,
1307 LocalEngineSetLauncher,
1307 ]
1308 ]
1308 mpi_launchers = [
1309 mpi_launchers = [
1309 MPILauncher,
1310 MPILauncher,
1310 MPIControllerLauncher,
1311 MPIControllerLauncher,
1311 MPIEngineSetLauncher,
1312 MPIEngineSetLauncher,
1312 ]
1313 ]
1313 ssh_launchers = [
1314 ssh_launchers = [
1314 SSHLauncher,
1315 SSHLauncher,
1315 SSHControllerLauncher,
1316 SSHControllerLauncher,
1316 SSHEngineLauncher,
1317 SSHEngineLauncher,
1317 SSHEngineSetLauncher,
1318 SSHEngineSetLauncher,
1318 ]
1319 ]
1319 winhpc_launchers = [
1320 winhpc_launchers = [
1320 WindowsHPCLauncher,
1321 WindowsHPCLauncher,
1321 WindowsHPCControllerLauncher,
1322 WindowsHPCControllerLauncher,
1322 WindowsHPCEngineSetLauncher,
1323 WindowsHPCEngineSetLauncher,
1323 ]
1324 ]
1324 pbs_launchers = [
1325 pbs_launchers = [
1325 PBSLauncher,
1326 PBSLauncher,
1326 PBSControllerLauncher,
1327 PBSControllerLauncher,
1327 PBSEngineSetLauncher,
1328 PBSEngineSetLauncher,
1328 ]
1329 ]
1329 sge_launchers = [
1330 sge_launchers = [
1330 SGELauncher,
1331 SGELauncher,
1331 SGEControllerLauncher,
1332 SGEControllerLauncher,
1332 SGEEngineSetLauncher,
1333 SGEEngineSetLauncher,
1333 ]
1334 ]
1334 lsf_launchers = [
1335 lsf_launchers = [
1335 LSFLauncher,
1336 LSFLauncher,
1336 LSFControllerLauncher,
1337 LSFControllerLauncher,
1337 LSFEngineSetLauncher,
1338 LSFEngineSetLauncher,
1338 ]
1339 ]
1339 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1340 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1340 + pbs_launchers + sge_launchers + lsf_launchers
1341 + pbs_launchers + sge_launchers + lsf_launchers
1341
1342
General Comments 0
You need to be logged in to leave comments. Login now