##// END OF EJS Templates
Merge pull request #1634 from minrk/proxylauncher...
Fernando Perez -
r6622:66e42d93 merge
parent child Browse files
Show More
@@ -1,1223 +1,1348
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import re
25 import re
26 import stat
26 import stat
27 import time
27 import time
28
28
29 # signal imports, handling various platforms, versions
29 # signal imports, handling various platforms, versions
30
30
31 from signal import SIGINT, SIGTERM
31 from signal import SIGINT, SIGTERM
32 try:
32 try:
33 from signal import SIGKILL
33 from signal import SIGKILL
34 except ImportError:
34 except ImportError:
35 # Windows
35 # Windows
36 SIGKILL=SIGTERM
36 SIGKILL=SIGTERM
37
37
38 try:
38 try:
39 # Windows >= 2.7, 3.2
39 # Windows >= 2.7, 3.2
40 from signal import CTRL_C_EVENT as SIGINT
40 from signal import CTRL_C_EVENT as SIGINT
41 except ImportError:
41 except ImportError:
42 pass
42 pass
43
43
44 from subprocess import Popen, PIPE, STDOUT
44 from subprocess import Popen, PIPE, STDOUT
45 try:
45 try:
46 from subprocess import check_output
46 from subprocess import check_output
47 except ImportError:
47 except ImportError:
48 # pre-2.7, define check_output with Popen
48 # pre-2.7, define check_output with Popen
49 def check_output(*args, **kwargs):
49 def check_output(*args, **kwargs):
50 kwargs.update(dict(stdout=PIPE))
50 kwargs.update(dict(stdout=PIPE))
51 p = Popen(*args, **kwargs)
51 p = Popen(*args, **kwargs)
52 out,err = p.communicate()
52 out,err = p.communicate()
53 return out
53 return out
54
54
55 from zmq.eventloop import ioloop
55 from zmq.eventloop import ioloop
56
56
57 from IPython.config.application import Application
57 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import (
60 from IPython.utils.traitlets import (
61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
62 )
63 from IPython.utils.path import get_ipython_module_path
63 from IPython.utils.path import get_ipython_module_path, get_home_dir
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
65
65
66 from .win32support import forward_read_events
66 from .win32support import forward_read_events
67
67
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69
69
70 WINDOWS = os.name == 'nt'
70 WINDOWS = os.name == 'nt'
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # Paths to the kernel apps
73 # Paths to the kernel apps
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76
76
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 'IPython.parallel.apps.ipclusterapp'
78 'IPython.parallel.apps.ipclusterapp'
79 ))
79 ))
80
80
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 'IPython.parallel.apps.ipengineapp'
82 'IPython.parallel.apps.ipengineapp'
83 ))
83 ))
84
84
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 'IPython.parallel.apps.ipcontrollerapp'
86 'IPython.parallel.apps.ipcontrollerapp'
87 ))
87 ))
88
88
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90 # Base launchers and errors
90 # Base launchers and errors
91 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
92
92
93
93
94 class LauncherError(Exception):
94 class LauncherError(Exception):
95 pass
95 pass
96
96
97
97
98 class ProcessStateError(LauncherError):
98 class ProcessStateError(LauncherError):
99 pass
99 pass
100
100
101
101
102 class UnknownStatus(LauncherError):
102 class UnknownStatus(LauncherError):
103 pass
103 pass
104
104
105
105
106 class BaseLauncher(LoggingConfigurable):
106 class BaseLauncher(LoggingConfigurable):
107 """An asbtraction for starting, stopping and signaling a process."""
107 """An asbtraction for starting, stopping and signaling a process."""
108
108
109 # In all of the launchers, the work_dir is where child processes will be
109 # In all of the launchers, the work_dir is where child processes will be
110 # run. This will usually be the profile_dir, but may not be. any work_dir
110 # run. This will usually be the profile_dir, but may not be. any work_dir
111 # passed into the __init__ method will override the config value.
111 # passed into the __init__ method will override the config value.
112 # This should not be used to set the work_dir for the actual engine
112 # This should not be used to set the work_dir for the actual engine
113 # and controller. Instead, use their own config files or the
113 # and controller. Instead, use their own config files or the
114 # controller_args, engine_args attributes of the launchers to add
114 # controller_args, engine_args attributes of the launchers to add
115 # the work_dir option.
115 # the work_dir option.
116 work_dir = Unicode(u'.')
116 work_dir = Unicode(u'.')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118
118
119 start_data = Any()
119 start_data = Any()
120 stop_data = Any()
120 stop_data = Any()
121
121
122 def _loop_default(self):
122 def _loop_default(self):
123 return ioloop.IOLoop.instance()
123 return ioloop.IOLoop.instance()
124
124
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 self.state = 'before' # can be before, running, after
127 self.state = 'before' # can be before, running, after
128 self.stop_callbacks = []
128 self.stop_callbacks = []
129 self.start_data = None
129 self.start_data = None
130 self.stop_data = None
130 self.stop_data = None
131
131
132 @property
132 @property
133 def args(self):
133 def args(self):
134 """A list of cmd and args that will be used to start the process.
134 """A list of cmd and args that will be used to start the process.
135
135
136 This is what is passed to :func:`spawnProcess` and the first element
136 This is what is passed to :func:`spawnProcess` and the first element
137 will be the process name.
137 will be the process name.
138 """
138 """
139 return self.find_args()
139 return self.find_args()
140
140
141 def find_args(self):
141 def find_args(self):
142 """The ``.args`` property calls this to find the args list.
142 """The ``.args`` property calls this to find the args list.
143
143
144 Subcommand should implement this to construct the cmd and args.
144 Subcommand should implement this to construct the cmd and args.
145 """
145 """
146 raise NotImplementedError('find_args must be implemented in a subclass')
146 raise NotImplementedError('find_args must be implemented in a subclass')
147
147
148 @property
148 @property
149 def arg_str(self):
149 def arg_str(self):
150 """The string form of the program arguments."""
150 """The string form of the program arguments."""
151 return ' '.join(self.args)
151 return ' '.join(self.args)
152
152
153 @property
153 @property
154 def running(self):
154 def running(self):
155 """Am I running."""
155 """Am I running."""
156 if self.state == 'running':
156 if self.state == 'running':
157 return True
157 return True
158 else:
158 else:
159 return False
159 return False
160
160
161 def start(self):
161 def start(self):
162 """Start the process."""
162 """Start the process."""
163 raise NotImplementedError('start must be implemented in a subclass')
163 raise NotImplementedError('start must be implemented in a subclass')
164
164
165 def stop(self):
165 def stop(self):
166 """Stop the process and notify observers of stopping.
166 """Stop the process and notify observers of stopping.
167
167
168 This method will return None immediately.
168 This method will return None immediately.
169 To observe the actual process stopping, see :meth:`on_stop`.
169 To observe the actual process stopping, see :meth:`on_stop`.
170 """
170 """
171 raise NotImplementedError('stop must be implemented in a subclass')
171 raise NotImplementedError('stop must be implemented in a subclass')
172
172
173 def on_stop(self, f):
173 def on_stop(self, f):
174 """Register a callback to be called with this Launcher's stop_data
174 """Register a callback to be called with this Launcher's stop_data
175 when the process actually finishes.
175 when the process actually finishes.
176 """
176 """
177 if self.state=='after':
177 if self.state=='after':
178 return f(self.stop_data)
178 return f(self.stop_data)
179 else:
179 else:
180 self.stop_callbacks.append(f)
180 self.stop_callbacks.append(f)
181
181
182 def notify_start(self, data):
182 def notify_start(self, data):
183 """Call this to trigger startup actions.
183 """Call this to trigger startup actions.
184
184
185 This logs the process startup and sets the state to 'running'. It is
185 This logs the process startup and sets the state to 'running'. It is
186 a pass-through so it can be used as a callback.
186 a pass-through so it can be used as a callback.
187 """
187 """
188
188
189 self.log.debug('Process %r started: %r', self.args[0], data)
189 self.log.debug('Process %r started: %r', self.args[0], data)
190 self.start_data = data
190 self.start_data = data
191 self.state = 'running'
191 self.state = 'running'
192 return data
192 return data
193
193
194 def notify_stop(self, data):
194 def notify_stop(self, data):
195 """Call this to trigger process stop actions.
195 """Call this to trigger process stop actions.
196
196
197 This logs the process stopping and sets the state to 'after'. Call
197 This logs the process stopping and sets the state to 'after'. Call
198 this to trigger callbacks registered via :meth:`on_stop`."""
198 this to trigger callbacks registered via :meth:`on_stop`."""
199
199
200 self.log.debug('Process %r stopped: %r', self.args[0], data)
200 self.log.debug('Process %r stopped: %r', self.args[0], data)
201 self.stop_data = data
201 self.stop_data = data
202 self.state = 'after'
202 self.state = 'after'
203 for i in range(len(self.stop_callbacks)):
203 for i in range(len(self.stop_callbacks)):
204 d = self.stop_callbacks.pop()
204 d = self.stop_callbacks.pop()
205 d(data)
205 d(data)
206 return data
206 return data
207
207
208 def signal(self, sig):
208 def signal(self, sig):
209 """Signal the process.
209 """Signal the process.
210
210
211 Parameters
211 Parameters
212 ----------
212 ----------
213 sig : str or int
213 sig : str or int
214 'KILL', 'INT', etc., or any signal number
214 'KILL', 'INT', etc., or any signal number
215 """
215 """
216 raise NotImplementedError('signal must be implemented in a subclass')
216 raise NotImplementedError('signal must be implemented in a subclass')
217
217
218 class ClusterAppMixin(HasTraits):
218 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
221 profile_dir=Unicode('')
220 profile_dir=Unicode('')
222 cluster_id=Unicode('')
221 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
222
224 self.cluster_args = []
223 @property
225 if self.profile_dir:
224 def cluster_args(self):
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
225 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
230
226
231 class ControllerMixin(ClusterAppMixin):
227 class ControllerMixin(ClusterAppMixin):
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
228 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 help="""Popen command to launch ipcontroller.""")
229 help="""Popen command to launch ipcontroller.""")
234 # Command line arguments to ipcontroller.
230 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
231 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 help="""command-line args to pass to ipcontroller""")
232 help="""command-line args to pass to ipcontroller""")
237
233
238 class EngineMixin(ClusterAppMixin):
234 class EngineMixin(ClusterAppMixin):
239 engine_cmd = List(ipengine_cmd_argv, config=True,
235 engine_cmd = List(ipengine_cmd_argv, config=True,
240 help="""command to launch the Engine.""")
236 help="""command to launch the Engine.""")
241 # Command line arguments for ipengine.
237 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
238 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 help="command-line arguments to pass to ipengine"
239 help="command-line arguments to pass to ipengine"
244 )
240 )
245
241
242
246 #-----------------------------------------------------------------------------
243 #-----------------------------------------------------------------------------
247 # Local process launchers
244 # Local process launchers
248 #-----------------------------------------------------------------------------
245 #-----------------------------------------------------------------------------
249
246
250
247
251 class LocalProcessLauncher(BaseLauncher):
248 class LocalProcessLauncher(BaseLauncher):
252 """Start and stop an external process in an asynchronous manner.
249 """Start and stop an external process in an asynchronous manner.
253
250
254 This will launch the external process with a working directory of
251 This will launch the external process with a working directory of
255 ``self.work_dir``.
252 ``self.work_dir``.
256 """
253 """
257
254
258 # This is used to to construct self.args, which is passed to
255 # This is used to to construct self.args, which is passed to
259 # spawnProcess.
256 # spawnProcess.
260 cmd_and_args = List([])
257 cmd_and_args = List([])
261 poll_frequency = Integer(100) # in ms
258 poll_frequency = Integer(100) # in ms
262
259
263 def __init__(self, work_dir=u'.', config=None, **kwargs):
260 def __init__(self, work_dir=u'.', config=None, **kwargs):
264 super(LocalProcessLauncher, self).__init__(
261 super(LocalProcessLauncher, self).__init__(
265 work_dir=work_dir, config=config, **kwargs
262 work_dir=work_dir, config=config, **kwargs
266 )
263 )
267 self.process = None
264 self.process = None
268 self.poller = None
265 self.poller = None
269
266
270 def find_args(self):
267 def find_args(self):
271 return self.cmd_and_args
268 return self.cmd_and_args
272
269
273 def start(self):
270 def start(self):
274 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
271 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
275 if self.state == 'before':
272 if self.state == 'before':
276 self.process = Popen(self.args,
273 self.process = Popen(self.args,
277 stdout=PIPE,stderr=PIPE,stdin=PIPE,
274 stdout=PIPE,stderr=PIPE,stdin=PIPE,
278 env=os.environ,
275 env=os.environ,
279 cwd=self.work_dir
276 cwd=self.work_dir
280 )
277 )
281 if WINDOWS:
278 if WINDOWS:
282 self.stdout = forward_read_events(self.process.stdout)
279 self.stdout = forward_read_events(self.process.stdout)
283 self.stderr = forward_read_events(self.process.stderr)
280 self.stderr = forward_read_events(self.process.stderr)
284 else:
281 else:
285 self.stdout = self.process.stdout.fileno()
282 self.stdout = self.process.stdout.fileno()
286 self.stderr = self.process.stderr.fileno()
283 self.stderr = self.process.stderr.fileno()
287 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
284 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
288 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
285 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
289 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
286 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
290 self.poller.start()
287 self.poller.start()
291 self.notify_start(self.process.pid)
288 self.notify_start(self.process.pid)
292 else:
289 else:
293 s = 'The process was already started and has state: %r' % self.state
290 s = 'The process was already started and has state: %r' % self.state
294 raise ProcessStateError(s)
291 raise ProcessStateError(s)
295
292
296 def stop(self):
293 def stop(self):
297 return self.interrupt_then_kill()
294 return self.interrupt_then_kill()
298
295
299 def signal(self, sig):
296 def signal(self, sig):
300 if self.state == 'running':
297 if self.state == 'running':
301 if WINDOWS and sig != SIGINT:
298 if WINDOWS and sig != SIGINT:
302 # use Windows tree-kill for better child cleanup
299 # use Windows tree-kill for better child cleanup
303 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
300 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
304 else:
301 else:
305 self.process.send_signal(sig)
302 self.process.send_signal(sig)
306
303
307 def interrupt_then_kill(self, delay=2.0):
304 def interrupt_then_kill(self, delay=2.0):
308 """Send INT, wait a delay and then send KILL."""
305 """Send INT, wait a delay and then send KILL."""
309 try:
306 try:
310 self.signal(SIGINT)
307 self.signal(SIGINT)
311 except Exception:
308 except Exception:
312 self.log.debug("interrupt failed")
309 self.log.debug("interrupt failed")
313 pass
310 pass
314 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
311 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
315 self.killer.start()
312 self.killer.start()
316
313
317 # callbacks, etc:
314 # callbacks, etc:
318
315
319 def handle_stdout(self, fd, events):
316 def handle_stdout(self, fd, events):
320 if WINDOWS:
317 if WINDOWS:
321 line = self.stdout.recv()
318 line = self.stdout.recv()
322 else:
319 else:
323 line = self.process.stdout.readline()
320 line = self.process.stdout.readline()
324 # a stopped process will be readable but return empty strings
321 # a stopped process will be readable but return empty strings
325 if line:
322 if line:
326 self.log.debug(line[:-1])
323 self.log.debug(line[:-1])
327 else:
324 else:
328 self.poll()
325 self.poll()
329
326
330 def handle_stderr(self, fd, events):
327 def handle_stderr(self, fd, events):
331 if WINDOWS:
328 if WINDOWS:
332 line = self.stderr.recv()
329 line = self.stderr.recv()
333 else:
330 else:
334 line = self.process.stderr.readline()
331 line = self.process.stderr.readline()
335 # a stopped process will be readable but return empty strings
332 # a stopped process will be readable but return empty strings
336 if line:
333 if line:
337 self.log.debug(line[:-1])
334 self.log.debug(line[:-1])
338 else:
335 else:
339 self.poll()
336 self.poll()
340
337
341 def poll(self):
338 def poll(self):
342 status = self.process.poll()
339 status = self.process.poll()
343 if status is not None:
340 if status is not None:
344 self.poller.stop()
341 self.poller.stop()
345 self.loop.remove_handler(self.stdout)
342 self.loop.remove_handler(self.stdout)
346 self.loop.remove_handler(self.stderr)
343 self.loop.remove_handler(self.stderr)
347 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
344 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
348 return status
345 return status
349
346
350 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
347 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
351 """Launch a controller as a regular external process."""
348 """Launch a controller as a regular external process."""
352
349
353 def find_args(self):
350 def find_args(self):
354 return self.controller_cmd + self.cluster_args + self.controller_args
351 return self.controller_cmd + self.cluster_args + self.controller_args
355
352
356 def start(self):
353 def start(self):
357 """Start the controller by profile_dir."""
354 """Start the controller by profile_dir."""
358 return super(LocalControllerLauncher, self).start()
355 return super(LocalControllerLauncher, self).start()
359
356
360
357
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
358 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
362 """Launch a single engine as a regular externall process."""
359 """Launch a single engine as a regular externall process."""
363
360
364 def find_args(self):
361 def find_args(self):
365 return self.engine_cmd + self.cluster_args + self.engine_args
362 return self.engine_cmd + self.cluster_args + self.engine_args
366
363
367
364
368 class LocalEngineSetLauncher(LocalEngineLauncher):
365 class LocalEngineSetLauncher(LocalEngineLauncher):
369 """Launch a set of engines as regular external processes."""
366 """Launch a set of engines as regular external processes."""
370
367
371 delay = CFloat(0.1, config=True,
368 delay = CFloat(0.1, config=True,
372 help="""delay (in seconds) between starting each engine after the first.
369 help="""delay (in seconds) between starting each engine after the first.
373 This can help force the engines to get their ids in order, or limit
370 This can help force the engines to get their ids in order, or limit
374 process flood when starting many engines."""
371 process flood when starting many engines."""
375 )
372 )
376
373
377 # launcher class
374 # launcher class
378 launcher_class = LocalEngineLauncher
375 launcher_class = LocalEngineLauncher
379
376
380 launchers = Dict()
377 launchers = Dict()
381 stop_data = Dict()
378 stop_data = Dict()
382
379
383 def __init__(self, work_dir=u'.', config=None, **kwargs):
380 def __init__(self, work_dir=u'.', config=None, **kwargs):
384 super(LocalEngineSetLauncher, self).__init__(
381 super(LocalEngineSetLauncher, self).__init__(
385 work_dir=work_dir, config=config, **kwargs
382 work_dir=work_dir, config=config, **kwargs
386 )
383 )
387 self.stop_data = {}
384 self.stop_data = {}
388
385
389 def start(self, n):
386 def start(self, n):
390 """Start n engines by profile or profile_dir."""
387 """Start n engines by profile or profile_dir."""
391 dlist = []
388 dlist = []
392 for i in range(n):
389 for i in range(n):
393 if i > 0:
390 if i > 0:
394 time.sleep(self.delay)
391 time.sleep(self.delay)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
393 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 )
394 )
398
395
399 # Copy the engine args over to each engine launcher.
396 # Copy the engine args over to each engine launcher.
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
397 el.engine_cmd = copy.deepcopy(self.engine_cmd)
401 el.engine_args = copy.deepcopy(self.engine_args)
398 el.engine_args = copy.deepcopy(self.engine_args)
402 el.on_stop(self._notice_engine_stopped)
399 el.on_stop(self._notice_engine_stopped)
403 d = el.start()
400 d = el.start()
404 self.launchers[i] = el
401 self.launchers[i] = el
405 dlist.append(d)
402 dlist.append(d)
406 self.notify_start(dlist)
403 self.notify_start(dlist)
407 return dlist
404 return dlist
408
405
409 def find_args(self):
406 def find_args(self):
410 return ['engine set']
407 return ['engine set']
411
408
412 def signal(self, sig):
409 def signal(self, sig):
413 dlist = []
410 dlist = []
414 for el in self.launchers.itervalues():
411 for el in self.launchers.itervalues():
415 d = el.signal(sig)
412 d = el.signal(sig)
416 dlist.append(d)
413 dlist.append(d)
417 return dlist
414 return dlist
418
415
419 def interrupt_then_kill(self, delay=1.0):
416 def interrupt_then_kill(self, delay=1.0):
420 dlist = []
417 dlist = []
421 for el in self.launchers.itervalues():
418 for el in self.launchers.itervalues():
422 d = el.interrupt_then_kill(delay)
419 d = el.interrupt_then_kill(delay)
423 dlist.append(d)
420 dlist.append(d)
424 return dlist
421 return dlist
425
422
426 def stop(self):
423 def stop(self):
427 return self.interrupt_then_kill()
424 return self.interrupt_then_kill()
428
425
429 def _notice_engine_stopped(self, data):
426 def _notice_engine_stopped(self, data):
430 pid = data['pid']
427 pid = data['pid']
431 for idx,el in self.launchers.iteritems():
428 for idx,el in self.launchers.iteritems():
432 if el.process.pid == pid:
429 if el.process.pid == pid:
433 break
430 break
434 self.launchers.pop(idx)
431 self.launchers.pop(idx)
435 self.stop_data[idx] = data
432 self.stop_data[idx] = data
436 if not self.launchers:
433 if not self.launchers:
437 self.notify_stop(self.stop_data)
434 self.notify_stop(self.stop_data)
438
435
439
436
440 #-----------------------------------------------------------------------------
437 #-----------------------------------------------------------------------------
441 # MPI launchers
438 # MPI launchers
442 #-----------------------------------------------------------------------------
439 #-----------------------------------------------------------------------------
443
440
444
441
445 class MPILauncher(LocalProcessLauncher):
442 class MPILauncher(LocalProcessLauncher):
446 """Launch an external process using mpiexec."""
443 """Launch an external process using mpiexec."""
447
444
448 mpi_cmd = List(['mpiexec'], config=True,
445 mpi_cmd = List(['mpiexec'], config=True,
449 help="The mpiexec command to use in starting the process."
446 help="The mpiexec command to use in starting the process."
450 )
447 )
451 mpi_args = List([], config=True,
448 mpi_args = List([], config=True,
452 help="The command line arguments to pass to mpiexec."
449 help="The command line arguments to pass to mpiexec."
453 )
450 )
454 program = List(['date'],
451 program = List(['date'],
455 help="The program to start via mpiexec.")
452 help="The program to start via mpiexec.")
456 program_args = List([],
453 program_args = List([],
457 help="The command line argument to the program."
454 help="The command line argument to the program."
458 )
455 )
459 n = Integer(1)
456 n = Integer(1)
460
457
461 def __init__(self, *args, **kwargs):
458 def __init__(self, *args, **kwargs):
462 # deprecation for old MPIExec names:
459 # deprecation for old MPIExec names:
463 config = kwargs.get('config', {})
460 config = kwargs.get('config', {})
464 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
461 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
465 deprecated = config.get(oldname)
462 deprecated = config.get(oldname)
466 if deprecated:
463 if deprecated:
467 newname = oldname.replace('MPIExec', 'MPI')
464 newname = oldname.replace('MPIExec', 'MPI')
468 config[newname].update(deprecated)
465 config[newname].update(deprecated)
469 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
466 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
470
467
471 super(MPILauncher, self).__init__(*args, **kwargs)
468 super(MPILauncher, self).__init__(*args, **kwargs)
472
469
473 def find_args(self):
470 def find_args(self):
474 """Build self.args using all the fields."""
471 """Build self.args using all the fields."""
475 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
472 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
476 self.program + self.program_args
473 self.program + self.program_args
477
474
478 def start(self, n):
475 def start(self, n):
479 """Start n instances of the program using mpiexec."""
476 """Start n instances of the program using mpiexec."""
480 self.n = n
477 self.n = n
481 return super(MPILauncher, self).start()
478 return super(MPILauncher, self).start()
482
479
483
480
484 class MPIControllerLauncher(MPILauncher, ControllerMixin):
481 class MPIControllerLauncher(MPILauncher, ControllerMixin):
485 """Launch a controller using mpiexec."""
482 """Launch a controller using mpiexec."""
486
483
487 # alias back to *non-configurable* program[_args] for use in find_args()
484 # alias back to *non-configurable* program[_args] for use in find_args()
488 # this way all Controller/EngineSetLaunchers have the same form, rather
485 # this way all Controller/EngineSetLaunchers have the same form, rather
489 # than *some* having `program_args` and others `controller_args`
486 # than *some* having `program_args` and others `controller_args`
490 @property
487 @property
491 def program(self):
488 def program(self):
492 return self.controller_cmd
489 return self.controller_cmd
493
490
494 @property
491 @property
495 def program_args(self):
492 def program_args(self):
496 return self.cluster_args + self.controller_args
493 return self.cluster_args + self.controller_args
497
494
498 def start(self):
495 def start(self):
499 """Start the controller by profile_dir."""
496 """Start the controller by profile_dir."""
500 return super(MPIControllerLauncher, self).start(1)
497 return super(MPIControllerLauncher, self).start(1)
501
498
502
499
503 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
500 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
504 """Launch engines using mpiexec"""
501 """Launch engines using mpiexec"""
505
502
506 # alias back to *non-configurable* program[_args] for use in find_args()
503 # alias back to *non-configurable* program[_args] for use in find_args()
507 # this way all Controller/EngineSetLaunchers have the same form, rather
504 # this way all Controller/EngineSetLaunchers have the same form, rather
508 # than *some* having `program_args` and others `controller_args`
505 # than *some* having `program_args` and others `controller_args`
509 @property
506 @property
510 def program(self):
507 def program(self):
511 return self.engine_cmd
508 return self.engine_cmd
512
509
513 @property
510 @property
514 def program_args(self):
511 def program_args(self):
515 return self.cluster_args + self.engine_args
512 return self.cluster_args + self.engine_args
516
513
517 def start(self, n):
514 def start(self, n):
518 """Start n engines by profile or profile_dir."""
515 """Start n engines by profile or profile_dir."""
519 self.n = n
516 self.n = n
520 return super(MPIEngineSetLauncher, self).start(n)
517 return super(MPIEngineSetLauncher, self).start(n)
521
518
522 # deprecated MPIExec names
519 # deprecated MPIExec names
523 class DeprecatedMPILauncher(object):
520 class DeprecatedMPILauncher(object):
524 def warn(self):
521 def warn(self):
525 oldname = self.__class__.__name__
522 oldname = self.__class__.__name__
526 newname = oldname.replace('MPIExec', 'MPI')
523 newname = oldname.replace('MPIExec', 'MPI')
527 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
524 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
528
525
529 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
526 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
530 """Deprecated, use MPILauncher"""
527 """Deprecated, use MPILauncher"""
531 def __init__(self, *args, **kwargs):
528 def __init__(self, *args, **kwargs):
532 super(MPIExecLauncher, self).__init__(*args, **kwargs)
529 super(MPIExecLauncher, self).__init__(*args, **kwargs)
533 self.warn()
530 self.warn()
534
531
535 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
532 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
536 """Deprecated, use MPIControllerLauncher"""
533 """Deprecated, use MPIControllerLauncher"""
537 def __init__(self, *args, **kwargs):
534 def __init__(self, *args, **kwargs):
538 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
535 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
539 self.warn()
536 self.warn()
540
537
541 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
538 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
542 """Deprecated, use MPIEngineSetLauncher"""
539 """Deprecated, use MPIEngineSetLauncher"""
543 def __init__(self, *args, **kwargs):
540 def __init__(self, *args, **kwargs):
544 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
541 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
545 self.warn()
542 self.warn()
546
543
547
544
548 #-----------------------------------------------------------------------------
545 #-----------------------------------------------------------------------------
549 # SSH launchers
546 # SSH launchers
550 #-----------------------------------------------------------------------------
547 #-----------------------------------------------------------------------------
551
548
552 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
549 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
553
550
554 class SSHLauncher(LocalProcessLauncher):
551 class SSHLauncher(LocalProcessLauncher):
555 """A minimal launcher for ssh.
552 """A minimal launcher for ssh.
556
553
557 To be useful this will probably have to be extended to use the ``sshx``
554 To be useful this will probably have to be extended to use the ``sshx``
558 idea for environment variables. There could be other things this needs
555 idea for environment variables. There could be other things this needs
559 as well.
556 as well.
560 """
557 """
561
558
562 ssh_cmd = List(['ssh'], config=True,
559 ssh_cmd = List(['ssh'], config=True,
563 help="command for starting ssh")
560 help="command for starting ssh")
564 ssh_args = List(['-tt'], config=True,
561 ssh_args = List(['-tt'], config=True,
565 help="args to pass to ssh")
562 help="args to pass to ssh")
563 scp_cmd = List(['scp'], config=True,
564 help="command for sending files")
566 program = List(['date'],
565 program = List(['date'],
567 help="Program to launch via ssh")
566 help="Program to launch via ssh")
568 program_args = List([],
567 program_args = List([],
569 help="args to pass to remote program")
568 help="args to pass to remote program")
570 hostname = Unicode('', config=True,
569 hostname = Unicode('', config=True,
571 help="hostname on which to launch the program")
570 help="hostname on which to launch the program")
572 user = Unicode('', config=True,
571 user = Unicode('', config=True,
573 help="username for ssh")
572 help="username for ssh")
574 location = Unicode('', config=True,
573 location = Unicode('', config=True,
575 help="user@hostname location for ssh in one setting")
574 help="user@hostname location for ssh in one setting")
575 to_fetch = List([], config=True,
576 help="List of (remote, local) files to fetch after starting")
577 to_send = List([], config=True,
578 help="List of (local, remote) files to send before starting")
576
579
577 def _hostname_changed(self, name, old, new):
580 def _hostname_changed(self, name, old, new):
578 if self.user:
581 if self.user:
579 self.location = u'%s@%s' % (self.user, new)
582 self.location = u'%s@%s' % (self.user, new)
580 else:
583 else:
581 self.location = new
584 self.location = new
582
585
583 def _user_changed(self, name, old, new):
586 def _user_changed(self, name, old, new):
584 self.location = u'%s@%s' % (new, self.hostname)
587 self.location = u'%s@%s' % (new, self.hostname)
585
588
586 def find_args(self):
589 def find_args(self):
587 return self.ssh_cmd + self.ssh_args + [self.location] + \
590 return self.ssh_cmd + self.ssh_args + [self.location] + \
588 self.program + self.program_args
591 self.program + self.program_args
592
593 def _send_file(self, local, remote):
594 """send a single file"""
595 remote = "%s:%s" % (self.location, remote)
596 for i in range(10):
597 if not os.path.exists(local):
598 self.log.debug("waiting for %s" % local)
599 time.sleep(1)
600 else:
601 break
602 self.log.info("sending %s to %s", local, remote)
603 check_output(self.scp_cmd + [local, remote])
604
605 def send_files(self):
606 """send our files (called before start)"""
607 if not self.to_send:
608 return
609 for local_file, remote_file in self.to_send:
610 self._send_file(local_file, remote_file)
611
612 def _fetch_file(self, remote, local):
613 """fetch a single file"""
614 full_remote = "%s:%s" % (self.location, remote)
615 self.log.info("fetching %s from %s", local, full_remote)
616 for i in range(10):
617 # wait up to 10s for remote file to exist
618 check = check_output(self.ssh_cmd + self.ssh_args + \
619 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
620 check = check.strip()
621 if check == 'no':
622 time.sleep(1)
623 elif check == 'yes':
624 break
625 check_output(self.scp_cmd + [full_remote, local])
626
627 def fetch_files(self):
628 """fetch remote files (called after start)"""
629 if not self.to_fetch:
630 return
631 for remote_file, local_file in self.to_fetch:
632 self._fetch_file(remote_file, local_file)
589
633
590 def start(self, hostname=None, user=None):
634 def start(self, hostname=None, user=None):
591 if hostname is not None:
635 if hostname is not None:
592 self.hostname = hostname
636 self.hostname = hostname
593 if user is not None:
637 if user is not None:
594 self.user = user
638 self.user = user
595
639
596 return super(SSHLauncher, self).start()
640 self.send_files()
641 super(SSHLauncher, self).start()
642 self.fetch_files()
597
643
598 def signal(self, sig):
644 def signal(self, sig):
599 if self.state == 'running':
645 if self.state == 'running':
600 # send escaped ssh connection-closer
646 # send escaped ssh connection-closer
601 self.process.stdin.write('~.')
647 self.process.stdin.write('~.')
602 self.process.stdin.flush()
648 self.process.stdin.flush()
603
649
650 class SSHClusterLauncher(SSHLauncher):
651
652 remote_profile_dir = Unicode('', config=True,
653 help="""The remote profile_dir to use.
654
655 If not specified, use calling profile, stripping out possible leading homedir.
656 """)
657
658 def _remote_profie_dir_default(self):
659 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
660 """
661 home = get_home_dir()
662 if not home.endswith('/'):
663 home = home+'/'
664
665 if self.profile_dir.startswith(home):
666 return self.profile_dir[len(home):]
667 else:
668 return self.profile_dir
669
670 def _cluster_id_changed(self, name, old, new):
671 if new:
672 raise ValueError("cluster id not supported by SSH launchers")
673
674 @property
675 def cluster_args(self):
676 return ['--profile-dir', self.remote_profile_dir]
604
677
605
678 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
606 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
607
679
608 # alias back to *non-configurable* program[_args] for use in find_args()
680 # alias back to *non-configurable* program[_args] for use in find_args()
609 # this way all Controller/EngineSetLaunchers have the same form, rather
681 # this way all Controller/EngineSetLaunchers have the same form, rather
610 # than *some* having `program_args` and others `controller_args`
682 # than *some* having `program_args` and others `controller_args`
683
684 def _controller_cmd_default(self):
685 return ['ipcontroller']
686
611 @property
687 @property
612 def program(self):
688 def program(self):
613 return self.controller_cmd
689 return self.controller_cmd
614
690
615 @property
691 @property
616 def program_args(self):
692 def program_args(self):
617 return self.cluster_args + self.controller_args
693 return self.cluster_args + self.controller_args
618
694
695 def _to_fetch_default(self):
696 return [
697 (os.path.join(self.remote_profile_dir, 'security', cf),
698 os.path.join(self.profile_dir, 'security', cf),)
699 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
700 ]
619
701
620 class SSHEngineLauncher(SSHLauncher, EngineMixin):
702 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
621
703
622 # alias back to *non-configurable* program[_args] for use in find_args()
704 # alias back to *non-configurable* program[_args] for use in find_args()
623 # this way all Controller/EngineSetLaunchers have the same form, rather
705 # this way all Controller/EngineSetLaunchers have the same form, rather
624 # than *some* having `program_args` and others `controller_args`
706 # than *some* having `program_args` and others `controller_args`
707
708 def _engine_cmd_default(self):
709 return ['ipengine']
710
625 @property
711 @property
626 def program(self):
712 def program(self):
627 return self.engine_cmd
713 return self.engine_cmd
628
714
629 @property
715 @property
630 def program_args(self):
716 def program_args(self):
631 return self.cluster_args + self.engine_args
717 return self.cluster_args + self.engine_args
718
719 def _to_send_default(self):
720 return [
721 (os.path.join(self.profile_dir, 'security', cf),
722 os.path.join(self.remote_profile_dir, 'security', cf))
723 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
724 ]
632
725
633
726
634 class SSHEngineSetLauncher(LocalEngineSetLauncher):
727 class SSHEngineSetLauncher(LocalEngineSetLauncher):
635 launcher_class = SSHEngineLauncher
728 launcher_class = SSHEngineLauncher
636 engines = Dict(config=True,
729 engines = Dict(config=True,
637 help="""dict of engines to launch. This is a dict by hostname of ints,
730 help="""dict of engines to launch. This is a dict by hostname of ints,
638 corresponding to the number of engines to start on that host.""")
731 corresponding to the number of engines to start on that host.""")
639
732
640 @property
733 @property
641 def engine_count(self):
734 def engine_count(self):
642 """determine engine count from `engines` dict"""
735 """determine engine count from `engines` dict"""
643 count = 0
736 count = 0
644 for n in self.engines.itervalues():
737 for n in self.engines.itervalues():
645 if isinstance(n, (tuple,list)):
738 if isinstance(n, (tuple,list)):
646 n,args = n
739 n,args = n
647 count += n
740 count += n
648 return count
741 return count
649
742
650 def start(self, n):
743 def start(self, n):
651 """Start engines by profile or profile_dir.
744 """Start engines by profile or profile_dir.
652 `n` is ignored, and the `engines` config property is used instead.
745 `n` is ignored, and the `engines` config property is used instead.
653 """
746 """
654
747
655 dlist = []
748 dlist = []
656 for host, n in self.engines.iteritems():
749 for host, n in self.engines.iteritems():
657 if isinstance(n, (tuple, list)):
750 if isinstance(n, (tuple, list)):
658 n, args = n
751 n, args = n
659 else:
752 else:
660 args = copy.deepcopy(self.engine_args)
753 args = copy.deepcopy(self.engine_args)
661
754
662 if '@' in host:
755 if '@' in host:
663 user,host = host.split('@',1)
756 user,host = host.split('@',1)
664 else:
757 else:
665 user=None
758 user=None
666 for i in range(n):
759 for i in range(n):
667 if i > 0:
760 if i > 0:
668 time.sleep(self.delay)
761 time.sleep(self.delay)
669 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
762 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
670 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
763 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
671 )
764 )
765 if i > 0:
766 # only send files for the first engine on each host
767 el.to_send = []
672
768
673 # Copy the engine args over to each engine launcher.
769 # Copy the engine args over to each engine launcher.
674 el.engine_cmd = self.engine_cmd
770 el.engine_cmd = self.engine_cmd
675 el.engine_args = args
771 el.engine_args = args
676 el.on_stop(self._notice_engine_stopped)
772 el.on_stop(self._notice_engine_stopped)
677 d = el.start(user=user, hostname=host)
773 d = el.start(user=user, hostname=host)
678 self.launchers[ "%s/%i" % (host,i) ] = el
774 self.launchers[ "%s/%i" % (host,i) ] = el
679 dlist.append(d)
775 dlist.append(d)
680 self.notify_start(dlist)
776 self.notify_start(dlist)
681 return dlist
777 return dlist
682
778
683
779
780 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
781 """Launcher for calling
782 `ipcluster engines` on a remote machine.
783
784 Requires that remote profile is already configured.
785 """
786
787 n = Integer()
788 ipcluster_cmd = List(['ipcluster'], config=True)
789
790 @property
791 def program(self):
792 return self.ipcluster_cmd + ['engines']
793
794 @property
795 def program_args(self):
796 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
797
798 def _to_send_default(self):
799 return [
800 (os.path.join(self.profile_dir, 'security', cf),
801 os.path.join(self.remote_profile_dir, 'security', cf))
802 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
803 ]
804
805 def start(self, n):
806 self.n = n
807 super(SSHProxyEngineSetLauncher, self).start()
808
684
809
685 #-----------------------------------------------------------------------------
810 #-----------------------------------------------------------------------------
686 # Windows HPC Server 2008 scheduler launchers
811 # Windows HPC Server 2008 scheduler launchers
687 #-----------------------------------------------------------------------------
812 #-----------------------------------------------------------------------------
688
813
689
814
690 # This is only used on Windows.
815 # This is only used on Windows.
691 def find_job_cmd():
816 def find_job_cmd():
692 if WINDOWS:
817 if WINDOWS:
693 try:
818 try:
694 return find_cmd('job')
819 return find_cmd('job')
695 except (FindCmdError, ImportError):
820 except (FindCmdError, ImportError):
696 # ImportError will be raised if win32api is not installed
821 # ImportError will be raised if win32api is not installed
697 return 'job'
822 return 'job'
698 else:
823 else:
699 return 'job'
824 return 'job'
700
825
701
826
702 class WindowsHPCLauncher(BaseLauncher):
827 class WindowsHPCLauncher(BaseLauncher):
703
828
704 job_id_regexp = Unicode(r'\d+', config=True,
829 job_id_regexp = Unicode(r'\d+', config=True,
705 help="""A regular expression used to get the job id from the output of the
830 help="""A regular expression used to get the job id from the output of the
706 submit_command. """
831 submit_command. """
707 )
832 )
708 job_file_name = Unicode(u'ipython_job.xml', config=True,
833 job_file_name = Unicode(u'ipython_job.xml', config=True,
709 help="The filename of the instantiated job script.")
834 help="The filename of the instantiated job script.")
710 # The full path to the instantiated job script. This gets made dynamically
835 # The full path to the instantiated job script. This gets made dynamically
711 # by combining the work_dir with the job_file_name.
836 # by combining the work_dir with the job_file_name.
712 job_file = Unicode(u'')
837 job_file = Unicode(u'')
713 scheduler = Unicode('', config=True,
838 scheduler = Unicode('', config=True,
714 help="The hostname of the scheduler to submit the job to.")
839 help="The hostname of the scheduler to submit the job to.")
715 job_cmd = Unicode(find_job_cmd(), config=True,
840 job_cmd = Unicode(find_job_cmd(), config=True,
716 help="The command for submitting jobs.")
841 help="The command for submitting jobs.")
717
842
718 def __init__(self, work_dir=u'.', config=None, **kwargs):
843 def __init__(self, work_dir=u'.', config=None, **kwargs):
719 super(WindowsHPCLauncher, self).__init__(
844 super(WindowsHPCLauncher, self).__init__(
720 work_dir=work_dir, config=config, **kwargs
845 work_dir=work_dir, config=config, **kwargs
721 )
846 )
722
847
723 @property
848 @property
724 def job_file(self):
849 def job_file(self):
725 return os.path.join(self.work_dir, self.job_file_name)
850 return os.path.join(self.work_dir, self.job_file_name)
726
851
727 def write_job_file(self, n):
852 def write_job_file(self, n):
728 raise NotImplementedError("Implement write_job_file in a subclass.")
853 raise NotImplementedError("Implement write_job_file in a subclass.")
729
854
730 def find_args(self):
855 def find_args(self):
731 return [u'job.exe']
856 return [u'job.exe']
732
857
733 def parse_job_id(self, output):
858 def parse_job_id(self, output):
734 """Take the output of the submit command and return the job id."""
859 """Take the output of the submit command and return the job id."""
735 m = re.search(self.job_id_regexp, output)
860 m = re.search(self.job_id_regexp, output)
736 if m is not None:
861 if m is not None:
737 job_id = m.group()
862 job_id = m.group()
738 else:
863 else:
739 raise LauncherError("Job id couldn't be determined: %s" % output)
864 raise LauncherError("Job id couldn't be determined: %s" % output)
740 self.job_id = job_id
865 self.job_id = job_id
741 self.log.info('Job started with id: %r', job_id)
866 self.log.info('Job started with id: %r', job_id)
742 return job_id
867 return job_id
743
868
744 def start(self, n):
869 def start(self, n):
745 """Start n copies of the process using the Win HPC job scheduler."""
870 """Start n copies of the process using the Win HPC job scheduler."""
746 self.write_job_file(n)
871 self.write_job_file(n)
747 args = [
872 args = [
748 'submit',
873 'submit',
749 '/jobfile:%s' % self.job_file,
874 '/jobfile:%s' % self.job_file,
750 '/scheduler:%s' % self.scheduler
875 '/scheduler:%s' % self.scheduler
751 ]
876 ]
752 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
877 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
753
878
754 output = check_output([self.job_cmd]+args,
879 output = check_output([self.job_cmd]+args,
755 env=os.environ,
880 env=os.environ,
756 cwd=self.work_dir,
881 cwd=self.work_dir,
757 stderr=STDOUT
882 stderr=STDOUT
758 )
883 )
759 job_id = self.parse_job_id(output)
884 job_id = self.parse_job_id(output)
760 self.notify_start(job_id)
885 self.notify_start(job_id)
761 return job_id
886 return job_id
762
887
763 def stop(self):
888 def stop(self):
764 args = [
889 args = [
765 'cancel',
890 'cancel',
766 self.job_id,
891 self.job_id,
767 '/scheduler:%s' % self.scheduler
892 '/scheduler:%s' % self.scheduler
768 ]
893 ]
769 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
894 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
770 try:
895 try:
771 output = check_output([self.job_cmd]+args,
896 output = check_output([self.job_cmd]+args,
772 env=os.environ,
897 env=os.environ,
773 cwd=self.work_dir,
898 cwd=self.work_dir,
774 stderr=STDOUT
899 stderr=STDOUT
775 )
900 )
776 except:
901 except:
777 output = 'The job already appears to be stoppped: %r' % self.job_id
902 output = 'The job already appears to be stoppped: %r' % self.job_id
778 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
903 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
779 return output
904 return output
780
905
781
906
782 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
907 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
783
908
784 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
909 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
785 help="WinHPC xml job file.")
910 help="WinHPC xml job file.")
786 controller_args = List([], config=False,
911 controller_args = List([], config=False,
787 help="extra args to pass to ipcontroller")
912 help="extra args to pass to ipcontroller")
788
913
789 def write_job_file(self, n):
914 def write_job_file(self, n):
790 job = IPControllerJob(config=self.config)
915 job = IPControllerJob(config=self.config)
791
916
792 t = IPControllerTask(config=self.config)
917 t = IPControllerTask(config=self.config)
793 # The tasks work directory is *not* the actual work directory of
918 # The tasks work directory is *not* the actual work directory of
794 # the controller. It is used as the base path for the stdout/stderr
919 # the controller. It is used as the base path for the stdout/stderr
795 # files that the scheduler redirects to.
920 # files that the scheduler redirects to.
796 t.work_directory = self.profile_dir
921 t.work_directory = self.profile_dir
797 # Add the profile_dir and from self.start().
922 # Add the profile_dir and from self.start().
798 t.controller_args.extend(self.cluster_args)
923 t.controller_args.extend(self.cluster_args)
799 t.controller_args.extend(self.controller_args)
924 t.controller_args.extend(self.controller_args)
800 job.add_task(t)
925 job.add_task(t)
801
926
802 self.log.debug("Writing job description file: %s", self.job_file)
927 self.log.debug("Writing job description file: %s", self.job_file)
803 job.write(self.job_file)
928 job.write(self.job_file)
804
929
805 @property
930 @property
806 def job_file(self):
931 def job_file(self):
807 return os.path.join(self.profile_dir, self.job_file_name)
932 return os.path.join(self.profile_dir, self.job_file_name)
808
933
809 def start(self):
934 def start(self):
810 """Start the controller by profile_dir."""
935 """Start the controller by profile_dir."""
811 return super(WindowsHPCControllerLauncher, self).start(1)
936 return super(WindowsHPCControllerLauncher, self).start(1)
812
937
813
938
814 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
939 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
815
940
816 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
941 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
817 help="jobfile for ipengines job")
942 help="jobfile for ipengines job")
818 engine_args = List([], config=False,
943 engine_args = List([], config=False,
819 help="extra args to pas to ipengine")
944 help="extra args to pas to ipengine")
820
945
821 def write_job_file(self, n):
946 def write_job_file(self, n):
822 job = IPEngineSetJob(config=self.config)
947 job = IPEngineSetJob(config=self.config)
823
948
824 for i in range(n):
949 for i in range(n):
825 t = IPEngineTask(config=self.config)
950 t = IPEngineTask(config=self.config)
826 # The tasks work directory is *not* the actual work directory of
951 # The tasks work directory is *not* the actual work directory of
827 # the engine. It is used as the base path for the stdout/stderr
952 # the engine. It is used as the base path for the stdout/stderr
828 # files that the scheduler redirects to.
953 # files that the scheduler redirects to.
829 t.work_directory = self.profile_dir
954 t.work_directory = self.profile_dir
830 # Add the profile_dir and from self.start().
955 # Add the profile_dir and from self.start().
831 t.engine_args.extend(self.cluster_args)
956 t.engine_args.extend(self.cluster_args)
832 t.engine_args.extend(self.engine_args)
957 t.engine_args.extend(self.engine_args)
833 job.add_task(t)
958 job.add_task(t)
834
959
835 self.log.debug("Writing job description file: %s", self.job_file)
960 self.log.debug("Writing job description file: %s", self.job_file)
836 job.write(self.job_file)
961 job.write(self.job_file)
837
962
838 @property
963 @property
839 def job_file(self):
964 def job_file(self):
840 return os.path.join(self.profile_dir, self.job_file_name)
965 return os.path.join(self.profile_dir, self.job_file_name)
841
966
842 def start(self, n):
967 def start(self, n):
843 """Start the controller by profile_dir."""
968 """Start the controller by profile_dir."""
844 return super(WindowsHPCEngineSetLauncher, self).start(n)
969 return super(WindowsHPCEngineSetLauncher, self).start(n)
845
970
846
971
847 #-----------------------------------------------------------------------------
972 #-----------------------------------------------------------------------------
848 # Batch (PBS) system launchers
973 # Batch (PBS) system launchers
849 #-----------------------------------------------------------------------------
974 #-----------------------------------------------------------------------------
850
975
851 class BatchClusterAppMixin(ClusterAppMixin):
976 class BatchClusterAppMixin(ClusterAppMixin):
852 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
977 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
853 def _profile_dir_changed(self, name, old, new):
978 def _profile_dir_changed(self, name, old, new):
854 self.context[name] = new
979 self.context[name] = new
855 _cluster_id_changed = _profile_dir_changed
980 _cluster_id_changed = _profile_dir_changed
856
981
857 def _profile_dir_default(self):
982 def _profile_dir_default(self):
858 self.context['profile_dir'] = ''
983 self.context['profile_dir'] = ''
859 return ''
984 return ''
860 def _cluster_id_default(self):
985 def _cluster_id_default(self):
861 self.context['cluster_id'] = ''
986 self.context['cluster_id'] = ''
862 return ''
987 return ''
863
988
864
989
865 class BatchSystemLauncher(BaseLauncher):
990 class BatchSystemLauncher(BaseLauncher):
866 """Launch an external process using a batch system.
991 """Launch an external process using a batch system.
867
992
868 This class is designed to work with UNIX batch systems like PBS, LSF,
993 This class is designed to work with UNIX batch systems like PBS, LSF,
869 GridEngine, etc. The overall model is that there are different commands
994 GridEngine, etc. The overall model is that there are different commands
870 like qsub, qdel, etc. that handle the starting and stopping of the process.
995 like qsub, qdel, etc. that handle the starting and stopping of the process.
871
996
872 This class also has the notion of a batch script. The ``batch_template``
997 This class also has the notion of a batch script. The ``batch_template``
873 attribute can be set to a string that is a template for the batch script.
998 attribute can be set to a string that is a template for the batch script.
874 This template is instantiated using string formatting. Thus the template can
999 This template is instantiated using string formatting. Thus the template can
875 use {n} fot the number of instances. Subclasses can add additional variables
1000 use {n} fot the number of instances. Subclasses can add additional variables
876 to the template dict.
1001 to the template dict.
877 """
1002 """
878
1003
879 # Subclasses must fill these in. See PBSEngineSet
1004 # Subclasses must fill these in. See PBSEngineSet
880 submit_command = List([''], config=True,
1005 submit_command = List([''], config=True,
881 help="The name of the command line program used to submit jobs.")
1006 help="The name of the command line program used to submit jobs.")
882 delete_command = List([''], config=True,
1007 delete_command = List([''], config=True,
883 help="The name of the command line program used to delete jobs.")
1008 help="The name of the command line program used to delete jobs.")
884 job_id_regexp = Unicode('', config=True,
1009 job_id_regexp = Unicode('', config=True,
885 help="""A regular expression used to get the job id from the output of the
1010 help="""A regular expression used to get the job id from the output of the
886 submit_command.""")
1011 submit_command.""")
887 batch_template = Unicode('', config=True,
1012 batch_template = Unicode('', config=True,
888 help="The string that is the batch script template itself.")
1013 help="The string that is the batch script template itself.")
889 batch_template_file = Unicode(u'', config=True,
1014 batch_template_file = Unicode(u'', config=True,
890 help="The file that contains the batch template.")
1015 help="The file that contains the batch template.")
891 batch_file_name = Unicode(u'batch_script', config=True,
1016 batch_file_name = Unicode(u'batch_script', config=True,
892 help="The filename of the instantiated batch script.")
1017 help="The filename of the instantiated batch script.")
893 queue = Unicode(u'', config=True,
1018 queue = Unicode(u'', config=True,
894 help="The PBS Queue.")
1019 help="The PBS Queue.")
895
1020
896 def _queue_changed(self, name, old, new):
1021 def _queue_changed(self, name, old, new):
897 self.context[name] = new
1022 self.context[name] = new
898
1023
899 n = Integer(1)
1024 n = Integer(1)
900 _n_changed = _queue_changed
1025 _n_changed = _queue_changed
901
1026
902 # not configurable, override in subclasses
1027 # not configurable, override in subclasses
903 # PBS Job Array regex
1028 # PBS Job Array regex
904 job_array_regexp = Unicode('')
1029 job_array_regexp = Unicode('')
905 job_array_template = Unicode('')
1030 job_array_template = Unicode('')
906 # PBS Queue regex
1031 # PBS Queue regex
907 queue_regexp = Unicode('')
1032 queue_regexp = Unicode('')
908 queue_template = Unicode('')
1033 queue_template = Unicode('')
909 # The default batch template, override in subclasses
1034 # The default batch template, override in subclasses
910 default_template = Unicode('')
1035 default_template = Unicode('')
911 # The full path to the instantiated batch script.
1036 # The full path to the instantiated batch script.
912 batch_file = Unicode(u'')
1037 batch_file = Unicode(u'')
913 # the format dict used with batch_template:
1038 # the format dict used with batch_template:
914 context = Dict()
1039 context = Dict()
915 def _context_default(self):
1040 def _context_default(self):
916 """load the default context with the default values for the basic keys
1041 """load the default context with the default values for the basic keys
917
1042
918 because the _trait_changed methods only load the context if they
1043 because the _trait_changed methods only load the context if they
919 are set to something other than the default value.
1044 are set to something other than the default value.
920 """
1045 """
921 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1046 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
922
1047
923 # the Formatter instance for rendering the templates:
1048 # the Formatter instance for rendering the templates:
924 formatter = Instance(EvalFormatter, (), {})
1049 formatter = Instance(EvalFormatter, (), {})
925
1050
926
1051
927 def find_args(self):
1052 def find_args(self):
928 return self.submit_command + [self.batch_file]
1053 return self.submit_command + [self.batch_file]
929
1054
930 def __init__(self, work_dir=u'.', config=None, **kwargs):
1055 def __init__(self, work_dir=u'.', config=None, **kwargs):
931 super(BatchSystemLauncher, self).__init__(
1056 super(BatchSystemLauncher, self).__init__(
932 work_dir=work_dir, config=config, **kwargs
1057 work_dir=work_dir, config=config, **kwargs
933 )
1058 )
934 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1059 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
935
1060
936 def parse_job_id(self, output):
1061 def parse_job_id(self, output):
937 """Take the output of the submit command and return the job id."""
1062 """Take the output of the submit command and return the job id."""
938 m = re.search(self.job_id_regexp, output)
1063 m = re.search(self.job_id_regexp, output)
939 if m is not None:
1064 if m is not None:
940 job_id = m.group()
1065 job_id = m.group()
941 else:
1066 else:
942 raise LauncherError("Job id couldn't be determined: %s" % output)
1067 raise LauncherError("Job id couldn't be determined: %s" % output)
943 self.job_id = job_id
1068 self.job_id = job_id
944 self.log.info('Job submitted with job id: %r', job_id)
1069 self.log.info('Job submitted with job id: %r', job_id)
945 return job_id
1070 return job_id
946
1071
947 def write_batch_script(self, n):
1072 def write_batch_script(self, n):
948 """Instantiate and write the batch script to the work_dir."""
1073 """Instantiate and write the batch script to the work_dir."""
949 self.n = n
1074 self.n = n
950 # first priority is batch_template if set
1075 # first priority is batch_template if set
951 if self.batch_template_file and not self.batch_template:
1076 if self.batch_template_file and not self.batch_template:
952 # second priority is batch_template_file
1077 # second priority is batch_template_file
953 with open(self.batch_template_file) as f:
1078 with open(self.batch_template_file) as f:
954 self.batch_template = f.read()
1079 self.batch_template = f.read()
955 if not self.batch_template:
1080 if not self.batch_template:
956 # third (last) priority is default_template
1081 # third (last) priority is default_template
957 self.batch_template = self.default_template
1082 self.batch_template = self.default_template
958
1083
959 # add jobarray or queue lines to user-specified template
1084 # add jobarray or queue lines to user-specified template
960 # note that this is *only* when user did not specify a template.
1085 # note that this is *only* when user did not specify a template.
961 regex = re.compile(self.job_array_regexp)
1086 regex = re.compile(self.job_array_regexp)
962 # print regex.search(self.batch_template)
1087 # print regex.search(self.batch_template)
963 if not regex.search(self.batch_template):
1088 if not regex.search(self.batch_template):
964 self.log.debug("adding job array settings to batch script")
1089 self.log.debug("adding job array settings to batch script")
965 firstline, rest = self.batch_template.split('\n',1)
1090 firstline, rest = self.batch_template.split('\n',1)
966 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1091 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
967
1092
968 regex = re.compile(self.queue_regexp)
1093 regex = re.compile(self.queue_regexp)
969 # print regex.search(self.batch_template)
1094 # print regex.search(self.batch_template)
970 if self.queue and not regex.search(self.batch_template):
1095 if self.queue and not regex.search(self.batch_template):
971 self.log.debug("adding PBS queue settings to batch script")
1096 self.log.debug("adding PBS queue settings to batch script")
972 firstline, rest = self.batch_template.split('\n',1)
1097 firstline, rest = self.batch_template.split('\n',1)
973 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1098 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
974
1099
975 script_as_string = self.formatter.format(self.batch_template, **self.context)
1100 script_as_string = self.formatter.format(self.batch_template, **self.context)
976 self.log.debug('Writing batch script: %s', self.batch_file)
1101 self.log.debug('Writing batch script: %s', self.batch_file)
977
1102
978 with open(self.batch_file, 'w') as f:
1103 with open(self.batch_file, 'w') as f:
979 f.write(script_as_string)
1104 f.write(script_as_string)
980 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1105 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
981
1106
982 def start(self, n):
1107 def start(self, n):
983 """Start n copies of the process using a batch system."""
1108 """Start n copies of the process using a batch system."""
984 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1109 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
985 # Here we save profile_dir in the context so they
1110 # Here we save profile_dir in the context so they
986 # can be used in the batch script template as {profile_dir}
1111 # can be used in the batch script template as {profile_dir}
987 self.write_batch_script(n)
1112 self.write_batch_script(n)
988 output = check_output(self.args, env=os.environ)
1113 output = check_output(self.args, env=os.environ)
989
1114
990 job_id = self.parse_job_id(output)
1115 job_id = self.parse_job_id(output)
991 self.notify_start(job_id)
1116 self.notify_start(job_id)
992 return job_id
1117 return job_id
993
1118
994 def stop(self):
1119 def stop(self):
995 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1120 output = check_output(self.delete_command+[self.job_id], env=os.environ)
996 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1121 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
997 return output
1122 return output
998
1123
999
1124
1000 class PBSLauncher(BatchSystemLauncher):
1125 class PBSLauncher(BatchSystemLauncher):
1001 """A BatchSystemLauncher subclass for PBS."""
1126 """A BatchSystemLauncher subclass for PBS."""
1002
1127
1003 submit_command = List(['qsub'], config=True,
1128 submit_command = List(['qsub'], config=True,
1004 help="The PBS submit command ['qsub']")
1129 help="The PBS submit command ['qsub']")
1005 delete_command = List(['qdel'], config=True,
1130 delete_command = List(['qdel'], config=True,
1006 help="The PBS delete command ['qsub']")
1131 help="The PBS delete command ['qsub']")
1007 job_id_regexp = Unicode(r'\d+', config=True,
1132 job_id_regexp = Unicode(r'\d+', config=True,
1008 help="Regular expresion for identifying the job ID [r'\d+']")
1133 help="Regular expresion for identifying the job ID [r'\d+']")
1009
1134
1010 batch_file = Unicode(u'')
1135 batch_file = Unicode(u'')
1011 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
1136 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
1012 job_array_template = Unicode('#PBS -t 1-{n}')
1137 job_array_template = Unicode('#PBS -t 1-{n}')
1013 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
1138 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
1014 queue_template = Unicode('#PBS -q {queue}')
1139 queue_template = Unicode('#PBS -q {queue}')
1015
1140
1016
1141
1017 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1142 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1018 """Launch a controller using PBS."""
1143 """Launch a controller using PBS."""
1019
1144
1020 batch_file_name = Unicode(u'pbs_controller', config=True,
1145 batch_file_name = Unicode(u'pbs_controller', config=True,
1021 help="batch file name for the controller job.")
1146 help="batch file name for the controller job.")
1022 default_template= Unicode("""#!/bin/sh
1147 default_template= Unicode("""#!/bin/sh
1023 #PBS -V
1148 #PBS -V
1024 #PBS -N ipcontroller
1149 #PBS -N ipcontroller
1025 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1150 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1026 """%(' '.join(ipcontroller_cmd_argv)))
1151 """%(' '.join(ipcontroller_cmd_argv)))
1027
1152
1028
1153
1029 def start(self):
1154 def start(self):
1030 """Start the controller by profile or profile_dir."""
1155 """Start the controller by profile or profile_dir."""
1031 return super(PBSControllerLauncher, self).start(1)
1156 return super(PBSControllerLauncher, self).start(1)
1032
1157
1033
1158
1034 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1159 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1035 """Launch Engines using PBS"""
1160 """Launch Engines using PBS"""
1036 batch_file_name = Unicode(u'pbs_engines', config=True,
1161 batch_file_name = Unicode(u'pbs_engines', config=True,
1037 help="batch file name for the engine(s) job.")
1162 help="batch file name for the engine(s) job.")
1038 default_template= Unicode(u"""#!/bin/sh
1163 default_template= Unicode(u"""#!/bin/sh
1039 #PBS -V
1164 #PBS -V
1040 #PBS -N ipengine
1165 #PBS -N ipengine
1041 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1166 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1042 """%(' '.join(ipengine_cmd_argv)))
1167 """%(' '.join(ipengine_cmd_argv)))
1043
1168
1044 def start(self, n):
1169 def start(self, n):
1045 """Start n engines by profile or profile_dir."""
1170 """Start n engines by profile or profile_dir."""
1046 return super(PBSEngineSetLauncher, self).start(n)
1171 return super(PBSEngineSetLauncher, self).start(n)
1047
1172
1048 #SGE is very similar to PBS
1173 #SGE is very similar to PBS
1049
1174
1050 class SGELauncher(PBSLauncher):
1175 class SGELauncher(PBSLauncher):
1051 """Sun GridEngine is a PBS clone with slightly different syntax"""
1176 """Sun GridEngine is a PBS clone with slightly different syntax"""
1052 job_array_regexp = Unicode('#\$\W+\-t')
1177 job_array_regexp = Unicode('#\$\W+\-t')
1053 job_array_template = Unicode('#$ -t 1-{n}')
1178 job_array_template = Unicode('#$ -t 1-{n}')
1054 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1179 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1055 queue_template = Unicode('#$ -q {queue}')
1180 queue_template = Unicode('#$ -q {queue}')
1056
1181
1057 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1182 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1058 """Launch a controller using SGE."""
1183 """Launch a controller using SGE."""
1059
1184
1060 batch_file_name = Unicode(u'sge_controller', config=True,
1185 batch_file_name = Unicode(u'sge_controller', config=True,
1061 help="batch file name for the ipontroller job.")
1186 help="batch file name for the ipontroller job.")
1062 default_template= Unicode(u"""#$ -V
1187 default_template= Unicode(u"""#$ -V
1063 #$ -S /bin/sh
1188 #$ -S /bin/sh
1064 #$ -N ipcontroller
1189 #$ -N ipcontroller
1065 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1190 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1066 """%(' '.join(ipcontroller_cmd_argv)))
1191 """%(' '.join(ipcontroller_cmd_argv)))
1067
1192
1068 def start(self):
1193 def start(self):
1069 """Start the controller by profile or profile_dir."""
1194 """Start the controller by profile or profile_dir."""
1070 return super(SGEControllerLauncher, self).start(1)
1195 return super(SGEControllerLauncher, self).start(1)
1071
1196
1072 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1197 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1073 """Launch Engines with SGE"""
1198 """Launch Engines with SGE"""
1074 batch_file_name = Unicode(u'sge_engines', config=True,
1199 batch_file_name = Unicode(u'sge_engines', config=True,
1075 help="batch file name for the engine(s) job.")
1200 help="batch file name for the engine(s) job.")
1076 default_template = Unicode("""#$ -V
1201 default_template = Unicode("""#$ -V
1077 #$ -S /bin/sh
1202 #$ -S /bin/sh
1078 #$ -N ipengine
1203 #$ -N ipengine
1079 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1204 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1080 """%(' '.join(ipengine_cmd_argv)))
1205 """%(' '.join(ipengine_cmd_argv)))
1081
1206
1082 def start(self, n):
1207 def start(self, n):
1083 """Start n engines by profile or profile_dir."""
1208 """Start n engines by profile or profile_dir."""
1084 return super(SGEEngineSetLauncher, self).start(n)
1209 return super(SGEEngineSetLauncher, self).start(n)
1085
1210
1086
1211
1087 # LSF launchers
1212 # LSF launchers
1088
1213
1089 class LSFLauncher(BatchSystemLauncher):
1214 class LSFLauncher(BatchSystemLauncher):
1090 """A BatchSystemLauncher subclass for LSF."""
1215 """A BatchSystemLauncher subclass for LSF."""
1091
1216
1092 submit_command = List(['bsub'], config=True,
1217 submit_command = List(['bsub'], config=True,
1093 help="The PBS submit command ['bsub']")
1218 help="The PBS submit command ['bsub']")
1094 delete_command = List(['bkill'], config=True,
1219 delete_command = List(['bkill'], config=True,
1095 help="The PBS delete command ['bkill']")
1220 help="The PBS delete command ['bkill']")
1096 job_id_regexp = Unicode(r'\d+', config=True,
1221 job_id_regexp = Unicode(r'\d+', config=True,
1097 help="Regular expresion for identifying the job ID [r'\d+']")
1222 help="Regular expresion for identifying the job ID [r'\d+']")
1098
1223
1099 batch_file = Unicode(u'')
1224 batch_file = Unicode(u'')
1100 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1225 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1101 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1226 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1102 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1227 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1103 queue_template = Unicode('#BSUB -q {queue}')
1228 queue_template = Unicode('#BSUB -q {queue}')
1104
1229
1105 def start(self, n):
1230 def start(self, n):
1106 """Start n copies of the process using LSF batch system.
1231 """Start n copies of the process using LSF batch system.
1107 This cant inherit from the base class because bsub expects
1232 This cant inherit from the base class because bsub expects
1108 to be piped a shell script in order to honor the #BSUB directives :
1233 to be piped a shell script in order to honor the #BSUB directives :
1109 bsub < script
1234 bsub < script
1110 """
1235 """
1111 # Here we save profile_dir in the context so they
1236 # Here we save profile_dir in the context so they
1112 # can be used in the batch script template as {profile_dir}
1237 # can be used in the batch script template as {profile_dir}
1113 self.write_batch_script(n)
1238 self.write_batch_script(n)
1114 #output = check_output(self.args, env=os.environ)
1239 #output = check_output(self.args, env=os.environ)
1115 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1240 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1116 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1241 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1117 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1242 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1118 output,err = p.communicate()
1243 output,err = p.communicate()
1119 job_id = self.parse_job_id(output)
1244 job_id = self.parse_job_id(output)
1120 self.notify_start(job_id)
1245 self.notify_start(job_id)
1121 return job_id
1246 return job_id
1122
1247
1123
1248
1124 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1249 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1125 """Launch a controller using LSF."""
1250 """Launch a controller using LSF."""
1126
1251
1127 batch_file_name = Unicode(u'lsf_controller', config=True,
1252 batch_file_name = Unicode(u'lsf_controller', config=True,
1128 help="batch file name for the controller job.")
1253 help="batch file name for the controller job.")
1129 default_template= Unicode("""#!/bin/sh
1254 default_template= Unicode("""#!/bin/sh
1130 #BSUB -J ipcontroller
1255 #BSUB -J ipcontroller
1131 #BSUB -oo ipcontroller.o.%%J
1256 #BSUB -oo ipcontroller.o.%%J
1132 #BSUB -eo ipcontroller.e.%%J
1257 #BSUB -eo ipcontroller.e.%%J
1133 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1258 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1134 """%(' '.join(ipcontroller_cmd_argv)))
1259 """%(' '.join(ipcontroller_cmd_argv)))
1135
1260
1136 def start(self):
1261 def start(self):
1137 """Start the controller by profile or profile_dir."""
1262 """Start the controller by profile or profile_dir."""
1138 return super(LSFControllerLauncher, self).start(1)
1263 return super(LSFControllerLauncher, self).start(1)
1139
1264
1140
1265
1141 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1266 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1142 """Launch Engines using LSF"""
1267 """Launch Engines using LSF"""
1143 batch_file_name = Unicode(u'lsf_engines', config=True,
1268 batch_file_name = Unicode(u'lsf_engines', config=True,
1144 help="batch file name for the engine(s) job.")
1269 help="batch file name for the engine(s) job.")
1145 default_template= Unicode(u"""#!/bin/sh
1270 default_template= Unicode(u"""#!/bin/sh
1146 #BSUB -oo ipengine.o.%%J
1271 #BSUB -oo ipengine.o.%%J
1147 #BSUB -eo ipengine.e.%%J
1272 #BSUB -eo ipengine.e.%%J
1148 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1273 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1149 """%(' '.join(ipengine_cmd_argv)))
1274 """%(' '.join(ipengine_cmd_argv)))
1150
1275
1151 def start(self, n):
1276 def start(self, n):
1152 """Start n engines by profile or profile_dir."""
1277 """Start n engines by profile or profile_dir."""
1153 return super(LSFEngineSetLauncher, self).start(n)
1278 return super(LSFEngineSetLauncher, self).start(n)
1154
1279
1155
1280
1156 #-----------------------------------------------------------------------------
1281 #-----------------------------------------------------------------------------
1157 # A launcher for ipcluster itself!
1282 # A launcher for ipcluster itself!
1158 #-----------------------------------------------------------------------------
1283 #-----------------------------------------------------------------------------
1159
1284
1160
1285
1161 class IPClusterLauncher(LocalProcessLauncher):
1286 class IPClusterLauncher(LocalProcessLauncher):
1162 """Launch the ipcluster program in an external process."""
1287 """Launch the ipcluster program in an external process."""
1163
1288
1164 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1289 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1165 help="Popen command for ipcluster")
1290 help="Popen command for ipcluster")
1166 ipcluster_args = List(
1291 ipcluster_args = List(
1167 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1292 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1168 help="Command line arguments to pass to ipcluster.")
1293 help="Command line arguments to pass to ipcluster.")
1169 ipcluster_subcommand = Unicode('start')
1294 ipcluster_subcommand = Unicode('start')
1170 profile = Unicode('default')
1295 profile = Unicode('default')
1171 n = Integer(2)
1296 n = Integer(2)
1172
1297
1173 def find_args(self):
1298 def find_args(self):
1174 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1299 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1175 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1300 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1176 self.ipcluster_args
1301 self.ipcluster_args
1177
1302
1178 def start(self):
1303 def start(self):
1179 return super(IPClusterLauncher, self).start()
1304 return super(IPClusterLauncher, self).start()
1180
1305
1181 #-----------------------------------------------------------------------------
1306 #-----------------------------------------------------------------------------
1182 # Collections of launchers
1307 # Collections of launchers
1183 #-----------------------------------------------------------------------------
1308 #-----------------------------------------------------------------------------
1184
1309
1185 local_launchers = [
1310 local_launchers = [
1186 LocalControllerLauncher,
1311 LocalControllerLauncher,
1187 LocalEngineLauncher,
1312 LocalEngineLauncher,
1188 LocalEngineSetLauncher,
1313 LocalEngineSetLauncher,
1189 ]
1314 ]
1190 mpi_launchers = [
1315 mpi_launchers = [
1191 MPILauncher,
1316 MPILauncher,
1192 MPIControllerLauncher,
1317 MPIControllerLauncher,
1193 MPIEngineSetLauncher,
1318 MPIEngineSetLauncher,
1194 ]
1319 ]
1195 ssh_launchers = [
1320 ssh_launchers = [
1196 SSHLauncher,
1321 SSHLauncher,
1197 SSHControllerLauncher,
1322 SSHControllerLauncher,
1198 SSHEngineLauncher,
1323 SSHEngineLauncher,
1199 SSHEngineSetLauncher,
1324 SSHEngineSetLauncher,
1200 ]
1325 ]
1201 winhpc_launchers = [
1326 winhpc_launchers = [
1202 WindowsHPCLauncher,
1327 WindowsHPCLauncher,
1203 WindowsHPCControllerLauncher,
1328 WindowsHPCControllerLauncher,
1204 WindowsHPCEngineSetLauncher,
1329 WindowsHPCEngineSetLauncher,
1205 ]
1330 ]
1206 pbs_launchers = [
1331 pbs_launchers = [
1207 PBSLauncher,
1332 PBSLauncher,
1208 PBSControllerLauncher,
1333 PBSControllerLauncher,
1209 PBSEngineSetLauncher,
1334 PBSEngineSetLauncher,
1210 ]
1335 ]
1211 sge_launchers = [
1336 sge_launchers = [
1212 SGELauncher,
1337 SGELauncher,
1213 SGEControllerLauncher,
1338 SGEControllerLauncher,
1214 SGEEngineSetLauncher,
1339 SGEEngineSetLauncher,
1215 ]
1340 ]
1216 lsf_launchers = [
1341 lsf_launchers = [
1217 LSFLauncher,
1342 LSFLauncher,
1218 LSFControllerLauncher,
1343 LSFControllerLauncher,
1219 LSFEngineSetLauncher,
1344 LSFEngineSetLauncher,
1220 ]
1345 ]
1221 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1346 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1222 + pbs_launchers + sge_launchers + lsf_launchers
1347 + pbs_launchers + sge_launchers + lsf_launchers
1223
1348
General Comments 0
You need to be logged in to leave comments. Login now