##// END OF EJS Templates
Use CRegExp trait for regular expressions.
Bradley M. Froehle -
Show More
@@ -1,1348 +1,1345 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import re
26 import stat
25 import stat
27 import time
26 import time
28
27
29 # signal imports, handling various platforms, versions
28 # signal imports, handling various platforms, versions
30
29
31 from signal import SIGINT, SIGTERM
30 from signal import SIGINT, SIGTERM
32 try:
31 try:
33 from signal import SIGKILL
32 from signal import SIGKILL
34 except ImportError:
33 except ImportError:
35 # Windows
34 # Windows
36 SIGKILL=SIGTERM
35 SIGKILL=SIGTERM
37
36
38 try:
37 try:
39 # Windows >= 2.7, 3.2
38 # Windows >= 2.7, 3.2
40 from signal import CTRL_C_EVENT as SIGINT
39 from signal import CTRL_C_EVENT as SIGINT
41 except ImportError:
40 except ImportError:
42 pass
41 pass
43
42
44 from subprocess import Popen, PIPE, STDOUT
43 from subprocess import Popen, PIPE, STDOUT
45 try:
44 try:
46 from subprocess import check_output
45 from subprocess import check_output
47 except ImportError:
46 except ImportError:
48 # pre-2.7, define check_output with Popen
47 # pre-2.7, define check_output with Popen
49 def check_output(*args, **kwargs):
48 def check_output(*args, **kwargs):
50 kwargs.update(dict(stdout=PIPE))
49 kwargs.update(dict(stdout=PIPE))
51 p = Popen(*args, **kwargs)
50 p = Popen(*args, **kwargs)
52 out,err = p.communicate()
51 out,err = p.communicate()
53 return out
52 return out
54
53
55 from zmq.eventloop import ioloop
54 from zmq.eventloop import ioloop
56
55
57 from IPython.config.application import Application
56 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
57 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
58 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import (
59 from IPython.utils.traitlets import (
61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
60 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 )
61 )
63 from IPython.utils.path import get_ipython_module_path, get_home_dir
62 from IPython.utils.path import get_ipython_module_path, get_home_dir
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
65
64
66 from .win32support import forward_read_events
65 from .win32support import forward_read_events
67
66
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
67 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69
68
70 WINDOWS = os.name == 'nt'
69 WINDOWS = os.name == 'nt'
71
70
72 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
73 # Paths to the kernel apps
72 # Paths to the kernel apps
74 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
75
74
76
75
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
76 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 'IPython.parallel.apps.ipclusterapp'
77 'IPython.parallel.apps.ipclusterapp'
79 ))
78 ))
80
79
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
80 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 'IPython.parallel.apps.ipengineapp'
81 'IPython.parallel.apps.ipengineapp'
83 ))
82 ))
84
83
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
84 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 'IPython.parallel.apps.ipcontrollerapp'
85 'IPython.parallel.apps.ipcontrollerapp'
87 ))
86 ))
88
87
89 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
90 # Base launchers and errors
89 # Base launchers and errors
91 #-----------------------------------------------------------------------------
90 #-----------------------------------------------------------------------------
92
91
93
92
94 class LauncherError(Exception):
93 class LauncherError(Exception):
95 pass
94 pass
96
95
97
96
98 class ProcessStateError(LauncherError):
97 class ProcessStateError(LauncherError):
99 pass
98 pass
100
99
101
100
102 class UnknownStatus(LauncherError):
101 class UnknownStatus(LauncherError):
103 pass
102 pass
104
103
105
104
106 class BaseLauncher(LoggingConfigurable):
105 class BaseLauncher(LoggingConfigurable):
107 """An asbtraction for starting, stopping and signaling a process."""
106 """An asbtraction for starting, stopping and signaling a process."""
108
107
109 # In all of the launchers, the work_dir is where child processes will be
108 # In all of the launchers, the work_dir is where child processes will be
110 # run. This will usually be the profile_dir, but may not be. any work_dir
109 # run. This will usually be the profile_dir, but may not be. any work_dir
111 # passed into the __init__ method will override the config value.
110 # passed into the __init__ method will override the config value.
112 # This should not be used to set the work_dir for the actual engine
111 # This should not be used to set the work_dir for the actual engine
113 # and controller. Instead, use their own config files or the
112 # and controller. Instead, use their own config files or the
114 # controller_args, engine_args attributes of the launchers to add
113 # controller_args, engine_args attributes of the launchers to add
115 # the work_dir option.
114 # the work_dir option.
116 work_dir = Unicode(u'.')
115 work_dir = Unicode(u'.')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
116 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118
117
119 start_data = Any()
118 start_data = Any()
120 stop_data = Any()
119 stop_data = Any()
121
120
122 def _loop_default(self):
121 def _loop_default(self):
123 return ioloop.IOLoop.instance()
122 return ioloop.IOLoop.instance()
124
123
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
124 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
125 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 self.state = 'before' # can be before, running, after
126 self.state = 'before' # can be before, running, after
128 self.stop_callbacks = []
127 self.stop_callbacks = []
129 self.start_data = None
128 self.start_data = None
130 self.stop_data = None
129 self.stop_data = None
131
130
132 @property
131 @property
133 def args(self):
132 def args(self):
134 """A list of cmd and args that will be used to start the process.
133 """A list of cmd and args that will be used to start the process.
135
134
136 This is what is passed to :func:`spawnProcess` and the first element
135 This is what is passed to :func:`spawnProcess` and the first element
137 will be the process name.
136 will be the process name.
138 """
137 """
139 return self.find_args()
138 return self.find_args()
140
139
141 def find_args(self):
140 def find_args(self):
142 """The ``.args`` property calls this to find the args list.
141 """The ``.args`` property calls this to find the args list.
143
142
144 Subcommand should implement this to construct the cmd and args.
143 Subcommand should implement this to construct the cmd and args.
145 """
144 """
146 raise NotImplementedError('find_args must be implemented in a subclass')
145 raise NotImplementedError('find_args must be implemented in a subclass')
147
146
148 @property
147 @property
149 def arg_str(self):
148 def arg_str(self):
150 """The string form of the program arguments."""
149 """The string form of the program arguments."""
151 return ' '.join(self.args)
150 return ' '.join(self.args)
152
151
153 @property
152 @property
154 def running(self):
153 def running(self):
155 """Am I running."""
154 """Am I running."""
156 if self.state == 'running':
155 if self.state == 'running':
157 return True
156 return True
158 else:
157 else:
159 return False
158 return False
160
159
161 def start(self):
160 def start(self):
162 """Start the process."""
161 """Start the process."""
163 raise NotImplementedError('start must be implemented in a subclass')
162 raise NotImplementedError('start must be implemented in a subclass')
164
163
165 def stop(self):
164 def stop(self):
166 """Stop the process and notify observers of stopping.
165 """Stop the process and notify observers of stopping.
167
166
168 This method will return None immediately.
167 This method will return None immediately.
169 To observe the actual process stopping, see :meth:`on_stop`.
168 To observe the actual process stopping, see :meth:`on_stop`.
170 """
169 """
171 raise NotImplementedError('stop must be implemented in a subclass')
170 raise NotImplementedError('stop must be implemented in a subclass')
172
171
173 def on_stop(self, f):
172 def on_stop(self, f):
174 """Register a callback to be called with this Launcher's stop_data
173 """Register a callback to be called with this Launcher's stop_data
175 when the process actually finishes.
174 when the process actually finishes.
176 """
175 """
177 if self.state=='after':
176 if self.state=='after':
178 return f(self.stop_data)
177 return f(self.stop_data)
179 else:
178 else:
180 self.stop_callbacks.append(f)
179 self.stop_callbacks.append(f)
181
180
182 def notify_start(self, data):
181 def notify_start(self, data):
183 """Call this to trigger startup actions.
182 """Call this to trigger startup actions.
184
183
185 This logs the process startup and sets the state to 'running'. It is
184 This logs the process startup and sets the state to 'running'. It is
186 a pass-through so it can be used as a callback.
185 a pass-through so it can be used as a callback.
187 """
186 """
188
187
189 self.log.debug('Process %r started: %r', self.args[0], data)
188 self.log.debug('Process %r started: %r', self.args[0], data)
190 self.start_data = data
189 self.start_data = data
191 self.state = 'running'
190 self.state = 'running'
192 return data
191 return data
193
192
194 def notify_stop(self, data):
193 def notify_stop(self, data):
195 """Call this to trigger process stop actions.
194 """Call this to trigger process stop actions.
196
195
197 This logs the process stopping and sets the state to 'after'. Call
196 This logs the process stopping and sets the state to 'after'. Call
198 this to trigger callbacks registered via :meth:`on_stop`."""
197 this to trigger callbacks registered via :meth:`on_stop`."""
199
198
200 self.log.debug('Process %r stopped: %r', self.args[0], data)
199 self.log.debug('Process %r stopped: %r', self.args[0], data)
201 self.stop_data = data
200 self.stop_data = data
202 self.state = 'after'
201 self.state = 'after'
203 for i in range(len(self.stop_callbacks)):
202 for i in range(len(self.stop_callbacks)):
204 d = self.stop_callbacks.pop()
203 d = self.stop_callbacks.pop()
205 d(data)
204 d(data)
206 return data
205 return data
207
206
208 def signal(self, sig):
207 def signal(self, sig):
209 """Signal the process.
208 """Signal the process.
210
209
211 Parameters
210 Parameters
212 ----------
211 ----------
213 sig : str or int
212 sig : str or int
214 'KILL', 'INT', etc., or any signal number
213 'KILL', 'INT', etc., or any signal number
215 """
214 """
216 raise NotImplementedError('signal must be implemented in a subclass')
215 raise NotImplementedError('signal must be implemented in a subclass')
217
216
218 class ClusterAppMixin(HasTraits):
217 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
218 """MixIn for cluster args as traits"""
220 profile_dir=Unicode('')
219 profile_dir=Unicode('')
221 cluster_id=Unicode('')
220 cluster_id=Unicode('')
222
221
223 @property
222 @property
224 def cluster_args(self):
223 def cluster_args(self):
225 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
224 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
226
225
227 class ControllerMixin(ClusterAppMixin):
226 class ControllerMixin(ClusterAppMixin):
228 controller_cmd = List(ipcontroller_cmd_argv, config=True,
227 controller_cmd = List(ipcontroller_cmd_argv, config=True,
229 help="""Popen command to launch ipcontroller.""")
228 help="""Popen command to launch ipcontroller.""")
230 # Command line arguments to ipcontroller.
229 # Command line arguments to ipcontroller.
231 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
230 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
232 help="""command-line args to pass to ipcontroller""")
231 help="""command-line args to pass to ipcontroller""")
233
232
234 class EngineMixin(ClusterAppMixin):
233 class EngineMixin(ClusterAppMixin):
235 engine_cmd = List(ipengine_cmd_argv, config=True,
234 engine_cmd = List(ipengine_cmd_argv, config=True,
236 help="""command to launch the Engine.""")
235 help="""command to launch the Engine.""")
237 # Command line arguments for ipengine.
236 # Command line arguments for ipengine.
238 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
237 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
239 help="command-line arguments to pass to ipengine"
238 help="command-line arguments to pass to ipengine"
240 )
239 )
241
240
242
241
243 #-----------------------------------------------------------------------------
242 #-----------------------------------------------------------------------------
244 # Local process launchers
243 # Local process launchers
245 #-----------------------------------------------------------------------------
244 #-----------------------------------------------------------------------------
246
245
247
246
248 class LocalProcessLauncher(BaseLauncher):
247 class LocalProcessLauncher(BaseLauncher):
249 """Start and stop an external process in an asynchronous manner.
248 """Start and stop an external process in an asynchronous manner.
250
249
251 This will launch the external process with a working directory of
250 This will launch the external process with a working directory of
252 ``self.work_dir``.
251 ``self.work_dir``.
253 """
252 """
254
253
255 # This is used to to construct self.args, which is passed to
254 # This is used to to construct self.args, which is passed to
256 # spawnProcess.
255 # spawnProcess.
257 cmd_and_args = List([])
256 cmd_and_args = List([])
258 poll_frequency = Integer(100) # in ms
257 poll_frequency = Integer(100) # in ms
259
258
260 def __init__(self, work_dir=u'.', config=None, **kwargs):
259 def __init__(self, work_dir=u'.', config=None, **kwargs):
261 super(LocalProcessLauncher, self).__init__(
260 super(LocalProcessLauncher, self).__init__(
262 work_dir=work_dir, config=config, **kwargs
261 work_dir=work_dir, config=config, **kwargs
263 )
262 )
264 self.process = None
263 self.process = None
265 self.poller = None
264 self.poller = None
266
265
267 def find_args(self):
266 def find_args(self):
268 return self.cmd_and_args
267 return self.cmd_and_args
269
268
270 def start(self):
269 def start(self):
271 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
270 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
272 if self.state == 'before':
271 if self.state == 'before':
273 self.process = Popen(self.args,
272 self.process = Popen(self.args,
274 stdout=PIPE,stderr=PIPE,stdin=PIPE,
273 stdout=PIPE,stderr=PIPE,stdin=PIPE,
275 env=os.environ,
274 env=os.environ,
276 cwd=self.work_dir
275 cwd=self.work_dir
277 )
276 )
278 if WINDOWS:
277 if WINDOWS:
279 self.stdout = forward_read_events(self.process.stdout)
278 self.stdout = forward_read_events(self.process.stdout)
280 self.stderr = forward_read_events(self.process.stderr)
279 self.stderr = forward_read_events(self.process.stderr)
281 else:
280 else:
282 self.stdout = self.process.stdout.fileno()
281 self.stdout = self.process.stdout.fileno()
283 self.stderr = self.process.stderr.fileno()
282 self.stderr = self.process.stderr.fileno()
284 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
283 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
285 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
284 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
286 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
285 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
287 self.poller.start()
286 self.poller.start()
288 self.notify_start(self.process.pid)
287 self.notify_start(self.process.pid)
289 else:
288 else:
290 s = 'The process was already started and has state: %r' % self.state
289 s = 'The process was already started and has state: %r' % self.state
291 raise ProcessStateError(s)
290 raise ProcessStateError(s)
292
291
293 def stop(self):
292 def stop(self):
294 return self.interrupt_then_kill()
293 return self.interrupt_then_kill()
295
294
296 def signal(self, sig):
295 def signal(self, sig):
297 if self.state == 'running':
296 if self.state == 'running':
298 if WINDOWS and sig != SIGINT:
297 if WINDOWS and sig != SIGINT:
299 # use Windows tree-kill for better child cleanup
298 # use Windows tree-kill for better child cleanup
300 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
299 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
301 else:
300 else:
302 self.process.send_signal(sig)
301 self.process.send_signal(sig)
303
302
304 def interrupt_then_kill(self, delay=2.0):
303 def interrupt_then_kill(self, delay=2.0):
305 """Send INT, wait a delay and then send KILL."""
304 """Send INT, wait a delay and then send KILL."""
306 try:
305 try:
307 self.signal(SIGINT)
306 self.signal(SIGINT)
308 except Exception:
307 except Exception:
309 self.log.debug("interrupt failed")
308 self.log.debug("interrupt failed")
310 pass
309 pass
311 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
310 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
312 self.killer.start()
311 self.killer.start()
313
312
314 # callbacks, etc:
313 # callbacks, etc:
315
314
316 def handle_stdout(self, fd, events):
315 def handle_stdout(self, fd, events):
317 if WINDOWS:
316 if WINDOWS:
318 line = self.stdout.recv()
317 line = self.stdout.recv()
319 else:
318 else:
320 line = self.process.stdout.readline()
319 line = self.process.stdout.readline()
321 # a stopped process will be readable but return empty strings
320 # a stopped process will be readable but return empty strings
322 if line:
321 if line:
323 self.log.debug(line[:-1])
322 self.log.debug(line[:-1])
324 else:
323 else:
325 self.poll()
324 self.poll()
326
325
327 def handle_stderr(self, fd, events):
326 def handle_stderr(self, fd, events):
328 if WINDOWS:
327 if WINDOWS:
329 line = self.stderr.recv()
328 line = self.stderr.recv()
330 else:
329 else:
331 line = self.process.stderr.readline()
330 line = self.process.stderr.readline()
332 # a stopped process will be readable but return empty strings
331 # a stopped process will be readable but return empty strings
333 if line:
332 if line:
334 self.log.debug(line[:-1])
333 self.log.debug(line[:-1])
335 else:
334 else:
336 self.poll()
335 self.poll()
337
336
338 def poll(self):
337 def poll(self):
339 status = self.process.poll()
338 status = self.process.poll()
340 if status is not None:
339 if status is not None:
341 self.poller.stop()
340 self.poller.stop()
342 self.loop.remove_handler(self.stdout)
341 self.loop.remove_handler(self.stdout)
343 self.loop.remove_handler(self.stderr)
342 self.loop.remove_handler(self.stderr)
344 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
343 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
345 return status
344 return status
346
345
347 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
346 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
348 """Launch a controller as a regular external process."""
347 """Launch a controller as a regular external process."""
349
348
350 def find_args(self):
349 def find_args(self):
351 return self.controller_cmd + self.cluster_args + self.controller_args
350 return self.controller_cmd + self.cluster_args + self.controller_args
352
351
353 def start(self):
352 def start(self):
354 """Start the controller by profile_dir."""
353 """Start the controller by profile_dir."""
355 return super(LocalControllerLauncher, self).start()
354 return super(LocalControllerLauncher, self).start()
356
355
357
356
358 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
357 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
359 """Launch a single engine as a regular externall process."""
358 """Launch a single engine as a regular externall process."""
360
359
361 def find_args(self):
360 def find_args(self):
362 return self.engine_cmd + self.cluster_args + self.engine_args
361 return self.engine_cmd + self.cluster_args + self.engine_args
363
362
364
363
365 class LocalEngineSetLauncher(LocalEngineLauncher):
364 class LocalEngineSetLauncher(LocalEngineLauncher):
366 """Launch a set of engines as regular external processes."""
365 """Launch a set of engines as regular external processes."""
367
366
368 delay = CFloat(0.1, config=True,
367 delay = CFloat(0.1, config=True,
369 help="""delay (in seconds) between starting each engine after the first.
368 help="""delay (in seconds) between starting each engine after the first.
370 This can help force the engines to get their ids in order, or limit
369 This can help force the engines to get their ids in order, or limit
371 process flood when starting many engines."""
370 process flood when starting many engines."""
372 )
371 )
373
372
374 # launcher class
373 # launcher class
375 launcher_class = LocalEngineLauncher
374 launcher_class = LocalEngineLauncher
376
375
377 launchers = Dict()
376 launchers = Dict()
378 stop_data = Dict()
377 stop_data = Dict()
379
378
380 def __init__(self, work_dir=u'.', config=None, **kwargs):
379 def __init__(self, work_dir=u'.', config=None, **kwargs):
381 super(LocalEngineSetLauncher, self).__init__(
380 super(LocalEngineSetLauncher, self).__init__(
382 work_dir=work_dir, config=config, **kwargs
381 work_dir=work_dir, config=config, **kwargs
383 )
382 )
384 self.stop_data = {}
383 self.stop_data = {}
385
384
386 def start(self, n):
385 def start(self, n):
387 """Start n engines by profile or profile_dir."""
386 """Start n engines by profile or profile_dir."""
388 dlist = []
387 dlist = []
389 for i in range(n):
388 for i in range(n):
390 if i > 0:
389 if i > 0:
391 time.sleep(self.delay)
390 time.sleep(self.delay)
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
391 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
393 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
392 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
394 )
393 )
395
394
396 # Copy the engine args over to each engine launcher.
395 # Copy the engine args over to each engine launcher.
397 el.engine_cmd = copy.deepcopy(self.engine_cmd)
396 el.engine_cmd = copy.deepcopy(self.engine_cmd)
398 el.engine_args = copy.deepcopy(self.engine_args)
397 el.engine_args = copy.deepcopy(self.engine_args)
399 el.on_stop(self._notice_engine_stopped)
398 el.on_stop(self._notice_engine_stopped)
400 d = el.start()
399 d = el.start()
401 self.launchers[i] = el
400 self.launchers[i] = el
402 dlist.append(d)
401 dlist.append(d)
403 self.notify_start(dlist)
402 self.notify_start(dlist)
404 return dlist
403 return dlist
405
404
406 def find_args(self):
405 def find_args(self):
407 return ['engine set']
406 return ['engine set']
408
407
409 def signal(self, sig):
408 def signal(self, sig):
410 dlist = []
409 dlist = []
411 for el in self.launchers.itervalues():
410 for el in self.launchers.itervalues():
412 d = el.signal(sig)
411 d = el.signal(sig)
413 dlist.append(d)
412 dlist.append(d)
414 return dlist
413 return dlist
415
414
416 def interrupt_then_kill(self, delay=1.0):
415 def interrupt_then_kill(self, delay=1.0):
417 dlist = []
416 dlist = []
418 for el in self.launchers.itervalues():
417 for el in self.launchers.itervalues():
419 d = el.interrupt_then_kill(delay)
418 d = el.interrupt_then_kill(delay)
420 dlist.append(d)
419 dlist.append(d)
421 return dlist
420 return dlist
422
421
423 def stop(self):
422 def stop(self):
424 return self.interrupt_then_kill()
423 return self.interrupt_then_kill()
425
424
426 def _notice_engine_stopped(self, data):
425 def _notice_engine_stopped(self, data):
427 pid = data['pid']
426 pid = data['pid']
428 for idx,el in self.launchers.iteritems():
427 for idx,el in self.launchers.iteritems():
429 if el.process.pid == pid:
428 if el.process.pid == pid:
430 break
429 break
431 self.launchers.pop(idx)
430 self.launchers.pop(idx)
432 self.stop_data[idx] = data
431 self.stop_data[idx] = data
433 if not self.launchers:
432 if not self.launchers:
434 self.notify_stop(self.stop_data)
433 self.notify_stop(self.stop_data)
435
434
436
435
437 #-----------------------------------------------------------------------------
436 #-----------------------------------------------------------------------------
438 # MPI launchers
437 # MPI launchers
439 #-----------------------------------------------------------------------------
438 #-----------------------------------------------------------------------------
440
439
441
440
442 class MPILauncher(LocalProcessLauncher):
441 class MPILauncher(LocalProcessLauncher):
443 """Launch an external process using mpiexec."""
442 """Launch an external process using mpiexec."""
444
443
445 mpi_cmd = List(['mpiexec'], config=True,
444 mpi_cmd = List(['mpiexec'], config=True,
446 help="The mpiexec command to use in starting the process."
445 help="The mpiexec command to use in starting the process."
447 )
446 )
448 mpi_args = List([], config=True,
447 mpi_args = List([], config=True,
449 help="The command line arguments to pass to mpiexec."
448 help="The command line arguments to pass to mpiexec."
450 )
449 )
451 program = List(['date'],
450 program = List(['date'],
452 help="The program to start via mpiexec.")
451 help="The program to start via mpiexec.")
453 program_args = List([],
452 program_args = List([],
454 help="The command line argument to the program."
453 help="The command line argument to the program."
455 )
454 )
456 n = Integer(1)
455 n = Integer(1)
457
456
458 def __init__(self, *args, **kwargs):
457 def __init__(self, *args, **kwargs):
459 # deprecation for old MPIExec names:
458 # deprecation for old MPIExec names:
460 config = kwargs.get('config', {})
459 config = kwargs.get('config', {})
461 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
460 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
462 deprecated = config.get(oldname)
461 deprecated = config.get(oldname)
463 if deprecated:
462 if deprecated:
464 newname = oldname.replace('MPIExec', 'MPI')
463 newname = oldname.replace('MPIExec', 'MPI')
465 config[newname].update(deprecated)
464 config[newname].update(deprecated)
466 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
465 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
467
466
468 super(MPILauncher, self).__init__(*args, **kwargs)
467 super(MPILauncher, self).__init__(*args, **kwargs)
469
468
470 def find_args(self):
469 def find_args(self):
471 """Build self.args using all the fields."""
470 """Build self.args using all the fields."""
472 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
471 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
473 self.program + self.program_args
472 self.program + self.program_args
474
473
475 def start(self, n):
474 def start(self, n):
476 """Start n instances of the program using mpiexec."""
475 """Start n instances of the program using mpiexec."""
477 self.n = n
476 self.n = n
478 return super(MPILauncher, self).start()
477 return super(MPILauncher, self).start()
479
478
480
479
481 class MPIControllerLauncher(MPILauncher, ControllerMixin):
480 class MPIControllerLauncher(MPILauncher, ControllerMixin):
482 """Launch a controller using mpiexec."""
481 """Launch a controller using mpiexec."""
483
482
484 # alias back to *non-configurable* program[_args] for use in find_args()
483 # alias back to *non-configurable* program[_args] for use in find_args()
485 # this way all Controller/EngineSetLaunchers have the same form, rather
484 # this way all Controller/EngineSetLaunchers have the same form, rather
486 # than *some* having `program_args` and others `controller_args`
485 # than *some* having `program_args` and others `controller_args`
487 @property
486 @property
488 def program(self):
487 def program(self):
489 return self.controller_cmd
488 return self.controller_cmd
490
489
491 @property
490 @property
492 def program_args(self):
491 def program_args(self):
493 return self.cluster_args + self.controller_args
492 return self.cluster_args + self.controller_args
494
493
495 def start(self):
494 def start(self):
496 """Start the controller by profile_dir."""
495 """Start the controller by profile_dir."""
497 return super(MPIControllerLauncher, self).start(1)
496 return super(MPIControllerLauncher, self).start(1)
498
497
499
498
500 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
499 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
501 """Launch engines using mpiexec"""
500 """Launch engines using mpiexec"""
502
501
503 # alias back to *non-configurable* program[_args] for use in find_args()
502 # alias back to *non-configurable* program[_args] for use in find_args()
504 # this way all Controller/EngineSetLaunchers have the same form, rather
503 # this way all Controller/EngineSetLaunchers have the same form, rather
505 # than *some* having `program_args` and others `controller_args`
504 # than *some* having `program_args` and others `controller_args`
506 @property
505 @property
507 def program(self):
506 def program(self):
508 return self.engine_cmd
507 return self.engine_cmd
509
508
510 @property
509 @property
511 def program_args(self):
510 def program_args(self):
512 return self.cluster_args + self.engine_args
511 return self.cluster_args + self.engine_args
513
512
514 def start(self, n):
513 def start(self, n):
515 """Start n engines by profile or profile_dir."""
514 """Start n engines by profile or profile_dir."""
516 self.n = n
515 self.n = n
517 return super(MPIEngineSetLauncher, self).start(n)
516 return super(MPIEngineSetLauncher, self).start(n)
518
517
519 # deprecated MPIExec names
518 # deprecated MPIExec names
520 class DeprecatedMPILauncher(object):
519 class DeprecatedMPILauncher(object):
521 def warn(self):
520 def warn(self):
522 oldname = self.__class__.__name__
521 oldname = self.__class__.__name__
523 newname = oldname.replace('MPIExec', 'MPI')
522 newname = oldname.replace('MPIExec', 'MPI')
524 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
523 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
525
524
526 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
525 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
527 """Deprecated, use MPILauncher"""
526 """Deprecated, use MPILauncher"""
528 def __init__(self, *args, **kwargs):
527 def __init__(self, *args, **kwargs):
529 super(MPIExecLauncher, self).__init__(*args, **kwargs)
528 super(MPIExecLauncher, self).__init__(*args, **kwargs)
530 self.warn()
529 self.warn()
531
530
532 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
531 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
533 """Deprecated, use MPIControllerLauncher"""
532 """Deprecated, use MPIControllerLauncher"""
534 def __init__(self, *args, **kwargs):
533 def __init__(self, *args, **kwargs):
535 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
534 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
536 self.warn()
535 self.warn()
537
536
538 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
537 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
539 """Deprecated, use MPIEngineSetLauncher"""
538 """Deprecated, use MPIEngineSetLauncher"""
540 def __init__(self, *args, **kwargs):
539 def __init__(self, *args, **kwargs):
541 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
540 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
542 self.warn()
541 self.warn()
543
542
544
543
545 #-----------------------------------------------------------------------------
544 #-----------------------------------------------------------------------------
546 # SSH launchers
545 # SSH launchers
547 #-----------------------------------------------------------------------------
546 #-----------------------------------------------------------------------------
548
547
549 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
548 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
550
549
551 class SSHLauncher(LocalProcessLauncher):
550 class SSHLauncher(LocalProcessLauncher):
552 """A minimal launcher for ssh.
551 """A minimal launcher for ssh.
553
552
554 To be useful this will probably have to be extended to use the ``sshx``
553 To be useful this will probably have to be extended to use the ``sshx``
555 idea for environment variables. There could be other things this needs
554 idea for environment variables. There could be other things this needs
556 as well.
555 as well.
557 """
556 """
558
557
559 ssh_cmd = List(['ssh'], config=True,
558 ssh_cmd = List(['ssh'], config=True,
560 help="command for starting ssh")
559 help="command for starting ssh")
561 ssh_args = List(['-tt'], config=True,
560 ssh_args = List(['-tt'], config=True,
562 help="args to pass to ssh")
561 help="args to pass to ssh")
563 scp_cmd = List(['scp'], config=True,
562 scp_cmd = List(['scp'], config=True,
564 help="command for sending files")
563 help="command for sending files")
565 program = List(['date'],
564 program = List(['date'],
566 help="Program to launch via ssh")
565 help="Program to launch via ssh")
567 program_args = List([],
566 program_args = List([],
568 help="args to pass to remote program")
567 help="args to pass to remote program")
569 hostname = Unicode('', config=True,
568 hostname = Unicode('', config=True,
570 help="hostname on which to launch the program")
569 help="hostname on which to launch the program")
571 user = Unicode('', config=True,
570 user = Unicode('', config=True,
572 help="username for ssh")
571 help="username for ssh")
573 location = Unicode('', config=True,
572 location = Unicode('', config=True,
574 help="user@hostname location for ssh in one setting")
573 help="user@hostname location for ssh in one setting")
575 to_fetch = List([], config=True,
574 to_fetch = List([], config=True,
576 help="List of (remote, local) files to fetch after starting")
575 help="List of (remote, local) files to fetch after starting")
577 to_send = List([], config=True,
576 to_send = List([], config=True,
578 help="List of (local, remote) files to send before starting")
577 help="List of (local, remote) files to send before starting")
579
578
580 def _hostname_changed(self, name, old, new):
579 def _hostname_changed(self, name, old, new):
581 if self.user:
580 if self.user:
582 self.location = u'%s@%s' % (self.user, new)
581 self.location = u'%s@%s' % (self.user, new)
583 else:
582 else:
584 self.location = new
583 self.location = new
585
584
586 def _user_changed(self, name, old, new):
585 def _user_changed(self, name, old, new):
587 self.location = u'%s@%s' % (new, self.hostname)
586 self.location = u'%s@%s' % (new, self.hostname)
588
587
589 def find_args(self):
588 def find_args(self):
590 return self.ssh_cmd + self.ssh_args + [self.location] + \
589 return self.ssh_cmd + self.ssh_args + [self.location] + \
591 self.program + self.program_args
590 self.program + self.program_args
592
591
593 def _send_file(self, local, remote):
592 def _send_file(self, local, remote):
594 """send a single file"""
593 """send a single file"""
595 remote = "%s:%s" % (self.location, remote)
594 remote = "%s:%s" % (self.location, remote)
596 for i in range(10):
595 for i in range(10):
597 if not os.path.exists(local):
596 if not os.path.exists(local):
598 self.log.debug("waiting for %s" % local)
597 self.log.debug("waiting for %s" % local)
599 time.sleep(1)
598 time.sleep(1)
600 else:
599 else:
601 break
600 break
602 self.log.info("sending %s to %s", local, remote)
601 self.log.info("sending %s to %s", local, remote)
603 check_output(self.scp_cmd + [local, remote])
602 check_output(self.scp_cmd + [local, remote])
604
603
605 def send_files(self):
604 def send_files(self):
606 """send our files (called before start)"""
605 """send our files (called before start)"""
607 if not self.to_send:
606 if not self.to_send:
608 return
607 return
609 for local_file, remote_file in self.to_send:
608 for local_file, remote_file in self.to_send:
610 self._send_file(local_file, remote_file)
609 self._send_file(local_file, remote_file)
611
610
612 def _fetch_file(self, remote, local):
611 def _fetch_file(self, remote, local):
613 """fetch a single file"""
612 """fetch a single file"""
614 full_remote = "%s:%s" % (self.location, remote)
613 full_remote = "%s:%s" % (self.location, remote)
615 self.log.info("fetching %s from %s", local, full_remote)
614 self.log.info("fetching %s from %s", local, full_remote)
616 for i in range(10):
615 for i in range(10):
617 # wait up to 10s for remote file to exist
616 # wait up to 10s for remote file to exist
618 check = check_output(self.ssh_cmd + self.ssh_args + \
617 check = check_output(self.ssh_cmd + self.ssh_args + \
619 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
618 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
620 check = check.strip()
619 check = check.strip()
621 if check == 'no':
620 if check == 'no':
622 time.sleep(1)
621 time.sleep(1)
623 elif check == 'yes':
622 elif check == 'yes':
624 break
623 break
625 check_output(self.scp_cmd + [full_remote, local])
624 check_output(self.scp_cmd + [full_remote, local])
626
625
627 def fetch_files(self):
626 def fetch_files(self):
628 """fetch remote files (called after start)"""
627 """fetch remote files (called after start)"""
629 if not self.to_fetch:
628 if not self.to_fetch:
630 return
629 return
631 for remote_file, local_file in self.to_fetch:
630 for remote_file, local_file in self.to_fetch:
632 self._fetch_file(remote_file, local_file)
631 self._fetch_file(remote_file, local_file)
633
632
634 def start(self, hostname=None, user=None):
633 def start(self, hostname=None, user=None):
635 if hostname is not None:
634 if hostname is not None:
636 self.hostname = hostname
635 self.hostname = hostname
637 if user is not None:
636 if user is not None:
638 self.user = user
637 self.user = user
639
638
640 self.send_files()
639 self.send_files()
641 super(SSHLauncher, self).start()
640 super(SSHLauncher, self).start()
642 self.fetch_files()
641 self.fetch_files()
643
642
644 def signal(self, sig):
643 def signal(self, sig):
645 if self.state == 'running':
644 if self.state == 'running':
646 # send escaped ssh connection-closer
645 # send escaped ssh connection-closer
647 self.process.stdin.write('~.')
646 self.process.stdin.write('~.')
648 self.process.stdin.flush()
647 self.process.stdin.flush()
649
648
650 class SSHClusterLauncher(SSHLauncher):
649 class SSHClusterLauncher(SSHLauncher):
651
650
652 remote_profile_dir = Unicode('', config=True,
651 remote_profile_dir = Unicode('', config=True,
653 help="""The remote profile_dir to use.
652 help="""The remote profile_dir to use.
654
653
655 If not specified, use calling profile, stripping out possible leading homedir.
654 If not specified, use calling profile, stripping out possible leading homedir.
656 """)
655 """)
657
656
658 def _remote_profie_dir_default(self):
657 def _remote_profie_dir_default(self):
659 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
658 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo
660 """
659 """
661 home = get_home_dir()
660 home = get_home_dir()
662 if not home.endswith('/'):
661 if not home.endswith('/'):
663 home = home+'/'
662 home = home+'/'
664
663
665 if self.profile_dir.startswith(home):
664 if self.profile_dir.startswith(home):
666 return self.profile_dir[len(home):]
665 return self.profile_dir[len(home):]
667 else:
666 else:
668 return self.profile_dir
667 return self.profile_dir
669
668
670 def _cluster_id_changed(self, name, old, new):
669 def _cluster_id_changed(self, name, old, new):
671 if new:
670 if new:
672 raise ValueError("cluster id not supported by SSH launchers")
671 raise ValueError("cluster id not supported by SSH launchers")
673
672
674 @property
673 @property
675 def cluster_args(self):
674 def cluster_args(self):
676 return ['--profile-dir', self.remote_profile_dir]
675 return ['--profile-dir', self.remote_profile_dir]
677
676
678 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
677 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
679
678
680 # alias back to *non-configurable* program[_args] for use in find_args()
679 # alias back to *non-configurable* program[_args] for use in find_args()
681 # this way all Controller/EngineSetLaunchers have the same form, rather
680 # this way all Controller/EngineSetLaunchers have the same form, rather
682 # than *some* having `program_args` and others `controller_args`
681 # than *some* having `program_args` and others `controller_args`
683
682
684 def _controller_cmd_default(self):
683 def _controller_cmd_default(self):
685 return ['ipcontroller']
684 return ['ipcontroller']
686
685
687 @property
686 @property
688 def program(self):
687 def program(self):
689 return self.controller_cmd
688 return self.controller_cmd
690
689
691 @property
690 @property
692 def program_args(self):
691 def program_args(self):
693 return self.cluster_args + self.controller_args
692 return self.cluster_args + self.controller_args
694
693
695 def _to_fetch_default(self):
694 def _to_fetch_default(self):
696 return [
695 return [
697 (os.path.join(self.remote_profile_dir, 'security', cf),
696 (os.path.join(self.remote_profile_dir, 'security', cf),
698 os.path.join(self.profile_dir, 'security', cf),)
697 os.path.join(self.profile_dir, 'security', cf),)
699 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
698 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
700 ]
699 ]
701
700
702 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
701 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
703
702
704 # alias back to *non-configurable* program[_args] for use in find_args()
703 # alias back to *non-configurable* program[_args] for use in find_args()
705 # this way all Controller/EngineSetLaunchers have the same form, rather
704 # this way all Controller/EngineSetLaunchers have the same form, rather
706 # than *some* having `program_args` and others `controller_args`
705 # than *some* having `program_args` and others `controller_args`
707
706
708 def _engine_cmd_default(self):
707 def _engine_cmd_default(self):
709 return ['ipengine']
708 return ['ipengine']
710
709
711 @property
710 @property
712 def program(self):
711 def program(self):
713 return self.engine_cmd
712 return self.engine_cmd
714
713
715 @property
714 @property
716 def program_args(self):
715 def program_args(self):
717 return self.cluster_args + self.engine_args
716 return self.cluster_args + self.engine_args
718
717
719 def _to_send_default(self):
718 def _to_send_default(self):
720 return [
719 return [
721 (os.path.join(self.profile_dir, 'security', cf),
720 (os.path.join(self.profile_dir, 'security', cf),
722 os.path.join(self.remote_profile_dir, 'security', cf))
721 os.path.join(self.remote_profile_dir, 'security', cf))
723 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
722 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
724 ]
723 ]
725
724
726
725
727 class SSHEngineSetLauncher(LocalEngineSetLauncher):
726 class SSHEngineSetLauncher(LocalEngineSetLauncher):
728 launcher_class = SSHEngineLauncher
727 launcher_class = SSHEngineLauncher
729 engines = Dict(config=True,
728 engines = Dict(config=True,
730 help="""dict of engines to launch. This is a dict by hostname of ints,
729 help="""dict of engines to launch. This is a dict by hostname of ints,
731 corresponding to the number of engines to start on that host.""")
730 corresponding to the number of engines to start on that host.""")
732
731
733 @property
732 @property
734 def engine_count(self):
733 def engine_count(self):
735 """determine engine count from `engines` dict"""
734 """determine engine count from `engines` dict"""
736 count = 0
735 count = 0
737 for n in self.engines.itervalues():
736 for n in self.engines.itervalues():
738 if isinstance(n, (tuple,list)):
737 if isinstance(n, (tuple,list)):
739 n,args = n
738 n,args = n
740 count += n
739 count += n
741 return count
740 return count
742
741
743 def start(self, n):
742 def start(self, n):
744 """Start engines by profile or profile_dir.
743 """Start engines by profile or profile_dir.
745 `n` is ignored, and the `engines` config property is used instead.
744 `n` is ignored, and the `engines` config property is used instead.
746 """
745 """
747
746
748 dlist = []
747 dlist = []
749 for host, n in self.engines.iteritems():
748 for host, n in self.engines.iteritems():
750 if isinstance(n, (tuple, list)):
749 if isinstance(n, (tuple, list)):
751 n, args = n
750 n, args = n
752 else:
751 else:
753 args = copy.deepcopy(self.engine_args)
752 args = copy.deepcopy(self.engine_args)
754
753
755 if '@' in host:
754 if '@' in host:
756 user,host = host.split('@',1)
755 user,host = host.split('@',1)
757 else:
756 else:
758 user=None
757 user=None
759 for i in range(n):
758 for i in range(n):
760 if i > 0:
759 if i > 0:
761 time.sleep(self.delay)
760 time.sleep(self.delay)
762 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
761 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
763 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
762 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
764 )
763 )
765 if i > 0:
764 if i > 0:
766 # only send files for the first engine on each host
765 # only send files for the first engine on each host
767 el.to_send = []
766 el.to_send = []
768
767
769 # Copy the engine args over to each engine launcher.
768 # Copy the engine args over to each engine launcher.
770 el.engine_cmd = self.engine_cmd
769 el.engine_cmd = self.engine_cmd
771 el.engine_args = args
770 el.engine_args = args
772 el.on_stop(self._notice_engine_stopped)
771 el.on_stop(self._notice_engine_stopped)
773 d = el.start(user=user, hostname=host)
772 d = el.start(user=user, hostname=host)
774 self.launchers[ "%s/%i" % (host,i) ] = el
773 self.launchers[ "%s/%i" % (host,i) ] = el
775 dlist.append(d)
774 dlist.append(d)
776 self.notify_start(dlist)
775 self.notify_start(dlist)
777 return dlist
776 return dlist
778
777
779
778
780 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
779 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
781 """Launcher for calling
780 """Launcher for calling
782 `ipcluster engines` on a remote machine.
781 `ipcluster engines` on a remote machine.
783
782
784 Requires that remote profile is already configured.
783 Requires that remote profile is already configured.
785 """
784 """
786
785
787 n = Integer()
786 n = Integer()
788 ipcluster_cmd = List(['ipcluster'], config=True)
787 ipcluster_cmd = List(['ipcluster'], config=True)
789
788
790 @property
789 @property
791 def program(self):
790 def program(self):
792 return self.ipcluster_cmd + ['engines']
791 return self.ipcluster_cmd + ['engines']
793
792
794 @property
793 @property
795 def program_args(self):
794 def program_args(self):
796 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
795 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
797
796
798 def _to_send_default(self):
797 def _to_send_default(self):
799 return [
798 return [
800 (os.path.join(self.profile_dir, 'security', cf),
799 (os.path.join(self.profile_dir, 'security', cf),
801 os.path.join(self.remote_profile_dir, 'security', cf))
800 os.path.join(self.remote_profile_dir, 'security', cf))
802 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
801 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
803 ]
802 ]
804
803
805 def start(self, n):
804 def start(self, n):
806 self.n = n
805 self.n = n
807 super(SSHProxyEngineSetLauncher, self).start()
806 super(SSHProxyEngineSetLauncher, self).start()
808
807
809
808
810 #-----------------------------------------------------------------------------
809 #-----------------------------------------------------------------------------
811 # Windows HPC Server 2008 scheduler launchers
810 # Windows HPC Server 2008 scheduler launchers
812 #-----------------------------------------------------------------------------
811 #-----------------------------------------------------------------------------
813
812
814
813
815 # This is only used on Windows.
814 # This is only used on Windows.
816 def find_job_cmd():
815 def find_job_cmd():
817 if WINDOWS:
816 if WINDOWS:
818 try:
817 try:
819 return find_cmd('job')
818 return find_cmd('job')
820 except (FindCmdError, ImportError):
819 except (FindCmdError, ImportError):
821 # ImportError will be raised if win32api is not installed
820 # ImportError will be raised if win32api is not installed
822 return 'job'
821 return 'job'
823 else:
822 else:
824 return 'job'
823 return 'job'
825
824
826
825
827 class WindowsHPCLauncher(BaseLauncher):
826 class WindowsHPCLauncher(BaseLauncher):
828
827
829 job_id_regexp = Unicode(r'\d+', config=True,
828 job_id_regexp = CRegExp(r'\d+', config=True,
830 help="""A regular expression used to get the job id from the output of the
829 help="""A regular expression used to get the job id from the output of the
831 submit_command. """
830 submit_command. """
832 )
831 )
833 job_file_name = Unicode(u'ipython_job.xml', config=True,
832 job_file_name = Unicode(u'ipython_job.xml', config=True,
834 help="The filename of the instantiated job script.")
833 help="The filename of the instantiated job script.")
835 # The full path to the instantiated job script. This gets made dynamically
834 # The full path to the instantiated job script. This gets made dynamically
836 # by combining the work_dir with the job_file_name.
835 # by combining the work_dir with the job_file_name.
837 job_file = Unicode(u'')
836 job_file = Unicode(u'')
838 scheduler = Unicode('', config=True,
837 scheduler = Unicode('', config=True,
839 help="The hostname of the scheduler to submit the job to.")
838 help="The hostname of the scheduler to submit the job to.")
840 job_cmd = Unicode(find_job_cmd(), config=True,
839 job_cmd = Unicode(find_job_cmd(), config=True,
841 help="The command for submitting jobs.")
840 help="The command for submitting jobs.")
842
841
843 def __init__(self, work_dir=u'.', config=None, **kwargs):
842 def __init__(self, work_dir=u'.', config=None, **kwargs):
844 super(WindowsHPCLauncher, self).__init__(
843 super(WindowsHPCLauncher, self).__init__(
845 work_dir=work_dir, config=config, **kwargs
844 work_dir=work_dir, config=config, **kwargs
846 )
845 )
847
846
848 @property
847 @property
849 def job_file(self):
848 def job_file(self):
850 return os.path.join(self.work_dir, self.job_file_name)
849 return os.path.join(self.work_dir, self.job_file_name)
851
850
852 def write_job_file(self, n):
851 def write_job_file(self, n):
853 raise NotImplementedError("Implement write_job_file in a subclass.")
852 raise NotImplementedError("Implement write_job_file in a subclass.")
854
853
855 def find_args(self):
854 def find_args(self):
856 return [u'job.exe']
855 return [u'job.exe']
857
856
858 def parse_job_id(self, output):
857 def parse_job_id(self, output):
859 """Take the output of the submit command and return the job id."""
858 """Take the output of the submit command and return the job id."""
860 m = re.search(self.job_id_regexp, output)
859 m = self.job_id_regexp.search(output)
861 if m is not None:
860 if m is not None:
862 job_id = m.group()
861 job_id = m.group()
863 else:
862 else:
864 raise LauncherError("Job id couldn't be determined: %s" % output)
863 raise LauncherError("Job id couldn't be determined: %s" % output)
865 self.job_id = job_id
864 self.job_id = job_id
866 self.log.info('Job started with id: %r', job_id)
865 self.log.info('Job started with id: %r', job_id)
867 return job_id
866 return job_id
868
867
869 def start(self, n):
868 def start(self, n):
870 """Start n copies of the process using the Win HPC job scheduler."""
869 """Start n copies of the process using the Win HPC job scheduler."""
871 self.write_job_file(n)
870 self.write_job_file(n)
872 args = [
871 args = [
873 'submit',
872 'submit',
874 '/jobfile:%s' % self.job_file,
873 '/jobfile:%s' % self.job_file,
875 '/scheduler:%s' % self.scheduler
874 '/scheduler:%s' % self.scheduler
876 ]
875 ]
877 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
876 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
878
877
879 output = check_output([self.job_cmd]+args,
878 output = check_output([self.job_cmd]+args,
880 env=os.environ,
879 env=os.environ,
881 cwd=self.work_dir,
880 cwd=self.work_dir,
882 stderr=STDOUT
881 stderr=STDOUT
883 )
882 )
884 job_id = self.parse_job_id(output)
883 job_id = self.parse_job_id(output)
885 self.notify_start(job_id)
884 self.notify_start(job_id)
886 return job_id
885 return job_id
887
886
888 def stop(self):
887 def stop(self):
889 args = [
888 args = [
890 'cancel',
889 'cancel',
891 self.job_id,
890 self.job_id,
892 '/scheduler:%s' % self.scheduler
891 '/scheduler:%s' % self.scheduler
893 ]
892 ]
894 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
893 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
895 try:
894 try:
896 output = check_output([self.job_cmd]+args,
895 output = check_output([self.job_cmd]+args,
897 env=os.environ,
896 env=os.environ,
898 cwd=self.work_dir,
897 cwd=self.work_dir,
899 stderr=STDOUT
898 stderr=STDOUT
900 )
899 )
901 except:
900 except:
902 output = 'The job already appears to be stoppped: %r' % self.job_id
901 output = 'The job already appears to be stoppped: %r' % self.job_id
903 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
902 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
904 return output
903 return output
905
904
906
905
907 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
906 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
908
907
909 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
908 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
910 help="WinHPC xml job file.")
909 help="WinHPC xml job file.")
911 controller_args = List([], config=False,
910 controller_args = List([], config=False,
912 help="extra args to pass to ipcontroller")
911 help="extra args to pass to ipcontroller")
913
912
914 def write_job_file(self, n):
913 def write_job_file(self, n):
915 job = IPControllerJob(config=self.config)
914 job = IPControllerJob(config=self.config)
916
915
917 t = IPControllerTask(config=self.config)
916 t = IPControllerTask(config=self.config)
918 # The tasks work directory is *not* the actual work directory of
917 # The tasks work directory is *not* the actual work directory of
919 # the controller. It is used as the base path for the stdout/stderr
918 # the controller. It is used as the base path for the stdout/stderr
920 # files that the scheduler redirects to.
919 # files that the scheduler redirects to.
921 t.work_directory = self.profile_dir
920 t.work_directory = self.profile_dir
922 # Add the profile_dir and from self.start().
921 # Add the profile_dir and from self.start().
923 t.controller_args.extend(self.cluster_args)
922 t.controller_args.extend(self.cluster_args)
924 t.controller_args.extend(self.controller_args)
923 t.controller_args.extend(self.controller_args)
925 job.add_task(t)
924 job.add_task(t)
926
925
927 self.log.debug("Writing job description file: %s", self.job_file)
926 self.log.debug("Writing job description file: %s", self.job_file)
928 job.write(self.job_file)
927 job.write(self.job_file)
929
928
930 @property
929 @property
931 def job_file(self):
930 def job_file(self):
932 return os.path.join(self.profile_dir, self.job_file_name)
931 return os.path.join(self.profile_dir, self.job_file_name)
933
932
934 def start(self):
933 def start(self):
935 """Start the controller by profile_dir."""
934 """Start the controller by profile_dir."""
936 return super(WindowsHPCControllerLauncher, self).start(1)
935 return super(WindowsHPCControllerLauncher, self).start(1)
937
936
938
937
939 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
938 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
940
939
941 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
940 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
942 help="jobfile for ipengines job")
941 help="jobfile for ipengines job")
943 engine_args = List([], config=False,
942 engine_args = List([], config=False,
944 help="extra args to pas to ipengine")
943 help="extra args to pas to ipengine")
945
944
946 def write_job_file(self, n):
945 def write_job_file(self, n):
947 job = IPEngineSetJob(config=self.config)
946 job = IPEngineSetJob(config=self.config)
948
947
949 for i in range(n):
948 for i in range(n):
950 t = IPEngineTask(config=self.config)
949 t = IPEngineTask(config=self.config)
951 # The tasks work directory is *not* the actual work directory of
950 # The tasks work directory is *not* the actual work directory of
952 # the engine. It is used as the base path for the stdout/stderr
951 # the engine. It is used as the base path for the stdout/stderr
953 # files that the scheduler redirects to.
952 # files that the scheduler redirects to.
954 t.work_directory = self.profile_dir
953 t.work_directory = self.profile_dir
955 # Add the profile_dir and from self.start().
954 # Add the profile_dir and from self.start().
956 t.engine_args.extend(self.cluster_args)
955 t.engine_args.extend(self.cluster_args)
957 t.engine_args.extend(self.engine_args)
956 t.engine_args.extend(self.engine_args)
958 job.add_task(t)
957 job.add_task(t)
959
958
960 self.log.debug("Writing job description file: %s", self.job_file)
959 self.log.debug("Writing job description file: %s", self.job_file)
961 job.write(self.job_file)
960 job.write(self.job_file)
962
961
963 @property
962 @property
964 def job_file(self):
963 def job_file(self):
965 return os.path.join(self.profile_dir, self.job_file_name)
964 return os.path.join(self.profile_dir, self.job_file_name)
966
965
967 def start(self, n):
966 def start(self, n):
968 """Start the controller by profile_dir."""
967 """Start the controller by profile_dir."""
969 return super(WindowsHPCEngineSetLauncher, self).start(n)
968 return super(WindowsHPCEngineSetLauncher, self).start(n)
970
969
971
970
972 #-----------------------------------------------------------------------------
971 #-----------------------------------------------------------------------------
973 # Batch (PBS) system launchers
972 # Batch (PBS) system launchers
974 #-----------------------------------------------------------------------------
973 #-----------------------------------------------------------------------------
975
974
976 class BatchClusterAppMixin(ClusterAppMixin):
975 class BatchClusterAppMixin(ClusterAppMixin):
977 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
976 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
978 def _profile_dir_changed(self, name, old, new):
977 def _profile_dir_changed(self, name, old, new):
979 self.context[name] = new
978 self.context[name] = new
980 _cluster_id_changed = _profile_dir_changed
979 _cluster_id_changed = _profile_dir_changed
981
980
982 def _profile_dir_default(self):
981 def _profile_dir_default(self):
983 self.context['profile_dir'] = ''
982 self.context['profile_dir'] = ''
984 return ''
983 return ''
985 def _cluster_id_default(self):
984 def _cluster_id_default(self):
986 self.context['cluster_id'] = ''
985 self.context['cluster_id'] = ''
987 return ''
986 return ''
988
987
989
988
990 class BatchSystemLauncher(BaseLauncher):
989 class BatchSystemLauncher(BaseLauncher):
991 """Launch an external process using a batch system.
990 """Launch an external process using a batch system.
992
991
993 This class is designed to work with UNIX batch systems like PBS, LSF,
992 This class is designed to work with UNIX batch systems like PBS, LSF,
994 GridEngine, etc. The overall model is that there are different commands
993 GridEngine, etc. The overall model is that there are different commands
995 like qsub, qdel, etc. that handle the starting and stopping of the process.
994 like qsub, qdel, etc. that handle the starting and stopping of the process.
996
995
997 This class also has the notion of a batch script. The ``batch_template``
996 This class also has the notion of a batch script. The ``batch_template``
998 attribute can be set to a string that is a template for the batch script.
997 attribute can be set to a string that is a template for the batch script.
999 This template is instantiated using string formatting. Thus the template can
998 This template is instantiated using string formatting. Thus the template can
1000 use {n} fot the number of instances. Subclasses can add additional variables
999 use {n} fot the number of instances. Subclasses can add additional variables
1001 to the template dict.
1000 to the template dict.
1002 """
1001 """
1003
1002
1004 # Subclasses must fill these in. See PBSEngineSet
1003 # Subclasses must fill these in. See PBSEngineSet
1005 submit_command = List([''], config=True,
1004 submit_command = List([''], config=True,
1006 help="The name of the command line program used to submit jobs.")
1005 help="The name of the command line program used to submit jobs.")
1007 delete_command = List([''], config=True,
1006 delete_command = List([''], config=True,
1008 help="The name of the command line program used to delete jobs.")
1007 help="The name of the command line program used to delete jobs.")
1009 job_id_regexp = Unicode('', config=True,
1008 job_id_regexp = CRegExp('', config=True,
1010 help="""A regular expression used to get the job id from the output of the
1009 help="""A regular expression used to get the job id from the output of the
1011 submit_command.""")
1010 submit_command.""")
1012 batch_template = Unicode('', config=True,
1011 batch_template = Unicode('', config=True,
1013 help="The string that is the batch script template itself.")
1012 help="The string that is the batch script template itself.")
1014 batch_template_file = Unicode(u'', config=True,
1013 batch_template_file = Unicode(u'', config=True,
1015 help="The file that contains the batch template.")
1014 help="The file that contains the batch template.")
1016 batch_file_name = Unicode(u'batch_script', config=True,
1015 batch_file_name = Unicode(u'batch_script', config=True,
1017 help="The filename of the instantiated batch script.")
1016 help="The filename of the instantiated batch script.")
1018 queue = Unicode(u'', config=True,
1017 queue = Unicode(u'', config=True,
1019 help="The PBS Queue.")
1018 help="The PBS Queue.")
1020
1019
1021 def _queue_changed(self, name, old, new):
1020 def _queue_changed(self, name, old, new):
1022 self.context[name] = new
1021 self.context[name] = new
1023
1022
1024 n = Integer(1)
1023 n = Integer(1)
1025 _n_changed = _queue_changed
1024 _n_changed = _queue_changed
1026
1025
1027 # not configurable, override in subclasses
1026 # not configurable, override in subclasses
1028 # PBS Job Array regex
1027 # PBS Job Array regex
1029 job_array_regexp = Unicode('')
1028 job_array_regexp = CRegExp('')
1030 job_array_template = Unicode('')
1029 job_array_template = Unicode('')
1031 # PBS Queue regex
1030 # PBS Queue regex
1032 queue_regexp = Unicode('')
1031 queue_regexp = CRegExp('')
1033 queue_template = Unicode('')
1032 queue_template = Unicode('')
1034 # The default batch template, override in subclasses
1033 # The default batch template, override in subclasses
1035 default_template = Unicode('')
1034 default_template = Unicode('')
1036 # The full path to the instantiated batch script.
1035 # The full path to the instantiated batch script.
1037 batch_file = Unicode(u'')
1036 batch_file = Unicode(u'')
1038 # the format dict used with batch_template:
1037 # the format dict used with batch_template:
1039 context = Dict()
1038 context = Dict()
1040 def _context_default(self):
1039 def _context_default(self):
1041 """load the default context with the default values for the basic keys
1040 """load the default context with the default values for the basic keys
1042
1041
1043 because the _trait_changed methods only load the context if they
1042 because the _trait_changed methods only load the context if they
1044 are set to something other than the default value.
1043 are set to something other than the default value.
1045 """
1044 """
1046 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1045 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1047
1046
1048 # the Formatter instance for rendering the templates:
1047 # the Formatter instance for rendering the templates:
1049 formatter = Instance(EvalFormatter, (), {})
1048 formatter = Instance(EvalFormatter, (), {})
1050
1049
1051
1050
1052 def find_args(self):
1051 def find_args(self):
1053 return self.submit_command + [self.batch_file]
1052 return self.submit_command + [self.batch_file]
1054
1053
1055 def __init__(self, work_dir=u'.', config=None, **kwargs):
1054 def __init__(self, work_dir=u'.', config=None, **kwargs):
1056 super(BatchSystemLauncher, self).__init__(
1055 super(BatchSystemLauncher, self).__init__(
1057 work_dir=work_dir, config=config, **kwargs
1056 work_dir=work_dir, config=config, **kwargs
1058 )
1057 )
1059 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1058 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1060
1059
1061 def parse_job_id(self, output):
1060 def parse_job_id(self, output):
1062 """Take the output of the submit command and return the job id."""
1061 """Take the output of the submit command and return the job id."""
1063 m = re.search(self.job_id_regexp, output)
1062 m = self.job_id_regexp.search(output)
1064 if m is not None:
1063 if m is not None:
1065 job_id = m.group()
1064 job_id = m.group()
1066 else:
1065 else:
1067 raise LauncherError("Job id couldn't be determined: %s" % output)
1066 raise LauncherError("Job id couldn't be determined: %s" % output)
1068 self.job_id = job_id
1067 self.job_id = job_id
1069 self.log.info('Job submitted with job id: %r', job_id)
1068 self.log.info('Job submitted with job id: %r', job_id)
1070 return job_id
1069 return job_id
1071
1070
1072 def write_batch_script(self, n):
1071 def write_batch_script(self, n):
1073 """Instantiate and write the batch script to the work_dir."""
1072 """Instantiate and write the batch script to the work_dir."""
1074 self.n = n
1073 self.n = n
1075 # first priority is batch_template if set
1074 # first priority is batch_template if set
1076 if self.batch_template_file and not self.batch_template:
1075 if self.batch_template_file and not self.batch_template:
1077 # second priority is batch_template_file
1076 # second priority is batch_template_file
1078 with open(self.batch_template_file) as f:
1077 with open(self.batch_template_file) as f:
1079 self.batch_template = f.read()
1078 self.batch_template = f.read()
1080 if not self.batch_template:
1079 if not self.batch_template:
1081 # third (last) priority is default_template
1080 # third (last) priority is default_template
1082 self.batch_template = self.default_template
1081 self.batch_template = self.default_template
1083
1082
1084 # add jobarray or queue lines to user-specified template
1083 # add jobarray or queue lines to user-specified template
1085 # note that this is *only* when user did not specify a template.
1084 # note that this is *only* when user did not specify a template.
1086 regex = re.compile(self.job_array_regexp)
1085 # print self.job_array_regexp.search(self.batch_template)
1087 # print regex.search(self.batch_template)
1086 if not self.job_array_regexp.search(self.batch_template):
1088 if not regex.search(self.batch_template):
1089 self.log.debug("adding job array settings to batch script")
1087 self.log.debug("adding job array settings to batch script")
1090 firstline, rest = self.batch_template.split('\n',1)
1088 firstline, rest = self.batch_template.split('\n',1)
1091 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1089 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1092
1090
1093 regex = re.compile(self.queue_regexp)
1091 # print self.queue_regexp.search(self.batch_template)
1094 # print regex.search(self.batch_template)
1092 if self.queue and not self.queue_regexp.search(self.batch_template):
1095 if self.queue and not regex.search(self.batch_template):
1096 self.log.debug("adding PBS queue settings to batch script")
1093 self.log.debug("adding PBS queue settings to batch script")
1097 firstline, rest = self.batch_template.split('\n',1)
1094 firstline, rest = self.batch_template.split('\n',1)
1098 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1095 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1099
1096
1100 script_as_string = self.formatter.format(self.batch_template, **self.context)
1097 script_as_string = self.formatter.format(self.batch_template, **self.context)
1101 self.log.debug('Writing batch script: %s', self.batch_file)
1098 self.log.debug('Writing batch script: %s', self.batch_file)
1102
1099
1103 with open(self.batch_file, 'w') as f:
1100 with open(self.batch_file, 'w') as f:
1104 f.write(script_as_string)
1101 f.write(script_as_string)
1105 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1102 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1106
1103
1107 def start(self, n):
1104 def start(self, n):
1108 """Start n copies of the process using a batch system."""
1105 """Start n copies of the process using a batch system."""
1109 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1106 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1110 # Here we save profile_dir in the context so they
1107 # Here we save profile_dir in the context so they
1111 # can be used in the batch script template as {profile_dir}
1108 # can be used in the batch script template as {profile_dir}
1112 self.write_batch_script(n)
1109 self.write_batch_script(n)
1113 output = check_output(self.args, env=os.environ)
1110 output = check_output(self.args, env=os.environ)
1114
1111
1115 job_id = self.parse_job_id(output)
1112 job_id = self.parse_job_id(output)
1116 self.notify_start(job_id)
1113 self.notify_start(job_id)
1117 return job_id
1114 return job_id
1118
1115
1119 def stop(self):
1116 def stop(self):
1120 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1117 output = check_output(self.delete_command+[self.job_id], env=os.environ)
1121 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1118 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1122 return output
1119 return output
1123
1120
1124
1121
1125 class PBSLauncher(BatchSystemLauncher):
1122 class PBSLauncher(BatchSystemLauncher):
1126 """A BatchSystemLauncher subclass for PBS."""
1123 """A BatchSystemLauncher subclass for PBS."""
1127
1124
1128 submit_command = List(['qsub'], config=True,
1125 submit_command = List(['qsub'], config=True,
1129 help="The PBS submit command ['qsub']")
1126 help="The PBS submit command ['qsub']")
1130 delete_command = List(['qdel'], config=True,
1127 delete_command = List(['qdel'], config=True,
1131 help="The PBS delete command ['qsub']")
1128 help="The PBS delete command ['qsub']")
1132 job_id_regexp = Unicode(r'\d+', config=True,
1129 job_id_regexp = CRegExp(r'\d+', config=True,
1133 help="Regular expresion for identifying the job ID [r'\d+']")
1130 help="Regular expresion for identifying the job ID [r'\d+']")
1134
1131
1135 batch_file = Unicode(u'')
1132 batch_file = Unicode(u'')
1136 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
1133 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1137 job_array_template = Unicode('#PBS -t 1-{n}')
1134 job_array_template = Unicode('#PBS -t 1-{n}')
1138 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
1135 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1139 queue_template = Unicode('#PBS -q {queue}')
1136 queue_template = Unicode('#PBS -q {queue}')
1140
1137
1141
1138
1142 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1139 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1143 """Launch a controller using PBS."""
1140 """Launch a controller using PBS."""
1144
1141
1145 batch_file_name = Unicode(u'pbs_controller', config=True,
1142 batch_file_name = Unicode(u'pbs_controller', config=True,
1146 help="batch file name for the controller job.")
1143 help="batch file name for the controller job.")
1147 default_template= Unicode("""#!/bin/sh
1144 default_template= Unicode("""#!/bin/sh
1148 #PBS -V
1145 #PBS -V
1149 #PBS -N ipcontroller
1146 #PBS -N ipcontroller
1150 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1147 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1151 """%(' '.join(ipcontroller_cmd_argv)))
1148 """%(' '.join(ipcontroller_cmd_argv)))
1152
1149
1153
1150
1154 def start(self):
1151 def start(self):
1155 """Start the controller by profile or profile_dir."""
1152 """Start the controller by profile or profile_dir."""
1156 return super(PBSControllerLauncher, self).start(1)
1153 return super(PBSControllerLauncher, self).start(1)
1157
1154
1158
1155
1159 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1156 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1160 """Launch Engines using PBS"""
1157 """Launch Engines using PBS"""
1161 batch_file_name = Unicode(u'pbs_engines', config=True,
1158 batch_file_name = Unicode(u'pbs_engines', config=True,
1162 help="batch file name for the engine(s) job.")
1159 help="batch file name for the engine(s) job.")
1163 default_template= Unicode(u"""#!/bin/sh
1160 default_template= Unicode(u"""#!/bin/sh
1164 #PBS -V
1161 #PBS -V
1165 #PBS -N ipengine
1162 #PBS -N ipengine
1166 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1163 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1167 """%(' '.join(ipengine_cmd_argv)))
1164 """%(' '.join(ipengine_cmd_argv)))
1168
1165
1169 def start(self, n):
1166 def start(self, n):
1170 """Start n engines by profile or profile_dir."""
1167 """Start n engines by profile or profile_dir."""
1171 return super(PBSEngineSetLauncher, self).start(n)
1168 return super(PBSEngineSetLauncher, self).start(n)
1172
1169
1173 #SGE is very similar to PBS
1170 #SGE is very similar to PBS
1174
1171
1175 class SGELauncher(PBSLauncher):
1172 class SGELauncher(PBSLauncher):
1176 """Sun GridEngine is a PBS clone with slightly different syntax"""
1173 """Sun GridEngine is a PBS clone with slightly different syntax"""
1177 job_array_regexp = Unicode('#\$\W+\-t')
1174 job_array_regexp = CRegExp('#\$\W+\-t')
1178 job_array_template = Unicode('#$ -t 1-{n}')
1175 job_array_template = Unicode('#$ -t 1-{n}')
1179 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1176 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1180 queue_template = Unicode('#$ -q {queue}')
1177 queue_template = Unicode('#$ -q {queue}')
1181
1178
1182 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1179 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1183 """Launch a controller using SGE."""
1180 """Launch a controller using SGE."""
1184
1181
1185 batch_file_name = Unicode(u'sge_controller', config=True,
1182 batch_file_name = Unicode(u'sge_controller', config=True,
1186 help="batch file name for the ipontroller job.")
1183 help="batch file name for the ipontroller job.")
1187 default_template= Unicode(u"""#$ -V
1184 default_template= Unicode(u"""#$ -V
1188 #$ -S /bin/sh
1185 #$ -S /bin/sh
1189 #$ -N ipcontroller
1186 #$ -N ipcontroller
1190 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1187 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1191 """%(' '.join(ipcontroller_cmd_argv)))
1188 """%(' '.join(ipcontroller_cmd_argv)))
1192
1189
1193 def start(self):
1190 def start(self):
1194 """Start the controller by profile or profile_dir."""
1191 """Start the controller by profile or profile_dir."""
1195 return super(SGEControllerLauncher, self).start(1)
1192 return super(SGEControllerLauncher, self).start(1)
1196
1193
1197 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1194 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1198 """Launch Engines with SGE"""
1195 """Launch Engines with SGE"""
1199 batch_file_name = Unicode(u'sge_engines', config=True,
1196 batch_file_name = Unicode(u'sge_engines', config=True,
1200 help="batch file name for the engine(s) job.")
1197 help="batch file name for the engine(s) job.")
1201 default_template = Unicode("""#$ -V
1198 default_template = Unicode("""#$ -V
1202 #$ -S /bin/sh
1199 #$ -S /bin/sh
1203 #$ -N ipengine
1200 #$ -N ipengine
1204 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1201 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1205 """%(' '.join(ipengine_cmd_argv)))
1202 """%(' '.join(ipengine_cmd_argv)))
1206
1203
1207 def start(self, n):
1204 def start(self, n):
1208 """Start n engines by profile or profile_dir."""
1205 """Start n engines by profile or profile_dir."""
1209 return super(SGEEngineSetLauncher, self).start(n)
1206 return super(SGEEngineSetLauncher, self).start(n)
1210
1207
1211
1208
1212 # LSF launchers
1209 # LSF launchers
1213
1210
1214 class LSFLauncher(BatchSystemLauncher):
1211 class LSFLauncher(BatchSystemLauncher):
1215 """A BatchSystemLauncher subclass for LSF."""
1212 """A BatchSystemLauncher subclass for LSF."""
1216
1213
1217 submit_command = List(['bsub'], config=True,
1214 submit_command = List(['bsub'], config=True,
1218 help="The PBS submit command ['bsub']")
1215 help="The PBS submit command ['bsub']")
1219 delete_command = List(['bkill'], config=True,
1216 delete_command = List(['bkill'], config=True,
1220 help="The PBS delete command ['bkill']")
1217 help="The PBS delete command ['bkill']")
1221 job_id_regexp = Unicode(r'\d+', config=True,
1218 job_id_regexp = CRegExp(r'\d+', config=True,
1222 help="Regular expresion for identifying the job ID [r'\d+']")
1219 help="Regular expresion for identifying the job ID [r'\d+']")
1223
1220
1224 batch_file = Unicode(u'')
1221 batch_file = Unicode(u'')
1225 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1222 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1226 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1223 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1227 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1224 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1228 queue_template = Unicode('#BSUB -q {queue}')
1225 queue_template = Unicode('#BSUB -q {queue}')
1229
1226
1230 def start(self, n):
1227 def start(self, n):
1231 """Start n copies of the process using LSF batch system.
1228 """Start n copies of the process using LSF batch system.
1232 This cant inherit from the base class because bsub expects
1229 This cant inherit from the base class because bsub expects
1233 to be piped a shell script in order to honor the #BSUB directives :
1230 to be piped a shell script in order to honor the #BSUB directives :
1234 bsub < script
1231 bsub < script
1235 """
1232 """
1236 # Here we save profile_dir in the context so they
1233 # Here we save profile_dir in the context so they
1237 # can be used in the batch script template as {profile_dir}
1234 # can be used in the batch script template as {profile_dir}
1238 self.write_batch_script(n)
1235 self.write_batch_script(n)
1239 #output = check_output(self.args, env=os.environ)
1236 #output = check_output(self.args, env=os.environ)
1240 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1237 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1241 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1238 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1242 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1239 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1243 output,err = p.communicate()
1240 output,err = p.communicate()
1244 job_id = self.parse_job_id(output)
1241 job_id = self.parse_job_id(output)
1245 self.notify_start(job_id)
1242 self.notify_start(job_id)
1246 return job_id
1243 return job_id
1247
1244
1248
1245
1249 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1246 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1250 """Launch a controller using LSF."""
1247 """Launch a controller using LSF."""
1251
1248
1252 batch_file_name = Unicode(u'lsf_controller', config=True,
1249 batch_file_name = Unicode(u'lsf_controller', config=True,
1253 help="batch file name for the controller job.")
1250 help="batch file name for the controller job.")
1254 default_template= Unicode("""#!/bin/sh
1251 default_template= Unicode("""#!/bin/sh
1255 #BSUB -J ipcontroller
1252 #BSUB -J ipcontroller
1256 #BSUB -oo ipcontroller.o.%%J
1253 #BSUB -oo ipcontroller.o.%%J
1257 #BSUB -eo ipcontroller.e.%%J
1254 #BSUB -eo ipcontroller.e.%%J
1258 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1255 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1259 """%(' '.join(ipcontroller_cmd_argv)))
1256 """%(' '.join(ipcontroller_cmd_argv)))
1260
1257
1261 def start(self):
1258 def start(self):
1262 """Start the controller by profile or profile_dir."""
1259 """Start the controller by profile or profile_dir."""
1263 return super(LSFControllerLauncher, self).start(1)
1260 return super(LSFControllerLauncher, self).start(1)
1264
1261
1265
1262
1266 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1263 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1267 """Launch Engines using LSF"""
1264 """Launch Engines using LSF"""
1268 batch_file_name = Unicode(u'lsf_engines', config=True,
1265 batch_file_name = Unicode(u'lsf_engines', config=True,
1269 help="batch file name for the engine(s) job.")
1266 help="batch file name for the engine(s) job.")
1270 default_template= Unicode(u"""#!/bin/sh
1267 default_template= Unicode(u"""#!/bin/sh
1271 #BSUB -oo ipengine.o.%%J
1268 #BSUB -oo ipengine.o.%%J
1272 #BSUB -eo ipengine.e.%%J
1269 #BSUB -eo ipengine.e.%%J
1273 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1270 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1274 """%(' '.join(ipengine_cmd_argv)))
1271 """%(' '.join(ipengine_cmd_argv)))
1275
1272
1276 def start(self, n):
1273 def start(self, n):
1277 """Start n engines by profile or profile_dir."""
1274 """Start n engines by profile or profile_dir."""
1278 return super(LSFEngineSetLauncher, self).start(n)
1275 return super(LSFEngineSetLauncher, self).start(n)
1279
1276
1280
1277
1281 #-----------------------------------------------------------------------------
1278 #-----------------------------------------------------------------------------
1282 # A launcher for ipcluster itself!
1279 # A launcher for ipcluster itself!
1283 #-----------------------------------------------------------------------------
1280 #-----------------------------------------------------------------------------
1284
1281
1285
1282
1286 class IPClusterLauncher(LocalProcessLauncher):
1283 class IPClusterLauncher(LocalProcessLauncher):
1287 """Launch the ipcluster program in an external process."""
1284 """Launch the ipcluster program in an external process."""
1288
1285
1289 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1286 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1290 help="Popen command for ipcluster")
1287 help="Popen command for ipcluster")
1291 ipcluster_args = List(
1288 ipcluster_args = List(
1292 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1289 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1293 help="Command line arguments to pass to ipcluster.")
1290 help="Command line arguments to pass to ipcluster.")
1294 ipcluster_subcommand = Unicode('start')
1291 ipcluster_subcommand = Unicode('start')
1295 profile = Unicode('default')
1292 profile = Unicode('default')
1296 n = Integer(2)
1293 n = Integer(2)
1297
1294
1298 def find_args(self):
1295 def find_args(self):
1299 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1296 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1300 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1297 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1301 self.ipcluster_args
1298 self.ipcluster_args
1302
1299
1303 def start(self):
1300 def start(self):
1304 return super(IPClusterLauncher, self).start()
1301 return super(IPClusterLauncher, self).start()
1305
1302
1306 #-----------------------------------------------------------------------------
1303 #-----------------------------------------------------------------------------
1307 # Collections of launchers
1304 # Collections of launchers
1308 #-----------------------------------------------------------------------------
1305 #-----------------------------------------------------------------------------
1309
1306
1310 local_launchers = [
1307 local_launchers = [
1311 LocalControllerLauncher,
1308 LocalControllerLauncher,
1312 LocalEngineLauncher,
1309 LocalEngineLauncher,
1313 LocalEngineSetLauncher,
1310 LocalEngineSetLauncher,
1314 ]
1311 ]
1315 mpi_launchers = [
1312 mpi_launchers = [
1316 MPILauncher,
1313 MPILauncher,
1317 MPIControllerLauncher,
1314 MPIControllerLauncher,
1318 MPIEngineSetLauncher,
1315 MPIEngineSetLauncher,
1319 ]
1316 ]
1320 ssh_launchers = [
1317 ssh_launchers = [
1321 SSHLauncher,
1318 SSHLauncher,
1322 SSHControllerLauncher,
1319 SSHControllerLauncher,
1323 SSHEngineLauncher,
1320 SSHEngineLauncher,
1324 SSHEngineSetLauncher,
1321 SSHEngineSetLauncher,
1325 ]
1322 ]
1326 winhpc_launchers = [
1323 winhpc_launchers = [
1327 WindowsHPCLauncher,
1324 WindowsHPCLauncher,
1328 WindowsHPCControllerLauncher,
1325 WindowsHPCControllerLauncher,
1329 WindowsHPCEngineSetLauncher,
1326 WindowsHPCEngineSetLauncher,
1330 ]
1327 ]
1331 pbs_launchers = [
1328 pbs_launchers = [
1332 PBSLauncher,
1329 PBSLauncher,
1333 PBSControllerLauncher,
1330 PBSControllerLauncher,
1334 PBSEngineSetLauncher,
1331 PBSEngineSetLauncher,
1335 ]
1332 ]
1336 sge_launchers = [
1333 sge_launchers = [
1337 SGELauncher,
1334 SGELauncher,
1338 SGEControllerLauncher,
1335 SGEControllerLauncher,
1339 SGEEngineSetLauncher,
1336 SGEEngineSetLauncher,
1340 ]
1337 ]
1341 lsf_launchers = [
1338 lsf_launchers = [
1342 LSFLauncher,
1339 LSFLauncher,
1343 LSFControllerLauncher,
1340 LSFControllerLauncher,
1344 LSFEngineSetLauncher,
1341 LSFEngineSetLauncher,
1345 ]
1342 ]
1346 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1343 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1347 + pbs_launchers + sge_launchers + lsf_launchers
1344 + pbs_launchers + sge_launchers + lsf_launchers
1348
1345
General Comments 0
You need to be logged in to leave comments. Login now