##// END OF EJS Templates
use `-m` entry points in parallel launchers
MinRK -
Show More
@@ -1,1451 +1,1449 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import pipes
25 import pipes
26 import stat
26 import stat
27 import sys
27 import sys
28 import time
28 import time
29
29
30 # signal imports, handling various platforms, versions
30 # signal imports, handling various platforms, versions
31
31
32 from signal import SIGINT, SIGTERM
32 from signal import SIGINT, SIGTERM
33 try:
33 try:
34 from signal import SIGKILL
34 from signal import SIGKILL
35 except ImportError:
35 except ImportError:
36 # Windows
36 # Windows
37 SIGKILL=SIGTERM
37 SIGKILL=SIGTERM
38
38
39 try:
39 try:
40 # Windows >= 2.7, 3.2
40 # Windows >= 2.7, 3.2
41 from signal import CTRL_C_EVENT as SIGINT
41 from signal import CTRL_C_EVENT as SIGINT
42 except ImportError:
42 except ImportError:
43 pass
43 pass
44
44
45 from subprocess import Popen, PIPE, STDOUT
45 from subprocess import Popen, PIPE, STDOUT
46 try:
46 try:
47 from subprocess import check_output
47 from subprocess import check_output
48 except ImportError:
48 except ImportError:
49 # pre-2.7, define check_output with Popen
49 # pre-2.7, define check_output with Popen
50 def check_output(*args, **kwargs):
50 def check_output(*args, **kwargs):
51 kwargs.update(dict(stdout=PIPE))
51 kwargs.update(dict(stdout=PIPE))
52 p = Popen(*args, **kwargs)
52 p = Popen(*args, **kwargs)
53 out,err = p.communicate()
53 out,err = p.communicate()
54 return out
54 return out
55
55
56 from zmq.eventloop import ioloop
56 from zmq.eventloop import ioloop
57
57
58 from IPython.config.application import Application
58 from IPython.config.application import Application
59 from IPython.config.configurable import LoggingConfigurable
59 from IPython.config.configurable import LoggingConfigurable
60 from IPython.utils.text import EvalFormatter
60 from IPython.utils.text import EvalFormatter
61 from IPython.utils.traitlets import (
61 from IPython.utils.traitlets import (
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
62 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits, CRegExp
63 )
63 )
64 from IPython.utils.encoding import DEFAULT_ENCODING
64 from IPython.utils.encoding import DEFAULT_ENCODING
65 from IPython.utils.path import get_home_dir
65 from IPython.utils.path import get_home_dir
66 from IPython.utils.process import find_cmd, FindCmdError
66 from IPython.utils.process import find_cmd, FindCmdError
67 from IPython.utils.py3compat import iteritems, itervalues
67 from IPython.utils.py3compat import iteritems, itervalues
68
68
69 from .win32support import forward_read_events
69 from .win32support import forward_read_events
70
70
71 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
71 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
72
72
73 WINDOWS = os.name == 'nt'
73 WINDOWS = os.name == 'nt'
74
74
75 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
76 # Paths to the kernel apps
76 # Paths to the kernel apps
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78
78
79 cmd = "from IPython.parallel.apps.%s import launch_new_instance; launch_new_instance()"
79 ipcluster_cmd_argv = [sys.executable, "-m", "IPython.parallel.cluster"]
80
80
81 ipcluster_cmd_argv = [sys.executable, "-c", cmd % "ipclusterapp"]
81 ipengine_cmd_argv = [sys.executable, "-m", "IPython.parallel.engine"]
82
82
83 ipengine_cmd_argv = [sys.executable, "-c", cmd % "ipengineapp"]
83 ipcontroller_cmd_argv = [sys.executable, "-m", "IPython.parallel.controller"]
84
85 ipcontroller_cmd_argv = [sys.executable, "-c", cmd % "ipcontrollerapp"]
86
84
87 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
88 # Base launchers and errors
86 # Base launchers and errors
89 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
90
88
91 class LauncherError(Exception):
89 class LauncherError(Exception):
92 pass
90 pass
93
91
94
92
95 class ProcessStateError(LauncherError):
93 class ProcessStateError(LauncherError):
96 pass
94 pass
97
95
98
96
99 class UnknownStatus(LauncherError):
97 class UnknownStatus(LauncherError):
100 pass
98 pass
101
99
102
100
103 class BaseLauncher(LoggingConfigurable):
101 class BaseLauncher(LoggingConfigurable):
104 """An asbtraction for starting, stopping and signaling a process."""
102 """An asbtraction for starting, stopping and signaling a process."""
105
103
106 # In all of the launchers, the work_dir is where child processes will be
104 # In all of the launchers, the work_dir is where child processes will be
107 # run. This will usually be the profile_dir, but may not be. any work_dir
105 # run. This will usually be the profile_dir, but may not be. any work_dir
108 # passed into the __init__ method will override the config value.
106 # passed into the __init__ method will override the config value.
109 # This should not be used to set the work_dir for the actual engine
107 # This should not be used to set the work_dir for the actual engine
110 # and controller. Instead, use their own config files or the
108 # and controller. Instead, use their own config files or the
111 # controller_args, engine_args attributes of the launchers to add
109 # controller_args, engine_args attributes of the launchers to add
112 # the work_dir option.
110 # the work_dir option.
113 work_dir = Unicode(u'.')
111 work_dir = Unicode(u'.')
114 loop = Instance('zmq.eventloop.ioloop.IOLoop')
112 loop = Instance('zmq.eventloop.ioloop.IOLoop')
115
113
116 start_data = Any()
114 start_data = Any()
117 stop_data = Any()
115 stop_data = Any()
118
116
119 def _loop_default(self):
117 def _loop_default(self):
120 return ioloop.IOLoop.instance()
118 return ioloop.IOLoop.instance()
121
119
122 def __init__(self, work_dir=u'.', config=None, **kwargs):
120 def __init__(self, work_dir=u'.', config=None, **kwargs):
123 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
121 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
124 self.state = 'before' # can be before, running, after
122 self.state = 'before' # can be before, running, after
125 self.stop_callbacks = []
123 self.stop_callbacks = []
126 self.start_data = None
124 self.start_data = None
127 self.stop_data = None
125 self.stop_data = None
128
126
129 @property
127 @property
130 def args(self):
128 def args(self):
131 """A list of cmd and args that will be used to start the process.
129 """A list of cmd and args that will be used to start the process.
132
130
133 This is what is passed to :func:`spawnProcess` and the first element
131 This is what is passed to :func:`spawnProcess` and the first element
134 will be the process name.
132 will be the process name.
135 """
133 """
136 return self.find_args()
134 return self.find_args()
137
135
138 def find_args(self):
136 def find_args(self):
139 """The ``.args`` property calls this to find the args list.
137 """The ``.args`` property calls this to find the args list.
140
138
141 Subcommand should implement this to construct the cmd and args.
139 Subcommand should implement this to construct the cmd and args.
142 """
140 """
143 raise NotImplementedError('find_args must be implemented in a subclass')
141 raise NotImplementedError('find_args must be implemented in a subclass')
144
142
145 @property
143 @property
146 def arg_str(self):
144 def arg_str(self):
147 """The string form of the program arguments."""
145 """The string form of the program arguments."""
148 return ' '.join(self.args)
146 return ' '.join(self.args)
149
147
150 @property
148 @property
151 def running(self):
149 def running(self):
152 """Am I running."""
150 """Am I running."""
153 if self.state == 'running':
151 if self.state == 'running':
154 return True
152 return True
155 else:
153 else:
156 return False
154 return False
157
155
158 def start(self):
156 def start(self):
159 """Start the process."""
157 """Start the process."""
160 raise NotImplementedError('start must be implemented in a subclass')
158 raise NotImplementedError('start must be implemented in a subclass')
161
159
162 def stop(self):
160 def stop(self):
163 """Stop the process and notify observers of stopping.
161 """Stop the process and notify observers of stopping.
164
162
165 This method will return None immediately.
163 This method will return None immediately.
166 To observe the actual process stopping, see :meth:`on_stop`.
164 To observe the actual process stopping, see :meth:`on_stop`.
167 """
165 """
168 raise NotImplementedError('stop must be implemented in a subclass')
166 raise NotImplementedError('stop must be implemented in a subclass')
169
167
170 def on_stop(self, f):
168 def on_stop(self, f):
171 """Register a callback to be called with this Launcher's stop_data
169 """Register a callback to be called with this Launcher's stop_data
172 when the process actually finishes.
170 when the process actually finishes.
173 """
171 """
174 if self.state=='after':
172 if self.state=='after':
175 return f(self.stop_data)
173 return f(self.stop_data)
176 else:
174 else:
177 self.stop_callbacks.append(f)
175 self.stop_callbacks.append(f)
178
176
179 def notify_start(self, data):
177 def notify_start(self, data):
180 """Call this to trigger startup actions.
178 """Call this to trigger startup actions.
181
179
182 This logs the process startup and sets the state to 'running'. It is
180 This logs the process startup and sets the state to 'running'. It is
183 a pass-through so it can be used as a callback.
181 a pass-through so it can be used as a callback.
184 """
182 """
185
183
186 self.log.debug('Process %r started: %r', self.args[0], data)
184 self.log.debug('Process %r started: %r', self.args[0], data)
187 self.start_data = data
185 self.start_data = data
188 self.state = 'running'
186 self.state = 'running'
189 return data
187 return data
190
188
191 def notify_stop(self, data):
189 def notify_stop(self, data):
192 """Call this to trigger process stop actions.
190 """Call this to trigger process stop actions.
193
191
194 This logs the process stopping and sets the state to 'after'. Call
192 This logs the process stopping and sets the state to 'after'. Call
195 this to trigger callbacks registered via :meth:`on_stop`."""
193 this to trigger callbacks registered via :meth:`on_stop`."""
196
194
197 self.log.debug('Process %r stopped: %r', self.args[0], data)
195 self.log.debug('Process %r stopped: %r', self.args[0], data)
198 self.stop_data = data
196 self.stop_data = data
199 self.state = 'after'
197 self.state = 'after'
200 for i in range(len(self.stop_callbacks)):
198 for i in range(len(self.stop_callbacks)):
201 d = self.stop_callbacks.pop()
199 d = self.stop_callbacks.pop()
202 d(data)
200 d(data)
203 return data
201 return data
204
202
205 def signal(self, sig):
203 def signal(self, sig):
206 """Signal the process.
204 """Signal the process.
207
205
208 Parameters
206 Parameters
209 ----------
207 ----------
210 sig : str or int
208 sig : str or int
211 'KILL', 'INT', etc., or any signal number
209 'KILL', 'INT', etc., or any signal number
212 """
210 """
213 raise NotImplementedError('signal must be implemented in a subclass')
211 raise NotImplementedError('signal must be implemented in a subclass')
214
212
215 class ClusterAppMixin(HasTraits):
213 class ClusterAppMixin(HasTraits):
216 """MixIn for cluster args as traits"""
214 """MixIn for cluster args as traits"""
217 profile_dir=Unicode('')
215 profile_dir=Unicode('')
218 cluster_id=Unicode('')
216 cluster_id=Unicode('')
219
217
220 @property
218 @property
221 def cluster_args(self):
219 def cluster_args(self):
222 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
220 return ['--profile-dir', self.profile_dir, '--cluster-id', self.cluster_id]
223
221
224 class ControllerMixin(ClusterAppMixin):
222 class ControllerMixin(ClusterAppMixin):
225 controller_cmd = List(ipcontroller_cmd_argv, config=True,
223 controller_cmd = List(ipcontroller_cmd_argv, config=True,
226 help="""Popen command to launch ipcontroller.""")
224 help="""Popen command to launch ipcontroller.""")
227 # Command line arguments to ipcontroller.
225 # Command line arguments to ipcontroller.
228 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
226 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
229 help="""command-line args to pass to ipcontroller""")
227 help="""command-line args to pass to ipcontroller""")
230
228
231 class EngineMixin(ClusterAppMixin):
229 class EngineMixin(ClusterAppMixin):
232 engine_cmd = List(ipengine_cmd_argv, config=True,
230 engine_cmd = List(ipengine_cmd_argv, config=True,
233 help="""command to launch the Engine.""")
231 help="""command to launch the Engine.""")
234 # Command line arguments for ipengine.
232 # Command line arguments for ipengine.
235 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
233 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 help="command-line arguments to pass to ipengine"
234 help="command-line arguments to pass to ipengine"
237 )
235 )
238
236
239
237
240 #-----------------------------------------------------------------------------
238 #-----------------------------------------------------------------------------
241 # Local process launchers
239 # Local process launchers
242 #-----------------------------------------------------------------------------
240 #-----------------------------------------------------------------------------
243
241
244
242
245 class LocalProcessLauncher(BaseLauncher):
243 class LocalProcessLauncher(BaseLauncher):
246 """Start and stop an external process in an asynchronous manner.
244 """Start and stop an external process in an asynchronous manner.
247
245
248 This will launch the external process with a working directory of
246 This will launch the external process with a working directory of
249 ``self.work_dir``.
247 ``self.work_dir``.
250 """
248 """
251
249
252 # This is used to to construct self.args, which is passed to
250 # This is used to to construct self.args, which is passed to
253 # spawnProcess.
251 # spawnProcess.
254 cmd_and_args = List([])
252 cmd_and_args = List([])
255 poll_frequency = Integer(100) # in ms
253 poll_frequency = Integer(100) # in ms
256
254
257 def __init__(self, work_dir=u'.', config=None, **kwargs):
255 def __init__(self, work_dir=u'.', config=None, **kwargs):
258 super(LocalProcessLauncher, self).__init__(
256 super(LocalProcessLauncher, self).__init__(
259 work_dir=work_dir, config=config, **kwargs
257 work_dir=work_dir, config=config, **kwargs
260 )
258 )
261 self.process = None
259 self.process = None
262 self.poller = None
260 self.poller = None
263
261
264 def find_args(self):
262 def find_args(self):
265 return self.cmd_and_args
263 return self.cmd_and_args
266
264
267 def start(self):
265 def start(self):
268 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
266 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
269 if self.state == 'before':
267 if self.state == 'before':
270 self.process = Popen(self.args,
268 self.process = Popen(self.args,
271 stdout=PIPE,stderr=PIPE,stdin=PIPE,
269 stdout=PIPE,stderr=PIPE,stdin=PIPE,
272 env=os.environ,
270 env=os.environ,
273 cwd=self.work_dir
271 cwd=self.work_dir
274 )
272 )
275 if WINDOWS:
273 if WINDOWS:
276 self.stdout = forward_read_events(self.process.stdout)
274 self.stdout = forward_read_events(self.process.stdout)
277 self.stderr = forward_read_events(self.process.stderr)
275 self.stderr = forward_read_events(self.process.stderr)
278 else:
276 else:
279 self.stdout = self.process.stdout.fileno()
277 self.stdout = self.process.stdout.fileno()
280 self.stderr = self.process.stderr.fileno()
278 self.stderr = self.process.stderr.fileno()
281 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
279 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
282 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
280 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
283 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
281 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
284 self.poller.start()
282 self.poller.start()
285 self.notify_start(self.process.pid)
283 self.notify_start(self.process.pid)
286 else:
284 else:
287 s = 'The process was already started and has state: %r' % self.state
285 s = 'The process was already started and has state: %r' % self.state
288 raise ProcessStateError(s)
286 raise ProcessStateError(s)
289
287
290 def stop(self):
288 def stop(self):
291 return self.interrupt_then_kill()
289 return self.interrupt_then_kill()
292
290
293 def signal(self, sig):
291 def signal(self, sig):
294 if self.state == 'running':
292 if self.state == 'running':
295 if WINDOWS and sig != SIGINT:
293 if WINDOWS and sig != SIGINT:
296 # use Windows tree-kill for better child cleanup
294 # use Windows tree-kill for better child cleanup
297 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
295 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
298 else:
296 else:
299 self.process.send_signal(sig)
297 self.process.send_signal(sig)
300
298
301 def interrupt_then_kill(self, delay=2.0):
299 def interrupt_then_kill(self, delay=2.0):
302 """Send INT, wait a delay and then send KILL."""
300 """Send INT, wait a delay and then send KILL."""
303 try:
301 try:
304 self.signal(SIGINT)
302 self.signal(SIGINT)
305 except Exception:
303 except Exception:
306 self.log.debug("interrupt failed")
304 self.log.debug("interrupt failed")
307 pass
305 pass
308 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
306 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
309 self.killer.start()
307 self.killer.start()
310
308
311 # callbacks, etc:
309 # callbacks, etc:
312
310
313 def handle_stdout(self, fd, events):
311 def handle_stdout(self, fd, events):
314 if WINDOWS:
312 if WINDOWS:
315 line = self.stdout.recv()
313 line = self.stdout.recv()
316 else:
314 else:
317 line = self.process.stdout.readline()
315 line = self.process.stdout.readline()
318 # a stopped process will be readable but return empty strings
316 # a stopped process will be readable but return empty strings
319 if line:
317 if line:
320 self.log.debug(line[:-1])
318 self.log.debug(line[:-1])
321 else:
319 else:
322 self.poll()
320 self.poll()
323
321
324 def handle_stderr(self, fd, events):
322 def handle_stderr(self, fd, events):
325 if WINDOWS:
323 if WINDOWS:
326 line = self.stderr.recv()
324 line = self.stderr.recv()
327 else:
325 else:
328 line = self.process.stderr.readline()
326 line = self.process.stderr.readline()
329 # a stopped process will be readable but return empty strings
327 # a stopped process will be readable but return empty strings
330 if line:
328 if line:
331 self.log.debug(line[:-1])
329 self.log.debug(line[:-1])
332 else:
330 else:
333 self.poll()
331 self.poll()
334
332
335 def poll(self):
333 def poll(self):
336 status = self.process.poll()
334 status = self.process.poll()
337 if status is not None:
335 if status is not None:
338 self.poller.stop()
336 self.poller.stop()
339 self.loop.remove_handler(self.stdout)
337 self.loop.remove_handler(self.stdout)
340 self.loop.remove_handler(self.stderr)
338 self.loop.remove_handler(self.stderr)
341 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
339 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
342 return status
340 return status
343
341
344 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
342 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
345 """Launch a controller as a regular external process."""
343 """Launch a controller as a regular external process."""
346
344
347 def find_args(self):
345 def find_args(self):
348 return self.controller_cmd + self.cluster_args + self.controller_args
346 return self.controller_cmd + self.cluster_args + self.controller_args
349
347
350 def start(self):
348 def start(self):
351 """Start the controller by profile_dir."""
349 """Start the controller by profile_dir."""
352 return super(LocalControllerLauncher, self).start()
350 return super(LocalControllerLauncher, self).start()
353
351
354
352
355 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
353 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
356 """Launch a single engine as a regular externall process."""
354 """Launch a single engine as a regular externall process."""
357
355
358 def find_args(self):
356 def find_args(self):
359 return self.engine_cmd + self.cluster_args + self.engine_args
357 return self.engine_cmd + self.cluster_args + self.engine_args
360
358
361
359
362 class LocalEngineSetLauncher(LocalEngineLauncher):
360 class LocalEngineSetLauncher(LocalEngineLauncher):
363 """Launch a set of engines as regular external processes."""
361 """Launch a set of engines as regular external processes."""
364
362
365 delay = CFloat(0.1, config=True,
363 delay = CFloat(0.1, config=True,
366 help="""delay (in seconds) between starting each engine after the first.
364 help="""delay (in seconds) between starting each engine after the first.
367 This can help force the engines to get their ids in order, or limit
365 This can help force the engines to get their ids in order, or limit
368 process flood when starting many engines."""
366 process flood when starting many engines."""
369 )
367 )
370
368
371 # launcher class
369 # launcher class
372 launcher_class = LocalEngineLauncher
370 launcher_class = LocalEngineLauncher
373
371
374 launchers = Dict()
372 launchers = Dict()
375 stop_data = Dict()
373 stop_data = Dict()
376
374
377 def __init__(self, work_dir=u'.', config=None, **kwargs):
375 def __init__(self, work_dir=u'.', config=None, **kwargs):
378 super(LocalEngineSetLauncher, self).__init__(
376 super(LocalEngineSetLauncher, self).__init__(
379 work_dir=work_dir, config=config, **kwargs
377 work_dir=work_dir, config=config, **kwargs
380 )
378 )
381 self.stop_data = {}
379 self.stop_data = {}
382
380
383 def start(self, n):
381 def start(self, n):
384 """Start n engines by profile or profile_dir."""
382 """Start n engines by profile or profile_dir."""
385 dlist = []
383 dlist = []
386 for i in range(n):
384 for i in range(n):
387 if i > 0:
385 if i > 0:
388 time.sleep(self.delay)
386 time.sleep(self.delay)
389 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
387 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
390 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
388 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
391 )
389 )
392
390
393 # Copy the engine args over to each engine launcher.
391 # Copy the engine args over to each engine launcher.
394 el.engine_cmd = copy.deepcopy(self.engine_cmd)
392 el.engine_cmd = copy.deepcopy(self.engine_cmd)
395 el.engine_args = copy.deepcopy(self.engine_args)
393 el.engine_args = copy.deepcopy(self.engine_args)
396 el.on_stop(self._notice_engine_stopped)
394 el.on_stop(self._notice_engine_stopped)
397 d = el.start()
395 d = el.start()
398 self.launchers[i] = el
396 self.launchers[i] = el
399 dlist.append(d)
397 dlist.append(d)
400 self.notify_start(dlist)
398 self.notify_start(dlist)
401 return dlist
399 return dlist
402
400
403 def find_args(self):
401 def find_args(self):
404 return ['engine set']
402 return ['engine set']
405
403
406 def signal(self, sig):
404 def signal(self, sig):
407 dlist = []
405 dlist = []
408 for el in itervalues(self.launchers):
406 for el in itervalues(self.launchers):
409 d = el.signal(sig)
407 d = el.signal(sig)
410 dlist.append(d)
408 dlist.append(d)
411 return dlist
409 return dlist
412
410
413 def interrupt_then_kill(self, delay=1.0):
411 def interrupt_then_kill(self, delay=1.0):
414 dlist = []
412 dlist = []
415 for el in itervalues(self.launchers):
413 for el in itervalues(self.launchers):
416 d = el.interrupt_then_kill(delay)
414 d = el.interrupt_then_kill(delay)
417 dlist.append(d)
415 dlist.append(d)
418 return dlist
416 return dlist
419
417
420 def stop(self):
418 def stop(self):
421 return self.interrupt_then_kill()
419 return self.interrupt_then_kill()
422
420
423 def _notice_engine_stopped(self, data):
421 def _notice_engine_stopped(self, data):
424 pid = data['pid']
422 pid = data['pid']
425 for idx,el in iteritems(self.launchers):
423 for idx,el in iteritems(self.launchers):
426 if el.process.pid == pid:
424 if el.process.pid == pid:
427 break
425 break
428 self.launchers.pop(idx)
426 self.launchers.pop(idx)
429 self.stop_data[idx] = data
427 self.stop_data[idx] = data
430 if not self.launchers:
428 if not self.launchers:
431 self.notify_stop(self.stop_data)
429 self.notify_stop(self.stop_data)
432
430
433
431
434 #-----------------------------------------------------------------------------
432 #-----------------------------------------------------------------------------
435 # MPI launchers
433 # MPI launchers
436 #-----------------------------------------------------------------------------
434 #-----------------------------------------------------------------------------
437
435
438
436
439 class MPILauncher(LocalProcessLauncher):
437 class MPILauncher(LocalProcessLauncher):
440 """Launch an external process using mpiexec."""
438 """Launch an external process using mpiexec."""
441
439
442 mpi_cmd = List(['mpiexec'], config=True,
440 mpi_cmd = List(['mpiexec'], config=True,
443 help="The mpiexec command to use in starting the process."
441 help="The mpiexec command to use in starting the process."
444 )
442 )
445 mpi_args = List([], config=True,
443 mpi_args = List([], config=True,
446 help="The command line arguments to pass to mpiexec."
444 help="The command line arguments to pass to mpiexec."
447 )
445 )
448 program = List(['date'],
446 program = List(['date'],
449 help="The program to start via mpiexec.")
447 help="The program to start via mpiexec.")
450 program_args = List([],
448 program_args = List([],
451 help="The command line argument to the program."
449 help="The command line argument to the program."
452 )
450 )
453 n = Integer(1)
451 n = Integer(1)
454
452
455 def __init__(self, *args, **kwargs):
453 def __init__(self, *args, **kwargs):
456 # deprecation for old MPIExec names:
454 # deprecation for old MPIExec names:
457 config = kwargs.get('config', {})
455 config = kwargs.get('config', {})
458 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
456 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
459 deprecated = config.get(oldname)
457 deprecated = config.get(oldname)
460 if deprecated:
458 if deprecated:
461 newname = oldname.replace('MPIExec', 'MPI')
459 newname = oldname.replace('MPIExec', 'MPI')
462 config[newname].update(deprecated)
460 config[newname].update(deprecated)
463 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
461 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
464
462
465 super(MPILauncher, self).__init__(*args, **kwargs)
463 super(MPILauncher, self).__init__(*args, **kwargs)
466
464
467 def find_args(self):
465 def find_args(self):
468 """Build self.args using all the fields."""
466 """Build self.args using all the fields."""
469 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
467 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
470 self.program + self.program_args
468 self.program + self.program_args
471
469
472 def start(self, n):
470 def start(self, n):
473 """Start n instances of the program using mpiexec."""
471 """Start n instances of the program using mpiexec."""
474 self.n = n
472 self.n = n
475 return super(MPILauncher, self).start()
473 return super(MPILauncher, self).start()
476
474
477
475
478 class MPIControllerLauncher(MPILauncher, ControllerMixin):
476 class MPIControllerLauncher(MPILauncher, ControllerMixin):
479 """Launch a controller using mpiexec."""
477 """Launch a controller using mpiexec."""
480
478
481 # alias back to *non-configurable* program[_args] for use in find_args()
479 # alias back to *non-configurable* program[_args] for use in find_args()
482 # this way all Controller/EngineSetLaunchers have the same form, rather
480 # this way all Controller/EngineSetLaunchers have the same form, rather
483 # than *some* having `program_args` and others `controller_args`
481 # than *some* having `program_args` and others `controller_args`
484 @property
482 @property
485 def program(self):
483 def program(self):
486 return self.controller_cmd
484 return self.controller_cmd
487
485
488 @property
486 @property
489 def program_args(self):
487 def program_args(self):
490 return self.cluster_args + self.controller_args
488 return self.cluster_args + self.controller_args
491
489
492 def start(self):
490 def start(self):
493 """Start the controller by profile_dir."""
491 """Start the controller by profile_dir."""
494 return super(MPIControllerLauncher, self).start(1)
492 return super(MPIControllerLauncher, self).start(1)
495
493
496
494
497 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
495 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
498 """Launch engines using mpiexec"""
496 """Launch engines using mpiexec"""
499
497
500 # alias back to *non-configurable* program[_args] for use in find_args()
498 # alias back to *non-configurable* program[_args] for use in find_args()
501 # this way all Controller/EngineSetLaunchers have the same form, rather
499 # this way all Controller/EngineSetLaunchers have the same form, rather
502 # than *some* having `program_args` and others `controller_args`
500 # than *some* having `program_args` and others `controller_args`
503 @property
501 @property
504 def program(self):
502 def program(self):
505 return self.engine_cmd
503 return self.engine_cmd
506
504
507 @property
505 @property
508 def program_args(self):
506 def program_args(self):
509 return self.cluster_args + self.engine_args
507 return self.cluster_args + self.engine_args
510
508
511 def start(self, n):
509 def start(self, n):
512 """Start n engines by profile or profile_dir."""
510 """Start n engines by profile or profile_dir."""
513 self.n = n
511 self.n = n
514 return super(MPIEngineSetLauncher, self).start(n)
512 return super(MPIEngineSetLauncher, self).start(n)
515
513
516 # deprecated MPIExec names
514 # deprecated MPIExec names
517 class DeprecatedMPILauncher(object):
515 class DeprecatedMPILauncher(object):
518 def warn(self):
516 def warn(self):
519 oldname = self.__class__.__name__
517 oldname = self.__class__.__name__
520 newname = oldname.replace('MPIExec', 'MPI')
518 newname = oldname.replace('MPIExec', 'MPI')
521 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
519 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
522
520
523 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
521 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
524 """Deprecated, use MPILauncher"""
522 """Deprecated, use MPILauncher"""
525 def __init__(self, *args, **kwargs):
523 def __init__(self, *args, **kwargs):
526 super(MPIExecLauncher, self).__init__(*args, **kwargs)
524 super(MPIExecLauncher, self).__init__(*args, **kwargs)
527 self.warn()
525 self.warn()
528
526
529 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
527 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
530 """Deprecated, use MPIControllerLauncher"""
528 """Deprecated, use MPIControllerLauncher"""
531 def __init__(self, *args, **kwargs):
529 def __init__(self, *args, **kwargs):
532 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
530 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
533 self.warn()
531 self.warn()
534
532
535 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
533 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
536 """Deprecated, use MPIEngineSetLauncher"""
534 """Deprecated, use MPIEngineSetLauncher"""
537 def __init__(self, *args, **kwargs):
535 def __init__(self, *args, **kwargs):
538 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
536 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
539 self.warn()
537 self.warn()
540
538
541
539
542 #-----------------------------------------------------------------------------
540 #-----------------------------------------------------------------------------
543 # SSH launchers
541 # SSH launchers
544 #-----------------------------------------------------------------------------
542 #-----------------------------------------------------------------------------
545
543
546 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
544 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
547
545
548 class SSHLauncher(LocalProcessLauncher):
546 class SSHLauncher(LocalProcessLauncher):
549 """A minimal launcher for ssh.
547 """A minimal launcher for ssh.
550
548
551 To be useful this will probably have to be extended to use the ``sshx``
549 To be useful this will probably have to be extended to use the ``sshx``
552 idea for environment variables. There could be other things this needs
550 idea for environment variables. There could be other things this needs
553 as well.
551 as well.
554 """
552 """
555
553
556 ssh_cmd = List(['ssh'], config=True,
554 ssh_cmd = List(['ssh'], config=True,
557 help="command for starting ssh")
555 help="command for starting ssh")
558 ssh_args = List(['-tt'], config=True,
556 ssh_args = List(['-tt'], config=True,
559 help="args to pass to ssh")
557 help="args to pass to ssh")
560 scp_cmd = List(['scp'], config=True,
558 scp_cmd = List(['scp'], config=True,
561 help="command for sending files")
559 help="command for sending files")
562 program = List(['date'],
560 program = List(['date'],
563 help="Program to launch via ssh")
561 help="Program to launch via ssh")
564 program_args = List([],
562 program_args = List([],
565 help="args to pass to remote program")
563 help="args to pass to remote program")
566 hostname = Unicode('', config=True,
564 hostname = Unicode('', config=True,
567 help="hostname on which to launch the program")
565 help="hostname on which to launch the program")
568 user = Unicode('', config=True,
566 user = Unicode('', config=True,
569 help="username for ssh")
567 help="username for ssh")
570 location = Unicode('', config=True,
568 location = Unicode('', config=True,
571 help="user@hostname location for ssh in one setting")
569 help="user@hostname location for ssh in one setting")
572 to_fetch = List([], config=True,
570 to_fetch = List([], config=True,
573 help="List of (remote, local) files to fetch after starting")
571 help="List of (remote, local) files to fetch after starting")
574 to_send = List([], config=True,
572 to_send = List([], config=True,
575 help="List of (local, remote) files to send before starting")
573 help="List of (local, remote) files to send before starting")
576
574
577 def _hostname_changed(self, name, old, new):
575 def _hostname_changed(self, name, old, new):
578 if self.user:
576 if self.user:
579 self.location = u'%s@%s' % (self.user, new)
577 self.location = u'%s@%s' % (self.user, new)
580 else:
578 else:
581 self.location = new
579 self.location = new
582
580
583 def _user_changed(self, name, old, new):
581 def _user_changed(self, name, old, new):
584 self.location = u'%s@%s' % (new, self.hostname)
582 self.location = u'%s@%s' % (new, self.hostname)
585
583
586 def find_args(self):
584 def find_args(self):
587 return self.ssh_cmd + self.ssh_args + [self.location] + \
585 return self.ssh_cmd + self.ssh_args + [self.location] + \
588 list(map(pipes.quote, self.program + self.program_args))
586 list(map(pipes.quote, self.program + self.program_args))
589
587
590 def _send_file(self, local, remote):
588 def _send_file(self, local, remote):
591 """send a single file"""
589 """send a single file"""
592 remote = "%s:%s" % (self.location, remote)
590 remote = "%s:%s" % (self.location, remote)
593 for i in range(10):
591 for i in range(10):
594 if not os.path.exists(local):
592 if not os.path.exists(local):
595 self.log.debug("waiting for %s" % local)
593 self.log.debug("waiting for %s" % local)
596 time.sleep(1)
594 time.sleep(1)
597 else:
595 else:
598 break
596 break
599 self.log.info("sending %s to %s", local, remote)
597 self.log.info("sending %s to %s", local, remote)
600 check_output(self.scp_cmd + [local, remote])
598 check_output(self.scp_cmd + [local, remote])
601
599
602 def send_files(self):
600 def send_files(self):
603 """send our files (called before start)"""
601 """send our files (called before start)"""
604 if not self.to_send:
602 if not self.to_send:
605 return
603 return
606 for local_file, remote_file in self.to_send:
604 for local_file, remote_file in self.to_send:
607 self._send_file(local_file, remote_file)
605 self._send_file(local_file, remote_file)
608
606
609 def _fetch_file(self, remote, local):
607 def _fetch_file(self, remote, local):
610 """fetch a single file"""
608 """fetch a single file"""
611 full_remote = "%s:%s" % (self.location, remote)
609 full_remote = "%s:%s" % (self.location, remote)
612 self.log.info("fetching %s from %s", local, full_remote)
610 self.log.info("fetching %s from %s", local, full_remote)
613 for i in range(10):
611 for i in range(10):
614 # wait up to 10s for remote file to exist
612 # wait up to 10s for remote file to exist
615 check = check_output(self.ssh_cmd + self.ssh_args + \
613 check = check_output(self.ssh_cmd + self.ssh_args + \
616 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
614 [self.location, 'test -e', remote, "&& echo 'yes' || echo 'no'"])
617 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
615 check = check.decode(DEFAULT_ENCODING, 'replace').strip()
618 if check == u'no':
616 if check == u'no':
619 time.sleep(1)
617 time.sleep(1)
620 elif check == u'yes':
618 elif check == u'yes':
621 break
619 break
622 check_output(self.scp_cmd + [full_remote, local])
620 check_output(self.scp_cmd + [full_remote, local])
623
621
624 def fetch_files(self):
622 def fetch_files(self):
625 """fetch remote files (called after start)"""
623 """fetch remote files (called after start)"""
626 if not self.to_fetch:
624 if not self.to_fetch:
627 return
625 return
628 for remote_file, local_file in self.to_fetch:
626 for remote_file, local_file in self.to_fetch:
629 self._fetch_file(remote_file, local_file)
627 self._fetch_file(remote_file, local_file)
630
628
631 def start(self, hostname=None, user=None):
629 def start(self, hostname=None, user=None):
632 if hostname is not None:
630 if hostname is not None:
633 self.hostname = hostname
631 self.hostname = hostname
634 if user is not None:
632 if user is not None:
635 self.user = user
633 self.user = user
636
634
637 self.send_files()
635 self.send_files()
638 super(SSHLauncher, self).start()
636 super(SSHLauncher, self).start()
639 self.fetch_files()
637 self.fetch_files()
640
638
641 def signal(self, sig):
639 def signal(self, sig):
642 if self.state == 'running':
640 if self.state == 'running':
643 # send escaped ssh connection-closer
641 # send escaped ssh connection-closer
644 self.process.stdin.write('~.')
642 self.process.stdin.write('~.')
645 self.process.stdin.flush()
643 self.process.stdin.flush()
646
644
647 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
645 class SSHClusterLauncher(SSHLauncher, ClusterAppMixin):
648
646
649 remote_profile_dir = Unicode('', config=True,
647 remote_profile_dir = Unicode('', config=True,
650 help="""The remote profile_dir to use.
648 help="""The remote profile_dir to use.
651
649
652 If not specified, use calling profile, stripping out possible leading homedir.
650 If not specified, use calling profile, stripping out possible leading homedir.
653 """)
651 """)
654
652
655 def _profile_dir_changed(self, name, old, new):
653 def _profile_dir_changed(self, name, old, new):
656 if not self.remote_profile_dir:
654 if not self.remote_profile_dir:
657 # trigger remote_profile_dir_default logic again,
655 # trigger remote_profile_dir_default logic again,
658 # in case it was already triggered before profile_dir was set
656 # in case it was already triggered before profile_dir was set
659 self.remote_profile_dir = self._strip_home(new)
657 self.remote_profile_dir = self._strip_home(new)
660
658
661 @staticmethod
659 @staticmethod
662 def _strip_home(path):
660 def _strip_home(path):
663 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
661 """turns /home/you/.ipython/profile_foo into .ipython/profile_foo"""
664 home = get_home_dir()
662 home = get_home_dir()
665 if not home.endswith('/'):
663 if not home.endswith('/'):
666 home = home+'/'
664 home = home+'/'
667
665
668 if path.startswith(home):
666 if path.startswith(home):
669 return path[len(home):]
667 return path[len(home):]
670 else:
668 else:
671 return path
669 return path
672
670
673 def _remote_profile_dir_default(self):
671 def _remote_profile_dir_default(self):
674 return self._strip_home(self.profile_dir)
672 return self._strip_home(self.profile_dir)
675
673
676 def _cluster_id_changed(self, name, old, new):
674 def _cluster_id_changed(self, name, old, new):
677 if new:
675 if new:
678 raise ValueError("cluster id not supported by SSH launchers")
676 raise ValueError("cluster id not supported by SSH launchers")
679
677
680 @property
678 @property
681 def cluster_args(self):
679 def cluster_args(self):
682 return ['--profile-dir', self.remote_profile_dir]
680 return ['--profile-dir', self.remote_profile_dir]
683
681
684 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
682 class SSHControllerLauncher(SSHClusterLauncher, ControllerMixin):
685
683
686 # alias back to *non-configurable* program[_args] for use in find_args()
684 # alias back to *non-configurable* program[_args] for use in find_args()
687 # this way all Controller/EngineSetLaunchers have the same form, rather
685 # this way all Controller/EngineSetLaunchers have the same form, rather
688 # than *some* having `program_args` and others `controller_args`
686 # than *some* having `program_args` and others `controller_args`
689
687
690 def _controller_cmd_default(self):
688 def _controller_cmd_default(self):
691 return ['ipcontroller']
689 return ['ipcontroller']
692
690
693 @property
691 @property
694 def program(self):
692 def program(self):
695 return self.controller_cmd
693 return self.controller_cmd
696
694
697 @property
695 @property
698 def program_args(self):
696 def program_args(self):
699 return self.cluster_args + self.controller_args
697 return self.cluster_args + self.controller_args
700
698
701 def _to_fetch_default(self):
699 def _to_fetch_default(self):
702 return [
700 return [
703 (os.path.join(self.remote_profile_dir, 'security', cf),
701 (os.path.join(self.remote_profile_dir, 'security', cf),
704 os.path.join(self.profile_dir, 'security', cf),)
702 os.path.join(self.profile_dir, 'security', cf),)
705 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
703 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
706 ]
704 ]
707
705
708 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
706 class SSHEngineLauncher(SSHClusterLauncher, EngineMixin):
709
707
710 # alias back to *non-configurable* program[_args] for use in find_args()
708 # alias back to *non-configurable* program[_args] for use in find_args()
711 # this way all Controller/EngineSetLaunchers have the same form, rather
709 # this way all Controller/EngineSetLaunchers have the same form, rather
712 # than *some* having `program_args` and others `controller_args`
710 # than *some* having `program_args` and others `controller_args`
713
711
714 def _engine_cmd_default(self):
712 def _engine_cmd_default(self):
715 return ['ipengine']
713 return ['ipengine']
716
714
717 @property
715 @property
718 def program(self):
716 def program(self):
719 return self.engine_cmd
717 return self.engine_cmd
720
718
721 @property
719 @property
722 def program_args(self):
720 def program_args(self):
723 return self.cluster_args + self.engine_args
721 return self.cluster_args + self.engine_args
724
722
725 def _to_send_default(self):
723 def _to_send_default(self):
726 return [
724 return [
727 (os.path.join(self.profile_dir, 'security', cf),
725 (os.path.join(self.profile_dir, 'security', cf),
728 os.path.join(self.remote_profile_dir, 'security', cf))
726 os.path.join(self.remote_profile_dir, 'security', cf))
729 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
727 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
730 ]
728 ]
731
729
732
730
733 class SSHEngineSetLauncher(LocalEngineSetLauncher):
731 class SSHEngineSetLauncher(LocalEngineSetLauncher):
734 launcher_class = SSHEngineLauncher
732 launcher_class = SSHEngineLauncher
735 engines = Dict(config=True,
733 engines = Dict(config=True,
736 help="""dict of engines to launch. This is a dict by hostname of ints,
734 help="""dict of engines to launch. This is a dict by hostname of ints,
737 corresponding to the number of engines to start on that host.""")
735 corresponding to the number of engines to start on that host.""")
738
736
739 def _engine_cmd_default(self):
737 def _engine_cmd_default(self):
740 return ['ipengine']
738 return ['ipengine']
741
739
742 @property
740 @property
743 def engine_count(self):
741 def engine_count(self):
744 """determine engine count from `engines` dict"""
742 """determine engine count from `engines` dict"""
745 count = 0
743 count = 0
746 for n in itervalues(self.engines):
744 for n in itervalues(self.engines):
747 if isinstance(n, (tuple,list)):
745 if isinstance(n, (tuple,list)):
748 n,args = n
746 n,args = n
749 count += n
747 count += n
750 return count
748 return count
751
749
752 def start(self, n):
750 def start(self, n):
753 """Start engines by profile or profile_dir.
751 """Start engines by profile or profile_dir.
754 `n` is ignored, and the `engines` config property is used instead.
752 `n` is ignored, and the `engines` config property is used instead.
755 """
753 """
756
754
757 dlist = []
755 dlist = []
758 for host, n in iteritems(self.engines):
756 for host, n in iteritems(self.engines):
759 if isinstance(n, (tuple, list)):
757 if isinstance(n, (tuple, list)):
760 n, args = n
758 n, args = n
761 else:
759 else:
762 args = copy.deepcopy(self.engine_args)
760 args = copy.deepcopy(self.engine_args)
763
761
764 if '@' in host:
762 if '@' in host:
765 user,host = host.split('@',1)
763 user,host = host.split('@',1)
766 else:
764 else:
767 user=None
765 user=None
768 for i in range(n):
766 for i in range(n):
769 if i > 0:
767 if i > 0:
770 time.sleep(self.delay)
768 time.sleep(self.delay)
771 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
769 el = self.launcher_class(work_dir=self.work_dir, parent=self, log=self.log,
772 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
770 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
773 )
771 )
774 if i > 0:
772 if i > 0:
775 # only send files for the first engine on each host
773 # only send files for the first engine on each host
776 el.to_send = []
774 el.to_send = []
777
775
778 # Copy the engine args over to each engine launcher.
776 # Copy the engine args over to each engine launcher.
779 el.engine_cmd = self.engine_cmd
777 el.engine_cmd = self.engine_cmd
780 el.engine_args = args
778 el.engine_args = args
781 el.on_stop(self._notice_engine_stopped)
779 el.on_stop(self._notice_engine_stopped)
782 d = el.start(user=user, hostname=host)
780 d = el.start(user=user, hostname=host)
783 self.launchers[ "%s/%i" % (host,i) ] = el
781 self.launchers[ "%s/%i" % (host,i) ] = el
784 dlist.append(d)
782 dlist.append(d)
785 self.notify_start(dlist)
783 self.notify_start(dlist)
786 return dlist
784 return dlist
787
785
788
786
789 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
787 class SSHProxyEngineSetLauncher(SSHClusterLauncher):
790 """Launcher for calling
788 """Launcher for calling
791 `ipcluster engines` on a remote machine.
789 `ipcluster engines` on a remote machine.
792
790
793 Requires that remote profile is already configured.
791 Requires that remote profile is already configured.
794 """
792 """
795
793
796 n = Integer()
794 n = Integer()
797 ipcluster_cmd = List(['ipcluster'], config=True)
795 ipcluster_cmd = List(['ipcluster'], config=True)
798
796
799 @property
797 @property
800 def program(self):
798 def program(self):
801 return self.ipcluster_cmd + ['engines']
799 return self.ipcluster_cmd + ['engines']
802
800
803 @property
801 @property
804 def program_args(self):
802 def program_args(self):
805 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
803 return ['-n', str(self.n), '--profile-dir', self.remote_profile_dir]
806
804
807 def _to_send_default(self):
805 def _to_send_default(self):
808 return [
806 return [
809 (os.path.join(self.profile_dir, 'security', cf),
807 (os.path.join(self.profile_dir, 'security', cf),
810 os.path.join(self.remote_profile_dir, 'security', cf))
808 os.path.join(self.remote_profile_dir, 'security', cf))
811 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
809 for cf in ('ipcontroller-client.json', 'ipcontroller-engine.json')
812 ]
810 ]
813
811
814 def start(self, n):
812 def start(self, n):
815 self.n = n
813 self.n = n
816 super(SSHProxyEngineSetLauncher, self).start()
814 super(SSHProxyEngineSetLauncher, self).start()
817
815
818
816
819 #-----------------------------------------------------------------------------
817 #-----------------------------------------------------------------------------
820 # Windows HPC Server 2008 scheduler launchers
818 # Windows HPC Server 2008 scheduler launchers
821 #-----------------------------------------------------------------------------
819 #-----------------------------------------------------------------------------
822
820
823
821
824 # This is only used on Windows.
822 # This is only used on Windows.
825 def find_job_cmd():
823 def find_job_cmd():
826 if WINDOWS:
824 if WINDOWS:
827 try:
825 try:
828 return find_cmd('job')
826 return find_cmd('job')
829 except (FindCmdError, ImportError):
827 except (FindCmdError, ImportError):
830 # ImportError will be raised if win32api is not installed
828 # ImportError will be raised if win32api is not installed
831 return 'job'
829 return 'job'
832 else:
830 else:
833 return 'job'
831 return 'job'
834
832
835
833
836 class WindowsHPCLauncher(BaseLauncher):
834 class WindowsHPCLauncher(BaseLauncher):
837
835
838 job_id_regexp = CRegExp(r'\d+', config=True,
836 job_id_regexp = CRegExp(r'\d+', config=True,
839 help="""A regular expression used to get the job id from the output of the
837 help="""A regular expression used to get the job id from the output of the
840 submit_command. """
838 submit_command. """
841 )
839 )
842 job_file_name = Unicode(u'ipython_job.xml', config=True,
840 job_file_name = Unicode(u'ipython_job.xml', config=True,
843 help="The filename of the instantiated job script.")
841 help="The filename of the instantiated job script.")
844 # The full path to the instantiated job script. This gets made dynamically
842 # The full path to the instantiated job script. This gets made dynamically
845 # by combining the work_dir with the job_file_name.
843 # by combining the work_dir with the job_file_name.
846 job_file = Unicode(u'')
844 job_file = Unicode(u'')
847 scheduler = Unicode('', config=True,
845 scheduler = Unicode('', config=True,
848 help="The hostname of the scheduler to submit the job to.")
846 help="The hostname of the scheduler to submit the job to.")
849 job_cmd = Unicode(find_job_cmd(), config=True,
847 job_cmd = Unicode(find_job_cmd(), config=True,
850 help="The command for submitting jobs.")
848 help="The command for submitting jobs.")
851
849
852 def __init__(self, work_dir=u'.', config=None, **kwargs):
850 def __init__(self, work_dir=u'.', config=None, **kwargs):
853 super(WindowsHPCLauncher, self).__init__(
851 super(WindowsHPCLauncher, self).__init__(
854 work_dir=work_dir, config=config, **kwargs
852 work_dir=work_dir, config=config, **kwargs
855 )
853 )
856
854
857 @property
855 @property
858 def job_file(self):
856 def job_file(self):
859 return os.path.join(self.work_dir, self.job_file_name)
857 return os.path.join(self.work_dir, self.job_file_name)
860
858
861 def write_job_file(self, n):
859 def write_job_file(self, n):
862 raise NotImplementedError("Implement write_job_file in a subclass.")
860 raise NotImplementedError("Implement write_job_file in a subclass.")
863
861
864 def find_args(self):
862 def find_args(self):
865 return [u'job.exe']
863 return [u'job.exe']
866
864
867 def parse_job_id(self, output):
865 def parse_job_id(self, output):
868 """Take the output of the submit command and return the job id."""
866 """Take the output of the submit command and return the job id."""
869 m = self.job_id_regexp.search(output)
867 m = self.job_id_regexp.search(output)
870 if m is not None:
868 if m is not None:
871 job_id = m.group()
869 job_id = m.group()
872 else:
870 else:
873 raise LauncherError("Job id couldn't be determined: %s" % output)
871 raise LauncherError("Job id couldn't be determined: %s" % output)
874 self.job_id = job_id
872 self.job_id = job_id
875 self.log.info('Job started with id: %r', job_id)
873 self.log.info('Job started with id: %r', job_id)
876 return job_id
874 return job_id
877
875
878 def start(self, n):
876 def start(self, n):
879 """Start n copies of the process using the Win HPC job scheduler."""
877 """Start n copies of the process using the Win HPC job scheduler."""
880 self.write_job_file(n)
878 self.write_job_file(n)
881 args = [
879 args = [
882 'submit',
880 'submit',
883 '/jobfile:%s' % self.job_file,
881 '/jobfile:%s' % self.job_file,
884 '/scheduler:%s' % self.scheduler
882 '/scheduler:%s' % self.scheduler
885 ]
883 ]
886 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
884 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
887
885
888 output = check_output([self.job_cmd]+args,
886 output = check_output([self.job_cmd]+args,
889 env=os.environ,
887 env=os.environ,
890 cwd=self.work_dir,
888 cwd=self.work_dir,
891 stderr=STDOUT
889 stderr=STDOUT
892 )
890 )
893 output = output.decode(DEFAULT_ENCODING, 'replace')
891 output = output.decode(DEFAULT_ENCODING, 'replace')
894 job_id = self.parse_job_id(output)
892 job_id = self.parse_job_id(output)
895 self.notify_start(job_id)
893 self.notify_start(job_id)
896 return job_id
894 return job_id
897
895
898 def stop(self):
896 def stop(self):
899 args = [
897 args = [
900 'cancel',
898 'cancel',
901 self.job_id,
899 self.job_id,
902 '/scheduler:%s' % self.scheduler
900 '/scheduler:%s' % self.scheduler
903 ]
901 ]
904 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
902 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
905 try:
903 try:
906 output = check_output([self.job_cmd]+args,
904 output = check_output([self.job_cmd]+args,
907 env=os.environ,
905 env=os.environ,
908 cwd=self.work_dir,
906 cwd=self.work_dir,
909 stderr=STDOUT
907 stderr=STDOUT
910 )
908 )
911 output = output.decode(DEFAULT_ENCODING, 'replace')
909 output = output.decode(DEFAULT_ENCODING, 'replace')
912 except:
910 except:
913 output = u'The job already appears to be stopped: %r' % self.job_id
911 output = u'The job already appears to be stopped: %r' % self.job_id
914 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
912 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
915 return output
913 return output
916
914
917
915
918 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
916 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
919
917
920 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
918 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
921 help="WinHPC xml job file.")
919 help="WinHPC xml job file.")
922 controller_args = List([], config=False,
920 controller_args = List([], config=False,
923 help="extra args to pass to ipcontroller")
921 help="extra args to pass to ipcontroller")
924
922
925 def write_job_file(self, n):
923 def write_job_file(self, n):
926 job = IPControllerJob(parent=self)
924 job = IPControllerJob(parent=self)
927
925
928 t = IPControllerTask(parent=self)
926 t = IPControllerTask(parent=self)
929 # The tasks work directory is *not* the actual work directory of
927 # The tasks work directory is *not* the actual work directory of
930 # the controller. It is used as the base path for the stdout/stderr
928 # the controller. It is used as the base path for the stdout/stderr
931 # files that the scheduler redirects to.
929 # files that the scheduler redirects to.
932 t.work_directory = self.profile_dir
930 t.work_directory = self.profile_dir
933 # Add the profile_dir and from self.start().
931 # Add the profile_dir and from self.start().
934 t.controller_args.extend(self.cluster_args)
932 t.controller_args.extend(self.cluster_args)
935 t.controller_args.extend(self.controller_args)
933 t.controller_args.extend(self.controller_args)
936 job.add_task(t)
934 job.add_task(t)
937
935
938 self.log.debug("Writing job description file: %s", self.job_file)
936 self.log.debug("Writing job description file: %s", self.job_file)
939 job.write(self.job_file)
937 job.write(self.job_file)
940
938
941 @property
939 @property
942 def job_file(self):
940 def job_file(self):
943 return os.path.join(self.profile_dir, self.job_file_name)
941 return os.path.join(self.profile_dir, self.job_file_name)
944
942
945 def start(self):
943 def start(self):
946 """Start the controller by profile_dir."""
944 """Start the controller by profile_dir."""
947 return super(WindowsHPCControllerLauncher, self).start(1)
945 return super(WindowsHPCControllerLauncher, self).start(1)
948
946
949
947
950 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
948 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
951
949
952 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
950 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
953 help="jobfile for ipengines job")
951 help="jobfile for ipengines job")
954 engine_args = List([], config=False,
952 engine_args = List([], config=False,
955 help="extra args to pas to ipengine")
953 help="extra args to pas to ipengine")
956
954
957 def write_job_file(self, n):
955 def write_job_file(self, n):
958 job = IPEngineSetJob(parent=self)
956 job = IPEngineSetJob(parent=self)
959
957
960 for i in range(n):
958 for i in range(n):
961 t = IPEngineTask(parent=self)
959 t = IPEngineTask(parent=self)
962 # The tasks work directory is *not* the actual work directory of
960 # The tasks work directory is *not* the actual work directory of
963 # the engine. It is used as the base path for the stdout/stderr
961 # the engine. It is used as the base path for the stdout/stderr
964 # files that the scheduler redirects to.
962 # files that the scheduler redirects to.
965 t.work_directory = self.profile_dir
963 t.work_directory = self.profile_dir
966 # Add the profile_dir and from self.start().
964 # Add the profile_dir and from self.start().
967 t.engine_args.extend(self.cluster_args)
965 t.engine_args.extend(self.cluster_args)
968 t.engine_args.extend(self.engine_args)
966 t.engine_args.extend(self.engine_args)
969 job.add_task(t)
967 job.add_task(t)
970
968
971 self.log.debug("Writing job description file: %s", self.job_file)
969 self.log.debug("Writing job description file: %s", self.job_file)
972 job.write(self.job_file)
970 job.write(self.job_file)
973
971
974 @property
972 @property
975 def job_file(self):
973 def job_file(self):
976 return os.path.join(self.profile_dir, self.job_file_name)
974 return os.path.join(self.profile_dir, self.job_file_name)
977
975
978 def start(self, n):
976 def start(self, n):
979 """Start the controller by profile_dir."""
977 """Start the controller by profile_dir."""
980 return super(WindowsHPCEngineSetLauncher, self).start(n)
978 return super(WindowsHPCEngineSetLauncher, self).start(n)
981
979
982
980
983 #-----------------------------------------------------------------------------
981 #-----------------------------------------------------------------------------
984 # Batch (PBS) system launchers
982 # Batch (PBS) system launchers
985 #-----------------------------------------------------------------------------
983 #-----------------------------------------------------------------------------
986
984
987 class BatchClusterAppMixin(ClusterAppMixin):
985 class BatchClusterAppMixin(ClusterAppMixin):
988 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
986 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
989 def _profile_dir_changed(self, name, old, new):
987 def _profile_dir_changed(self, name, old, new):
990 self.context[name] = new
988 self.context[name] = new
991 _cluster_id_changed = _profile_dir_changed
989 _cluster_id_changed = _profile_dir_changed
992
990
993 def _profile_dir_default(self):
991 def _profile_dir_default(self):
994 self.context['profile_dir'] = ''
992 self.context['profile_dir'] = ''
995 return ''
993 return ''
996 def _cluster_id_default(self):
994 def _cluster_id_default(self):
997 self.context['cluster_id'] = ''
995 self.context['cluster_id'] = ''
998 return ''
996 return ''
999
997
1000
998
1001 class BatchSystemLauncher(BaseLauncher):
999 class BatchSystemLauncher(BaseLauncher):
1002 """Launch an external process using a batch system.
1000 """Launch an external process using a batch system.
1003
1001
1004 This class is designed to work with UNIX batch systems like PBS, LSF,
1002 This class is designed to work with UNIX batch systems like PBS, LSF,
1005 GridEngine, etc. The overall model is that there are different commands
1003 GridEngine, etc. The overall model is that there are different commands
1006 like qsub, qdel, etc. that handle the starting and stopping of the process.
1004 like qsub, qdel, etc. that handle the starting and stopping of the process.
1007
1005
1008 This class also has the notion of a batch script. The ``batch_template``
1006 This class also has the notion of a batch script. The ``batch_template``
1009 attribute can be set to a string that is a template for the batch script.
1007 attribute can be set to a string that is a template for the batch script.
1010 This template is instantiated using string formatting. Thus the template can
1008 This template is instantiated using string formatting. Thus the template can
1011 use {n} fot the number of instances. Subclasses can add additional variables
1009 use {n} fot the number of instances. Subclasses can add additional variables
1012 to the template dict.
1010 to the template dict.
1013 """
1011 """
1014
1012
1015 # Subclasses must fill these in. See PBSEngineSet
1013 # Subclasses must fill these in. See PBSEngineSet
1016 submit_command = List([''], config=True,
1014 submit_command = List([''], config=True,
1017 help="The name of the command line program used to submit jobs.")
1015 help="The name of the command line program used to submit jobs.")
1018 delete_command = List([''], config=True,
1016 delete_command = List([''], config=True,
1019 help="The name of the command line program used to delete jobs.")
1017 help="The name of the command line program used to delete jobs.")
1020 job_id_regexp = CRegExp('', config=True,
1018 job_id_regexp = CRegExp('', config=True,
1021 help="""A regular expression used to get the job id from the output of the
1019 help="""A regular expression used to get the job id from the output of the
1022 submit_command.""")
1020 submit_command.""")
1023 job_id_regexp_group = Integer(0, config=True,
1021 job_id_regexp_group = Integer(0, config=True,
1024 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1022 help="""The group we wish to match in job_id_regexp (0 to match all)""")
1025 batch_template = Unicode('', config=True,
1023 batch_template = Unicode('', config=True,
1026 help="The string that is the batch script template itself.")
1024 help="The string that is the batch script template itself.")
1027 batch_template_file = Unicode(u'', config=True,
1025 batch_template_file = Unicode(u'', config=True,
1028 help="The file that contains the batch template.")
1026 help="The file that contains the batch template.")
1029 batch_file_name = Unicode(u'batch_script', config=True,
1027 batch_file_name = Unicode(u'batch_script', config=True,
1030 help="The filename of the instantiated batch script.")
1028 help="The filename of the instantiated batch script.")
1031 queue = Unicode(u'', config=True,
1029 queue = Unicode(u'', config=True,
1032 help="The PBS Queue.")
1030 help="The PBS Queue.")
1033
1031
1034 def _queue_changed(self, name, old, new):
1032 def _queue_changed(self, name, old, new):
1035 self.context[name] = new
1033 self.context[name] = new
1036
1034
1037 n = Integer(1)
1035 n = Integer(1)
1038 _n_changed = _queue_changed
1036 _n_changed = _queue_changed
1039
1037
1040 # not configurable, override in subclasses
1038 # not configurable, override in subclasses
1041 # PBS Job Array regex
1039 # PBS Job Array regex
1042 job_array_regexp = CRegExp('')
1040 job_array_regexp = CRegExp('')
1043 job_array_template = Unicode('')
1041 job_array_template = Unicode('')
1044 # PBS Queue regex
1042 # PBS Queue regex
1045 queue_regexp = CRegExp('')
1043 queue_regexp = CRegExp('')
1046 queue_template = Unicode('')
1044 queue_template = Unicode('')
1047 # The default batch template, override in subclasses
1045 # The default batch template, override in subclasses
1048 default_template = Unicode('')
1046 default_template = Unicode('')
1049 # The full path to the instantiated batch script.
1047 # The full path to the instantiated batch script.
1050 batch_file = Unicode(u'')
1048 batch_file = Unicode(u'')
1051 # the format dict used with batch_template:
1049 # the format dict used with batch_template:
1052 context = Dict()
1050 context = Dict()
1053
1051
1054 def _context_default(self):
1052 def _context_default(self):
1055 """load the default context with the default values for the basic keys
1053 """load the default context with the default values for the basic keys
1056
1054
1057 because the _trait_changed methods only load the context if they
1055 because the _trait_changed methods only load the context if they
1058 are set to something other than the default value.
1056 are set to something other than the default value.
1059 """
1057 """
1060 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1058 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
1061
1059
1062 # the Formatter instance for rendering the templates:
1060 # the Formatter instance for rendering the templates:
1063 formatter = Instance(EvalFormatter, (), {})
1061 formatter = Instance(EvalFormatter, (), {})
1064
1062
1065 def find_args(self):
1063 def find_args(self):
1066 return self.submit_command + [self.batch_file]
1064 return self.submit_command + [self.batch_file]
1067
1065
1068 def __init__(self, work_dir=u'.', config=None, **kwargs):
1066 def __init__(self, work_dir=u'.', config=None, **kwargs):
1069 super(BatchSystemLauncher, self).__init__(
1067 super(BatchSystemLauncher, self).__init__(
1070 work_dir=work_dir, config=config, **kwargs
1068 work_dir=work_dir, config=config, **kwargs
1071 )
1069 )
1072 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1070 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
1073
1071
1074 def parse_job_id(self, output):
1072 def parse_job_id(self, output):
1075 """Take the output of the submit command and return the job id."""
1073 """Take the output of the submit command and return the job id."""
1076 m = self.job_id_regexp.search(output)
1074 m = self.job_id_regexp.search(output)
1077 if m is not None:
1075 if m is not None:
1078 job_id = m.group(self.job_id_regexp_group)
1076 job_id = m.group(self.job_id_regexp_group)
1079 else:
1077 else:
1080 raise LauncherError("Job id couldn't be determined: %s" % output)
1078 raise LauncherError("Job id couldn't be determined: %s" % output)
1081 self.job_id = job_id
1079 self.job_id = job_id
1082 self.log.info('Job submitted with job id: %r', job_id)
1080 self.log.info('Job submitted with job id: %r', job_id)
1083 return job_id
1081 return job_id
1084
1082
1085 def write_batch_script(self, n):
1083 def write_batch_script(self, n):
1086 """Instantiate and write the batch script to the work_dir."""
1084 """Instantiate and write the batch script to the work_dir."""
1087 self.n = n
1085 self.n = n
1088 # first priority is batch_template if set
1086 # first priority is batch_template if set
1089 if self.batch_template_file and not self.batch_template:
1087 if self.batch_template_file and not self.batch_template:
1090 # second priority is batch_template_file
1088 # second priority is batch_template_file
1091 with open(self.batch_template_file) as f:
1089 with open(self.batch_template_file) as f:
1092 self.batch_template = f.read()
1090 self.batch_template = f.read()
1093 if not self.batch_template:
1091 if not self.batch_template:
1094 # third (last) priority is default_template
1092 # third (last) priority is default_template
1095 self.batch_template = self.default_template
1093 self.batch_template = self.default_template
1096 # add jobarray or queue lines to user-specified template
1094 # add jobarray or queue lines to user-specified template
1097 # note that this is *only* when user did not specify a template.
1095 # note that this is *only* when user did not specify a template.
1098 self._insert_queue_in_script()
1096 self._insert_queue_in_script()
1099 self._insert_job_array_in_script()
1097 self._insert_job_array_in_script()
1100 script_as_string = self.formatter.format(self.batch_template, **self.context)
1098 script_as_string = self.formatter.format(self.batch_template, **self.context)
1101 self.log.debug('Writing batch script: %s', self.batch_file)
1099 self.log.debug('Writing batch script: %s', self.batch_file)
1102 with open(self.batch_file, 'w') as f:
1100 with open(self.batch_file, 'w') as f:
1103 f.write(script_as_string)
1101 f.write(script_as_string)
1104 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1102 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
1105
1103
1106 def _insert_queue_in_script(self):
1104 def _insert_queue_in_script(self):
1107 """Inserts a queue if required into the batch script.
1105 """Inserts a queue if required into the batch script.
1108 """
1106 """
1109 if self.queue and not self.queue_regexp.search(self.batch_template):
1107 if self.queue and not self.queue_regexp.search(self.batch_template):
1110 self.log.debug("adding PBS queue settings to batch script")
1108 self.log.debug("adding PBS queue settings to batch script")
1111 firstline, rest = self.batch_template.split('\n',1)
1109 firstline, rest = self.batch_template.split('\n',1)
1112 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1110 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
1113
1111
1114 def _insert_job_array_in_script(self):
1112 def _insert_job_array_in_script(self):
1115 """Inserts a job array if required into the batch script.
1113 """Inserts a job array if required into the batch script.
1116 """
1114 """
1117 if not self.job_array_regexp.search(self.batch_template):
1115 if not self.job_array_regexp.search(self.batch_template):
1118 self.log.debug("adding job array settings to batch script")
1116 self.log.debug("adding job array settings to batch script")
1119 firstline, rest = self.batch_template.split('\n',1)
1117 firstline, rest = self.batch_template.split('\n',1)
1120 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1118 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
1121
1119
1122 def start(self, n):
1120 def start(self, n):
1123 """Start n copies of the process using a batch system."""
1121 """Start n copies of the process using a batch system."""
1124 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1122 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
1125 # Here we save profile_dir in the context so they
1123 # Here we save profile_dir in the context so they
1126 # can be used in the batch script template as {profile_dir}
1124 # can be used in the batch script template as {profile_dir}
1127 self.write_batch_script(n)
1125 self.write_batch_script(n)
1128 output = check_output(self.args, env=os.environ)
1126 output = check_output(self.args, env=os.environ)
1129 output = output.decode(DEFAULT_ENCODING, 'replace')
1127 output = output.decode(DEFAULT_ENCODING, 'replace')
1130
1128
1131 job_id = self.parse_job_id(output)
1129 job_id = self.parse_job_id(output)
1132 self.notify_start(job_id)
1130 self.notify_start(job_id)
1133 return job_id
1131 return job_id
1134
1132
1135 def stop(self):
1133 def stop(self):
1136 try:
1134 try:
1137 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1135 p = Popen(self.delete_command+[self.job_id], env=os.environ,
1138 stdout=PIPE, stderr=PIPE)
1136 stdout=PIPE, stderr=PIPE)
1139 out, err = p.communicate()
1137 out, err = p.communicate()
1140 output = out + err
1138 output = out + err
1141 except:
1139 except:
1142 self.log.exception("Problem stopping cluster with command: %s" %
1140 self.log.exception("Problem stopping cluster with command: %s" %
1143 (self.delete_command + [self.job_id]))
1141 (self.delete_command + [self.job_id]))
1144 output = ""
1142 output = ""
1145 output = output.decode(DEFAULT_ENCODING, 'replace')
1143 output = output.decode(DEFAULT_ENCODING, 'replace')
1146 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1144 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
1147 return output
1145 return output
1148
1146
1149
1147
1150 class PBSLauncher(BatchSystemLauncher):
1148 class PBSLauncher(BatchSystemLauncher):
1151 """A BatchSystemLauncher subclass for PBS."""
1149 """A BatchSystemLauncher subclass for PBS."""
1152
1150
1153 submit_command = List(['qsub'], config=True,
1151 submit_command = List(['qsub'], config=True,
1154 help="The PBS submit command ['qsub']")
1152 help="The PBS submit command ['qsub']")
1155 delete_command = List(['qdel'], config=True,
1153 delete_command = List(['qdel'], config=True,
1156 help="The PBS delete command ['qsub']")
1154 help="The PBS delete command ['qsub']")
1157 job_id_regexp = CRegExp(r'\d+', config=True,
1155 job_id_regexp = CRegExp(r'\d+', config=True,
1158 help="Regular expresion for identifying the job ID [r'\d+']")
1156 help="Regular expresion for identifying the job ID [r'\d+']")
1159
1157
1160 batch_file = Unicode(u'')
1158 batch_file = Unicode(u'')
1161 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1159 job_array_regexp = CRegExp('#PBS\W+-t\W+[\w\d\-\$]+')
1162 job_array_template = Unicode('#PBS -t 1-{n}')
1160 job_array_template = Unicode('#PBS -t 1-{n}')
1163 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1161 queue_regexp = CRegExp('#PBS\W+-q\W+\$?\w+')
1164 queue_template = Unicode('#PBS -q {queue}')
1162 queue_template = Unicode('#PBS -q {queue}')
1165
1163
1166
1164
1167 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1165 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1168 """Launch a controller using PBS."""
1166 """Launch a controller using PBS."""
1169
1167
1170 batch_file_name = Unicode(u'pbs_controller', config=True,
1168 batch_file_name = Unicode(u'pbs_controller', config=True,
1171 help="batch file name for the controller job.")
1169 help="batch file name for the controller job.")
1172 default_template= Unicode("""#!/bin/sh
1170 default_template= Unicode("""#!/bin/sh
1173 #PBS -V
1171 #PBS -V
1174 #PBS -N ipcontroller
1172 #PBS -N ipcontroller
1175 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1173 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1176 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1174 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1177
1175
1178 def start(self):
1176 def start(self):
1179 """Start the controller by profile or profile_dir."""
1177 """Start the controller by profile or profile_dir."""
1180 return super(PBSControllerLauncher, self).start(1)
1178 return super(PBSControllerLauncher, self).start(1)
1181
1179
1182
1180
1183 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1181 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1184 """Launch Engines using PBS"""
1182 """Launch Engines using PBS"""
1185 batch_file_name = Unicode(u'pbs_engines', config=True,
1183 batch_file_name = Unicode(u'pbs_engines', config=True,
1186 help="batch file name for the engine(s) job.")
1184 help="batch file name for the engine(s) job.")
1187 default_template= Unicode(u"""#!/bin/sh
1185 default_template= Unicode(u"""#!/bin/sh
1188 #PBS -V
1186 #PBS -V
1189 #PBS -N ipengine
1187 #PBS -N ipengine
1190 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1188 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1191 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1189 """%(' '.join(map(pipes.quote,ipengine_cmd_argv))))
1192
1190
1193
1191
1194 #SGE is very similar to PBS
1192 #SGE is very similar to PBS
1195
1193
1196 class SGELauncher(PBSLauncher):
1194 class SGELauncher(PBSLauncher):
1197 """Sun GridEngine is a PBS clone with slightly different syntax"""
1195 """Sun GridEngine is a PBS clone with slightly different syntax"""
1198 job_array_regexp = CRegExp('#\$\W+\-t')
1196 job_array_regexp = CRegExp('#\$\W+\-t')
1199 job_array_template = Unicode('#$ -t 1-{n}')
1197 job_array_template = Unicode('#$ -t 1-{n}')
1200 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1198 queue_regexp = CRegExp('#\$\W+-q\W+\$?\w+')
1201 queue_template = Unicode('#$ -q {queue}')
1199 queue_template = Unicode('#$ -q {queue}')
1202
1200
1203
1201
1204 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1202 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1205 """Launch a controller using SGE."""
1203 """Launch a controller using SGE."""
1206
1204
1207 batch_file_name = Unicode(u'sge_controller', config=True,
1205 batch_file_name = Unicode(u'sge_controller', config=True,
1208 help="batch file name for the ipontroller job.")
1206 help="batch file name for the ipontroller job.")
1209 default_template= Unicode(u"""#$ -V
1207 default_template= Unicode(u"""#$ -V
1210 #$ -S /bin/sh
1208 #$ -S /bin/sh
1211 #$ -N ipcontroller
1209 #$ -N ipcontroller
1212 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1210 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1213 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1211 """%(' '.join(map(pipes.quote, ipcontroller_cmd_argv))))
1214
1212
1215 def start(self):
1213 def start(self):
1216 """Start the controller by profile or profile_dir."""
1214 """Start the controller by profile or profile_dir."""
1217 return super(SGEControllerLauncher, self).start(1)
1215 return super(SGEControllerLauncher, self).start(1)
1218
1216
1219
1217
1220 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1218 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1221 """Launch Engines with SGE"""
1219 """Launch Engines with SGE"""
1222 batch_file_name = Unicode(u'sge_engines', config=True,
1220 batch_file_name = Unicode(u'sge_engines', config=True,
1223 help="batch file name for the engine(s) job.")
1221 help="batch file name for the engine(s) job.")
1224 default_template = Unicode("""#$ -V
1222 default_template = Unicode("""#$ -V
1225 #$ -S /bin/sh
1223 #$ -S /bin/sh
1226 #$ -N ipengine
1224 #$ -N ipengine
1227 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1225 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1228 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1226 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1229
1227
1230
1228
1231 # LSF launchers
1229 # LSF launchers
1232
1230
1233 class LSFLauncher(BatchSystemLauncher):
1231 class LSFLauncher(BatchSystemLauncher):
1234 """A BatchSystemLauncher subclass for LSF."""
1232 """A BatchSystemLauncher subclass for LSF."""
1235
1233
1236 submit_command = List(['bsub'], config=True,
1234 submit_command = List(['bsub'], config=True,
1237 help="The PBS submit command ['bsub']")
1235 help="The PBS submit command ['bsub']")
1238 delete_command = List(['bkill'], config=True,
1236 delete_command = List(['bkill'], config=True,
1239 help="The PBS delete command ['bkill']")
1237 help="The PBS delete command ['bkill']")
1240 job_id_regexp = CRegExp(r'\d+', config=True,
1238 job_id_regexp = CRegExp(r'\d+', config=True,
1241 help="Regular expresion for identifying the job ID [r'\d+']")
1239 help="Regular expresion for identifying the job ID [r'\d+']")
1242
1240
1243 batch_file = Unicode(u'')
1241 batch_file = Unicode(u'')
1244 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1242 job_array_regexp = CRegExp('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1245 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1243 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1246 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1244 queue_regexp = CRegExp('#BSUB[ \t]+-q[ \t]+\w+')
1247 queue_template = Unicode('#BSUB -q {queue}')
1245 queue_template = Unicode('#BSUB -q {queue}')
1248
1246
1249 def start(self, n):
1247 def start(self, n):
1250 """Start n copies of the process using LSF batch system.
1248 """Start n copies of the process using LSF batch system.
1251 This cant inherit from the base class because bsub expects
1249 This cant inherit from the base class because bsub expects
1252 to be piped a shell script in order to honor the #BSUB directives :
1250 to be piped a shell script in order to honor the #BSUB directives :
1253 bsub < script
1251 bsub < script
1254 """
1252 """
1255 # Here we save profile_dir in the context so they
1253 # Here we save profile_dir in the context so they
1256 # can be used in the batch script template as {profile_dir}
1254 # can be used in the batch script template as {profile_dir}
1257 self.write_batch_script(n)
1255 self.write_batch_script(n)
1258 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1256 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1259 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1257 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1260 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1258 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1261 output,err = p.communicate()
1259 output,err = p.communicate()
1262 output = output.decode(DEFAULT_ENCODING, 'replace')
1260 output = output.decode(DEFAULT_ENCODING, 'replace')
1263 job_id = self.parse_job_id(output)
1261 job_id = self.parse_job_id(output)
1264 self.notify_start(job_id)
1262 self.notify_start(job_id)
1265 return job_id
1263 return job_id
1266
1264
1267
1265
1268 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1266 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1269 """Launch a controller using LSF."""
1267 """Launch a controller using LSF."""
1270
1268
1271 batch_file_name = Unicode(u'lsf_controller', config=True,
1269 batch_file_name = Unicode(u'lsf_controller', config=True,
1272 help="batch file name for the controller job.")
1270 help="batch file name for the controller job.")
1273 default_template= Unicode("""#!/bin/sh
1271 default_template= Unicode("""#!/bin/sh
1274 #BSUB -J ipcontroller
1272 #BSUB -J ipcontroller
1275 #BSUB -oo ipcontroller.o.%%J
1273 #BSUB -oo ipcontroller.o.%%J
1276 #BSUB -eo ipcontroller.e.%%J
1274 #BSUB -eo ipcontroller.e.%%J
1277 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1275 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1278 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1276 """%(' '.join(map(pipes.quote,ipcontroller_cmd_argv))))
1279
1277
1280 def start(self):
1278 def start(self):
1281 """Start the controller by profile or profile_dir."""
1279 """Start the controller by profile or profile_dir."""
1282 return super(LSFControllerLauncher, self).start(1)
1280 return super(LSFControllerLauncher, self).start(1)
1283
1281
1284
1282
1285 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1283 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1286 """Launch Engines using LSF"""
1284 """Launch Engines using LSF"""
1287 batch_file_name = Unicode(u'lsf_engines', config=True,
1285 batch_file_name = Unicode(u'lsf_engines', config=True,
1288 help="batch file name for the engine(s) job.")
1286 help="batch file name for the engine(s) job.")
1289 default_template= Unicode(u"""#!/bin/sh
1287 default_template= Unicode(u"""#!/bin/sh
1290 #BSUB -oo ipengine.o.%%J
1288 #BSUB -oo ipengine.o.%%J
1291 #BSUB -eo ipengine.e.%%J
1289 #BSUB -eo ipengine.e.%%J
1292 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1290 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1293 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1291 """%(' '.join(map(pipes.quote, ipengine_cmd_argv))))
1294
1292
1295
1293
1296
1294
1297 class HTCondorLauncher(BatchSystemLauncher):
1295 class HTCondorLauncher(BatchSystemLauncher):
1298 """A BatchSystemLauncher subclass for HTCondor.
1296 """A BatchSystemLauncher subclass for HTCondor.
1299
1297
1300 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1298 HTCondor requires that we launch the ipengine/ipcontroller scripts rather
1301 that the python instance but otherwise is very similar to PBS. This is because
1299 that the python instance but otherwise is very similar to PBS. This is because
1302 HTCondor destroys sys.executable when launching remote processes - a launched
1300 HTCondor destroys sys.executable when launching remote processes - a launched
1303 python process depends on sys.executable to effectively evaluate its
1301 python process depends on sys.executable to effectively evaluate its
1304 module search paths. Without it, regardless of which python interpreter you launch
1302 module search paths. Without it, regardless of which python interpreter you launch
1305 you will get the to built in module search paths.
1303 you will get the to built in module search paths.
1306
1304
1307 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1305 We use the ip{cluster, engine, controller} scripts as our executable to circumvent
1308 this - the mechanism of shebanged scripts means that the python binary will be
1306 this - the mechanism of shebanged scripts means that the python binary will be
1309 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1307 launched with argv[0] set to the *location of the ip{cluster, engine, controller}
1310 scripts on the remote node*. This means you need to take care that:
1308 scripts on the remote node*. This means you need to take care that:
1311
1309
1312 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1310 a. Your remote nodes have their paths configured correctly, with the ipengine and ipcontroller
1313 of the python environment you wish to execute code in having top precedence.
1311 of the python environment you wish to execute code in having top precedence.
1314 b. This functionality is untested on Windows.
1312 b. This functionality is untested on Windows.
1315
1313
1316 If you need different behavior, consider making you own template.
1314 If you need different behavior, consider making you own template.
1317 """
1315 """
1318
1316
1319 submit_command = List(['condor_submit'], config=True,
1317 submit_command = List(['condor_submit'], config=True,
1320 help="The HTCondor submit command ['condor_submit']")
1318 help="The HTCondor submit command ['condor_submit']")
1321 delete_command = List(['condor_rm'], config=True,
1319 delete_command = List(['condor_rm'], config=True,
1322 help="The HTCondor delete command ['condor_rm']")
1320 help="The HTCondor delete command ['condor_rm']")
1323 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1321 job_id_regexp = CRegExp(r'(\d+)\.$', config=True,
1324 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1322 help="Regular expression for identifying the job ID [r'(\d+)\.$']")
1325 job_id_regexp_group = Integer(1, config=True,
1323 job_id_regexp_group = Integer(1, config=True,
1326 help="""The group we wish to match in job_id_regexp [1]""")
1324 help="""The group we wish to match in job_id_regexp [1]""")
1327
1325
1328 job_array_regexp = CRegExp('queue\W+\$')
1326 job_array_regexp = CRegExp('queue\W+\$')
1329 job_array_template = Unicode('queue {n}')
1327 job_array_template = Unicode('queue {n}')
1330
1328
1331
1329
1332 def _insert_job_array_in_script(self):
1330 def _insert_job_array_in_script(self):
1333 """Inserts a job array if required into the batch script.
1331 """Inserts a job array if required into the batch script.
1334 """
1332 """
1335 if not self.job_array_regexp.search(self.batch_template):
1333 if not self.job_array_regexp.search(self.batch_template):
1336 self.log.debug("adding job array settings to batch script")
1334 self.log.debug("adding job array settings to batch script")
1337 #HTCondor requires that the job array goes at the bottom of the script
1335 #HTCondor requires that the job array goes at the bottom of the script
1338 self.batch_template = '\n'.join([self.batch_template,
1336 self.batch_template = '\n'.join([self.batch_template,
1339 self.job_array_template])
1337 self.job_array_template])
1340
1338
1341 def _insert_queue_in_script(self):
1339 def _insert_queue_in_script(self):
1342 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1340 """AFAIK, HTCondor doesn't have a concept of multiple queues that can be
1343 specified in the script.
1341 specified in the script.
1344 """
1342 """
1345 pass
1343 pass
1346
1344
1347
1345
1348 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1346 class HTCondorControllerLauncher(HTCondorLauncher, BatchClusterAppMixin):
1349 """Launch a controller using HTCondor."""
1347 """Launch a controller using HTCondor."""
1350
1348
1351 batch_file_name = Unicode(u'htcondor_controller', config=True,
1349 batch_file_name = Unicode(u'htcondor_controller', config=True,
1352 help="batch file name for the controller job.")
1350 help="batch file name for the controller job.")
1353 default_template = Unicode(r"""
1351 default_template = Unicode(r"""
1354 universe = vanilla
1352 universe = vanilla
1355 executable = ipcontroller
1353 executable = ipcontroller
1356 # by default we expect a shared file system
1354 # by default we expect a shared file system
1357 transfer_executable = False
1355 transfer_executable = False
1358 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1356 arguments = --log-to-file '--profile-dir={profile_dir}' --cluster-id='{cluster_id}'
1359 """)
1357 """)
1360
1358
1361 def start(self):
1359 def start(self):
1362 """Start the controller by profile or profile_dir."""
1360 """Start the controller by profile or profile_dir."""
1363 return super(HTCondorControllerLauncher, self).start(1)
1361 return super(HTCondorControllerLauncher, self).start(1)
1364
1362
1365
1363
1366 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1364 class HTCondorEngineSetLauncher(HTCondorLauncher, BatchClusterAppMixin):
1367 """Launch Engines using HTCondor"""
1365 """Launch Engines using HTCondor"""
1368 batch_file_name = Unicode(u'htcondor_engines', config=True,
1366 batch_file_name = Unicode(u'htcondor_engines', config=True,
1369 help="batch file name for the engine(s) job.")
1367 help="batch file name for the engine(s) job.")
1370 default_template = Unicode("""
1368 default_template = Unicode("""
1371 universe = vanilla
1369 universe = vanilla
1372 executable = ipengine
1370 executable = ipengine
1373 # by default we expect a shared file system
1371 # by default we expect a shared file system
1374 transfer_executable = False
1372 transfer_executable = False
1375 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1373 arguments = "--log-to-file '--profile-dir={profile_dir}' '--cluster-id={cluster_id}'"
1376 """)
1374 """)
1377
1375
1378
1376
1379 #-----------------------------------------------------------------------------
1377 #-----------------------------------------------------------------------------
1380 # A launcher for ipcluster itself!
1378 # A launcher for ipcluster itself!
1381 #-----------------------------------------------------------------------------
1379 #-----------------------------------------------------------------------------
1382
1380
1383
1381
1384 class IPClusterLauncher(LocalProcessLauncher):
1382 class IPClusterLauncher(LocalProcessLauncher):
1385 """Launch the ipcluster program in an external process."""
1383 """Launch the ipcluster program in an external process."""
1386
1384
1387 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1385 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1388 help="Popen command for ipcluster")
1386 help="Popen command for ipcluster")
1389 ipcluster_args = List(
1387 ipcluster_args = List(
1390 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1388 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1391 help="Command line arguments to pass to ipcluster.")
1389 help="Command line arguments to pass to ipcluster.")
1392 ipcluster_subcommand = Unicode('start')
1390 ipcluster_subcommand = Unicode('start')
1393 profile = Unicode('default')
1391 profile = Unicode('default')
1394 n = Integer(2)
1392 n = Integer(2)
1395
1393
1396 def find_args(self):
1394 def find_args(self):
1397 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1395 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1398 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1396 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1399 self.ipcluster_args
1397 self.ipcluster_args
1400
1398
1401 def start(self):
1399 def start(self):
1402 return super(IPClusterLauncher, self).start()
1400 return super(IPClusterLauncher, self).start()
1403
1401
1404 #-----------------------------------------------------------------------------
1402 #-----------------------------------------------------------------------------
1405 # Collections of launchers
1403 # Collections of launchers
1406 #-----------------------------------------------------------------------------
1404 #-----------------------------------------------------------------------------
1407
1405
1408 local_launchers = [
1406 local_launchers = [
1409 LocalControllerLauncher,
1407 LocalControllerLauncher,
1410 LocalEngineLauncher,
1408 LocalEngineLauncher,
1411 LocalEngineSetLauncher,
1409 LocalEngineSetLauncher,
1412 ]
1410 ]
1413 mpi_launchers = [
1411 mpi_launchers = [
1414 MPILauncher,
1412 MPILauncher,
1415 MPIControllerLauncher,
1413 MPIControllerLauncher,
1416 MPIEngineSetLauncher,
1414 MPIEngineSetLauncher,
1417 ]
1415 ]
1418 ssh_launchers = [
1416 ssh_launchers = [
1419 SSHLauncher,
1417 SSHLauncher,
1420 SSHControllerLauncher,
1418 SSHControllerLauncher,
1421 SSHEngineLauncher,
1419 SSHEngineLauncher,
1422 SSHEngineSetLauncher,
1420 SSHEngineSetLauncher,
1423 SSHProxyEngineSetLauncher,
1421 SSHProxyEngineSetLauncher,
1424 ]
1422 ]
1425 winhpc_launchers = [
1423 winhpc_launchers = [
1426 WindowsHPCLauncher,
1424 WindowsHPCLauncher,
1427 WindowsHPCControllerLauncher,
1425 WindowsHPCControllerLauncher,
1428 WindowsHPCEngineSetLauncher,
1426 WindowsHPCEngineSetLauncher,
1429 ]
1427 ]
1430 pbs_launchers = [
1428 pbs_launchers = [
1431 PBSLauncher,
1429 PBSLauncher,
1432 PBSControllerLauncher,
1430 PBSControllerLauncher,
1433 PBSEngineSetLauncher,
1431 PBSEngineSetLauncher,
1434 ]
1432 ]
1435 sge_launchers = [
1433 sge_launchers = [
1436 SGELauncher,
1434 SGELauncher,
1437 SGEControllerLauncher,
1435 SGEControllerLauncher,
1438 SGEEngineSetLauncher,
1436 SGEEngineSetLauncher,
1439 ]
1437 ]
1440 lsf_launchers = [
1438 lsf_launchers = [
1441 LSFLauncher,
1439 LSFLauncher,
1442 LSFControllerLauncher,
1440 LSFControllerLauncher,
1443 LSFEngineSetLauncher,
1441 LSFEngineSetLauncher,
1444 ]
1442 ]
1445 htcondor_launchers = [
1443 htcondor_launchers = [
1446 HTCondorLauncher,
1444 HTCondorLauncher,
1447 HTCondorControllerLauncher,
1445 HTCondorControllerLauncher,
1448 HTCondorEngineSetLauncher,
1446 HTCondorEngineSetLauncher,
1449 ]
1447 ]
1450 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1448 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1451 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
1449 + pbs_launchers + sge_launchers + lsf_launchers + htcondor_launchers
General Comments 0
You need to be logged in to leave comments. Login now