##// END OF EJS Templates
use [sys.exe, "-c", "…launch_new_instance()"] in launchers...
MinRK -
Show More
@@ -1,1348 +1,1344
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import re
25 import re
26 import stat
26 import stat
27 import sys
27 import time
28 import time
28
29
29 # signal imports, handling various platforms, versions
30 # signal imports, handling various platforms, versions
30
31
31 from signal import SIGINT, SIGTERM
32 from signal import SIGINT, SIGTERM
32 try:
33 try:
33 from signal import SIGKILL
34 from signal import SIGKILL
34 except ImportError:
35 except ImportError:
35 # Windows
36 # Windows
36 SIGKILL=SIGTERM
37 SIGKILL=SIGTERM
37
38
38 try:
39 try:
39 # Windows >= 2.7, 3.2
40 # Windows >= 2.7, 3.2
40 from signal import CTRL_C_EVENT as SIGINT
41 from signal import CTRL_C_EVENT as SIGINT
41 except ImportError:
42 except ImportError:
42 pass
43 pass
43
44
44 from subprocess import Popen, PIPE, STDOUT
45 from subprocess import Popen, PIPE, STDOUT
45 try:
46 try:
46 from subprocess import check_output
47 from subprocess import check_output
47 except ImportError:
48 except ImportError:
48 # pre-2.7, define check_output with Popen
49 # pre-2.7, define check_output with Popen
49 def check_output(*args, **kwargs):
50 def check_output(*args, **kwargs):
50 kwargs.update(dict(stdout=PIPE))
51 kwargs.update(dict(stdout=PIPE))
51 p = Popen(*args, **kwargs)
52 p = Popen(*args, **kwargs)
52 out,err = p.communicate()
53 out,err = p.communicate()
53 return out
54 return out
54
55
55 from zmq.eventloop import ioloop
56 from zmq.eventloop import ioloop
56
57
57 from IPython.config.application import Application
58 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import (
61 from IPython.utils.traitlets import (
61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
63 )
63 from IPython.utils.path import get_ipython_module_path, get_home_dir
64 from IPython.utils.path import get_home_dir
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
65 from IPython.utils.process import find_cmd, FindCmdError
65
66
66 from .win32support import forward_read_events
67 from .win32support import forward_read_events
67
68
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69
70
70 WINDOWS = os.name == 'nt'
71 WINDOWS = os.name == 'nt'
71
72
72 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
73 # Paths to the kernel apps
74 # Paths to the kernel apps
74 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
75
76
77 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
76
78
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
79 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
78 'IPython.parallel.apps.ipclusterapp'
79 ))
80
80
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
81 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
82 'IPython.parallel.apps.ipengineapp'
83 ))
84
82
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
83 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
86 'IPython.parallel.apps.ipcontrollerapp'
87 ))
88
84
89 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
90 # Base launchers and errors
86 # Base launchers and errors
91 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
92
88
93
89
94 class LauncherError(Exception):
90 class LauncherError(Exception):
95 pass
91 pass
96
92
97
93
98 class ProcessStateError(LauncherError):
94 class ProcessStateError(LauncherError):
99 pass
95 pass
100
96
101
97
102 class UnknownStatus(LauncherError):
98 class UnknownStatus(LauncherError):
103 pass
99 pass
104
100
105
101
106 class BaseLauncher(LoggingConfigurable):
102 class BaseLauncher(LoggingConfigurable):
107 """An asbtraction for starting, stopping and signaling a process."""
103 """An asbtraction for starting, stopping and signaling a process."""
108
104
109 # In all of the launchers, the work_dir is where child processes will be
105 # In all of the launchers, the work_dir is where child processes will be
110 # run. This will usually be the profile_dir, but may not be. any work_dir
106 # run. This will usually be the profile_dir, but may not be. any work_dir
111 # passed into the __init__ method will override the config value.
107 # passed into the __init__ method will override the config value.
112 # This should not be used to set the work_dir for the actual engine
108 # This should not be used to set the work_dir for the actual engine
113 # and controller. Instead, use their own config files or the
109 # and controller. Instead, use their own config files or the
114 # controller_args, engine_args attributes of the launchers to add
110 # controller_args, engine_args attributes of the launchers to add
115 # the work_dir option.
111 # the work_dir option.
116 work_dir = Unicode(u'.')
112 work_dir = Unicode(u'.')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
113 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118
114
119 start_data = Any()
115 start_data = Any()
120 stop_data = Any()
116 stop_data = Any()
121
117
122 def _loop_default(self):
118 def _loop_default(self):
123 return ioloop.IOLoop.instance()
119 return ioloop.IOLoop.instance()
124
120
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
121 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 self.state = 'before' # can be before, running, after
123 self.state = 'before' # can be before, running, after
128 self.stop_callbacks = []
124 self.stop_callbacks = []
129 self.start_data = None
125 self.start_data = None
130 self.stop_data = None
126 self.stop_data = None
131
127
132 @property
128 @property
133 def args(self):
129 def args(self):
134 """A list of cmd and args that will be used to start the process.
130 """A list of cmd and args that will be used to start the process.
135
131
136 This is what is passed to :func:`spawnProcess` and the first element
132 This is what is passed to :func:`spawnProcess` and the first element
137 will be the process name.
133 will be the process name.
138 """
134 """
139 return self.find_args()
135 return self.find_args()
140
136
141 def find_args(self):
137 def find_args(self):
142 """The ``.args`` property calls this to find the args list.
138 """The ``.args`` property calls this to find the args list.
143
139
144 Subcommand should implement this to construct the cmd and args.
140 Subcommand should implement this to construct the cmd and args.
145 """
141 """
146 raise NotImplementedError('find_args must be implemented in a subclass')
142 raise NotImplementedError('find_args must be implemented in a subclass')
147
143
148 @property
144 @property
149 def arg_str(self):
145 def arg_str(self):
150 """The string form of the program arguments."""
146 """The string form of the program arguments."""
151 return ' '.join(self.args)
147 return ' '.join(self.args)
152
148
153 @property
149 @property
154 def running(self):
150 def running(self):
155 """Am I running."""
151 """Am I running."""
156 if self.state == 'running':
152 if self.state == 'running':
157 return True
153 return True
158 else:
154 else:
159 return False
155 return False
160
156
161 def start(self):
157 def start(self):
162 """Start the process."""
158 """Start the process."""
163 raise NotImplementedError('start must be implemented in a subclass')
159 raise NotImplementedError('start must be implemented in a subclass')
164
160
165 def stop(self):
161 def stop(self):
166 """Stop the process and notify observers of stopping.
162 """Stop the process and notify observers of stopping.
167
163
168 This method will return None immediately.
164 This method will return None immediately.
169 To observe the actual process stopping, see :meth:`on_stop`.
165 To observe the actual process stopping, see :meth:`on_stop`.
170 """
166 """
171 raise NotImplementedError('stop must be implemented in a subclass')
167 raise NotImplementedError('stop must be implemented in a subclass')
172
168
173 def on_stop(self, f):
169 def on_stop(self, f):
174 """Register a callback to be called with this Launcher's stop_data
170 """Register a callback to be called with this Launcher's stop_data
175 when the process actually finishes.
171 when the process actually finishes.
176 """
172 """
177 if self.state=='after':
173 if self.state=='after':
178 return f(self.stop_data)
174 return f(self.stop_data)
179 else:
175 else:
180 self.stop_callbacks.append(f)
176 self.stop_callbacks.append(f)
181
177
182 def notify_start(self, data):
178 def notify_start(self, data):
183 """Call this to trigger startup actions.
179 """Call this to trigger startup actions.
184
180
185 This logs the process startup and sets the state to 'running'. It is
181 This logs the process startup and sets the state to 'running'. It is
186 a pass-through so it can be used as a callback.
182 a pass-through so it can be used as a callback.
187 """
183 """
188
184
189 self.log.debug('Process %r started: %r', self.args[0], data)
185 self.log.debug('Process %r started: %r', self.args[0], data)
190 self.start_data = data
186 self.start_data = data
191 self.state = 'running'
187 self.state = 'running'
192 return data
188 return data
193
189
194 def notify_stop(self, data):
190 def notify_stop(self, data):
195 """Call this to trigger process stop actions.
191 """Call this to trigger process stop actions.
196
192
197 This logs the process stopping and sets the state to 'after'. Call
193 This logs the process stopping and sets the state to 'after'. Call
198 this to trigger callbacks registered via :meth:`on_stop`."""
194 this to trigger callbacks registered via :meth:`on_stop`."""
199
195
200 self.log.debug('Process %r stopped: %r', self.args[0], data)
196 self.log.debug('Process %r stopped: %r', self.args[0], data)
201 self.stop_data = data
197 self.stop_data = data
202 self.state = 'after'
198 self.state = 'after'
203 for i in range(len(self.stop_callbacks)):
199 for i in range(len(self.stop_callbacks)):
204 d = self.stop_callbacks.pop()
200 d = self.stop_callbacks.pop()
205 d(data)
201 d(data)
206 return data
202 return data
207
203
208 def signal(self, sig):
204 def signal(self, sig):
209 """Signal the process.
205 """Signal the process.
210
206
211 Parameters
207 Parameters
212 ----------
208 ----------
213 sig : str or int
209 sig : str or int
214 'KILL', 'INT', etc., or any signal number
210 'KILL', 'INT', etc., or any signal number
215 """
211 """
216 raise NotImplementedError('signal must be implemented in a subclass')
212 raise NotImplementedError('signal must be implemented in a subclass')
217
213
218 class ClusterAppMixin(HasTraits):
214 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
215 """MixIn for cluster args as traits"""
220 profile_dir=Unicode('')
216 profile_dir=Unicode('')
221 cluster_id=Unicode('')
217 cluster_id=Unicode('')
222
218
223 @property
219 @property
224 def cluster_args(self):
220 def cluster_args(self):
225 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
221 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
226
222
227 class ControllerMixin(ClusterAppMixin):
223 class ControllerMixin(ClusterAppMixin):
228 controller_cmd = List(ipcontroller_cmd_argv, config=True,
224 controller_cmd = List(ipcontroller_cmd_argv, config=True,
229 help="""Popen command to launch ipcontroller.""")
225 help="""Popen command to launch ipcontroller.""")
230 # Command line arguments to ipcontroller.
226 # Command line arguments to ipcontroller.
231 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
227 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
232 help="""command-line args to pass to ipcontroller""")
228 help="""command-line args to pass to ipcontroller""")
233
229
234 class EngineMixin(ClusterAppMixin):
230 class EngineMixin(ClusterAppMixin):
235 engine_cmd = List(ipengine_cmd_argv, config=True,
231 engine_cmd = List(ipengine_cmd_argv, config=True,
236 help="""command to launch the Engine.""")
232 help="""command to launch the Engine.""")
237 # Command line arguments for ipengine.
233 # Command line arguments for ipengine.
238 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
234 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
239 help="command-line arguments to pass to ipengine"
235 help="command-line arguments to pass to ipengine"
240 )
236 )
241
237
242
238
243 #-----------------------------------------------------------------------------
239 #-----------------------------------------------------------------------------
244 # Local process launchers
240 # Local process launchers
245 #-----------------------------------------------------------------------------
241 #-----------------------------------------------------------------------------
246
242
247
243
248 class LocalProcessLauncher(BaseLauncher):
244 class LocalProcessLauncher(BaseLauncher):
249 """Start and stop an external process in an asynchronous manner.
245 """Start and stop an external process in an asynchronous manner.
250
246
251 This will launch the external process with a working directory of
247 This will launch the external process with a working directory of
252 ``self.work_dir``.
248 ``self.work_dir``.
253 """
249 """
254
250
255 # This is used to to construct self.args, which is passed to
251 # This is used to to construct self.args, which is passed to
256 # spawnProcess.
252 # spawnProcess.
257 cmd_and_args = List([])
253 cmd_and_args = List([])
258 poll_frequency = Integer(100) # in ms
254 poll_frequency = Integer(100) # in ms
259
255
260 def __init__(self, work_dir=u'.', config=None, **kwargs):
256 def __init__(self, work_dir=u'.', config=None, **kwargs):
261 super(LocalProcessLauncher, self).__init__(
257 super(LocalProcessLauncher, self).__init__(
262 work_dir=work_dir, config=config, **kwargs
258 work_dir=work_dir, config=config, **kwargs
263 )
259 )
264 self.process = None
260 self.process = None
265 self.poller = None
261 self.poller = None
266
262
267 def find_args(self):
263 def find_args(self):
268 return self.cmd_and_args
264 return self.cmd_and_args
269
265
270 def start(self):
266 def start(self):
271 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
267 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
272 if self.state == 'before':
268 if self.state == 'before':
273 self.process = Popen(self.args,
269 self.process = Popen(self.args,
274 stdout=PIPE,stderr=PIPE,stdin=PIPE,
270 stdout=PIPE,stderr=PIPE,stdin=PIPE,
275 env=os.environ,
271 env=os.environ,
276 cwd=self.work_dir
272 cwd=self.work_dir
277 )
273 )
278 if WINDOWS:
274 if WINDOWS:
279 self.stdout = forward_read_events(self.process.stdout)
275 self.stdout = forward_read_events(self.process.stdout)
280 self.stderr = forward_read_events(self.process.stderr)
276 self.stderr = forward_read_events(self.process.stderr)
281 else:
277 else:
282 self.stdout = self.process.stdout.fileno()
278 self.stdout = self.process.stdout.fileno()
283 self.stderr = self.process.stderr.fileno()
279 self.stderr = self.process.stderr.fileno()
284 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
280 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
285 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
281 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
286 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
282 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
287 self.poller.start()
283 self.poller.start()
288 self.notify_start(self.process.pid)
284 self.notify_start(self.process.pid)
289 else:
285 else:
290 s = 'The process was already started and has state: %r' % self.state
286 s = 'The process was already started and has state: %r' % self.state
291 raise ProcessStateError(s)
287 raise ProcessStateError(s)
292
288
293 def stop(self):
289 def stop(self):
294 return self.interrupt_then_kill()
290 return self.interrupt_then_kill()
295
291
296 def signal(self, sig):
292 def signal(self, sig):
297 if self.state == 'running':
293 if self.state == 'running':
298 if WINDOWS and sig != SIGINT:
294 if WINDOWS and sig != SIGINT:
299 # use Windows tree-kill for better child cleanup
295 # use Windows tree-kill for better child cleanup
300 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
296 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
301 else:
297 else:
302 self.process.send_signal(sig)
298 self.process.send_signal(sig)
303
299
304 def interrupt_then_kill(self, delay=2.0):
300 def interrupt_then_kill(self, delay=2.0):
305 """Send INT, wait a delay and then send KILL."""
301 """Send INT, wait a delay and then send KILL."""
306 try:
302 try:
307 self.signal(SIGINT)
303 self.signal(SIGINT)
308 except Exception:
304 except Exception:
309 self.log.debug("interrupt failed")
305 self.log.debug("interrupt failed")
310 pass
306 pass
311 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
307 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
312 self.killer.start()
308 self.killer.start()
313
309
314 # callbacks, etc:
310 # callbacks, etc:
315
311
316 def handle_stdout(self, fd, events):
312 def handle_stdout(self, fd, events):
317 if WINDOWS:
313 if WINDOWS:
318 line = self.stdout.recv()
314 line = self.stdout.recv()
319 else:
315 else:
320 line = self.process.stdout.readline()
316 line = self.process.stdout.readline()
321 # a stopped process will be readable but return empty strings
317 # a stopped process will be readable but return empty strings
322 if line:
318 if line:
323 self.log.debug(line[:-1])
319 self.log.debug(line[:-1])
324 else:
320 else:
325 self.poll()
321 self.poll()
326
322
327 def handle_stderr(self, fd, events):
323 def handle_stderr(self, fd, events):
328 if WINDOWS:
324 if WINDOWS:
329 line = self.stderr.recv()
325 line = self.stderr.recv()
330 else:
326 else:
331 line = self.process.stderr.readline()
327 line = self.process.stderr.readline()
332 # a stopped process will be readable but return empty strings
328 # a stopped process will be readable but return empty strings
333 if line:
329 if line:
334 self.log.debug(line[:-1])
330 self.log.debug(line[:-1])
335 else:
331 else:
336 self.poll()
332 self.poll()
337
333
338 def poll(self):
334 def poll(self):
339 status = self.process.poll()
335 status = self.process.poll()
340 if status is not None:
336 if status is not None:
341 self.poller.stop()
337 self.poller.stop()
342 self.loop.remove_handler(self.stdout)
338 self.loop.remove_handler(self.stdout)
343 self.loop.remove_handler(self.stderr)
339 self.loop.remove_handler(self.stderr)
344 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
340 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
345 return status
341 return status
346
342
347 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
343 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
348 """Launch a controller as a regular external process."""
344 """Launch a controller as a regular external process."""
349
345
350 def find_args(self):
346 def find_args(self):
351 return self.controller_cmd + self.cluster_args + self.controller_args
347 return self.controller_cmd + self.cluster_args + self.controller_args
352
348
353 def start(self):
349 def start(self):
354 """Start the controller by profile_dir."""
350 """Start the controller by profile_dir."""
355 return super(LocalControllerLauncher, self).start()
351 return super(LocalControllerLauncher, self).start()
356
352
357
353
358 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
354 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
359 """Launch a single engine as a regular externall process."""
355 """Launch a single engine as a regular externall process."""
360
356
361 def find_args(self):
357 def find_args(self):
362 return self.engine_cmd + self.cluster_args + self.engine_args
358 return self.engine_cmd + self.cluster_args + self.engine_args
363
359
364
360
365 class LocalEngineSetLauncher(LocalEngineLauncher):
361 class LocalEngineSetLauncher(LocalEngineLauncher):
366 """Launch a set of engines as regular external processes."""
362 """Launch a set of engines as regular external processes."""
367
363
368 delay = CFloat(0.1, config=True,
364 delay = CFloat(0.1, config=True,
369 help="""delay (in seconds) between starting each engine after the first.
365 help="""delay (in seconds) between starting each engine after the first.
370 This can help force the engines to get their ids in order, or limit
366 This can help force the engines to get their ids in order, or limit
371 process flood when starting many engines."""
367 process flood when starting many engines."""
372 )
368 )
373
369
374 # launcher class
370 # launcher class
375 launcher_class = LocalEngineLauncher
371 launcher_class = LocalEngineLauncher
376
372
377 launchers = Dict()
373 launchers = Dict()
378 stop_data = Dict()
374 stop_data = Dict()
379
375
380 def __init__(self, work_dir=u'.', config=None, **kwargs):
376 def __init__(self, work_dir=u'.', config=None, **kwargs):
381 super(LocalEngineSetLauncher, self).__init__(
377 super(LocalEngineSetLauncher, self).__init__(
382 work_dir=work_dir, config=config, **kwargs
378 work_dir=work_dir, config=config, **kwargs
383 )
379 )
384 self.stop_data = {}
380 self.stop_data = {}
385
381
386 def start(self, n):
382 def start(self, n):
387 """Start n engines by profile or profile_dir."""
383 """Start n engines by profile or profile_dir."""
388 dlist = []
384 dlist = []
389 for i in range(n):
385 for i in range(n):
390 if i > 0:
386 if i > 0:
391 time.sleep(self.delay)
387 time.sleep(self.delay)
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
388 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
393 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
389 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
394 )
390 )
395
391
396 # Copy the engine args over to each engine launcher.
392 # Copy the engine args over to each engine launcher.
397 el.engine_cmd = copy.deepcopy(self.engine_cmd)
393 el.engine_cmd = copy.deepcopy(self.engine_cmd)
398 el.engine_args = copy.deepcopy(self.engine_args)
394 el.engine_args = copy.deepcopy(self.engine_args)
399 el.on_stop(self._notice_engine_stopped)
395 el.on_stop(self._notice_engine_stopped)
400 d = el.start()
396 d = el.start()
401 self.launchers[i] = el
397 self.launchers[i] = el
402 dlist.append(d)
398 dlist.append(d)
403 self.notify_start(dlist)
399 self.notify_start(dlist)
404 return dlist
400 return dlist
405
401
406 def find_args(self):
402 def find_args(self):
407 return ['engine set']
403 return ['engine set']
408
404
409 def signal(self, sig):
405 def signal(self, sig):
410 dlist = []
406 dlist = []
411 for el in self.launchers.itervalues():
407 for el in self.launchers.itervalues():
412 d = el.signal(sig)
408 d = el.signal(sig)
413 dlist.append(d)
409 dlist.append(d)
414 return dlist
410 return dlist
415
411
416 def interrupt_then_kill(self, delay=1.0):
412 def interrupt_then_kill(self, delay=1.0):
417 dlist = []
413 dlist = []
418 for el in self.launchers.itervalues():
414 for el in self.launchers.itervalues():
419 d = el.interrupt_then_kill(delay)
415 d = el.interrupt_then_kill(delay)
420 dlist.append(d)
416 dlist.append(d)
421 return dlist
417 return dlist
422
418
423 def stop(self):
419 def stop(self):
424 return self.interrupt_then_kill()
420 return self.interrupt_then_kill()
425
421
426 def _notice_engine_stopped(self, data):
422 def _notice_engine_stopped(self, data):
427 pid = data['pid']
423 pid = data['pid']
428 for idx,el in self.launchers.iteritems():
424 for idx,el in self.launchers.iteritems():
429 if el.process.pid == pid:
425 if el.process.pid == pid:
430 break
426 break
431 self.launchers.pop(idx)
427 self.launchers.pop(idx)
432 self.stop_data[idx] = data
428 self.stop_data[idx] = data
433 if not self.launchers:
429 if not self.launchers:
434 self.notify_stop(self.stop_data)
430 self.notify_stop(self.stop_data)
435
431
436
432
437 #-----------------------------------------------------------------------------
433 #-----------------------------------------------------------------------------
438 # MPI launchers
434 # MPI launchers
439 #-----------------------------------------------------------------------------
435 #-----------------------------------------------------------------------------
440
436
441
437
442 class MPILauncher(LocalProcessLauncher):
438 class MPILauncher(LocalProcessLauncher):
443 """Launch an external process using mpiexec."""
439 """Launch an external process using mpiexec."""
444
440
445 mpi_cmd = List(['mpiexec'], config=True,
441 mpi_cmd = List(['mpiexec'], config=True,
446 help="The mpiexec command to use in starting the process."
442 help="The mpiexec command to use in starting the process."
447 )
443 )
448 mpi_args = List([], config=True,
444 mpi_args = List([], config=True,
449 help="The command line arguments to pass to mpiexec."
445 help="The command line arguments to pass to mpiexec."
450 )
446 )
451 program = List(['date'],
447 program = List(['date'],
452 help="The program to start via mpiexec.")
448 help="The program to start via mpiexec.")
453 program_args = List([],
449 program_args = List([],
454 help="The command line argument to the program."
450 help="The command line argument to the program."
455 )
451 )
456 n = Integer(1)
452 n = Integer(1)
457
453
458 def __init__(self, *args, **kwargs):
454 def __init__(self, *args, **kwargs):
459 # deprecation for old MPIExec names:
455 # deprecation for old MPIExec names:
460 config = kwargs.get('config', {})
456 config = kwargs.get('config', {})
461 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
457 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
462 deprecated = config.get(oldname)
458 deprecated = config.get(oldname)
463 if deprecated:
459 if deprecated:
464 newname = oldname.replace('MPIExec', 'MPI')
460 newname = oldname.replace('MPIExec', 'MPI')
465 config[newname].update(deprecated)
461 config[newname].update(deprecated)
466 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
462 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
467
463
468 super(MPILauncher, self).__init__(*args, **kwargs)
464 super(MPILauncher, self).__init__(*args, **kwargs)
469
465
470 def find_args(self):
466 def find_args(self):
471 """Build self.args using all the fields."""
467 """Build self.args using all the fields."""
472 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
468 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
473 self.program + self.program_args
469 self.program + self.program_args
474
470
475 def start(self, n):
471 def start(self, n):
476 """Start n instances of the program using mpiexec."""
472 """Start n instances of the program using mpiexec."""
477 self.n = n
473 self.n = n
478 return super(MPILauncher, self).start()
474 return super(MPILauncher, self).start()
479
475
480
476
481 class MPIControllerLauncher(MPILauncher, ControllerMixin):
477 class MPIControllerLauncher(MPILauncher, ControllerMixin):
482 """Launch a controller using mpiexec."""
478 """Launch a controller using mpiexec."""
483
479
484 # alias back to *non-configurable* program[_args] for use in find_args()
480 # alias back to *non-configurable* program[_args] for use in find_args()
485 # this way all Controller/EngineSetLaunchers have the same form, rather
481 # this way all Controller/EngineSetLaunchers have the same form, rather
486 # than *some* having `program_args` and others `controller_args`
482 # than *some* having `program_args` and others `controller_args`
487 @property
483 @property
488 def program(self):
484 def program(self):
489 return self.controller_cmd
485 return self.controller_cmd
490
486
491 @property
487 @property
492 def program_args(self):
488 def program_args(self):
493 return self.cluster_args + self.controller_args
489 return self.cluster_args + self.controller_args
494
490
495 def start(self):
491 def start(self):
496 """Start the controller by profile_dir."""
492 """Start the controller by profile_dir."""
497 return super(MPIControllerLauncher, self).start(1)
493 return super(MPIControllerLauncher, self).start(1)
498
494
499
495
500 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
496 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
501 """Launch engines using mpiexec"""
497 """Launch engines using mpiexec"""
502
498
503 # alias back to *non-configurable* program[_args] for use in find_args()
499 # alias back to *non-configurable* program[_args] for use in find_args()
504 # this way all Controller/EngineSetLaunchers have the same form, rather
500 # this way all Controller/EngineSetLaunchers have the same form, rather
505 # than *some* having `program_args` and others `controller_args`
501 # than *some* having `program_args` and others `controller_args`
506 @property
502 @property
507 def program(self):
503 def program(self):
508 return self.engine_cmd
504 return self.engine_cmd
509
505
510 @property
506 @property
511 def program_args(self):
507 def program_args(self):
512 return self.cluster_args + self.engine_args
508 return self.cluster_args + self.engine_args
513
509
514 def start(self, n):
510 def start(self, n):
515 """Start n engines by profile or profile_dir."""
511 """Start n engines by profile or profile_dir."""
516 self.n = n
512 self.n = n
517 return super(MPIEngineSetLauncher, self).start(n)
513 return super(MPIEngineSetLauncher, self).start(n)
518
514
519 # deprecated MPIExec names
515 # deprecated MPIExec names
520 class DeprecatedMPILauncher(object):
516 class DeprecatedMPILauncher(object):
521 def warn(self):
517 def warn(self):
522 oldname = self.__class__.__name__
518 oldname = self.__class__.__name__
523 newname = oldname.replace('MPIExec', 'MPI')
519 newname = oldname.replace('MPIExec', 'MPI')
524 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
520 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
525
521
526 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
522 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
527 """Deprecated, use MPILauncher"""
523 """Deprecated, use MPILauncher"""
528 def __init__(self, *args, **kwargs):
524 def __init__(self, *args, **kwargs):
529 super(MPIExecLauncher, self).__init__(*args, **kwargs)
525 super(MPIExecLauncher, self).__init__(*args, **kwargs)
530 self.warn()
526 self.warn()
531
527
532 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
528 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
533 """Deprecated, use MPIControllerLauncher"""
529 """Deprecated, use MPIControllerLauncher"""
534 def __init__(self, *args, **kwargs):
530 def __init__(self, *args, **kwargs):
535 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
531 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
536 self.warn()
532 self.warn()
537
533
538 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
534 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
539 """Deprecated, use MPIEngineSetLauncher"""
535 """Deprecated, use MPIEngineSetLauncher"""
540 def __init__(self, *args, **kwargs):
536 def __init__(self, *args, **kwargs):
541 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
537 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
542 self.warn()
538 self.warn()
543
539
544
540
545 #-----------------------------------------------------------------------------
541 #-----------------------------------------------------------------------------
546 # SSH launchers
542 # SSH launchers
547 #-----------------------------------------------------------------------------
543 #-----------------------------------------------------------------------------
548
544
549 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
545 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
550
546
551 class SSHLauncher(LocalProcessLauncher):
547 class SSHLauncher(LocalProcessLauncher):
552 """A minimal launcher for ssh.
548 """A minimal launcher for ssh.
553
549
554 To be useful this will probably have to be extended to use the ``sshx``
550 To be useful this will probably have to be extended to use the ``sshx``
555 idea for environment variables. There could be other things this needs
551 idea for environment variables. There could be other things this needs
556 as well.
552 as well.
557 """
553 """
558
554
559 ssh_cmd = List(['ssh'], config=True,
555 ssh_cmd = List(['ssh'], config=True,
560 help="command for starting ssh")
556 help="command for starting ssh")
561 ssh_args = List(['-tt'], config=True,
557 ssh_args = List(['-tt'], config=True,
562 help="args to pass to ssh")
558 help="args to pass to ssh")
563 scp_cmd = List(['scp'], config=True,
559 scp_cmd = List(['scp'], config=True,
564 help="command for sending files")
560 help="command for sending files")
565 program = List(['date'],
561 program = List(['date'],
566 help="Program to launch via ssh")
562 help="Program to launch via ssh")
567 program_args = List([],
563 program_args = List([],
568 help="args to pass to remote program")
564 help="args to pass to remote program")
569 hostname = Unicode('', config=True,
565 hostname = Unicode('', config=True,
570 help="hostname on which to launch the program")
566 help="hostname on which to launch the program")
571 user = Unicode('', config=True,
567 user = Unicode('', config=True,
572 help="username for ssh")
568 help="username for ssh")
573 location = Unicode('', config=True,
569 location = Unicode('', config=True,
574 help="user@hostname location for ssh in one setting")
570 help="user@hostname location for ssh in one setting")
575 to_fetch = List([], config=True,
571 to_fetch = List([], config=True,
576 help="List of (remote, local) files to fetch after starting")
572 help="List of (remote, local) files to fetch after starting")
577 to_send = List([], config=True,
573 to_send = List([], config=True,
578 help="List of (local, remote) files to send before starting")
574 help="List of (local, remote) files to send before starting")
579
575
580 def _hostname_changed(self, name, old, new):
576 def _hostname_changed(self, name, old, new):
581 if self.user:
577 if self.user:
582 self.location = u'%s@%s' % (self.user, new)
578 self.location = u'%s@%s' % (self.user, new)
583 else:
579 else:
584 self.location = new
580 self.location = new
585
581
586 def _user_changed(self, name, old, new):
582 def _user_changed(self, name, old, new):
587 self.location = u'%s@%s' % (new, self.hostname)
583 self.location = u'%s@%s' % (new, self.hostname)
588
584
589 def find_args(self):
585 def find_args(self):
590 return self.ssh_cmd + self.ssh_args + [self.location] + \
586 return self.ssh_cmd + self.ssh_args + [self.location] + \
591 self.program + self.program_args
587 self.program + self.program_args
592
588
593 def _send_file(self, local, remote):
589 def _send_file(self, local, remote):
594 """send a single file"""
590 """send a single file"""
595 remote = "%s:%s" % (self.location, remote)
591 remote = "%s:%s" % (self.location, remote)
596 for i in range(10):
592 for i in range(10):
597 if not os.path.exists(local):
593 if not os.path.exists(local):
598 self.log.debug("waiting for %s" % local)
594 self.log.debug("waiting for %s" % local)
599 time.sleep(1)
595 time.sleep(1)
600 else:
596 else:
601 break
597 break
602 self.log.info("sending %s to %s", local, remote)
598 self.log.info("sending %s to %s", local, remote)
603 check_output(self.scp_cmd + [local, remote])
599 check_output(self.scp_cmd + [local, remote])
604
600
605 def send_files(self):
601 def send_files(self):
606 """send our files (called before start)"""
602 """send our files (called before start)"""
607 if not self.to_send:
603 if not self.to_send:
608 return
604 return
609 for local_file, remote_file in self.to_send:
605 for local_file, remote_file in self.to_send:
610 self._send_file(local_file, remote_file)
606 self._send_file(local_file, remote_file)
611
607
612 def _fetch_file(self, remote, local):
608 def _fetch_file(self, remote, local):
613 """fetch a single file"""
609 """fetch a single file"""
614 full_remote = "%s:%s" % (self.location, remote)
610 full_remote = "%s:%s" % (self.location, remote)
615 self.log.info("fetching %s from %s", local, full_remote)
611 self.log.info("fetching %s from %s", local, full_remote)
616 for i in range(10):
612 for i in range(10):
617 # wait up to 10s for remote file to exist
613 # wait up to 10s for remote file to exist
618 check = check_output(self.ssh_cmd + self.ssh_args + \
614 check = check_output(self.ssh_cmd + self.ssh_args + \
619 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
615 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
620 check = check.strip()
616 check = check.strip()
621 if check == 'no':
617 if check == 'no':
622 time.sleep(1)
618 time.sleep(1)
623 elif check == 'yes':
619 elif check == 'yes':
624 break
620 break
625 check_output(self.scp_cmd + [full_remote, local])
621 check_output(self.scp_cmd + [full_remote, local])
626
622
627 def fetch_files(self):
623 def fetch_files(self):
628 """fetch remote files (called after start)"""
624 """fetch remote files (called after start)"""
629 if not self.to_fetch:
625 if not self.to_fetch:
630 return
626 return
631 for remote_file, local_file in self.to_fetch:
627 for remote_file, local_file in self.to_fetch:
632 self._fetch_file(remote_file, local_file)
628 self._fetch_file(remote_file, local_file)
633
629
634 def start(self, hostname=None, user=None):
630 def start(self, hostname=None, user=None):
635 if hostname is not None:
631 if hostname is not None:
636 self.hostname = hostname
632 self.hostname = hostname
637 if user is not None:
633 if user is not None:
638 self.user = user
634 self.user = user
639
635
640 self.send_files()
636 self.send_files()
641 super(SSHLauncher, self).start()
637 super(SSHLauncher, self).start()
642 self.fetch_files()
638 self.fetch_files()
643
639
644 def signal(self, sig):
640 def signal(self, sig):
645 if self.state == 'running':
641 if self.state == 'running':
646 # send escaped ssh connection-closer
642 # send escaped ssh connection-closer
647 self.process.stdin.write('~.')
643 self.process.stdin.write('~.')
648 self.process.stdin.flush()
644 self.process.stdin.flush()
649
645
650 class SSHClusterLauncher(SSHLauncher):
646 class SSHClusterLauncher(SSHLauncher):
651
647
652 remote_profile_dir = Unicode('', config=True,
648 remote_profile_dir = Unicode('', config=True,
653 help="""The remote profile_dir to use.
649 help="""The remote profile_dir to use.
654
650
655 If not specified, use calling profile, stripping out possible leading homedir.
651 If not specified, use calling profile, stripping out possible leading homedir.
656 """)
652 """)
657
653
658 def _remote_profie_dir_default(self):
654 def _remote_profie_dir_default(self):
659 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
655 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
660 """
656 """
661 home = get_home_dir()
657 home = get_home_dir()
662 if not home.endswith('/'):
658 if not home.endswith('/'):
663 home = home+'/'
659 home = home+'/'
664
660
665 if self.profile_dir.startswith(home):
661 if self.profile_dir.startswith(home):
666 return self.profile_dir[len(home):]
662 return self.profile_dir[len(home):]
667 else:
663 else:
668 return self.profile_dir
664 return self.profile_dir
669
665
670 def _cluster_id_changed(self, name, old, new):
666 def _cluster_id_changed(self, name, old, new):
671 if new:
667 if new:
672 raise ValueError("cluster id not supported by SSH launchers")
668 raise ValueError("cluster id not supported by SSH launchers")
673
669
674 @property
670 @property
675 def cluster_args(self):
671 def cluster_args(self):
676 return ['--profile-dir', self.remote_profile_dir]
672 return ['--profile-dir', self.remote_profile_dir]
677
673
678 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
674 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
679
675
680 # alias back to *non-configurable* program[_args] for use in find_args()
676 # alias back to *non-configurable* program[_args] for use in find_args()
681 # this way all Controller/EngineSetLaunchers have the same form, rather
677 # this way all Controller/EngineSetLaunchers have the same form, rather
682 # than *some* having `program_args` and others `controller_args`
678 # than *some* having `program_args` and others `controller_args`
683
679
684 def _controller_cmd_default(self):
680 def _controller_cmd_default(self):
685 return ['ipcontroller']
681 return ['ipcontroller']
686
682
687 @property
683 @property
688 def program(self):
684 def program(self):
689 return self.controller_cmd
685 return self.controller_cmd
690
686
691 @property
687 @property
692 def program_args(self):
688 def program_args(self):
693 return self.cluster_args + self.controller_args
689 return self.cluster_args + self.controller_args
694
690
695 def _to_fetch_default(self):
691 def _to_fetch_default(self):
696 return [
692 return [
697 (os.path.join(self.remote_profile_dir, 'security', cf),
693 (os.path.join(self.remote_profile_dir, 'security', cf),
698 os.path.join(self.profile_dir, 'security', cf),)
694 os.path.join(self.profile_dir, 'security', cf),)
699 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
695 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
700 ]
696 ]
701
697
702 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
698 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
703
699
704 # alias back to *non-configurable* program[_args] for use in find_args()
700 # alias back to *non-configurable* program[_args] for use in find_args()
705 # this way all Controller/EngineSetLaunchers have the same form, rather
701 # this way all Controller/EngineSetLaunchers have the same form, rather
706 # than *some* having `program_args` and others `controller_args`
702 # than *some* having `program_args` and others `controller_args`
707
703
708 def _engine_cmd_default(self):
704 def _engine_cmd_default(self):
709 return ['ipengine']
705 return ['ipengine']
710
706
711 @property
707 @property
712 def program(self):
708 def program(self):
713 return self.engine_cmd
709 return self.engine_cmd
714
710
715 @property
711 @property
716 def program_args(self):
712 def program_args(self):
717 return self.cluster_args + self.engine_args
713 return self.cluster_args + self.engine_args
718
714
719 def _to_send_default(self):
715 def _to_send_default(self):
720 return [
716 return [
721 (os.path.join(self.profile_dir, 'security', cf),
717 (os.path.join(self.profile_dir, 'security', cf),
722 os.path.join(self.remote_profile_dir, 'security', cf))
718 os.path.join(self.remote_profile_dir, 'security', cf))
723 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
719 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
724 ]
720 ]
725
721
726
722
727 class SSHEngineSetLauncher(LocalEngineSetLauncher):
723 class SSHEngineSetLauncher(LocalEngineSetLauncher):
728 launcher_class = SSHEngineLauncher
724 launcher_class = SSHEngineLauncher
729 engines = Dict(config=True,
725 engines = Dict(config=True,
730 help="""dict of engines to launch. This is a dict by hostname of ints,
726 help="""dict of engines to launch. This is a dict by hostname of ints,
731 corresponding to the number of engines to start on that host.""")
727 corresponding to the number of engines to start on that host.""")
732
728
733 @property
729 @property
734 def engine_count(self):
730 def engine_count(self):
735 """determine engine count from `engines` dict"""
731 """determine engine count from `engines` dict"""
736 count = 0
732 count = 0
737 for n in self.engines.itervalues():
733 for n in self.engines.itervalues():
738 if isinstance(n, (tuple,list)):
734 if isinstance(n, (tuple,list)):
739 n,args = n
735 n,args = n
740 count += n
736 count += n
741 return count
737 return count
742
738
743 def start(self, n):
739 def start(self, n):
744 """Start engines by profile or profile_dir.
740 """Start engines by profile or profile_dir.
745 `n` is ignored, and the `engines` config property is used instead.
741 `n` is ignored, and the `engines` config property is used instead.
746 """
742 """
747
743
748 dlist = []
744 dlist = []
749 for host, n in self.engines.iteritems():
745 for host, n in self.engines.iteritems():
750 if isinstance(n, (tuple, list)):
746 if isinstance(n, (tuple, list)):
751 n, args = n
747 n, args = n
752 else:
748 else:
753 args = copy.deepcopy(self.engine_args)
749 args = copy.deepcopy(self.engine_args)
754
750
755 if '@' in host:
751 if '@' in host:
756 user,host = host.split('@',1)
752 user,host = host.split('@',1)
757 else:
753 else:
758 user=None
754 user=None
759 for i in range(n):
755 for i in range(n):
760 if i > 0:
756 if i > 0:
761 time.sleep(self.delay)
757 time.sleep(self.delay)
762 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
758 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
763 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
759 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
764 )
760 )
765 if i > 0:
761 if i > 0:
766 # only send files for the first engine on each host
762 # only send files for the first engine on each host
767 el.to_send = []
763 el.to_send = []
768
764
769 # Copy the engine args over to each engine launcher.
765 # Copy the engine args over to each engine launcher.
770 el.engine_cmd = self.engine_cmd
766 el.engine_cmd = self.engine_cmd
771 el.engine_args = args
767 el.engine_args = args
772 el.on_stop(self._notice_engine_stopped)
768 el.on_stop(self._notice_engine_stopped)
773 d = el.start(user=user, hostname=host)
769 d = el.start(user=user, hostname=host)
774 self.launchers[ "%s/%i" % (host,i) ] = el
770 self.launchers[ "%s/%i" % (host,i) ] = el
775 dlist.append(d)
771 dlist.append(d)
776 self.notify_start(dlist)
772 self.notify_start(dlist)
777 return dlist
773 return dlist
778
774
779
775
780 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
776 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
781 """Launcher for calling
777 """Launcher for calling
782 `ipcluster engines` on a remote machine.
778 `ipcluster engines` on a remote machine.
783
779
784 Requires that remote profile is already configured.
780 Requires that remote profile is already configured.
785 """
781 """
786
782
787 n = Integer()
783 n = Integer()
788 ipcluster_cmd = List(['ipcluster'], config=True)
784 ipcluster_cmd = List(['ipcluster'], config=True)
789
785
790 @property
786 @property
791 def program(self):
787 def program(self):
792 return self.ipcluster_cmd + ['engines']
788 return self.ipcluster_cmd + ['engines']
793
789
794 @property
790 @property
795 def program_args(self):
791 def program_args(self):
796 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
792 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
797
793
798 def _to_send_default(self):
794 def _to_send_default(self):
799 return [
795 return [
800 (os.path.join(self.profile_dir, 'security', cf),
796 (os.path.join(self.profile_dir, 'security', cf),
801 os.path.join(self.remote_profile_dir, 'security', cf))
797 os.path.join(self.remote_profile_dir, 'security', cf))
802 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
798 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
803 ]
799 ]
804
800
805 def start(self, n):
801 def start(self, n):
806 self.n = n
802 self.n = n
807 super(SSHProxyEngineSetLauncher, self).start()
803 super(SSHProxyEngineSetLauncher, self).start()
808
804
809
805
810 #-----------------------------------------------------------------------------
806 #-----------------------------------------------------------------------------
811 # Windows HPC Server 2008 scheduler launchers
807 # Windows HPC Server 2008 scheduler launchers
812 #-----------------------------------------------------------------------------
808 #-----------------------------------------------------------------------------
813
809
814
810
815 # This is only used on Windows.
811 # This is only used on Windows.
816 def find_job_cmd():
812 def find_job_cmd():
817 if WINDOWS:
813 if WINDOWS:
818 try:
814 try:
819 return find_cmd('job')
815 return find_cmd('job')
820 except (FindCmdError, ImportError):
816 except (FindCmdError, ImportError):
821 # ImportError will be raised if win32api is not installed
817 # ImportError will be raised if win32api is not installed
822 return 'job'
818 return 'job'
823 else:
819 else:
824 return 'job'
820 return 'job'
825
821
826
822
827 class WindowsHPCLauncher(BaseLauncher):
823 class WindowsHPCLauncher(BaseLauncher):
828
824
829 job_id_regexp = Unicode(r'\d+', config=True,
825 job_id_regexp = Unicode(r'\d+', config=True,
830 help="""A regular expression used to get the job id from the output of the
826 help="""A regular expression used to get the job id from the output of the
831 submit_command. """
827 submit_command. """
832 )
828 )
833 job_file_name = Unicode(u'ipython_job.xml', config=True,
829 job_file_name = Unicode(u'ipython_job.xml', config=True,
834 help="The filename of the instantiated job script.")
830 help="The filename of the instantiated job script.")
835 # The full path to the instantiated job script. This gets made dynamically
831 # The full path to the instantiated job script. This gets made dynamically
836 # by combining the work_dir with the job_file_name.
832 # by combining the work_dir with the job_file_name.
837 job_file = Unicode(u'')
833 job_file = Unicode(u'')
838 scheduler = Unicode('', config=True,
834 scheduler = Unicode('', config=True,
839 help="The hostname of the scheduler to submit the job to.")
835 help="The hostname of the scheduler to submit the job to.")
840 job_cmd = Unicode(find_job_cmd(), config=True,
836 job_cmd = Unicode(find_job_cmd(), config=True,
841 help="The command for submitting jobs.")
837 help="The command for submitting jobs.")
842
838
843 def __init__(self, work_dir=u'.', config=None, **kwargs):
839 def __init__(self, work_dir=u'.', config=None, **kwargs):
844 super(WindowsHPCLauncher, self).__init__(
840 super(WindowsHPCLauncher, self).__init__(
845 work_dir=work_dir, config=config, **kwargs
841 work_dir=work_dir, config=config, **kwargs
846 )
842 )
847
843
848 @property
844 @property
849 def job_file(self):
845 def job_file(self):
850 return os.path.join(self.work_dir, self.job_file_name)
846 return os.path.join(self.work_dir, self.job_file_name)
851
847
852 def write_job_file(self, n):
848 def write_job_file(self, n):
853 raise NotImplementedError("Implement write_job_file in a subclass.")
849 raise NotImplementedError("Implement write_job_file in a subclass.")
854
850
855 def find_args(self):
851 def find_args(self):
856 return [u'job.exe']
852 return [u'job.exe']
857
853
858 def parse_job_id(self, output):
854 def parse_job_id(self, output):
859 """Take the output of the submit command and return the job id."""
855 """Take the output of the submit command and return the job id."""
860 m = re.search(self.job_id_regexp, output)
856 m = re.search(self.job_id_regexp, output)
861 if m is not None:
857 if m is not None:
862 job_id = m.group()
858 job_id = m.group()
863 else:
859 else:
864 raise LauncherError("Job id couldn't be determined: %s" % output)
860 raise LauncherError("Job id couldn't be determined: %s" % output)
865 self.job_id = job_id
861 self.job_id = job_id
866 self.log.info('Job started with id: %r', job_id)
862 self.log.info('Job started with id: %r', job_id)
867 return job_id
863 return job_id
868
864
869 def start(self, n):
865 def start(self, n):
870 """Start n copies of the process using the Win HPC job scheduler."""
866 """Start n copies of the process using the Win HPC job scheduler."""
871 self.write_job_file(n)
867 self.write_job_file(n)
872 args = [
868 args = [
873 'submit',
869 'submit',
874 '/jobfile:%s' % self.job_file,
870 '/jobfile:%s' % self.job_file,
875 '/scheduler:%s' % self.scheduler
871 '/scheduler:%s' % self.scheduler
876 ]
872 ]
877 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
873 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
878
874
879 output = check_output([self.job_cmd]+args,
875 output = check_output([self.job_cmd]+args,
880 env=os.environ,
876 env=os.environ,
881 cwd=self.work_dir,
877 cwd=self.work_dir,
882 stderr=STDOUT
878 stderr=STDOUT
883 )
879 )
884 job_id = self.parse_job_id(output)
880 job_id = self.parse_job_id(output)
885 self.notify_start(job_id)
881 self.notify_start(job_id)
886 return job_id
882 return job_id
887
883
888 def stop(self):
884 def stop(self):
889 args = [
885 args = [
890 'cancel',
886 'cancel',
891 self.job_id,
887 self.job_id,
892 '/scheduler:%s' % self.scheduler
888 '/scheduler:%s' % self.scheduler
893 ]
889 ]
894 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
890 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
895 try:
891 try:
896 output = check_output([self.job_cmd]+args,
892 output = check_output([self.job_cmd]+args,
897 env=os.environ,
893 env=os.environ,
898 cwd=self.work_dir,
894 cwd=self.work_dir,
899 stderr=STDOUT
895 stderr=STDOUT
900 )
896 )
901 except:
897 except:
902 output = 'The job already appears to be stoppped: %r' % self.job_id
898 output = 'The job already appears to be stoppped: %r' % self.job_id
903 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
899 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
904 return output
900 return output
905
901
906
902
907 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
903 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
908
904
909 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
905 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
910 help="WinHPC xml job file.")
906 help="WinHPC xml job file.")
911 controller_args = List([], config=False,
907 controller_args = List([], config=False,
912 help="extra args to pass to ipcontroller")
908 help="extra args to pass to ipcontroller")
913
909
914 def write_job_file(self, n):
910 def write_job_file(self, n):
915 job = IPControllerJob(config=self.config)
911 job = IPControllerJob(config=self.config)
916
912
917 t = IPControllerTask(config=self.config)
913 t = IPControllerTask(config=self.config)
918 # The tasks work directory is *not* the actual work directory of
914 # The tasks work directory is *not* the actual work directory of
919 # the controller. It is used as the base path for the stdout/stderr
915 # the controller. It is used as the base path for the stdout/stderr
920 # files that the scheduler redirects to.
916 # files that the scheduler redirects to.
921 t.work_directory = self.profile_dir
917 t.work_directory = self.profile_dir
922 # Add the profile_dir and from self.start().
918 # Add the profile_dir and from self.start().
923 t.controller_args.extend(self.cluster_args)
919 t.controller_args.extend(self.cluster_args)
924 t.controller_args.extend(self.controller_args)
920 t.controller_args.extend(self.controller_args)
925 job.add_task(t)
921 job.add_task(t)
926
922
927 self.log.debug("Writing job description file: %s", self.job_file)
923 self.log.debug("Writing job description file: %s", self.job_file)
928 job.write(self.job_file)
924 job.write(self.job_file)
929
925
930 @property
926 @property
931 def job_file(self):
927 def job_file(self):
932 return os.path.join(self.profile_dir, self.job_file_name)
928 return os.path.join(self.profile_dir, self.job_file_name)
933
929
934 def start(self):
930 def start(self):
935 """Start the controller by profile_dir."""
931 """Start the controller by profile_dir."""
936 return super(WindowsHPCControllerLauncher, self).start(1)
932 return super(WindowsHPCControllerLauncher, self).start(1)
937
933
938
934
939 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
935 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
940
936
941 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
937 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
942 help="jobfile for ipengines job")
938 help="jobfile for ipengines job")
943 engine_args = List([], config=False,
939 engine_args = List([], config=False,
944 help="extra args to pas to ipengine")
940 help="extra args to pas to ipengine")
945
941
946 def write_job_file(self, n):
942 def write_job_file(self, n):
947 job = IPEngineSetJob(config=self.config)
943 job = IPEngineSetJob(config=self.config)
948
944
949 for i in range(n):
945 for i in range(n):
950 t = IPEngineTask(config=self.config)
946 t = IPEngineTask(config=self.config)
951 # The tasks work directory is *not* the actual work directory of
947 # The tasks work directory is *not* the actual work directory of
952 # the engine. It is used as the base path for the stdout/stderr
948 # the engine. It is used as the base path for the stdout/stderr
953 # files that the scheduler redirects to.
949 # files that the scheduler redirects to.
954 t.work_directory = self.profile_dir
950 t.work_directory = self.profile_dir
955 # Add the profile_dir and from self.start().
951 # Add the profile_dir and from self.start().
956 t.engine_args.extend(self.cluster_args)
952 t.engine_args.extend(self.cluster_args)
957 t.engine_args.extend(self.engine_args)
953 t.engine_args.extend(self.engine_args)
958 job.add_task(t)
954 job.add_task(t)
959
955
960 self.log.debug("Writing job description file: %s", self.job_file)
956 self.log.debug("Writing job description file: %s", self.job_file)
961 job.write(self.job_file)
957 job.write(self.job_file)
962
958
963 @property
959 @property
964 def job_file(self):
960 def job_file(self):
965 return os.path.join(self.profile_dir, self.job_file_name)
961 return os.path.join(self.profile_dir, self.job_file_name)
966
962
967 def start(self, n):
963 def start(self, n):
968 """Start the controller by profile_dir."""
964 """Start the controller by profile_dir."""
969 return super(WindowsHPCEngineSetLauncher, self).start(n)
965 return super(WindowsHPCEngineSetLauncher, self).start(n)
970
966
971
967
972 #-----------------------------------------------------------------------------
968 #-----------------------------------------------------------------------------
973 # Batch (PBS) system launchers
969 # Batch (PBS) system launchers
974 #-----------------------------------------------------------------------------
970 #-----------------------------------------------------------------------------
975
971
976 class BatchClusterAppMixin(ClusterAppMixin):
972 class BatchClusterAppMixin(ClusterAppMixin):
977 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
973 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
978 def _profile_dir_changed(self, name, old, new):
974 def _profile_dir_changed(self, name, old, new):
979 self.context[name] = new
975 self.context[name] = new
980 _cluster_id_changed = _profile_dir_changed
976 _cluster_id_changed = _profile_dir_changed
981
977
982 def _profile_dir_default(self):
978 def _profile_dir_default(self):
983 self.context['profile_dir'] = ''
979 self.context['profile_dir'] = ''
984 return ''
980 return ''
985 def _cluster_id_default(self):
981 def _cluster_id_default(self):
986 self.context['cluster_id'] = ''
982 self.context['cluster_id'] = ''
987 return ''
983 return ''
988
984
989
985
990 class BatchSystemLauncher(BaseLauncher):
986 class BatchSystemLauncher(BaseLauncher):
991 """Launch an external process using a batch system.
987 """Launch an external process using a batch system.
992
988
993 This class is designed to work with UNIX batch systems like PBS, LSF,
989 This class is designed to work with UNIX batch systems like PBS, LSF,
994 GridEngine, etc. The overall model is that there are different commands
990 GridEngine, etc. The overall model is that there are different commands
995 like qsub, qdel, etc. that handle the starting and stopping of the process.
991 like qsub, qdel, etc. that handle the starting and stopping of the process.
996
992
997 This class also has the notion of a batch script. The ``batch_template``
993 This class also has the notion of a batch script. The ``batch_template``
998 attribute can be set to a string that is a template for the batch script.
994 attribute can be set to a string that is a template for the batch script.
999 This template is instantiated using string formatting. Thus the template can
995 This template is instantiated using string formatting. Thus the template can
1000 use {n} fot the number of instances. Subclasses can add additional variables
996 use {n} fot the number of instances. Subclasses can add additional variables
1001 to the template dict.
997 to the template dict.
1002 """
998 """
1003
999
1004 # Subclasses must fill these in. See PBSEngineSet
1000 # Subclasses must fill these in. See PBSEngineSet
1005 submit_command = List([''], config=True,
1001 submit_command = List([''], config=True,
1006 help="The name of the command line program used to submit jobs.")
1002 help="The name of the command line program used to submit jobs.")
1007 delete_command = List([''], config=True,
1003 delete_command = List([''], config=True,
1008 help="The name of the command line program used to delete jobs.")
1004 help="The name of the command line program used to delete jobs.")
1009 job_id_regexp = Unicode('', config=True,
1005 job_id_regexp = Unicode('', config=True,
1010 help="""A regular expression used to get the job id from the output of the
1006 help="""A regular expression used to get the job id from the output of the
1011 submit_command.""")
1007 submit_command.""")
1012 batch_template = Unicode('', config=True,
1008 batch_template = Unicode('', config=True,
1013 help="The string that is the batch script template itself.")
1009 help="The string that is the batch script template itself.")
1014 batch_template_file = Unicode(u'', config=True,
1010 batch_template_file = Unicode(u'', config=True,
1015 help="The file that contains the batch template.")
1011 help="The file that contains the batch template.")
1016 batch_file_name = Unicode(u'batch_script', config=True,
1012 batch_file_name = Unicode(u'batch_script', config=True,
1017 help="The filename of the instantiated batch script.")
1013 help="The filename of the instantiated batch script.")
1018 queue = Unicode(u'', config=True,
1014 queue = Unicode(u'', config=True,
1019 help="The PBS Queue.")
1015 help="The PBS Queue.")
1020
1016
1021 def _queue_changed(self, name, old, new):
1017 def _queue_changed(self, name, old, new):
1022 self.context[name] = new
1018 self.context[name] = new
1023
1019
1024 n = Integer(1)
1020 n = Integer(1)
1025 _n_changed = _queue_changed
1021 _n_changed = _queue_changed
1026
1022
1027 # not configurable, override in subclasses
1023 # not configurable, override in subclasses
1028 # PBS Job Array regex
1024 # PBS Job Array regex
1029 job_array_regexp = Unicode('')
1025 job_array_regexp = Unicode('')
1030 job_array_template = Unicode('')
1026 job_array_template = Unicode('')
1031 # PBS Queue regex
1027 # PBS Queue regex
1032 queue_regexp = Unicode('')
1028 queue_regexp = Unicode('')
1033 queue_template = Unicode('')
1029 queue_template = Unicode('')
1034 # The default batch template, override in subclasses
1030 # The default batch template, override in subclasses
1035 default_template = Unicode('')
1031 default_template = Unicode('')
1036 # The full path to the instantiated batch script.
1032 # The full path to the instantiated batch script.
1037 batch_file = Unicode(u'')
1033 batch_file = Unicode(u'')
1038 # the format dict used with batch_template:
1034 # the format dict used with batch_template:
1039 context = Dict()
1035 context = Dict()
1040 def _context_default(self):
1036 def _context_default(self):
1041 """load the default context with the default values for the basic keys
1037 """load the default context with the default values for the basic keys
1042
1038
1043 because the _trait_changed methods only load the context if they
1039 because the _trait_changed methods only load the context if they
1044 are set to something other than the default value.
1040 are set to something other than the default value.
1045 """
1041 """
1046 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1042 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1047
1043
1048 # the Formatter instance for rendering the templates:
1044 # the Formatter instance for rendering the templates:
1049 formatter = Instance(EvalFormatter, (), {})
1045 formatter = Instance(EvalFormatter, (), {})
1050
1046
1051
1047
1052 def find_args(self):
1048 def find_args(self):
1053 return self.submit_command + [self.batch_file]
1049 return self.submit_command + [self.batch_file]
1054
1050
1055 def __init__(self, work_dir=u'.', config=None, **kwargs):
1051 def __init__(self, work_dir=u'.', config=None, **kwargs):
1056 super(BatchSystemLauncher, self).__init__(
1052 super(BatchSystemLauncher, self).__init__(
1057 work_dir=work_dir, config=config, **kwargs
1053 work_dir=work_dir, config=config, **kwargs
1058 )
1054 )
1059 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1055 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1060
1056
1061 def parse_job_id(self, output):
1057 def parse_job_id(self, output):
1062 """Take the output of the submit command and return the job id."""
1058 """Take the output of the submit command and return the job id."""
1063 m = re.search(self.job_id_regexp, output)
1059 m = re.search(self.job_id_regexp, output)
1064 if m is not None:
1060 if m is not None:
1065 job_id = m.group()
1061 job_id = m.group()
1066 else:
1062 else:
1067 raise LauncherError("Job id couldn't be determined: %s" % output)
1063 raise LauncherError("Job id couldn't be determined: %s" % output)
1068 self.job_id = job_id
1064 self.job_id = job_id
1069 self.log.info('Job submitted with job id: %r', job_id)
1065 self.log.info('Job submitted with job id: %r', job_id)
1070 return job_id
1066 return job_id
1071
1067
1072 def write_batch_script(self, n):
1068 def write_batch_script(self, n):
1073 """Instantiate and write the batch script to the work_dir."""
1069 """Instantiate and write the batch script to the work_dir."""
1074 self.n = n
1070 self.n = n
1075 # first priority is batch_template if set
1071 # first priority is batch_template if set
1076 if self.batch_template_file and not self.batch_template:
1072 if self.batch_template_file and not self.batch_template:
1077 # second priority is batch_template_file
1073 # second priority is batch_template_file
1078 with open(self.batch_template_file) as f:
1074 with open(self.batch_template_file) as f:
1079 self.batch_template = f.read()
1075 self.batch_template = f.read()
1080 if not self.batch_template:
1076 if not self.batch_template:
1081 # third (last) priority is default_template
1077 # third (last) priority is default_template
1082 self.batch_template = self.default_template
1078 self.batch_template = self.default_template
1083
1079
1084 # add jobarray or queue lines to user-specified template
1080 # add jobarray or queue lines to user-specified template
1085 # note that this is *only* when user did not specify a template.
1081 # note that this is *only* when user did not specify a template.
1086 regex = re.compile(self.job_array_regexp)
1082 regex = re.compile(self.job_array_regexp)
1087 # print regex.search(self.batch_template)
1083 # print regex.search(self.batch_template)
1088 if not regex.search(self.batch_template):
1084 if not regex.search(self.batch_template):
1089 self.log.debug("adding job array settings to batch script")
1085 self.log.debug("adding job array settings to batch script")
1090 firstline, rest = self.batch_template.split('\n',1)
1086 firstline, rest = self.batch_template.split('\n',1)
1091 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1087 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1092
1088
1093 regex = re.compile(self.queue_regexp)
1089 regex = re.compile(self.queue_regexp)
1094 # print regex.search(self.batch_template)
1090 # print regex.search(self.batch_template)
1095 if self.queue and not regex.search(self.batch_template):
1091 if self.queue and not regex.search(self.batch_template):
1096 self.log.debug("adding PBS queue settings to batch script")
1092 self.log.debug("adding PBS queue settings to batch script")
1097 firstline, rest = self.batch_template.split('\n',1)
1093 firstline, rest = self.batch_template.split('\n',1)
1098 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1094 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1099
1095
1100 script_as_string = self.formatter.format(self.batch_template, **self.context)
1096 script_as_string = self.formatter.format(self.batch_template, **self.context)
1101 self.log.debug('Writing batch script: %s', self.batch_file)
1097 self.log.debug('Writing batch script: %s', self.batch_file)
1102
1098
1103 with open(self.batch_file, 'w') as f:
1099 with open(self.batch_file, 'w') as f:
1104 f.write(script_as_string)
1100 f.write(script_as_string)
1105 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1101 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1106
1102
1107 def start(self, n):
1103 def start(self, n):
1108 """Start n copies of the process using a batch system."""
1104 """Start n copies of the process using a batch system."""
1109 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1105 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1110 # Here we save profile_dir in the context so they
1106 # Here we save profile_dir in the context so they
1111 # can be used in the batch script template as {profile_dir}
1107 # can be used in the batch script template as {profile_dir}
1112 self.write_batch_script(n)
1108 self.write_batch_script(n)
1113 output = check_output(self.args, env=os.environ)
1109 output = check_output(self.args, env=os.environ)
1114
1110
1115 job_id = self.parse_job_id(output)
1111 job_id = self.parse_job_id(output)
1116 self.notify_start(job_id)
1112 self.notify_start(job_id)
1117 return job_id
1113 return job_id
1118
1114
1119 def stop(self):
1115 def stop(self):
1120 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1116 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1121 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1117 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1122 return output
1118 return output
1123
1119
1124
1120
1125 class PBSLauncher(BatchSystemLauncher):
1121 class PBSLauncher(BatchSystemLauncher):
1126 """A BatchSystemLauncher subclass for PBS."""
1122 """A BatchSystemLauncher subclass for PBS."""
1127
1123
1128 submit_command = List(['qsub'], config=True,
1124 submit_command = List(['qsub'], config=True,
1129 help="The PBS submit command ['qsub']")
1125 help="The PBS submit command ['qsub']")
1130 delete_command = List(['qdel'], config=True,
1126 delete_command = List(['qdel'], config=True,
1131 help="The PBS delete command ['qsub']")
1127 help="The PBS delete command ['qsub']")
1132 job_id_regexp = Unicode(r'\d+', config=True,
1128 job_id_regexp = Unicode(r'\d+', config=True,
1133 help="Regular expresion for identifying the job ID [r'\d+']")
1129 help="Regular expresion for identifying the job ID [r'\d+']")
1134
1130
1135 batch_file = Unicode(u'')
1131 batch_file = Unicode(u'')
1136 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
1132 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
1137 job_array_template = Unicode('#PBS -t 1-{n}')
1133 job_array_template = Unicode('#PBS -t 1-{n}')
1138 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
1134 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
1139 queue_template = Unicode('#PBS -q {queue}')
1135 queue_template = Unicode('#PBS -q {queue}')
1140
1136
1141
1137
1142 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1138 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1143 """Launch a controller using PBS."""
1139 """Launch a controller using PBS."""
1144
1140
1145 batch_file_name = Unicode(u'pbs_controller', config=True,
1141 batch_file_name = Unicode(u'pbs_controller', config=True,
1146 help="batch file name for the controller job.")
1142 help="batch file name for the controller job.")
1147 default_template= Unicode("""#!/bin/sh
1143 default_template= Unicode("""#!/bin/sh
1148 #PBS -V
1144 #PBS -V
1149 #PBS -N ipcontroller
1145 #PBS -N ipcontroller
1150 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1146 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1151 """%(' '.join(ipcontroller_cmd_argv)))
1147 """%(' '.join(ipcontroller_cmd_argv)))
1152
1148
1153
1149
1154 def start(self):
1150 def start(self):
1155 """Start the controller by profile or profile_dir."""
1151 """Start the controller by profile or profile_dir."""
1156 return super(PBSControllerLauncher, self).start(1)
1152 return super(PBSControllerLauncher, self).start(1)
1157
1153
1158
1154
1159 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1155 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1160 """Launch Engines using PBS"""
1156 """Launch Engines using PBS"""
1161 batch_file_name = Unicode(u'pbs_engines', config=True,
1157 batch_file_name = Unicode(u'pbs_engines', config=True,
1162 help="batch file name for the engine(s) job.")
1158 help="batch file name for the engine(s) job.")
1163 default_template= Unicode(u"""#!/bin/sh
1159 default_template= Unicode(u"""#!/bin/sh
1164 #PBS -V
1160 #PBS -V
1165 #PBS -N ipengine
1161 #PBS -N ipengine
1166 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1162 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1167 """%(' '.join(ipengine_cmd_argv)))
1163 """%(' '.join(ipengine_cmd_argv)))
1168
1164
1169 def start(self, n):
1165 def start(self, n):
1170 """Start n engines by profile or profile_dir."""
1166 """Start n engines by profile or profile_dir."""
1171 return super(PBSEngineSetLauncher, self).start(n)
1167 return super(PBSEngineSetLauncher, self).start(n)
1172
1168
1173 #SGE is very similar to PBS
1169 #SGE is very similar to PBS
1174
1170
1175 class SGELauncher(PBSLauncher):
1171 class SGELauncher(PBSLauncher):
1176 """Sun GridEngine is a PBS clone with slightly different syntax"""
1172 """Sun GridEngine is a PBS clone with slightly different syntax"""
1177 job_array_regexp = Unicode('#\$\W+\-t')
1173 job_array_regexp = Unicode('#\$\W+\-t')
1178 job_array_template = Unicode('#$ -t 1-{n}')
1174 job_array_template = Unicode('#$ -t 1-{n}')
1179 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1175 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1180 queue_template = Unicode('#$ -q {queue}')
1176 queue_template = Unicode('#$ -q {queue}')
1181
1177
1182 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1178 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1183 """Launch a controller using SGE."""
1179 """Launch a controller using SGE."""
1184
1180
1185 batch_file_name = Unicode(u'sge_controller', config=True,
1181 batch_file_name = Unicode(u'sge_controller', config=True,
1186 help="batch file name for the ipontroller job.")
1182 help="batch file name for the ipontroller job.")
1187 default_template= Unicode(u"""#$ -V
1183 default_template= Unicode(u"""#$ -V
1188 #$ -S /bin/sh
1184 #$ -S /bin/sh
1189 #$ -N ipcontroller
1185 #$ -N ipcontroller
1190 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1186 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1191 """%(' '.join(ipcontroller_cmd_argv)))
1187 """%(' '.join(ipcontroller_cmd_argv)))
1192
1188
1193 def start(self):
1189 def start(self):
1194 """Start the controller by profile or profile_dir."""
1190 """Start the controller by profile or profile_dir."""
1195 return super(SGEControllerLauncher, self).start(1)
1191 return super(SGEControllerLauncher, self).start(1)
1196
1192
1197 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1193 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1198 """Launch Engines with SGE"""
1194 """Launch Engines with SGE"""
1199 batch_file_name = Unicode(u'sge_engines', config=True,
1195 batch_file_name = Unicode(u'sge_engines', config=True,
1200 help="batch file name for the engine(s) job.")
1196 help="batch file name for the engine(s) job.")
1201 default_template = Unicode("""#$ -V
1197 default_template = Unicode("""#$ -V
1202 #$ -S /bin/sh
1198 #$ -S /bin/sh
1203 #$ -N ipengine
1199 #$ -N ipengine
1204 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1200 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1205 """%(' '.join(ipengine_cmd_argv)))
1201 """%(' '.join(ipengine_cmd_argv)))
1206
1202
1207 def start(self, n):
1203 def start(self, n):
1208 """Start n engines by profile or profile_dir."""
1204 """Start n engines by profile or profile_dir."""
1209 return super(SGEEngineSetLauncher, self).start(n)
1205 return super(SGEEngineSetLauncher, self).start(n)
1210
1206
1211
1207
1212 # LSF launchers
1208 # LSF launchers
1213
1209
1214 class LSFLauncher(BatchSystemLauncher):
1210 class LSFLauncher(BatchSystemLauncher):
1215 """A BatchSystemLauncher subclass for LSF."""
1211 """A BatchSystemLauncher subclass for LSF."""
1216
1212
1217 submit_command = List(['bsub'], config=True,
1213 submit_command = List(['bsub'], config=True,
1218 help="The PBS submit command ['bsub']")
1214 help="The PBS submit command ['bsub']")
1219 delete_command = List(['bkill'], config=True,
1215 delete_command = List(['bkill'], config=True,
1220 help="The PBS delete command ['bkill']")
1216 help="The PBS delete command ['bkill']")
1221 job_id_regexp = Unicode(r'\d+', config=True,
1217 job_id_regexp = Unicode(r'\d+', config=True,
1222 help="Regular expresion for identifying the job ID [r'\d+']")
1218 help="Regular expresion for identifying the job ID [r'\d+']")
1223
1219
1224 batch_file = Unicode(u'')
1220 batch_file = Unicode(u'')
1225 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1221 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1226 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1222 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1227 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1223 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1228 queue_template = Unicode('#BSUB -q {queue}')
1224 queue_template = Unicode('#BSUB -q {queue}')
1229
1225
1230 def start(self, n):
1226 def start(self, n):
1231 """Start n copies of the process using LSF batch system.
1227 """Start n copies of the process using LSF batch system.
1232 This cant inherit from the base class because bsub expects
1228 This cant inherit from the base class because bsub expects
1233 to be piped a shell script in order to honor the #BSUB directives :
1229 to be piped a shell script in order to honor the #BSUB directives :
1234 bsub < script
1230 bsub < script
1235 """
1231 """
1236 # Here we save profile_dir in the context so they
1232 # Here we save profile_dir in the context so they
1237 # can be used in the batch script template as {profile_dir}
1233 # can be used in the batch script template as {profile_dir}
1238 self.write_batch_script(n)
1234 self.write_batch_script(n)
1239 #output = check_output(self.args, env=os.environ)
1235 #output = check_output(self.args, env=os.environ)
1240 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1236 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1241 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1237 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1242 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1238 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1243 output,err = p.communicate()
1239 output,err = p.communicate()
1244 job_id = self.parse_job_id(output)
1240 job_id = self.parse_job_id(output)
1245 self.notify_start(job_id)
1241 self.notify_start(job_id)
1246 return job_id
1242 return job_id
1247
1243
1248
1244
1249 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1245 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1250 """Launch a controller using LSF."""
1246 """Launch a controller using LSF."""
1251
1247
1252 batch_file_name = Unicode(u'lsf_controller', config=True,
1248 batch_file_name = Unicode(u'lsf_controller', config=True,
1253 help="batch file name for the controller job.")
1249 help="batch file name for the controller job.")
1254 default_template= Unicode("""#!/bin/sh
1250 default_template= Unicode("""#!/bin/sh
1255 #BSUB -J ipcontroller
1251 #BSUB -J ipcontroller
1256 #BSUB -oo ipcontroller.o.%%J
1252 #BSUB -oo ipcontroller.o.%%J
1257 #BSUB -eo ipcontroller.e.%%J
1253 #BSUB -eo ipcontroller.e.%%J
1258 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1254 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1259 """%(' '.join(ipcontroller_cmd_argv)))
1255 """%(' '.join(ipcontroller_cmd_argv)))
1260
1256
1261 def start(self):
1257 def start(self):
1262 """Start the controller by profile or profile_dir."""
1258 """Start the controller by profile or profile_dir."""
1263 return super(LSFControllerLauncher, self).start(1)
1259 return super(LSFControllerLauncher, self).start(1)
1264
1260
1265
1261
1266 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1262 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1267 """Launch Engines using LSF"""
1263 """Launch Engines using LSF"""
1268 batch_file_name = Unicode(u'lsf_engines', config=True,
1264 batch_file_name = Unicode(u'lsf_engines', config=True,
1269 help="batch file name for the engine(s) job.")
1265 help="batch file name for the engine(s) job.")
1270 default_template= Unicode(u"""#!/bin/sh
1266 default_template= Unicode(u"""#!/bin/sh
1271 #BSUB -oo ipengine.o.%%J
1267 #BSUB -oo ipengine.o.%%J
1272 #BSUB -eo ipengine.e.%%J
1268 #BSUB -eo ipengine.e.%%J
1273 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1269 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1274 """%(' '.join(ipengine_cmd_argv)))
1270 """%(' '.join(ipengine_cmd_argv)))
1275
1271
1276 def start(self, n):
1272 def start(self, n):
1277 """Start n engines by profile or profile_dir."""
1273 """Start n engines by profile or profile_dir."""
1278 return super(LSFEngineSetLauncher, self).start(n)
1274 return super(LSFEngineSetLauncher, self).start(n)
1279
1275
1280
1276
1281 #-----------------------------------------------------------------------------
1277 #-----------------------------------------------------------------------------
1282 # A launcher for ipcluster itself!
1278 # A launcher for ipcluster itself!
1283 #-----------------------------------------------------------------------------
1279 #-----------------------------------------------------------------------------
1284
1280
1285
1281
1286 class IPClusterLauncher(LocalProcessLauncher):
1282 class IPClusterLauncher(LocalProcessLauncher):
1287 """Launch the ipcluster program in an external process."""
1283 """Launch the ipcluster program in an external process."""
1288
1284
1289 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1285 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1290 help="Popen command for ipcluster")
1286 help="Popen command for ipcluster")
1291 ipcluster_args = List(
1287 ipcluster_args = List(
1292 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1288 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1293 help="Command line arguments to pass to ipcluster.")
1289 help="Command line arguments to pass to ipcluster.")
1294 ipcluster_subcommand = Unicode('start')
1290 ipcluster_subcommand = Unicode('start')
1295 profile = Unicode('default')
1291 profile = Unicode('default')
1296 n = Integer(2)
1292 n = Integer(2)
1297
1293
1298 def find_args(self):
1294 def find_args(self):
1299 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1295 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1300 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1296 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1301 self.ipcluster_args
1297 self.ipcluster_args
1302
1298
1303 def start(self):
1299 def start(self):
1304 return super(IPClusterLauncher, self).start()
1300 return super(IPClusterLauncher, self).start()
1305
1301
1306 #-----------------------------------------------------------------------------
1302 #-----------------------------------------------------------------------------
1307 # Collections of launchers
1303 # Collections of launchers
1308 #-----------------------------------------------------------------------------
1304 #-----------------------------------------------------------------------------
1309
1305
1310 local_launchers = [
1306 local_launchers = [
1311 LocalControllerLauncher,
1307 LocalControllerLauncher,
1312 LocalEngineLauncher,
1308 LocalEngineLauncher,
1313 LocalEngineSetLauncher,
1309 LocalEngineSetLauncher,
1314 ]
1310 ]
1315 mpi_launchers = [
1311 mpi_launchers = [
1316 MPILauncher,
1312 MPILauncher,
1317 MPIControllerLauncher,
1313 MPIControllerLauncher,
1318 MPIEngineSetLauncher,
1314 MPIEngineSetLauncher,
1319 ]
1315 ]
1320 ssh_launchers = [
1316 ssh_launchers = [
1321 SSHLauncher,
1317 SSHLauncher,
1322 SSHControllerLauncher,
1318 SSHControllerLauncher,
1323 SSHEngineLauncher,
1319 SSHEngineLauncher,
1324 SSHEngineSetLauncher,
1320 SSHEngineSetLauncher,
1325 ]
1321 ]
1326 winhpc_launchers = [
1322 winhpc_launchers = [
1327 WindowsHPCLauncher,
1323 WindowsHPCLauncher,
1328 WindowsHPCControllerLauncher,
1324 WindowsHPCControllerLauncher,
1329 WindowsHPCEngineSetLauncher,
1325 WindowsHPCEngineSetLauncher,
1330 ]
1326 ]
1331 pbs_launchers = [
1327 pbs_launchers = [
1332 PBSLauncher,
1328 PBSLauncher,
1333 PBSControllerLauncher,
1329 PBSControllerLauncher,
1334 PBSEngineSetLauncher,
1330 PBSEngineSetLauncher,
1335 ]
1331 ]
1336 sge_launchers = [
1332 sge_launchers = [
1337 SGELauncher,
1333 SGELauncher,
1338 SGEControllerLauncher,
1334 SGEControllerLauncher,
1339 SGEEngineSetLauncher,
1335 SGEEngineSetLauncher,
1340 ]
1336 ]
1341 lsf_launchers = [
1337 lsf_launchers = [
1342 LSFLauncher,
1338 LSFLauncher,
1343 LSFControllerLauncher,
1339 LSFControllerLauncher,
1344 LSFEngineSetLauncher,
1340 LSFEngineSetLauncher,
1345 ]
1341 ]
1346 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1342 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1347 + pbs_launchers + sge_launchers + lsf_launchers
1343 + pbs_launchers + sge_launchers + lsf_launchers
1348
1344
General Comments 0
You need to be logged in to leave comments. Login now