##// END OF EJS Templates
add EvalFormatter for batch system (PBS) launcher templates...
MinRK -
Show More
@@ -1,1067 +1,1070 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23
23
24 # signal imports, handling various platforms, versions
24 # signal imports, handling various platforms, versions
25
25
26 from signal import SIGINT, SIGTERM
26 from signal import SIGINT, SIGTERM
27 try:
27 try:
28 from signal import SIGKILL
28 from signal import SIGKILL
29 except ImportError:
29 except ImportError:
30 # Windows
30 # Windows
31 SIGKILL=SIGTERM
31 SIGKILL=SIGTERM
32
32
33 try:
33 try:
34 # Windows >= 2.7, 3.2
34 # Windows >= 2.7, 3.2
35 from signal import CTRL_C_EVENT as SIGINT
35 from signal import CTRL_C_EVENT as SIGINT
36 except ImportError:
36 except ImportError:
37 pass
37 pass
38
38
39 from subprocess import Popen, PIPE, STDOUT
39 from subprocess import Popen, PIPE, STDOUT
40 try:
40 try:
41 from subprocess import check_output
41 from subprocess import check_output
42 except ImportError:
42 except ImportError:
43 # pre-2.7, define check_output with Popen
43 # pre-2.7, define check_output with Popen
44 def check_output(*args, **kwargs):
44 def check_output(*args, **kwargs):
45 kwargs.update(dict(stdout=PIPE))
45 kwargs.update(dict(stdout=PIPE))
46 p = Popen(*args, **kwargs)
46 p = Popen(*args, **kwargs)
47 out,err = p.communicate()
47 out,err = p.communicate()
48 return out
48 return out
49
49
50 from zmq.eventloop import ioloop
50 from zmq.eventloop import ioloop
51
51
52 # from IPython.config.configurable import Configurable
52 # from IPython.config.configurable import Configurable
53 from IPython.utils.text import EvalFormatter
53 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
54 from IPython.utils.path import get_ipython_module_path
55 from IPython.utils.path import get_ipython_module_path
55 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
56
57
57 from IPython.parallel.factory import LoggingFactory
58 from IPython.parallel.factory import LoggingFactory
58
59
59 from .win32support import forward_read_events
60 from .win32support import forward_read_events
60
61
61 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
62
63
63 WINDOWS = os.name == 'nt'
64 WINDOWS = os.name == 'nt'
64
65
65 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
66 # Paths to the kernel apps
67 # Paths to the kernel apps
67 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
68
69
69
70
70 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
71 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
71 'IPython.parallel.apps.ipclusterapp'
72 'IPython.parallel.apps.ipclusterapp'
72 ))
73 ))
73
74
74 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
75 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
75 'IPython.parallel.apps.ipengineapp'
76 'IPython.parallel.apps.ipengineapp'
76 ))
77 ))
77
78
78 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
79 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
79 'IPython.parallel.apps.ipcontrollerapp'
80 'IPython.parallel.apps.ipcontrollerapp'
80 ))
81 ))
81
82
82 #-----------------------------------------------------------------------------
83 #-----------------------------------------------------------------------------
83 # Base launchers and errors
84 # Base launchers and errors
84 #-----------------------------------------------------------------------------
85 #-----------------------------------------------------------------------------
85
86
86
87
87 class LauncherError(Exception):
88 class LauncherError(Exception):
88 pass
89 pass
89
90
90
91
91 class ProcessStateError(LauncherError):
92 class ProcessStateError(LauncherError):
92 pass
93 pass
93
94
94
95
95 class UnknownStatus(LauncherError):
96 class UnknownStatus(LauncherError):
96 pass
97 pass
97
98
98
99
99 class BaseLauncher(LoggingFactory):
100 class BaseLauncher(LoggingFactory):
100 """An asbtraction for starting, stopping and signaling a process."""
101 """An asbtraction for starting, stopping and signaling a process."""
101
102
102 # In all of the launchers, the work_dir is where child processes will be
103 # In all of the launchers, the work_dir is where child processes will be
103 # run. This will usually be the profile_dir, but may not be. any work_dir
104 # run. This will usually be the profile_dir, but may not be. any work_dir
104 # passed into the __init__ method will override the config value.
105 # passed into the __init__ method will override the config value.
105 # This should not be used to set the work_dir for the actual engine
106 # This should not be used to set the work_dir for the actual engine
106 # and controller. Instead, use their own config files or the
107 # and controller. Instead, use their own config files or the
107 # controller_args, engine_args attributes of the launchers to add
108 # controller_args, engine_args attributes of the launchers to add
108 # the work_dir option.
109 # the work_dir option.
109 work_dir = Unicode(u'.')
110 work_dir = Unicode(u'.')
110 loop = Instance('zmq.eventloop.ioloop.IOLoop')
111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
111
112
112 start_data = Any()
113 start_data = Any()
113 stop_data = Any()
114 stop_data = Any()
114
115
115 def _loop_default(self):
116 def _loop_default(self):
116 return ioloop.IOLoop.instance()
117 return ioloop.IOLoop.instance()
117
118
118 def __init__(self, work_dir=u'.', config=None, **kwargs):
119 def __init__(self, work_dir=u'.', config=None, **kwargs):
119 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
120 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
120 self.state = 'before' # can be before, running, after
121 self.state = 'before' # can be before, running, after
121 self.stop_callbacks = []
122 self.stop_callbacks = []
122 self.start_data = None
123 self.start_data = None
123 self.stop_data = None
124 self.stop_data = None
124
125
125 @property
126 @property
126 def args(self):
127 def args(self):
127 """A list of cmd and args that will be used to start the process.
128 """A list of cmd and args that will be used to start the process.
128
129
129 This is what is passed to :func:`spawnProcess` and the first element
130 This is what is passed to :func:`spawnProcess` and the first element
130 will be the process name.
131 will be the process name.
131 """
132 """
132 return self.find_args()
133 return self.find_args()
133
134
134 def find_args(self):
135 def find_args(self):
135 """The ``.args`` property calls this to find the args list.
136 """The ``.args`` property calls this to find the args list.
136
137
137 Subcommand should implement this to construct the cmd and args.
138 Subcommand should implement this to construct the cmd and args.
138 """
139 """
139 raise NotImplementedError('find_args must be implemented in a subclass')
140 raise NotImplementedError('find_args must be implemented in a subclass')
140
141
141 @property
142 @property
142 def arg_str(self):
143 def arg_str(self):
143 """The string form of the program arguments."""
144 """The string form of the program arguments."""
144 return ' '.join(self.args)
145 return ' '.join(self.args)
145
146
146 @property
147 @property
147 def running(self):
148 def running(self):
148 """Am I running."""
149 """Am I running."""
149 if self.state == 'running':
150 if self.state == 'running':
150 return True
151 return True
151 else:
152 else:
152 return False
153 return False
153
154
154 def start(self):
155 def start(self):
155 """Start the process.
156 """Start the process.
156
157
157 This must return a deferred that fires with information about the
158 This must return a deferred that fires with information about the
158 process starting (like a pid, job id, etc.).
159 process starting (like a pid, job id, etc.).
159 """
160 """
160 raise NotImplementedError('start must be implemented in a subclass')
161 raise NotImplementedError('start must be implemented in a subclass')
161
162
162 def stop(self):
163 def stop(self):
163 """Stop the process and notify observers of stopping.
164 """Stop the process and notify observers of stopping.
164
165
165 This must return a deferred that fires with information about the
166 This must return a deferred that fires with information about the
166 processing stopping, like errors that occur while the process is
167 processing stopping, like errors that occur while the process is
167 attempting to be shut down. This deferred won't fire when the process
168 attempting to be shut down. This deferred won't fire when the process
168 actually stops. To observe the actual process stopping, see
169 actually stops. To observe the actual process stopping, see
169 :func:`observe_stop`.
170 :func:`observe_stop`.
170 """
171 """
171 raise NotImplementedError('stop must be implemented in a subclass')
172 raise NotImplementedError('stop must be implemented in a subclass')
172
173
173 def on_stop(self, f):
174 def on_stop(self, f):
174 """Get a deferred that will fire when the process stops.
175 """Get a deferred that will fire when the process stops.
175
176
176 The deferred will fire with data that contains information about
177 The deferred will fire with data that contains information about
177 the exit status of the process.
178 the exit status of the process.
178 """
179 """
179 if self.state=='after':
180 if self.state=='after':
180 return f(self.stop_data)
181 return f(self.stop_data)
181 else:
182 else:
182 self.stop_callbacks.append(f)
183 self.stop_callbacks.append(f)
183
184
184 def notify_start(self, data):
185 def notify_start(self, data):
185 """Call this to trigger startup actions.
186 """Call this to trigger startup actions.
186
187
187 This logs the process startup and sets the state to 'running'. It is
188 This logs the process startup and sets the state to 'running'. It is
188 a pass-through so it can be used as a callback.
189 a pass-through so it can be used as a callback.
189 """
190 """
190
191
191 self.log.info('Process %r started: %r' % (self.args[0], data))
192 self.log.info('Process %r started: %r' % (self.args[0], data))
192 self.start_data = data
193 self.start_data = data
193 self.state = 'running'
194 self.state = 'running'
194 return data
195 return data
195
196
196 def notify_stop(self, data):
197 def notify_stop(self, data):
197 """Call this to trigger process stop actions.
198 """Call this to trigger process stop actions.
198
199
199 This logs the process stopping and sets the state to 'after'. Call
200 This logs the process stopping and sets the state to 'after'. Call
200 this to trigger all the deferreds from :func:`observe_stop`."""
201 this to trigger all the deferreds from :func:`observe_stop`."""
201
202
202 self.log.info('Process %r stopped: %r' % (self.args[0], data))
203 self.log.info('Process %r stopped: %r' % (self.args[0], data))
203 self.stop_data = data
204 self.stop_data = data
204 self.state = 'after'
205 self.state = 'after'
205 for i in range(len(self.stop_callbacks)):
206 for i in range(len(self.stop_callbacks)):
206 d = self.stop_callbacks.pop()
207 d = self.stop_callbacks.pop()
207 d(data)
208 d(data)
208 return data
209 return data
209
210
210 def signal(self, sig):
211 def signal(self, sig):
211 """Signal the process.
212 """Signal the process.
212
213
213 Return a semi-meaningless deferred after signaling the process.
214 Return a semi-meaningless deferred after signaling the process.
214
215
215 Parameters
216 Parameters
216 ----------
217 ----------
217 sig : str or int
218 sig : str or int
218 'KILL', 'INT', etc., or any signal number
219 'KILL', 'INT', etc., or any signal number
219 """
220 """
220 raise NotImplementedError('signal must be implemented in a subclass')
221 raise NotImplementedError('signal must be implemented in a subclass')
221
222
222
223
223 #-----------------------------------------------------------------------------
224 #-----------------------------------------------------------------------------
224 # Local process launchers
225 # Local process launchers
225 #-----------------------------------------------------------------------------
226 #-----------------------------------------------------------------------------
226
227
227
228
228 class LocalProcessLauncher(BaseLauncher):
229 class LocalProcessLauncher(BaseLauncher):
229 """Start and stop an external process in an asynchronous manner.
230 """Start and stop an external process in an asynchronous manner.
230
231
231 This will launch the external process with a working directory of
232 This will launch the external process with a working directory of
232 ``self.work_dir``.
233 ``self.work_dir``.
233 """
234 """
234
235
235 # This is used to to construct self.args, which is passed to
236 # This is used to to construct self.args, which is passed to
236 # spawnProcess.
237 # spawnProcess.
237 cmd_and_args = List([])
238 cmd_and_args = List([])
238 poll_frequency = Int(100) # in ms
239 poll_frequency = Int(100) # in ms
239
240
240 def __init__(self, work_dir=u'.', config=None, **kwargs):
241 def __init__(self, work_dir=u'.', config=None, **kwargs):
241 super(LocalProcessLauncher, self).__init__(
242 super(LocalProcessLauncher, self).__init__(
242 work_dir=work_dir, config=config, **kwargs
243 work_dir=work_dir, config=config, **kwargs
243 )
244 )
244 self.process = None
245 self.process = None
245 self.start_deferred = None
246 self.start_deferred = None
246 self.poller = None
247 self.poller = None
247
248
248 def find_args(self):
249 def find_args(self):
249 return self.cmd_and_args
250 return self.cmd_and_args
250
251
251 def start(self):
252 def start(self):
252 if self.state == 'before':
253 if self.state == 'before':
253 self.process = Popen(self.args,
254 self.process = Popen(self.args,
254 stdout=PIPE,stderr=PIPE,stdin=PIPE,
255 stdout=PIPE,stderr=PIPE,stdin=PIPE,
255 env=os.environ,
256 env=os.environ,
256 cwd=self.work_dir
257 cwd=self.work_dir
257 )
258 )
258 if WINDOWS:
259 if WINDOWS:
259 self.stdout = forward_read_events(self.process.stdout)
260 self.stdout = forward_read_events(self.process.stdout)
260 self.stderr = forward_read_events(self.process.stderr)
261 self.stderr = forward_read_events(self.process.stderr)
261 else:
262 else:
262 self.stdout = self.process.stdout.fileno()
263 self.stdout = self.process.stdout.fileno()
263 self.stderr = self.process.stderr.fileno()
264 self.stderr = self.process.stderr.fileno()
264 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
265 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
265 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
266 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
266 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
267 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
267 self.poller.start()
268 self.poller.start()
268 self.notify_start(self.process.pid)
269 self.notify_start(self.process.pid)
269 else:
270 else:
270 s = 'The process was already started and has state: %r' % self.state
271 s = 'The process was already started and has state: %r' % self.state
271 raise ProcessStateError(s)
272 raise ProcessStateError(s)
272
273
273 def stop(self):
274 def stop(self):
274 return self.interrupt_then_kill()
275 return self.interrupt_then_kill()
275
276
276 def signal(self, sig):
277 def signal(self, sig):
277 if self.state == 'running':
278 if self.state == 'running':
278 if WINDOWS and sig != SIGINT:
279 if WINDOWS and sig != SIGINT:
279 # use Windows tree-kill for better child cleanup
280 # use Windows tree-kill for better child cleanup
280 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
281 else:
282 else:
282 self.process.send_signal(sig)
283 self.process.send_signal(sig)
283
284
284 def interrupt_then_kill(self, delay=2.0):
285 def interrupt_then_kill(self, delay=2.0):
285 """Send INT, wait a delay and then send KILL."""
286 """Send INT, wait a delay and then send KILL."""
286 try:
287 try:
287 self.signal(SIGINT)
288 self.signal(SIGINT)
288 except Exception:
289 except Exception:
289 self.log.debug("interrupt failed")
290 self.log.debug("interrupt failed")
290 pass
291 pass
291 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
292 self.killer.start()
293 self.killer.start()
293
294
294 # callbacks, etc:
295 # callbacks, etc:
295
296
296 def handle_stdout(self, fd, events):
297 def handle_stdout(self, fd, events):
297 if WINDOWS:
298 if WINDOWS:
298 line = self.stdout.recv()
299 line = self.stdout.recv()
299 else:
300 else:
300 line = self.process.stdout.readline()
301 line = self.process.stdout.readline()
301 # a stopped process will be readable but return empty strings
302 # a stopped process will be readable but return empty strings
302 if line:
303 if line:
303 self.log.info(line[:-1])
304 self.log.info(line[:-1])
304 else:
305 else:
305 self.poll()
306 self.poll()
306
307
307 def handle_stderr(self, fd, events):
308 def handle_stderr(self, fd, events):
308 if WINDOWS:
309 if WINDOWS:
309 line = self.stderr.recv()
310 line = self.stderr.recv()
310 else:
311 else:
311 line = self.process.stderr.readline()
312 line = self.process.stderr.readline()
312 # a stopped process will be readable but return empty strings
313 # a stopped process will be readable but return empty strings
313 if line:
314 if line:
314 self.log.error(line[:-1])
315 self.log.error(line[:-1])
315 else:
316 else:
316 self.poll()
317 self.poll()
317
318
318 def poll(self):
319 def poll(self):
319 status = self.process.poll()
320 status = self.process.poll()
320 if status is not None:
321 if status is not None:
321 self.poller.stop()
322 self.poller.stop()
322 self.loop.remove_handler(self.stdout)
323 self.loop.remove_handler(self.stdout)
323 self.loop.remove_handler(self.stderr)
324 self.loop.remove_handler(self.stderr)
324 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
325 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
325 return status
326 return status
326
327
327 class LocalControllerLauncher(LocalProcessLauncher):
328 class LocalControllerLauncher(LocalProcessLauncher):
328 """Launch a controller as a regular external process."""
329 """Launch a controller as a regular external process."""
329
330
330 controller_cmd = List(ipcontroller_cmd_argv, config=True,
331 controller_cmd = List(ipcontroller_cmd_argv, config=True,
331 help="""Popen command to launch ipcontroller.""")
332 help="""Popen command to launch ipcontroller.""")
332 # Command line arguments to ipcontroller.
333 # Command line arguments to ipcontroller.
333 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
334 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
334 help="""command-line args to pass to ipcontroller""")
335 help="""command-line args to pass to ipcontroller""")
335
336
336 def find_args(self):
337 def find_args(self):
337 return self.controller_cmd + self.controller_args
338 return self.controller_cmd + self.controller_args
338
339
339 def start(self, profile_dir):
340 def start(self, profile_dir):
340 """Start the controller by profile_dir."""
341 """Start the controller by profile_dir."""
341 self.controller_args.extend(['profile_dir=%s'%profile_dir])
342 self.controller_args.extend(['profile_dir=%s'%profile_dir])
342 self.profile_dir = unicode(profile_dir)
343 self.profile_dir = unicode(profile_dir)
343 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
344 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
344 return super(LocalControllerLauncher, self).start()
345 return super(LocalControllerLauncher, self).start()
345
346
346
347
347 class LocalEngineLauncher(LocalProcessLauncher):
348 class LocalEngineLauncher(LocalProcessLauncher):
348 """Launch a single engine as a regular externall process."""
349 """Launch a single engine as a regular externall process."""
349
350
350 engine_cmd = List(ipengine_cmd_argv, config=True,
351 engine_cmd = List(ipengine_cmd_argv, config=True,
351 help="""command to launch the Engine.""")
352 help="""command to launch the Engine.""")
352 # Command line arguments for ipengine.
353 # Command line arguments for ipengine.
353 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
354 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
354 help="command-line arguments to pass to ipengine"
355 help="command-line arguments to pass to ipengine"
355 )
356 )
356
357
357 def find_args(self):
358 def find_args(self):
358 return self.engine_cmd + self.engine_args
359 return self.engine_cmd + self.engine_args
359
360
360 def start(self, profile_dir):
361 def start(self, profile_dir):
361 """Start the engine by profile_dir."""
362 """Start the engine by profile_dir."""
362 self.engine_args.extend(['profile_dir=%s'%profile_dir])
363 self.engine_args.extend(['profile_dir=%s'%profile_dir])
363 self.profile_dir = unicode(profile_dir)
364 self.profile_dir = unicode(profile_dir)
364 return super(LocalEngineLauncher, self).start()
365 return super(LocalEngineLauncher, self).start()
365
366
366
367
367 class LocalEngineSetLauncher(BaseLauncher):
368 class LocalEngineSetLauncher(BaseLauncher):
368 """Launch a set of engines as regular external processes."""
369 """Launch a set of engines as regular external processes."""
369
370
370 # Command line arguments for ipengine.
371 # Command line arguments for ipengine.
371 engine_args = List(
372 engine_args = List(
372 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
373 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
373 help="command-line arguments to pass to ipengine"
374 help="command-line arguments to pass to ipengine"
374 )
375 )
375 # launcher class
376 # launcher class
376 launcher_class = LocalEngineLauncher
377 launcher_class = LocalEngineLauncher
377
378
378 launchers = Dict()
379 launchers = Dict()
379 stop_data = Dict()
380 stop_data = Dict()
380
381
381 def __init__(self, work_dir=u'.', config=None, **kwargs):
382 def __init__(self, work_dir=u'.', config=None, **kwargs):
382 super(LocalEngineSetLauncher, self).__init__(
383 super(LocalEngineSetLauncher, self).__init__(
383 work_dir=work_dir, config=config, **kwargs
384 work_dir=work_dir, config=config, **kwargs
384 )
385 )
385 self.stop_data = {}
386 self.stop_data = {}
386
387
387 def start(self, n, profile_dir):
388 def start(self, n, profile_dir):
388 """Start n engines by profile or profile_dir."""
389 """Start n engines by profile or profile_dir."""
389 self.profile_dir = unicode(profile_dir)
390 self.profile_dir = unicode(profile_dir)
390 dlist = []
391 dlist = []
391 for i in range(n):
392 for i in range(n):
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
393 # Copy the engine args over to each engine launcher.
394 # Copy the engine args over to each engine launcher.
394 el.engine_args = copy.deepcopy(self.engine_args)
395 el.engine_args = copy.deepcopy(self.engine_args)
395 el.on_stop(self._notice_engine_stopped)
396 el.on_stop(self._notice_engine_stopped)
396 d = el.start(profile_dir)
397 d = el.start(profile_dir)
397 if i==0:
398 if i==0:
398 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
399 self.launchers[i] = el
400 self.launchers[i] = el
400 dlist.append(d)
401 dlist.append(d)
401 self.notify_start(dlist)
402 self.notify_start(dlist)
402 # The consumeErrors here could be dangerous
403 # The consumeErrors here could be dangerous
403 # dfinal = gatherBoth(dlist, consumeErrors=True)
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
404 # dfinal.addCallback(self.notify_start)
405 # dfinal.addCallback(self.notify_start)
405 return dlist
406 return dlist
406
407
407 def find_args(self):
408 def find_args(self):
408 return ['engine set']
409 return ['engine set']
409
410
410 def signal(self, sig):
411 def signal(self, sig):
411 dlist = []
412 dlist = []
412 for el in self.launchers.itervalues():
413 for el in self.launchers.itervalues():
413 d = el.signal(sig)
414 d = el.signal(sig)
414 dlist.append(d)
415 dlist.append(d)
415 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 return dlist
417 return dlist
417
418
418 def interrupt_then_kill(self, delay=1.0):
419 def interrupt_then_kill(self, delay=1.0):
419 dlist = []
420 dlist = []
420 for el in self.launchers.itervalues():
421 for el in self.launchers.itervalues():
421 d = el.interrupt_then_kill(delay)
422 d = el.interrupt_then_kill(delay)
422 dlist.append(d)
423 dlist.append(d)
423 # dfinal = gatherBoth(dlist, consumeErrors=True)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
424 return dlist
425 return dlist
425
426
426 def stop(self):
427 def stop(self):
427 return self.interrupt_then_kill()
428 return self.interrupt_then_kill()
428
429
429 def _notice_engine_stopped(self, data):
430 def _notice_engine_stopped(self, data):
430 pid = data['pid']
431 pid = data['pid']
431 for idx,el in self.launchers.iteritems():
432 for idx,el in self.launchers.iteritems():
432 if el.process.pid == pid:
433 if el.process.pid == pid:
433 break
434 break
434 self.launchers.pop(idx)
435 self.launchers.pop(idx)
435 self.stop_data[idx] = data
436 self.stop_data[idx] = data
436 if not self.launchers:
437 if not self.launchers:
437 self.notify_stop(self.stop_data)
438 self.notify_stop(self.stop_data)
438
439
439
440
440 #-----------------------------------------------------------------------------
441 #-----------------------------------------------------------------------------
441 # MPIExec launchers
442 # MPIExec launchers
442 #-----------------------------------------------------------------------------
443 #-----------------------------------------------------------------------------
443
444
444
445
445 class MPIExecLauncher(LocalProcessLauncher):
446 class MPIExecLauncher(LocalProcessLauncher):
446 """Launch an external process using mpiexec."""
447 """Launch an external process using mpiexec."""
447
448
448 mpi_cmd = List(['mpiexec'], config=True,
449 mpi_cmd = List(['mpiexec'], config=True,
449 help="The mpiexec command to use in starting the process."
450 help="The mpiexec command to use in starting the process."
450 )
451 )
451 mpi_args = List([], config=True,
452 mpi_args = List([], config=True,
452 help="The command line arguments to pass to mpiexec."
453 help="The command line arguments to pass to mpiexec."
453 )
454 )
454 program = List(['date'], config=True,
455 program = List(['date'], config=True,
455 help="The program to start via mpiexec.")
456 help="The program to start via mpiexec.")
456 program_args = List([], config=True,
457 program_args = List([], config=True,
457 help="The command line argument to the program."
458 help="The command line argument to the program."
458 )
459 )
459 n = Int(1)
460 n = Int(1)
460
461
461 def find_args(self):
462 def find_args(self):
462 """Build self.args using all the fields."""
463 """Build self.args using all the fields."""
463 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
464 self.program + self.program_args
465 self.program + self.program_args
465
466
466 def start(self, n):
467 def start(self, n):
467 """Start n instances of the program using mpiexec."""
468 """Start n instances of the program using mpiexec."""
468 self.n = n
469 self.n = n
469 return super(MPIExecLauncher, self).start()
470 return super(MPIExecLauncher, self).start()
470
471
471
472
472 class MPIExecControllerLauncher(MPIExecLauncher):
473 class MPIExecControllerLauncher(MPIExecLauncher):
473 """Launch a controller using mpiexec."""
474 """Launch a controller using mpiexec."""
474
475
475 controller_cmd = List(ipcontroller_cmd_argv, config=True,
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
476 help="Popen command to launch the Contropper"
477 help="Popen command to launch the Contropper"
477 )
478 )
478 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
479 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
479 help="Command line arguments to pass to ipcontroller."
480 help="Command line arguments to pass to ipcontroller."
480 )
481 )
481 n = Int(1)
482 n = Int(1)
482
483
483 def start(self, profile_dir):
484 def start(self, profile_dir):
484 """Start the controller by profile_dir."""
485 """Start the controller by profile_dir."""
485 self.controller_args.extend(['profile_dir=%s'%profile_dir])
486 self.controller_args.extend(['profile_dir=%s'%profile_dir])
486 self.profile_dir = unicode(profile_dir)
487 self.profile_dir = unicode(profile_dir)
487 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
488 return super(MPIExecControllerLauncher, self).start(1)
489 return super(MPIExecControllerLauncher, self).start(1)
489
490
490 def find_args(self):
491 def find_args(self):
491 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
492 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
492 self.controller_cmd + self.controller_args
493 self.controller_cmd + self.controller_args
493
494
494
495
495 class MPIExecEngineSetLauncher(MPIExecLauncher):
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
496
497
497 program = List(ipengine_cmd_argv, config=True,
498 program = List(ipengine_cmd_argv, config=True,
498 help="Popen command for ipengine"
499 help="Popen command for ipengine"
499 )
500 )
500 program_args = List(
501 program_args = List(
501 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
502 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
502 help="Command line arguments for ipengine."
503 help="Command line arguments for ipengine."
503 )
504 )
504 n = Int(1)
505 n = Int(1)
505
506
506 def start(self, n, profile_dir):
507 def start(self, n, profile_dir):
507 """Start n engines by profile or profile_dir."""
508 """Start n engines by profile or profile_dir."""
508 self.program_args.extend(['profile_dir=%s'%profile_dir])
509 self.program_args.extend(['profile_dir=%s'%profile_dir])
509 self.profile_dir = unicode(profile_dir)
510 self.profile_dir = unicode(profile_dir)
510 self.n = n
511 self.n = n
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 return super(MPIExecEngineSetLauncher, self).start(n)
513 return super(MPIExecEngineSetLauncher, self).start(n)
513
514
514 #-----------------------------------------------------------------------------
515 #-----------------------------------------------------------------------------
515 # SSH launchers
516 # SSH launchers
516 #-----------------------------------------------------------------------------
517 #-----------------------------------------------------------------------------
517
518
518 # TODO: Get SSH Launcher working again.
519 # TODO: Get SSH Launcher working again.
519
520
520 class SSHLauncher(LocalProcessLauncher):
521 class SSHLauncher(LocalProcessLauncher):
521 """A minimal launcher for ssh.
522 """A minimal launcher for ssh.
522
523
523 To be useful this will probably have to be extended to use the ``sshx``
524 To be useful this will probably have to be extended to use the ``sshx``
524 idea for environment variables. There could be other things this needs
525 idea for environment variables. There could be other things this needs
525 as well.
526 as well.
526 """
527 """
527
528
528 ssh_cmd = List(['ssh'], config=True,
529 ssh_cmd = List(['ssh'], config=True,
529 help="command for starting ssh")
530 help="command for starting ssh")
530 ssh_args = List(['-tt'], config=True,
531 ssh_args = List(['-tt'], config=True,
531 help="args to pass to ssh")
532 help="args to pass to ssh")
532 program = List(['date'], config=True,
533 program = List(['date'], config=True,
533 help="Program to launch via ssh")
534 help="Program to launch via ssh")
534 program_args = List([], config=True,
535 program_args = List([], config=True,
535 help="args to pass to remote program")
536 help="args to pass to remote program")
536 hostname = Unicode('', config=True,
537 hostname = Unicode('', config=True,
537 help="hostname on which to launch the program")
538 help="hostname on which to launch the program")
538 user = Unicode('', config=True,
539 user = Unicode('', config=True,
539 help="username for ssh")
540 help="username for ssh")
540 location = Unicode('', config=True,
541 location = Unicode('', config=True,
541 help="user@hostname location for ssh in one setting")
542 help="user@hostname location for ssh in one setting")
542
543
543 def _hostname_changed(self, name, old, new):
544 def _hostname_changed(self, name, old, new):
544 if self.user:
545 if self.user:
545 self.location = u'%s@%s' % (self.user, new)
546 self.location = u'%s@%s' % (self.user, new)
546 else:
547 else:
547 self.location = new
548 self.location = new
548
549
549 def _user_changed(self, name, old, new):
550 def _user_changed(self, name, old, new):
550 self.location = u'%s@%s' % (new, self.hostname)
551 self.location = u'%s@%s' % (new, self.hostname)
551
552
552 def find_args(self):
553 def find_args(self):
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 self.program + self.program_args
555 self.program + self.program_args
555
556
556 def start(self, profile_dir, hostname=None, user=None):
557 def start(self, profile_dir, hostname=None, user=None):
557 self.profile_dir = unicode(profile_dir)
558 self.profile_dir = unicode(profile_dir)
558 if hostname is not None:
559 if hostname is not None:
559 self.hostname = hostname
560 self.hostname = hostname
560 if user is not None:
561 if user is not None:
561 self.user = user
562 self.user = user
562
563
563 return super(SSHLauncher, self).start()
564 return super(SSHLauncher, self).start()
564
565
565 def signal(self, sig):
566 def signal(self, sig):
566 if self.state == 'running':
567 if self.state == 'running':
567 # send escaped ssh connection-closer
568 # send escaped ssh connection-closer
568 self.process.stdin.write('~.')
569 self.process.stdin.write('~.')
569 self.process.stdin.flush()
570 self.process.stdin.flush()
570
571
571
572
572
573
573 class SSHControllerLauncher(SSHLauncher):
574 class SSHControllerLauncher(SSHLauncher):
574
575
575 program = List(ipcontroller_cmd_argv, config=True,
576 program = List(ipcontroller_cmd_argv, config=True,
576 help="remote ipcontroller command.")
577 help="remote ipcontroller command.")
577 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
578 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
578 help="Command line arguments to ipcontroller.")
579 help="Command line arguments to ipcontroller.")
579
580
580
581
581 class SSHEngineLauncher(SSHLauncher):
582 class SSHEngineLauncher(SSHLauncher):
582 program = List(ipengine_cmd_argv, config=True,
583 program = List(ipengine_cmd_argv, config=True,
583 help="remote ipengine command.")
584 help="remote ipengine command.")
584 # Command line arguments for ipengine.
585 # Command line arguments for ipengine.
585 program_args = List(
586 program_args = List(
586 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
587 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
587 help="Command line arguments to ipengine."
588 help="Command line arguments to ipengine."
588 )
589 )
589
590
590 class SSHEngineSetLauncher(LocalEngineSetLauncher):
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
591 launcher_class = SSHEngineLauncher
592 launcher_class = SSHEngineLauncher
592 engines = Dict(config=True,
593 engines = Dict(config=True,
593 help="""dict of engines to launch. This is a dict by hostname of ints,
594 help="""dict of engines to launch. This is a dict by hostname of ints,
594 corresponding to the number of engines to start on that host.""")
595 corresponding to the number of engines to start on that host.""")
595
596
596 def start(self, n, profile_dir):
597 def start(self, n, profile_dir):
597 """Start engines by profile or profile_dir.
598 """Start engines by profile or profile_dir.
598 `n` is ignored, and the `engines` config property is used instead.
599 `n` is ignored, and the `engines` config property is used instead.
599 """
600 """
600
601
601 self.profile_dir = unicode(profile_dir)
602 self.profile_dir = unicode(profile_dir)
602 dlist = []
603 dlist = []
603 for host, n in self.engines.iteritems():
604 for host, n in self.engines.iteritems():
604 if isinstance(n, (tuple, list)):
605 if isinstance(n, (tuple, list)):
605 n, args = n
606 n, args = n
606 else:
607 else:
607 args = copy.deepcopy(self.engine_args)
608 args = copy.deepcopy(self.engine_args)
608
609
609 if '@' in host:
610 if '@' in host:
610 user,host = host.split('@',1)
611 user,host = host.split('@',1)
611 else:
612 else:
612 user=None
613 user=None
613 for i in range(n):
614 for i in range(n):
614 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
615 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
615
616
616 # Copy the engine args over to each engine launcher.
617 # Copy the engine args over to each engine launcher.
617 i
618 i
618 el.program_args = args
619 el.program_args = args
619 el.on_stop(self._notice_engine_stopped)
620 el.on_stop(self._notice_engine_stopped)
620 d = el.start(profile_dir, user=user, hostname=host)
621 d = el.start(profile_dir, user=user, hostname=host)
621 if i==0:
622 if i==0:
622 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
623 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
623 self.launchers[host+str(i)] = el
624 self.launchers[host+str(i)] = el
624 dlist.append(d)
625 dlist.append(d)
625 self.notify_start(dlist)
626 self.notify_start(dlist)
626 return dlist
627 return dlist
627
628
628
629
629
630
630 #-----------------------------------------------------------------------------
631 #-----------------------------------------------------------------------------
631 # Windows HPC Server 2008 scheduler launchers
632 # Windows HPC Server 2008 scheduler launchers
632 #-----------------------------------------------------------------------------
633 #-----------------------------------------------------------------------------
633
634
634
635
635 # This is only used on Windows.
636 # This is only used on Windows.
636 def find_job_cmd():
637 def find_job_cmd():
637 if WINDOWS:
638 if WINDOWS:
638 try:
639 try:
639 return find_cmd('job')
640 return find_cmd('job')
640 except (FindCmdError, ImportError):
641 except (FindCmdError, ImportError):
641 # ImportError will be raised if win32api is not installed
642 # ImportError will be raised if win32api is not installed
642 return 'job'
643 return 'job'
643 else:
644 else:
644 return 'job'
645 return 'job'
645
646
646
647
647 class WindowsHPCLauncher(BaseLauncher):
648 class WindowsHPCLauncher(BaseLauncher):
648
649
649 job_id_regexp = Unicode(r'\d+', config=True,
650 job_id_regexp = Unicode(r'\d+', config=True,
650 help="""A regular expression used to get the job id from the output of the
651 help="""A regular expression used to get the job id from the output of the
651 submit_command. """
652 submit_command. """
652 )
653 )
653 job_file_name = Unicode(u'ipython_job.xml', config=True,
654 job_file_name = Unicode(u'ipython_job.xml', config=True,
654 help="The filename of the instantiated job script.")
655 help="The filename of the instantiated job script.")
655 # The full path to the instantiated job script. This gets made dynamically
656 # The full path to the instantiated job script. This gets made dynamically
656 # by combining the work_dir with the job_file_name.
657 # by combining the work_dir with the job_file_name.
657 job_file = Unicode(u'')
658 job_file = Unicode(u'')
658 scheduler = Unicode('', config=True,
659 scheduler = Unicode('', config=True,
659 help="The hostname of the scheduler to submit the job to.")
660 help="The hostname of the scheduler to submit the job to.")
660 job_cmd = Unicode(find_job_cmd(), config=True,
661 job_cmd = Unicode(find_job_cmd(), config=True,
661 help="The command for submitting jobs.")
662 help="The command for submitting jobs.")
662
663
663 def __init__(self, work_dir=u'.', config=None, **kwargs):
664 def __init__(self, work_dir=u'.', config=None, **kwargs):
664 super(WindowsHPCLauncher, self).__init__(
665 super(WindowsHPCLauncher, self).__init__(
665 work_dir=work_dir, config=config, **kwargs
666 work_dir=work_dir, config=config, **kwargs
666 )
667 )
667
668
668 @property
669 @property
669 def job_file(self):
670 def job_file(self):
670 return os.path.join(self.work_dir, self.job_file_name)
671 return os.path.join(self.work_dir, self.job_file_name)
671
672
672 def write_job_file(self, n):
673 def write_job_file(self, n):
673 raise NotImplementedError("Implement write_job_file in a subclass.")
674 raise NotImplementedError("Implement write_job_file in a subclass.")
674
675
675 def find_args(self):
676 def find_args(self):
676 return [u'job.exe']
677 return [u'job.exe']
677
678
678 def parse_job_id(self, output):
679 def parse_job_id(self, output):
679 """Take the output of the submit command and return the job id."""
680 """Take the output of the submit command and return the job id."""
680 m = re.search(self.job_id_regexp, output)
681 m = re.search(self.job_id_regexp, output)
681 if m is not None:
682 if m is not None:
682 job_id = m.group()
683 job_id = m.group()
683 else:
684 else:
684 raise LauncherError("Job id couldn't be determined: %s" % output)
685 raise LauncherError("Job id couldn't be determined: %s" % output)
685 self.job_id = job_id
686 self.job_id = job_id
686 self.log.info('Job started with job id: %r' % job_id)
687 self.log.info('Job started with job id: %r' % job_id)
687 return job_id
688 return job_id
688
689
689 def start(self, n):
690 def start(self, n):
690 """Start n copies of the process using the Win HPC job scheduler."""
691 """Start n copies of the process using the Win HPC job scheduler."""
691 self.write_job_file(n)
692 self.write_job_file(n)
692 args = [
693 args = [
693 'submit',
694 'submit',
694 '/jobfile:%s' % self.job_file,
695 '/jobfile:%s' % self.job_file,
695 '/scheduler:%s' % self.scheduler
696 '/scheduler:%s' % self.scheduler
696 ]
697 ]
697 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
698 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
698 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
699 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
699 output = check_output([self.job_cmd]+args,
700 output = check_output([self.job_cmd]+args,
700 env=os.environ,
701 env=os.environ,
701 cwd=self.work_dir,
702 cwd=self.work_dir,
702 stderr=STDOUT
703 stderr=STDOUT
703 )
704 )
704 job_id = self.parse_job_id(output)
705 job_id = self.parse_job_id(output)
705 self.notify_start(job_id)
706 self.notify_start(job_id)
706 return job_id
707 return job_id
707
708
708 def stop(self):
709 def stop(self):
709 args = [
710 args = [
710 'cancel',
711 'cancel',
711 self.job_id,
712 self.job_id,
712 '/scheduler:%s' % self.scheduler
713 '/scheduler:%s' % self.scheduler
713 ]
714 ]
714 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
715 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
715 try:
716 try:
716 output = check_output([self.job_cmd]+args,
717 output = check_output([self.job_cmd]+args,
717 env=os.environ,
718 env=os.environ,
718 cwd=self.work_dir,
719 cwd=self.work_dir,
719 stderr=STDOUT
720 stderr=STDOUT
720 )
721 )
721 except:
722 except:
722 output = 'The job already appears to be stoppped: %r' % self.job_id
723 output = 'The job already appears to be stoppped: %r' % self.job_id
723 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
724 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
724 return output
725 return output
725
726
726
727
727 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
728 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
728
729
729 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
730 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
730 help="WinHPC xml job file.")
731 help="WinHPC xml job file.")
731 extra_args = List([], config=False,
732 extra_args = List([], config=False,
732 help="extra args to pass to ipcontroller")
733 help="extra args to pass to ipcontroller")
733
734
734 def write_job_file(self, n):
735 def write_job_file(self, n):
735 job = IPControllerJob(config=self.config)
736 job = IPControllerJob(config=self.config)
736
737
737 t = IPControllerTask(config=self.config)
738 t = IPControllerTask(config=self.config)
738 # The tasks work directory is *not* the actual work directory of
739 # The tasks work directory is *not* the actual work directory of
739 # the controller. It is used as the base path for the stdout/stderr
740 # the controller. It is used as the base path for the stdout/stderr
740 # files that the scheduler redirects to.
741 # files that the scheduler redirects to.
741 t.work_directory = self.profile_dir
742 t.work_directory = self.profile_dir
742 # Add the profile_dir and from self.start().
743 # Add the profile_dir and from self.start().
743 t.controller_args.extend(self.extra_args)
744 t.controller_args.extend(self.extra_args)
744 job.add_task(t)
745 job.add_task(t)
745
746
746 self.log.info("Writing job description file: %s" % self.job_file)
747 self.log.info("Writing job description file: %s" % self.job_file)
747 job.write(self.job_file)
748 job.write(self.job_file)
748
749
749 @property
750 @property
750 def job_file(self):
751 def job_file(self):
751 return os.path.join(self.profile_dir, self.job_file_name)
752 return os.path.join(self.profile_dir, self.job_file_name)
752
753
753 def start(self, profile_dir):
754 def start(self, profile_dir):
754 """Start the controller by profile_dir."""
755 """Start the controller by profile_dir."""
755 self.extra_args = ['profile_dir=%s'%profile_dir]
756 self.extra_args = ['profile_dir=%s'%profile_dir]
756 self.profile_dir = unicode(profile_dir)
757 self.profile_dir = unicode(profile_dir)
757 return super(WindowsHPCControllerLauncher, self).start(1)
758 return super(WindowsHPCControllerLauncher, self).start(1)
758
759
759
760
760 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
761 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
761
762
762 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
763 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
763 help="jobfile for ipengines job")
764 help="jobfile for ipengines job")
764 extra_args = List([], config=False,
765 extra_args = List([], config=False,
765 help="extra args to pas to ipengine")
766 help="extra args to pas to ipengine")
766
767
767 def write_job_file(self, n):
768 def write_job_file(self, n):
768 job = IPEngineSetJob(config=self.config)
769 job = IPEngineSetJob(config=self.config)
769
770
770 for i in range(n):
771 for i in range(n):
771 t = IPEngineTask(config=self.config)
772 t = IPEngineTask(config=self.config)
772 # The tasks work directory is *not* the actual work directory of
773 # The tasks work directory is *not* the actual work directory of
773 # the engine. It is used as the base path for the stdout/stderr
774 # the engine. It is used as the base path for the stdout/stderr
774 # files that the scheduler redirects to.
775 # files that the scheduler redirects to.
775 t.work_directory = self.profile_dir
776 t.work_directory = self.profile_dir
776 # Add the profile_dir and from self.start().
777 # Add the profile_dir and from self.start().
777 t.engine_args.extend(self.extra_args)
778 t.engine_args.extend(self.extra_args)
778 job.add_task(t)
779 job.add_task(t)
779
780
780 self.log.info("Writing job description file: %s" % self.job_file)
781 self.log.info("Writing job description file: %s" % self.job_file)
781 job.write(self.job_file)
782 job.write(self.job_file)
782
783
783 @property
784 @property
784 def job_file(self):
785 def job_file(self):
785 return os.path.join(self.profile_dir, self.job_file_name)
786 return os.path.join(self.profile_dir, self.job_file_name)
786
787
787 def start(self, n, profile_dir):
788 def start(self, n, profile_dir):
788 """Start the controller by profile_dir."""
789 """Start the controller by profile_dir."""
789 self.extra_args = ['profile_dir=%s'%profile_dir]
790 self.extra_args = ['profile_dir=%s'%profile_dir]
790 self.profile_dir = unicode(profile_dir)
791 self.profile_dir = unicode(profile_dir)
791 return super(WindowsHPCEngineSetLauncher, self).start(n)
792 return super(WindowsHPCEngineSetLauncher, self).start(n)
792
793
793
794
794 #-----------------------------------------------------------------------------
795 #-----------------------------------------------------------------------------
795 # Batch (PBS) system launchers
796 # Batch (PBS) system launchers
796 #-----------------------------------------------------------------------------
797 #-----------------------------------------------------------------------------
797
798
798 class BatchSystemLauncher(BaseLauncher):
799 class BatchSystemLauncher(BaseLauncher):
799 """Launch an external process using a batch system.
800 """Launch an external process using a batch system.
800
801
801 This class is designed to work with UNIX batch systems like PBS, LSF,
802 This class is designed to work with UNIX batch systems like PBS, LSF,
802 GridEngine, etc. The overall model is that there are different commands
803 GridEngine, etc. The overall model is that there are different commands
803 like qsub, qdel, etc. that handle the starting and stopping of the process.
804 like qsub, qdel, etc. that handle the starting and stopping of the process.
804
805
805 This class also has the notion of a batch script. The ``batch_template``
806 This class also has the notion of a batch script. The ``batch_template``
806 attribute can be set to a string that is a template for the batch script.
807 attribute can be set to a string that is a template for the batch script.
807 This template is instantiated using string formatting. Thus the template can
808 This template is instantiated using string formatting. Thus the template can
808 use {n} fot the number of instances. Subclasses can add additional variables
809 use {n} fot the number of instances. Subclasses can add additional variables
809 to the template dict.
810 to the template dict.
810 """
811 """
811
812
812 # Subclasses must fill these in. See PBSEngineSet
813 # Subclasses must fill these in. See PBSEngineSet
813 submit_command = List([''], config=True,
814 submit_command = List([''], config=True,
814 help="The name of the command line program used to submit jobs.")
815 help="The name of the command line program used to submit jobs.")
815 delete_command = List([''], config=True,
816 delete_command = List([''], config=True,
816 help="The name of the command line program used to delete jobs.")
817 help="The name of the command line program used to delete jobs.")
817 job_id_regexp = Unicode('', config=True,
818 job_id_regexp = Unicode('', config=True,
818 help="""A regular expression used to get the job id from the output of the
819 help="""A regular expression used to get the job id from the output of the
819 submit_command.""")
820 submit_command.""")
820 batch_template = Unicode('', config=True,
821 batch_template = Unicode('', config=True,
821 help="The string that is the batch script template itself.")
822 help="The string that is the batch script template itself.")
822 batch_template_file = Unicode(u'', config=True,
823 batch_template_file = Unicode(u'', config=True,
823 help="The file that contains the batch template.")
824 help="The file that contains the batch template.")
824 batch_file_name = Unicode(u'batch_script', config=True,
825 batch_file_name = Unicode(u'batch_script', config=True,
825 help="The filename of the instantiated batch script.")
826 help="The filename of the instantiated batch script.")
826 queue = Unicode(u'', config=True,
827 queue = Unicode(u'', config=True,
827 help="The PBS Queue.")
828 help="The PBS Queue.")
828
829
829 # not configurable, override in subclasses
830 # not configurable, override in subclasses
830 # PBS Job Array regex
831 # PBS Job Array regex
831 job_array_regexp = Unicode('')
832 job_array_regexp = Unicode('')
832 job_array_template = Unicode('')
833 job_array_template = Unicode('')
833 # PBS Queue regex
834 # PBS Queue regex
834 queue_regexp = Unicode('')
835 queue_regexp = Unicode('')
835 queue_template = Unicode('')
836 queue_template = Unicode('')
836 # The default batch template, override in subclasses
837 # The default batch template, override in subclasses
837 default_template = Unicode('')
838 default_template = Unicode('')
838 # The full path to the instantiated batch script.
839 # The full path to the instantiated batch script.
839 batch_file = Unicode(u'')
840 batch_file = Unicode(u'')
840 # the format dict used with batch_template:
841 # the format dict used with batch_template:
841 context = Dict()
842 context = Dict()
843 # the Formatter instance for rendering the templates:
844 formatter = Instance(EvalFormatter, (), {})
842
845
843
846
844 def find_args(self):
847 def find_args(self):
845 return self.submit_command + [self.batch_file]
848 return self.submit_command + [self.batch_file]
846
849
847 def __init__(self, work_dir=u'.', config=None, **kwargs):
850 def __init__(self, work_dir=u'.', config=None, **kwargs):
848 super(BatchSystemLauncher, self).__init__(
851 super(BatchSystemLauncher, self).__init__(
849 work_dir=work_dir, config=config, **kwargs
852 work_dir=work_dir, config=config, **kwargs
850 )
853 )
851 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
854 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
852
855
853 def parse_job_id(self, output):
856 def parse_job_id(self, output):
854 """Take the output of the submit command and return the job id."""
857 """Take the output of the submit command and return the job id."""
855 m = re.search(self.job_id_regexp, output)
858 m = re.search(self.job_id_regexp, output)
856 if m is not None:
859 if m is not None:
857 job_id = m.group()
860 job_id = m.group()
858 else:
861 else:
859 raise LauncherError("Job id couldn't be determined: %s" % output)
862 raise LauncherError("Job id couldn't be determined: %s" % output)
860 self.job_id = job_id
863 self.job_id = job_id
861 self.log.info('Job submitted with job id: %r' % job_id)
864 self.log.info('Job submitted with job id: %r' % job_id)
862 return job_id
865 return job_id
863
866
864 def write_batch_script(self, n):
867 def write_batch_script(self, n):
865 """Instantiate and write the batch script to the work_dir."""
868 """Instantiate and write the batch script to the work_dir."""
866 self.context['n'] = n
869 self.context['n'] = n
867 self.context['queue'] = self.queue
870 self.context['queue'] = self.queue
868 # first priority is batch_template if set
871 # first priority is batch_template if set
869 if self.batch_template_file and not self.batch_template:
872 if self.batch_template_file and not self.batch_template:
870 # second priority is batch_template_file
873 # second priority is batch_template_file
871 with open(self.batch_template_file) as f:
874 with open(self.batch_template_file) as f:
872 self.batch_template = f.read()
875 self.batch_template = f.read()
873 if not self.batch_template:
876 if not self.batch_template:
874 # third (last) priority is default_template
877 # third (last) priority is default_template
875 self.batch_template = self.default_template
878 self.batch_template = self.default_template
876
879
877 regex = re.compile(self.job_array_regexp)
880 regex = re.compile(self.job_array_regexp)
878 # print regex.search(self.batch_template)
881 # print regex.search(self.batch_template)
879 if not regex.search(self.batch_template):
882 if not regex.search(self.batch_template):
880 self.log.info("adding job array settings to batch script")
883 self.log.info("adding job array settings to batch script")
881 firstline, rest = self.batch_template.split('\n',1)
884 firstline, rest = self.batch_template.split('\n',1)
882 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
885 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
883
886
884 regex = re.compile(self.queue_regexp)
887 regex = re.compile(self.queue_regexp)
885 # print regex.search(self.batch_template)
888 # print regex.search(self.batch_template)
886 if self.queue and not regex.search(self.batch_template):
889 if self.queue and not regex.search(self.batch_template):
887 self.log.info("adding PBS queue settings to batch script")
890 self.log.info("adding PBS queue settings to batch script")
888 firstline, rest = self.batch_template.split('\n',1)
891 firstline, rest = self.batch_template.split('\n',1)
889 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
892 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
890
893
891 script_as_string = self.batch_template.format(**self.context)
894 script_as_string = self.formatter.format(self.batch_template, **self.context)
892 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
895 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
893
896
894 with open(self.batch_file, 'w') as f:
897 with open(self.batch_file, 'w') as f:
895 f.write(script_as_string)
898 f.write(script_as_string)
896 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
899 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
897
900
898 def start(self, n, profile_dir):
901 def start(self, n, profile_dir):
899 """Start n copies of the process using a batch system."""
902 """Start n copies of the process using a batch system."""
900 # Here we save profile_dir in the context so they
903 # Here we save profile_dir in the context so they
901 # can be used in the batch script template as {profile_dir}
904 # can be used in the batch script template as {profile_dir}
902 self.context['profile_dir'] = profile_dir
905 self.context['profile_dir'] = profile_dir
903 self.profile_dir = unicode(profile_dir)
906 self.profile_dir = unicode(profile_dir)
904 self.write_batch_script(n)
907 self.write_batch_script(n)
905 output = check_output(self.args, env=os.environ)
908 output = check_output(self.args, env=os.environ)
906
909
907 job_id = self.parse_job_id(output)
910 job_id = self.parse_job_id(output)
908 self.notify_start(job_id)
911 self.notify_start(job_id)
909 return job_id
912 return job_id
910
913
911 def stop(self):
914 def stop(self):
912 output = check_output(self.delete_command+[self.job_id], env=os.environ)
915 output = check_output(self.delete_command+[self.job_id], env=os.environ)
913 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
916 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
914 return output
917 return output
915
918
916
919
917 class PBSLauncher(BatchSystemLauncher):
920 class PBSLauncher(BatchSystemLauncher):
918 """A BatchSystemLauncher subclass for PBS."""
921 """A BatchSystemLauncher subclass for PBS."""
919
922
920 submit_command = List(['qsub'], config=True,
923 submit_command = List(['qsub'], config=True,
921 help="The PBS submit command ['qsub']")
924 help="The PBS submit command ['qsub']")
922 delete_command = List(['qdel'], config=True,
925 delete_command = List(['qdel'], config=True,
923 help="The PBS delete command ['qsub']")
926 help="The PBS delete command ['qsub']")
924 job_id_regexp = Unicode(r'\d+', config=True,
927 job_id_regexp = Unicode(r'\d+', config=True,
925 help="Regular expresion for identifying the job ID [r'\d+']")
928 help="Regular expresion for identifying the job ID [r'\d+']")
926
929
927 batch_file = Unicode(u'')
930 batch_file = Unicode(u'')
928 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
931 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
929 job_array_template = Unicode('#PBS -t 1-{n}')
932 job_array_template = Unicode('#PBS -t 1-{n}')
930 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
933 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
931 queue_template = Unicode('#PBS -q {queue}')
934 queue_template = Unicode('#PBS -q {queue}')
932
935
933
936
934 class PBSControllerLauncher(PBSLauncher):
937 class PBSControllerLauncher(PBSLauncher):
935 """Launch a controller using PBS."""
938 """Launch a controller using PBS."""
936
939
937 batch_file_name = Unicode(u'pbs_controller', config=True,
940 batch_file_name = Unicode(u'pbs_controller', config=True,
938 help="batch file name for the controller job.")
941 help="batch file name for the controller job.")
939 default_template= Unicode("""#!/bin/sh
942 default_template= Unicode("""#!/bin/sh
940 #PBS -V
943 #PBS -V
941 #PBS -N ipcontroller
944 #PBS -N ipcontroller
942 %s --log-to-file profile_dir={profile_dir}
945 %s --log-to-file profile_dir={profile_dir}
943 """%(' '.join(ipcontroller_cmd_argv)))
946 """%(' '.join(ipcontroller_cmd_argv)))
944
947
945 def start(self, profile_dir):
948 def start(self, profile_dir):
946 """Start the controller by profile or profile_dir."""
949 """Start the controller by profile or profile_dir."""
947 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
950 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
948 return super(PBSControllerLauncher, self).start(1, profile_dir)
951 return super(PBSControllerLauncher, self).start(1, profile_dir)
949
952
950
953
951 class PBSEngineSetLauncher(PBSLauncher):
954 class PBSEngineSetLauncher(PBSLauncher):
952 """Launch Engines using PBS"""
955 """Launch Engines using PBS"""
953 batch_file_name = Unicode(u'pbs_engines', config=True,
956 batch_file_name = Unicode(u'pbs_engines', config=True,
954 help="batch file name for the engine(s) job.")
957 help="batch file name for the engine(s) job.")
955 default_template= Unicode(u"""#!/bin/sh
958 default_template= Unicode(u"""#!/bin/sh
956 #PBS -V
959 #PBS -V
957 #PBS -N ipengine
960 #PBS -N ipengine
958 %s profile_dir={profile_dir}
961 %s profile_dir={profile_dir}
959 """%(' '.join(ipengine_cmd_argv)))
962 """%(' '.join(ipengine_cmd_argv)))
960
963
961 def start(self, n, profile_dir):
964 def start(self, n, profile_dir):
962 """Start n engines by profile or profile_dir."""
965 """Start n engines by profile or profile_dir."""
963 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
966 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
964 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
967 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
965
968
966 #SGE is very similar to PBS
969 #SGE is very similar to PBS
967
970
968 class SGELauncher(PBSLauncher):
971 class SGELauncher(PBSLauncher):
969 """Sun GridEngine is a PBS clone with slightly different syntax"""
972 """Sun GridEngine is a PBS clone with slightly different syntax"""
970 job_array_regexp = Unicode('#\$\W+\-t')
973 job_array_regexp = Unicode('#\$\W+\-t')
971 job_array_template = Unicode('#$ -t 1-{n}')
974 job_array_template = Unicode('#$ -t 1-{n}')
972 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
975 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
973 queue_template = Unicode('#$ -q $queue')
976 queue_template = Unicode('#$ -q $queue')
974
977
975 class SGEControllerLauncher(SGELauncher):
978 class SGEControllerLauncher(SGELauncher):
976 """Launch a controller using SGE."""
979 """Launch a controller using SGE."""
977
980
978 batch_file_name = Unicode(u'sge_controller', config=True,
981 batch_file_name = Unicode(u'sge_controller', config=True,
979 help="batch file name for the ipontroller job.")
982 help="batch file name for the ipontroller job.")
980 default_template= Unicode(u"""#$ -V
983 default_template= Unicode(u"""#$ -V
981 #$ -S /bin/sh
984 #$ -S /bin/sh
982 #$ -N ipcontroller
985 #$ -N ipcontroller
983 %s --log-to-file profile_dir={profile_dir}
986 %s --log-to-file profile_dir={profile_dir}
984 """%(' '.join(ipcontroller_cmd_argv)))
987 """%(' '.join(ipcontroller_cmd_argv)))
985
988
986 def start(self, profile_dir):
989 def start(self, profile_dir):
987 """Start the controller by profile or profile_dir."""
990 """Start the controller by profile or profile_dir."""
988 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
991 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
989 return super(SGEControllerLauncher, self).start(1, profile_dir)
992 return super(SGEControllerLauncher, self).start(1, profile_dir)
990
993
991 class SGEEngineSetLauncher(SGELauncher):
994 class SGEEngineSetLauncher(SGELauncher):
992 """Launch Engines with SGE"""
995 """Launch Engines with SGE"""
993 batch_file_name = Unicode(u'sge_engines', config=True,
996 batch_file_name = Unicode(u'sge_engines', config=True,
994 help="batch file name for the engine(s) job.")
997 help="batch file name for the engine(s) job.")
995 default_template = Unicode("""#$ -V
998 default_template = Unicode("""#$ -V
996 #$ -S /bin/sh
999 #$ -S /bin/sh
997 #$ -N ipengine
1000 #$ -N ipengine
998 %s profile_dir={profile_dir}
1001 %s profile_dir={profile_dir}
999 """%(' '.join(ipengine_cmd_argv)))
1002 """%(' '.join(ipengine_cmd_argv)))
1000
1003
1001 def start(self, n, profile_dir):
1004 def start(self, n, profile_dir):
1002 """Start n engines by profile or profile_dir."""
1005 """Start n engines by profile or profile_dir."""
1003 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1006 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1004 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1007 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1005
1008
1006
1009
1007 #-----------------------------------------------------------------------------
1010 #-----------------------------------------------------------------------------
1008 # A launcher for ipcluster itself!
1011 # A launcher for ipcluster itself!
1009 #-----------------------------------------------------------------------------
1012 #-----------------------------------------------------------------------------
1010
1013
1011
1014
1012 class IPClusterLauncher(LocalProcessLauncher):
1015 class IPClusterLauncher(LocalProcessLauncher):
1013 """Launch the ipcluster program in an external process."""
1016 """Launch the ipcluster program in an external process."""
1014
1017
1015 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1018 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1016 help="Popen command for ipcluster")
1019 help="Popen command for ipcluster")
1017 ipcluster_args = List(
1020 ipcluster_args = List(
1018 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1021 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1019 help="Command line arguments to pass to ipcluster.")
1022 help="Command line arguments to pass to ipcluster.")
1020 ipcluster_subcommand = Unicode('start')
1023 ipcluster_subcommand = Unicode('start')
1021 ipcluster_n = Int(2)
1024 ipcluster_n = Int(2)
1022
1025
1023 def find_args(self):
1026 def find_args(self):
1024 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1027 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1025 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1028 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1026
1029
1027 def start(self):
1030 def start(self):
1028 self.log.info("Starting ipcluster: %r" % self.args)
1031 self.log.info("Starting ipcluster: %r" % self.args)
1029 return super(IPClusterLauncher, self).start()
1032 return super(IPClusterLauncher, self).start()
1030
1033
1031 #-----------------------------------------------------------------------------
1034 #-----------------------------------------------------------------------------
1032 # Collections of launchers
1035 # Collections of launchers
1033 #-----------------------------------------------------------------------------
1036 #-----------------------------------------------------------------------------
1034
1037
1035 local_launchers = [
1038 local_launchers = [
1036 LocalControllerLauncher,
1039 LocalControllerLauncher,
1037 LocalEngineLauncher,
1040 LocalEngineLauncher,
1038 LocalEngineSetLauncher,
1041 LocalEngineSetLauncher,
1039 ]
1042 ]
1040 mpi_launchers = [
1043 mpi_launchers = [
1041 MPIExecLauncher,
1044 MPIExecLauncher,
1042 MPIExecControllerLauncher,
1045 MPIExecControllerLauncher,
1043 MPIExecEngineSetLauncher,
1046 MPIExecEngineSetLauncher,
1044 ]
1047 ]
1045 ssh_launchers = [
1048 ssh_launchers = [
1046 SSHLauncher,
1049 SSHLauncher,
1047 SSHControllerLauncher,
1050 SSHControllerLauncher,
1048 SSHEngineLauncher,
1051 SSHEngineLauncher,
1049 SSHEngineSetLauncher,
1052 SSHEngineSetLauncher,
1050 ]
1053 ]
1051 winhpc_launchers = [
1054 winhpc_launchers = [
1052 WindowsHPCLauncher,
1055 WindowsHPCLauncher,
1053 WindowsHPCControllerLauncher,
1056 WindowsHPCControllerLauncher,
1054 WindowsHPCEngineSetLauncher,
1057 WindowsHPCEngineSetLauncher,
1055 ]
1058 ]
1056 pbs_launchers = [
1059 pbs_launchers = [
1057 PBSLauncher,
1060 PBSLauncher,
1058 PBSControllerLauncher,
1061 PBSControllerLauncher,
1059 PBSEngineSetLauncher,
1062 PBSEngineSetLauncher,
1060 ]
1063 ]
1061 sge_launchers = [
1064 sge_launchers = [
1062 SGELauncher,
1065 SGELauncher,
1063 SGEControllerLauncher,
1066 SGEControllerLauncher,
1064 SGEEngineSetLauncher,
1067 SGEEngineSetLauncher,
1065 ]
1068 ]
1066 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1069 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1067 + pbs_launchers + sge_launchers
1070 + pbs_launchers + sge_launchers
@@ -1,521 +1,560 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Utilities for working with strings and text.
3 Utilities for working with strings and text.
4 """
4 """
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import __main__
17 import __main__
18
18
19 import os
19 import os
20 import re
20 import re
21 import shutil
21 import shutil
22 from string import Formatter
22
23
23 from IPython.external.path import path
24 from IPython.external.path import path
24
25
25 from IPython.utils.io import nlprint
26 from IPython.utils.io import nlprint
26 from IPython.utils.data import flatten
27 from IPython.utils.data import flatten
27
28
28 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
29 # Code
30 # Code
30 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
31
32
32
33
33 def unquote_ends(istr):
34 def unquote_ends(istr):
34 """Remove a single pair of quotes from the endpoints of a string."""
35 """Remove a single pair of quotes from the endpoints of a string."""
35
36
36 if not istr:
37 if not istr:
37 return istr
38 return istr
38 if (istr[0]=="'" and istr[-1]=="'") or \
39 if (istr[0]=="'" and istr[-1]=="'") or \
39 (istr[0]=='"' and istr[-1]=='"'):
40 (istr[0]=='"' and istr[-1]=='"'):
40 return istr[1:-1]
41 return istr[1:-1]
41 else:
42 else:
42 return istr
43 return istr
43
44
44
45
45 class LSString(str):
46 class LSString(str):
46 """String derivative with a special access attributes.
47 """String derivative with a special access attributes.
47
48
48 These are normal strings, but with the special attributes:
49 These are normal strings, but with the special attributes:
49
50
50 .l (or .list) : value as list (split on newlines).
51 .l (or .list) : value as list (split on newlines).
51 .n (or .nlstr): original value (the string itself).
52 .n (or .nlstr): original value (the string itself).
52 .s (or .spstr): value as whitespace-separated string.
53 .s (or .spstr): value as whitespace-separated string.
53 .p (or .paths): list of path objects
54 .p (or .paths): list of path objects
54
55
55 Any values which require transformations are computed only once and
56 Any values which require transformations are computed only once and
56 cached.
57 cached.
57
58
58 Such strings are very useful to efficiently interact with the shell, which
59 Such strings are very useful to efficiently interact with the shell, which
59 typically only understands whitespace-separated options for commands."""
60 typically only understands whitespace-separated options for commands."""
60
61
61 def get_list(self):
62 def get_list(self):
62 try:
63 try:
63 return self.__list
64 return self.__list
64 except AttributeError:
65 except AttributeError:
65 self.__list = self.split('\n')
66 self.__list = self.split('\n')
66 return self.__list
67 return self.__list
67
68
68 l = list = property(get_list)
69 l = list = property(get_list)
69
70
70 def get_spstr(self):
71 def get_spstr(self):
71 try:
72 try:
72 return self.__spstr
73 return self.__spstr
73 except AttributeError:
74 except AttributeError:
74 self.__spstr = self.replace('\n',' ')
75 self.__spstr = self.replace('\n',' ')
75 return self.__spstr
76 return self.__spstr
76
77
77 s = spstr = property(get_spstr)
78 s = spstr = property(get_spstr)
78
79
79 def get_nlstr(self):
80 def get_nlstr(self):
80 return self
81 return self
81
82
82 n = nlstr = property(get_nlstr)
83 n = nlstr = property(get_nlstr)
83
84
84 def get_paths(self):
85 def get_paths(self):
85 try:
86 try:
86 return self.__paths
87 return self.__paths
87 except AttributeError:
88 except AttributeError:
88 self.__paths = [path(p) for p in self.split('\n') if os.path.exists(p)]
89 self.__paths = [path(p) for p in self.split('\n') if os.path.exists(p)]
89 return self.__paths
90 return self.__paths
90
91
91 p = paths = property(get_paths)
92 p = paths = property(get_paths)
92
93
93 # FIXME: We need to reimplement type specific displayhook and then add this
94 # FIXME: We need to reimplement type specific displayhook and then add this
94 # back as a custom printer. This should also be moved outside utils into the
95 # back as a custom printer. This should also be moved outside utils into the
95 # core.
96 # core.
96
97
97 # def print_lsstring(arg):
98 # def print_lsstring(arg):
98 # """ Prettier (non-repr-like) and more informative printer for LSString """
99 # """ Prettier (non-repr-like) and more informative printer for LSString """
99 # print "LSString (.p, .n, .l, .s available). Value:"
100 # print "LSString (.p, .n, .l, .s available). Value:"
100 # print arg
101 # print arg
101 #
102 #
102 #
103 #
103 # print_lsstring = result_display.when_type(LSString)(print_lsstring)
104 # print_lsstring = result_display.when_type(LSString)(print_lsstring)
104
105
105
106
106 class SList(list):
107 class SList(list):
107 """List derivative with a special access attributes.
108 """List derivative with a special access attributes.
108
109
109 These are normal lists, but with the special attributes:
110 These are normal lists, but with the special attributes:
110
111
111 .l (or .list) : value as list (the list itself).
112 .l (or .list) : value as list (the list itself).
112 .n (or .nlstr): value as a string, joined on newlines.
113 .n (or .nlstr): value as a string, joined on newlines.
113 .s (or .spstr): value as a string, joined on spaces.
114 .s (or .spstr): value as a string, joined on spaces.
114 .p (or .paths): list of path objects
115 .p (or .paths): list of path objects
115
116
116 Any values which require transformations are computed only once and
117 Any values which require transformations are computed only once and
117 cached."""
118 cached."""
118
119
119 def get_list(self):
120 def get_list(self):
120 return self
121 return self
121
122
122 l = list = property(get_list)
123 l = list = property(get_list)
123
124
124 def get_spstr(self):
125 def get_spstr(self):
125 try:
126 try:
126 return self.__spstr
127 return self.__spstr
127 except AttributeError:
128 except AttributeError:
128 self.__spstr = ' '.join(self)
129 self.__spstr = ' '.join(self)
129 return self.__spstr
130 return self.__spstr
130
131
131 s = spstr = property(get_spstr)
132 s = spstr = property(get_spstr)
132
133
133 def get_nlstr(self):
134 def get_nlstr(self):
134 try:
135 try:
135 return self.__nlstr
136 return self.__nlstr
136 except AttributeError:
137 except AttributeError:
137 self.__nlstr = '\n'.join(self)
138 self.__nlstr = '\n'.join(self)
138 return self.__nlstr
139 return self.__nlstr
139
140
140 n = nlstr = property(get_nlstr)
141 n = nlstr = property(get_nlstr)
141
142
142 def get_paths(self):
143 def get_paths(self):
143 try:
144 try:
144 return self.__paths
145 return self.__paths
145 except AttributeError:
146 except AttributeError:
146 self.__paths = [path(p) for p in self if os.path.exists(p)]
147 self.__paths = [path(p) for p in self if os.path.exists(p)]
147 return self.__paths
148 return self.__paths
148
149
149 p = paths = property(get_paths)
150 p = paths = property(get_paths)
150
151
151 def grep(self, pattern, prune = False, field = None):
152 def grep(self, pattern, prune = False, field = None):
152 """ Return all strings matching 'pattern' (a regex or callable)
153 """ Return all strings matching 'pattern' (a regex or callable)
153
154
154 This is case-insensitive. If prune is true, return all items
155 This is case-insensitive. If prune is true, return all items
155 NOT matching the pattern.
156 NOT matching the pattern.
156
157
157 If field is specified, the match must occur in the specified
158 If field is specified, the match must occur in the specified
158 whitespace-separated field.
159 whitespace-separated field.
159
160
160 Examples::
161 Examples::
161
162
162 a.grep( lambda x: x.startswith('C') )
163 a.grep( lambda x: x.startswith('C') )
163 a.grep('Cha.*log', prune=1)
164 a.grep('Cha.*log', prune=1)
164 a.grep('chm', field=-1)
165 a.grep('chm', field=-1)
165 """
166 """
166
167
167 def match_target(s):
168 def match_target(s):
168 if field is None:
169 if field is None:
169 return s
170 return s
170 parts = s.split()
171 parts = s.split()
171 try:
172 try:
172 tgt = parts[field]
173 tgt = parts[field]
173 return tgt
174 return tgt
174 except IndexError:
175 except IndexError:
175 return ""
176 return ""
176
177
177 if isinstance(pattern, basestring):
178 if isinstance(pattern, basestring):
178 pred = lambda x : re.search(pattern, x, re.IGNORECASE)
179 pred = lambda x : re.search(pattern, x, re.IGNORECASE)
179 else:
180 else:
180 pred = pattern
181 pred = pattern
181 if not prune:
182 if not prune:
182 return SList([el for el in self if pred(match_target(el))])
183 return SList([el for el in self if pred(match_target(el))])
183 else:
184 else:
184 return SList([el for el in self if not pred(match_target(el))])
185 return SList([el for el in self if not pred(match_target(el))])
185
186
186 def fields(self, *fields):
187 def fields(self, *fields):
187 """ Collect whitespace-separated fields from string list
188 """ Collect whitespace-separated fields from string list
188
189
189 Allows quick awk-like usage of string lists.
190 Allows quick awk-like usage of string lists.
190
191
191 Example data (in var a, created by 'a = !ls -l')::
192 Example data (in var a, created by 'a = !ls -l')::
192 -rwxrwxrwx 1 ville None 18 Dec 14 2006 ChangeLog
193 -rwxrwxrwx 1 ville None 18 Dec 14 2006 ChangeLog
193 drwxrwxrwx+ 6 ville None 0 Oct 24 18:05 IPython
194 drwxrwxrwx+ 6 ville None 0 Oct 24 18:05 IPython
194
195
195 a.fields(0) is ['-rwxrwxrwx', 'drwxrwxrwx+']
196 a.fields(0) is ['-rwxrwxrwx', 'drwxrwxrwx+']
196 a.fields(1,0) is ['1 -rwxrwxrwx', '6 drwxrwxrwx+']
197 a.fields(1,0) is ['1 -rwxrwxrwx', '6 drwxrwxrwx+']
197 (note the joining by space).
198 (note the joining by space).
198 a.fields(-1) is ['ChangeLog', 'IPython']
199 a.fields(-1) is ['ChangeLog', 'IPython']
199
200
200 IndexErrors are ignored.
201 IndexErrors are ignored.
201
202
202 Without args, fields() just split()'s the strings.
203 Without args, fields() just split()'s the strings.
203 """
204 """
204 if len(fields) == 0:
205 if len(fields) == 0:
205 return [el.split() for el in self]
206 return [el.split() for el in self]
206
207
207 res = SList()
208 res = SList()
208 for el in [f.split() for f in self]:
209 for el in [f.split() for f in self]:
209 lineparts = []
210 lineparts = []
210
211
211 for fd in fields:
212 for fd in fields:
212 try:
213 try:
213 lineparts.append(el[fd])
214 lineparts.append(el[fd])
214 except IndexError:
215 except IndexError:
215 pass
216 pass
216 if lineparts:
217 if lineparts:
217 res.append(" ".join(lineparts))
218 res.append(" ".join(lineparts))
218
219
219 return res
220 return res
220
221
221 def sort(self,field= None, nums = False):
222 def sort(self,field= None, nums = False):
222 """ sort by specified fields (see fields())
223 """ sort by specified fields (see fields())
223
224
224 Example::
225 Example::
225 a.sort(1, nums = True)
226 a.sort(1, nums = True)
226
227
227 Sorts a by second field, in numerical order (so that 21 > 3)
228 Sorts a by second field, in numerical order (so that 21 > 3)
228
229
229 """
230 """
230
231
231 #decorate, sort, undecorate
232 #decorate, sort, undecorate
232 if field is not None:
233 if field is not None:
233 dsu = [[SList([line]).fields(field), line] for line in self]
234 dsu = [[SList([line]).fields(field), line] for line in self]
234 else:
235 else:
235 dsu = [[line, line] for line in self]
236 dsu = [[line, line] for line in self]
236 if nums:
237 if nums:
237 for i in range(len(dsu)):
238 for i in range(len(dsu)):
238 numstr = "".join([ch for ch in dsu[i][0] if ch.isdigit()])
239 numstr = "".join([ch for ch in dsu[i][0] if ch.isdigit()])
239 try:
240 try:
240 n = int(numstr)
241 n = int(numstr)
241 except ValueError:
242 except ValueError:
242 n = 0;
243 n = 0;
243 dsu[i][0] = n
244 dsu[i][0] = n
244
245
245
246
246 dsu.sort()
247 dsu.sort()
247 return SList([t[1] for t in dsu])
248 return SList([t[1] for t in dsu])
248
249
249
250
250 # FIXME: We need to reimplement type specific displayhook and then add this
251 # FIXME: We need to reimplement type specific displayhook and then add this
251 # back as a custom printer. This should also be moved outside utils into the
252 # back as a custom printer. This should also be moved outside utils into the
252 # core.
253 # core.
253
254
254 # def print_slist(arg):
255 # def print_slist(arg):
255 # """ Prettier (non-repr-like) and more informative printer for SList """
256 # """ Prettier (non-repr-like) and more informative printer for SList """
256 # print "SList (.p, .n, .l, .s, .grep(), .fields(), sort() available):"
257 # print "SList (.p, .n, .l, .s, .grep(), .fields(), sort() available):"
257 # if hasattr(arg, 'hideonce') and arg.hideonce:
258 # if hasattr(arg, 'hideonce') and arg.hideonce:
258 # arg.hideonce = False
259 # arg.hideonce = False
259 # return
260 # return
260 #
261 #
261 # nlprint(arg)
262 # nlprint(arg)
262 #
263 #
263 # print_slist = result_display.when_type(SList)(print_slist)
264 # print_slist = result_display.when_type(SList)(print_slist)
264
265
265
266
266 def esc_quotes(strng):
267 def esc_quotes(strng):
267 """Return the input string with single and double quotes escaped out"""
268 """Return the input string with single and double quotes escaped out"""
268
269
269 return strng.replace('"','\\"').replace("'","\\'")
270 return strng.replace('"','\\"').replace("'","\\'")
270
271
271
272
272 def make_quoted_expr(s):
273 def make_quoted_expr(s):
273 """Return string s in appropriate quotes, using raw string if possible.
274 """Return string s in appropriate quotes, using raw string if possible.
274
275
275 XXX - example removed because it caused encoding errors in documentation
276 XXX - example removed because it caused encoding errors in documentation
276 generation. We need a new example that doesn't contain invalid chars.
277 generation. We need a new example that doesn't contain invalid chars.
277
278
278 Note the use of raw string and padding at the end to allow trailing
279 Note the use of raw string and padding at the end to allow trailing
279 backslash.
280 backslash.
280 """
281 """
281
282
282 tail = ''
283 tail = ''
283 tailpadding = ''
284 tailpadding = ''
284 raw = ''
285 raw = ''
285 ucode = 'u'
286 ucode = 'u'
286 if "\\" in s:
287 if "\\" in s:
287 raw = 'r'
288 raw = 'r'
288 if s.endswith('\\'):
289 if s.endswith('\\'):
289 tail = '[:-1]'
290 tail = '[:-1]'
290 tailpadding = '_'
291 tailpadding = '_'
291 if '"' not in s:
292 if '"' not in s:
292 quote = '"'
293 quote = '"'
293 elif "'" not in s:
294 elif "'" not in s:
294 quote = "'"
295 quote = "'"
295 elif '"""' not in s and not s.endswith('"'):
296 elif '"""' not in s and not s.endswith('"'):
296 quote = '"""'
297 quote = '"""'
297 elif "'''" not in s and not s.endswith("'"):
298 elif "'''" not in s and not s.endswith("'"):
298 quote = "'''"
299 quote = "'''"
299 else:
300 else:
300 # give up, backslash-escaped string will do
301 # give up, backslash-escaped string will do
301 return '"%s"' % esc_quotes(s)
302 return '"%s"' % esc_quotes(s)
302 res = ucode + raw + quote + s + tailpadding + quote + tail
303 res = ucode + raw + quote + s + tailpadding + quote + tail
303 return res
304 return res
304
305
305
306
306 def qw(words,flat=0,sep=None,maxsplit=-1):
307 def qw(words,flat=0,sep=None,maxsplit=-1):
307 """Similar to Perl's qw() operator, but with some more options.
308 """Similar to Perl's qw() operator, but with some more options.
308
309
309 qw(words,flat=0,sep=' ',maxsplit=-1) -> words.split(sep,maxsplit)
310 qw(words,flat=0,sep=' ',maxsplit=-1) -> words.split(sep,maxsplit)
310
311
311 words can also be a list itself, and with flat=1, the output will be
312 words can also be a list itself, and with flat=1, the output will be
312 recursively flattened.
313 recursively flattened.
313
314
314 Examples:
315 Examples:
315
316
316 >>> qw('1 2')
317 >>> qw('1 2')
317 ['1', '2']
318 ['1', '2']
318
319
319 >>> qw(['a b','1 2',['m n','p q']])
320 >>> qw(['a b','1 2',['m n','p q']])
320 [['a', 'b'], ['1', '2'], [['m', 'n'], ['p', 'q']]]
321 [['a', 'b'], ['1', '2'], [['m', 'n'], ['p', 'q']]]
321
322
322 >>> qw(['a b','1 2',['m n','p q']],flat=1)
323 >>> qw(['a b','1 2',['m n','p q']],flat=1)
323 ['a', 'b', '1', '2', 'm', 'n', 'p', 'q']
324 ['a', 'b', '1', '2', 'm', 'n', 'p', 'q']
324 """
325 """
325
326
326 if isinstance(words, basestring):
327 if isinstance(words, basestring):
327 return [word.strip() for word in words.split(sep,maxsplit)
328 return [word.strip() for word in words.split(sep,maxsplit)
328 if word and not word.isspace() ]
329 if word and not word.isspace() ]
329 if flat:
330 if flat:
330 return flatten(map(qw,words,[1]*len(words)))
331 return flatten(map(qw,words,[1]*len(words)))
331 return map(qw,words)
332 return map(qw,words)
332
333
333
334
334 def qwflat(words,sep=None,maxsplit=-1):
335 def qwflat(words,sep=None,maxsplit=-1):
335 """Calls qw(words) in flat mode. It's just a convenient shorthand."""
336 """Calls qw(words) in flat mode. It's just a convenient shorthand."""
336 return qw(words,1,sep,maxsplit)
337 return qw(words,1,sep,maxsplit)
337
338
338
339
339 def qw_lol(indata):
340 def qw_lol(indata):
340 """qw_lol('a b') -> [['a','b']],
341 """qw_lol('a b') -> [['a','b']],
341 otherwise it's just a call to qw().
342 otherwise it's just a call to qw().
342
343
343 We need this to make sure the modules_some keys *always* end up as a
344 We need this to make sure the modules_some keys *always* end up as a
344 list of lists."""
345 list of lists."""
345
346
346 if isinstance(indata, basestring):
347 if isinstance(indata, basestring):
347 return [qw(indata)]
348 return [qw(indata)]
348 else:
349 else:
349 return qw(indata)
350 return qw(indata)
350
351
351
352
352 def grep(pat,list,case=1):
353 def grep(pat,list,case=1):
353 """Simple minded grep-like function.
354 """Simple minded grep-like function.
354 grep(pat,list) returns occurrences of pat in list, None on failure.
355 grep(pat,list) returns occurrences of pat in list, None on failure.
355
356
356 It only does simple string matching, with no support for regexps. Use the
357 It only does simple string matching, with no support for regexps. Use the
357 option case=0 for case-insensitive matching."""
358 option case=0 for case-insensitive matching."""
358
359
359 # This is pretty crude. At least it should implement copying only references
360 # This is pretty crude. At least it should implement copying only references
360 # to the original data in case it's big. Now it copies the data for output.
361 # to the original data in case it's big. Now it copies the data for output.
361 out=[]
362 out=[]
362 if case:
363 if case:
363 for term in list:
364 for term in list:
364 if term.find(pat)>-1: out.append(term)
365 if term.find(pat)>-1: out.append(term)
365 else:
366 else:
366 lpat=pat.lower()
367 lpat=pat.lower()
367 for term in list:
368 for term in list:
368 if term.lower().find(lpat)>-1: out.append(term)
369 if term.lower().find(lpat)>-1: out.append(term)
369
370
370 if len(out): return out
371 if len(out): return out
371 else: return None
372 else: return None
372
373
373
374
374 def dgrep(pat,*opts):
375 def dgrep(pat,*opts):
375 """Return grep() on dir()+dir(__builtins__).
376 """Return grep() on dir()+dir(__builtins__).
376
377
377 A very common use of grep() when working interactively."""
378 A very common use of grep() when working interactively."""
378
379
379 return grep(pat,dir(__main__)+dir(__main__.__builtins__),*opts)
380 return grep(pat,dir(__main__)+dir(__main__.__builtins__),*opts)
380
381
381
382
382 def idgrep(pat):
383 def idgrep(pat):
383 """Case-insensitive dgrep()"""
384 """Case-insensitive dgrep()"""
384
385
385 return dgrep(pat,0)
386 return dgrep(pat,0)
386
387
387
388
388 def igrep(pat,list):
389 def igrep(pat,list):
389 """Synonym for case-insensitive grep."""
390 """Synonym for case-insensitive grep."""
390
391
391 return grep(pat,list,case=0)
392 return grep(pat,list,case=0)
392
393
393
394
394 def indent(instr,nspaces=4, ntabs=0, flatten=False):
395 def indent(instr,nspaces=4, ntabs=0, flatten=False):
395 """Indent a string a given number of spaces or tabstops.
396 """Indent a string a given number of spaces or tabstops.
396
397
397 indent(str,nspaces=4,ntabs=0) -> indent str by ntabs+nspaces.
398 indent(str,nspaces=4,ntabs=0) -> indent str by ntabs+nspaces.
398
399
399 Parameters
400 Parameters
400 ----------
401 ----------
401
402
402 instr : basestring
403 instr : basestring
403 The string to be indented.
404 The string to be indented.
404 nspaces : int (default: 4)
405 nspaces : int (default: 4)
405 The number of spaces to be indented.
406 The number of spaces to be indented.
406 ntabs : int (default: 0)
407 ntabs : int (default: 0)
407 The number of tabs to be indented.
408 The number of tabs to be indented.
408 flatten : bool (default: False)
409 flatten : bool (default: False)
409 Whether to scrub existing indentation. If True, all lines will be
410 Whether to scrub existing indentation. If True, all lines will be
410 aligned to the same indentation. If False, existing indentation will
411 aligned to the same indentation. If False, existing indentation will
411 be strictly increased.
412 be strictly increased.
412
413
413 Returns
414 Returns
414 -------
415 -------
415
416
416 str|unicode : string indented by ntabs and nspaces.
417 str|unicode : string indented by ntabs and nspaces.
417
418
418 """
419 """
419 if instr is None:
420 if instr is None:
420 return
421 return
421 ind = '\t'*ntabs+' '*nspaces
422 ind = '\t'*ntabs+' '*nspaces
422 if flatten:
423 if flatten:
423 pat = re.compile(r'^\s*', re.MULTILINE)
424 pat = re.compile(r'^\s*', re.MULTILINE)
424 else:
425 else:
425 pat = re.compile(r'^', re.MULTILINE)
426 pat = re.compile(r'^', re.MULTILINE)
426 outstr = re.sub(pat, ind, instr)
427 outstr = re.sub(pat, ind, instr)
427 if outstr.endswith(os.linesep+ind):
428 if outstr.endswith(os.linesep+ind):
428 return outstr[:-len(ind)]
429 return outstr[:-len(ind)]
429 else:
430 else:
430 return outstr
431 return outstr
431
432
432 def native_line_ends(filename,backup=1):
433 def native_line_ends(filename,backup=1):
433 """Convert (in-place) a file to line-ends native to the current OS.
434 """Convert (in-place) a file to line-ends native to the current OS.
434
435
435 If the optional backup argument is given as false, no backup of the
436 If the optional backup argument is given as false, no backup of the
436 original file is left. """
437 original file is left. """
437
438
438 backup_suffixes = {'posix':'~','dos':'.bak','nt':'.bak','mac':'.bak'}
439 backup_suffixes = {'posix':'~','dos':'.bak','nt':'.bak','mac':'.bak'}
439
440
440 bak_filename = filename + backup_suffixes[os.name]
441 bak_filename = filename + backup_suffixes[os.name]
441
442
442 original = open(filename).read()
443 original = open(filename).read()
443 shutil.copy2(filename,bak_filename)
444 shutil.copy2(filename,bak_filename)
444 try:
445 try:
445 new = open(filename,'wb')
446 new = open(filename,'wb')
446 new.write(os.linesep.join(original.splitlines()))
447 new.write(os.linesep.join(original.splitlines()))
447 new.write(os.linesep) # ALWAYS put an eol at the end of the file
448 new.write(os.linesep) # ALWAYS put an eol at the end of the file
448 new.close()
449 new.close()
449 except:
450 except:
450 os.rename(bak_filename,filename)
451 os.rename(bak_filename,filename)
451 if not backup:
452 if not backup:
452 try:
453 try:
453 os.remove(bak_filename)
454 os.remove(bak_filename)
454 except:
455 except:
455 pass
456 pass
456
457
457
458
458 def list_strings(arg):
459 def list_strings(arg):
459 """Always return a list of strings, given a string or list of strings
460 """Always return a list of strings, given a string or list of strings
460 as input.
461 as input.
461
462
462 :Examples:
463 :Examples:
463
464
464 In [7]: list_strings('A single string')
465 In [7]: list_strings('A single string')
465 Out[7]: ['A single string']
466 Out[7]: ['A single string']
466
467
467 In [8]: list_strings(['A single string in a list'])
468 In [8]: list_strings(['A single string in a list'])
468 Out[8]: ['A single string in a list']
469 Out[8]: ['A single string in a list']
469
470
470 In [9]: list_strings(['A','list','of','strings'])
471 In [9]: list_strings(['A','list','of','strings'])
471 Out[9]: ['A', 'list', 'of', 'strings']
472 Out[9]: ['A', 'list', 'of', 'strings']
472 """
473 """
473
474
474 if isinstance(arg,basestring): return [arg]
475 if isinstance(arg,basestring): return [arg]
475 else: return arg
476 else: return arg
476
477
477
478
478 def marquee(txt='',width=78,mark='*'):
479 def marquee(txt='',width=78,mark='*'):
479 """Return the input string centered in a 'marquee'.
480 """Return the input string centered in a 'marquee'.
480
481
481 :Examples:
482 :Examples:
482
483
483 In [16]: marquee('A test',40)
484 In [16]: marquee('A test',40)
484 Out[16]: '**************** A test ****************'
485 Out[16]: '**************** A test ****************'
485
486
486 In [17]: marquee('A test',40,'-')
487 In [17]: marquee('A test',40,'-')
487 Out[17]: '---------------- A test ----------------'
488 Out[17]: '---------------- A test ----------------'
488
489
489 In [18]: marquee('A test',40,' ')
490 In [18]: marquee('A test',40,' ')
490 Out[18]: ' A test '
491 Out[18]: ' A test '
491
492
492 """
493 """
493 if not txt:
494 if not txt:
494 return (mark*width)[:width]
495 return (mark*width)[:width]
495 nmark = (width-len(txt)-2)/len(mark)/2
496 nmark = (width-len(txt)-2)/len(mark)/2
496 if nmark < 0: nmark =0
497 if nmark < 0: nmark =0
497 marks = mark*nmark
498 marks = mark*nmark
498 return '%s %s %s' % (marks,txt,marks)
499 return '%s %s %s' % (marks,txt,marks)
499
500
500
501
501 ini_spaces_re = re.compile(r'^(\s+)')
502 ini_spaces_re = re.compile(r'^(\s+)')
502
503
503 def num_ini_spaces(strng):
504 def num_ini_spaces(strng):
504 """Return the number of initial spaces in a string"""
505 """Return the number of initial spaces in a string"""
505
506
506 ini_spaces = ini_spaces_re.match(strng)
507 ini_spaces = ini_spaces_re.match(strng)
507 if ini_spaces:
508 if ini_spaces:
508 return ini_spaces.end()
509 return ini_spaces.end()
509 else:
510 else:
510 return 0
511 return 0
511
512
512
513
513 def format_screen(strng):
514 def format_screen(strng):
514 """Format a string for screen printing.
515 """Format a string for screen printing.
515
516
516 This removes some latex-type format codes."""
517 This removes some latex-type format codes."""
517 # Paragraph continue
518 # Paragraph continue
518 par_re = re.compile(r'\\$',re.MULTILINE)
519 par_re = re.compile(r'\\$',re.MULTILINE)
519 strng = par_re.sub('',strng)
520 strng = par_re.sub('',strng)
520 return strng
521 return strng
521
522
523
524 class EvalFormatter(Formatter):
525 """A String Formatter that allows evaluation of simple expressions.
526
527 Any time a format key is not found in the kwargs,
528 it will be tried as an expression in the kwargs namespace.
529
530 This is to be used in templating cases, such as the parallel batch
531 script templates, where simple arithmetic on arguments is useful.
532
533 Examples
534 --------
535
536 In [1]: f = EvalFormatter()
537 In [2]: f.format('{n/4}', n=8)
538 Out[2]: '2'
539
540 In [3]: f.format('{range(3)}')
541 Out[3]: '[0, 1, 2]'
542
543 In [4]: f.format('{3*2}')
544 Out[4]: '6'
545 """
546
547 def get_value(self, key, args, kwargs):
548 if isinstance(key, (int, long)):
549 return args[key]
550 elif key in kwargs:
551 return kwargs[key]
552 else:
553 # evaluate the expression using kwargs as namespace
554 try:
555 return eval(key, kwargs)
556 except Exception:
557 # classify all bad expressions as key errors
558 raise KeyError(key)
559
560
@@ -1,507 +1,504 b''
1 .. _parallel_process:
1 .. _parallel_process:
2
2
3 ===========================================
3 ===========================================
4 Starting the IPython controller and engines
4 Starting the IPython controller and engines
5 ===========================================
5 ===========================================
6
6
7 To use IPython for parallel computing, you need to start one instance of
7 To use IPython for parallel computing, you need to start one instance of
8 the controller and one or more instances of the engine. The controller
8 the controller and one or more instances of the engine. The controller
9 and each engine can run on different machines or on the same machine.
9 and each engine can run on different machines or on the same machine.
10 Because of this, there are many different possibilities.
10 Because of this, there are many different possibilities.
11
11
12 Broadly speaking, there are two ways of going about starting a controller and engines:
12 Broadly speaking, there are two ways of going about starting a controller and engines:
13
13
14 * In an automated manner using the :command:`ipcluster` command.
14 * In an automated manner using the :command:`ipcluster` command.
15 * In a more manual way using the :command:`ipcontroller` and
15 * In a more manual way using the :command:`ipcontroller` and
16 :command:`ipengine` commands.
16 :command:`ipengine` commands.
17
17
18 This document describes both of these methods. We recommend that new users
18 This document describes both of these methods. We recommend that new users
19 start with the :command:`ipcluster` command as it simplifies many common usage
19 start with the :command:`ipcluster` command as it simplifies many common usage
20 cases.
20 cases.
21
21
22 General considerations
22 General considerations
23 ======================
23 ======================
24
24
25 Before delving into the details about how you can start a controller and
25 Before delving into the details about how you can start a controller and
26 engines using the various methods, we outline some of the general issues that
26 engines using the various methods, we outline some of the general issues that
27 come up when starting the controller and engines. These things come up no
27 come up when starting the controller and engines. These things come up no
28 matter which method you use to start your IPython cluster.
28 matter which method you use to start your IPython cluster.
29
29
30 Let's say that you want to start the controller on ``host0`` and engines on
30 Let's say that you want to start the controller on ``host0`` and engines on
31 hosts ``host1``-``hostn``. The following steps are then required:
31 hosts ``host1``-``hostn``. The following steps are then required:
32
32
33 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
33 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
34 ``host0``.
34 ``host0``.
35 2. Move the JSON file (:file:`ipcontroller-engine.json`) created by the
35 2. Move the JSON file (:file:`ipcontroller-engine.json`) created by the
36 controller from ``host0`` to hosts ``host1``-``hostn``.
36 controller from ``host0`` to hosts ``host1``-``hostn``.
37 3. Start the engines on hosts ``host1``-``hostn`` by running
37 3. Start the engines on hosts ``host1``-``hostn`` by running
38 :command:`ipengine`. This command has to be told where the JSON file
38 :command:`ipengine`. This command has to be told where the JSON file
39 (:file:`ipcontroller-engine.json`) is located.
39 (:file:`ipcontroller-engine.json`) is located.
40
40
41 At this point, the controller and engines will be connected. By default, the JSON files
41 At this point, the controller and engines will be connected. By default, the JSON files
42 created by the controller are put into the :file:`~/.ipython/cluster_default/security`
42 created by the controller are put into the :file:`~/.ipython/cluster_default/security`
43 directory. If the engines share a filesystem with the controller, step 2 can be skipped as
43 directory. If the engines share a filesystem with the controller, step 2 can be skipped as
44 the engines will automatically look at that location.
44 the engines will automatically look at that location.
45
45
46 The final step required to actually use the running controller from a client is to move
46 The final step required to actually use the running controller from a client is to move
47 the JSON file :file:`ipcontroller-client.json` from ``host0`` to any host where clients
47 the JSON file :file:`ipcontroller-client.json` from ``host0`` to any host where clients
48 will be run. If these file are put into the :file:`~/.ipython/cluster_default/security`
48 will be run. If these file are put into the :file:`~/.ipython/cluster_default/security`
49 directory of the client's host, they will be found automatically. Otherwise, the full path
49 directory of the client's host, they will be found automatically. Otherwise, the full path
50 to them has to be passed to the client's constructor.
50 to them has to be passed to the client's constructor.
51
51
52 Using :command:`ipcluster`
52 Using :command:`ipcluster`
53 ===========================
53 ===========================
54
54
55 The :command:`ipcluster` command provides a simple way of starting a
55 The :command:`ipcluster` command provides a simple way of starting a
56 controller and engines in the following situations:
56 controller and engines in the following situations:
57
57
58 1. When the controller and engines are all run on localhost. This is useful
58 1. When the controller and engines are all run on localhost. This is useful
59 for testing or running on a multicore computer.
59 for testing or running on a multicore computer.
60 2. When engines are started using the :command:`mpiexec` command that comes
60 2. When engines are started using the :command:`mpiexec` command that comes
61 with most MPI [MPI]_ implementations
61 with most MPI [MPI]_ implementations
62 3. When engines are started using the PBS [PBS]_ batch system
62 3. When engines are started using the PBS [PBS]_ batch system
63 (or other `qsub` systems, such as SGE).
63 (or other `qsub` systems, such as SGE).
64 4. When the controller is started on localhost and the engines are started on
64 4. When the controller is started on localhost and the engines are started on
65 remote nodes using :command:`ssh`.
65 remote nodes using :command:`ssh`.
66 5. When engines are started using the Windows HPC Server batch system.
66 5. When engines are started using the Windows HPC Server batch system.
67
67
68 .. note::
68 .. note::
69
69
70 Currently :command:`ipcluster` requires that the
70 Currently :command:`ipcluster` requires that the
71 :file:`~/.ipython/cluster_<profile>/security` directory live on a shared filesystem that is
71 :file:`~/.ipython/cluster_<profile>/security` directory live on a shared filesystem that is
72 seen by both the controller and engines. If you don't have a shared file
72 seen by both the controller and engines. If you don't have a shared file
73 system you will need to use :command:`ipcontroller` and
73 system you will need to use :command:`ipcontroller` and
74 :command:`ipengine` directly.
74 :command:`ipengine` directly.
75
75
76 Under the hood, :command:`ipcluster` just uses :command:`ipcontroller`
76 Under the hood, :command:`ipcluster` just uses :command:`ipcontroller`
77 and :command:`ipengine` to perform the steps described above.
77 and :command:`ipengine` to perform the steps described above.
78
78
79 The simplest way to use ipcluster requires no configuration, and will
79 The simplest way to use ipcluster requires no configuration, and will
80 launch a controller and a number of engines on the local machine. For instance,
80 launch a controller and a number of engines on the local machine. For instance,
81 to start one controller and 4 engines on localhost, just do::
81 to start one controller and 4 engines on localhost, just do::
82
82
83 $ ipcluster start n=4
83 $ ipcluster start n=4
84
84
85 To see other command line options, do::
85 To see other command line options, do::
86
86
87 $ ipcluster -h
87 $ ipcluster -h
88
88
89
89
90 Configuring an IPython cluster
90 Configuring an IPython cluster
91 ==============================
91 ==============================
92
92
93 Cluster configurations are stored as `profiles`. You can create a new profile with::
93 Cluster configurations are stored as `profiles`. You can create a new profile with::
94
94
95 $ ipcluster create profile=myprofile
95 $ ipcluster create profile=myprofile
96
96
97 This will create the directory :file:`IPYTHONDIR/cluster_myprofile`, and populate it
97 This will create the directory :file:`IPYTHONDIR/cluster_myprofile`, and populate it
98 with the default configuration files for the three IPython cluster commands. Once
98 with the default configuration files for the three IPython cluster commands. Once
99 you edit those files, you can continue to call ipcluster/ipcontroller/ipengine
99 you edit those files, you can continue to call ipcluster/ipcontroller/ipengine
100 with no arguments beyond ``p=myprofile``, and any configuration will be maintained.
100 with no arguments beyond ``p=myprofile``, and any configuration will be maintained.
101
101
102 There is no limit to the number of profiles you can have, so you can maintain a profile for each
102 There is no limit to the number of profiles you can have, so you can maintain a profile for each
103 of your common use cases. The default profile will be used whenever the
103 of your common use cases. The default profile will be used whenever the
104 profile argument is not specified, so edit :file:`IPYTHONDIR/cluster_default/*_config.py` to
104 profile argument is not specified, so edit :file:`IPYTHONDIR/cluster_default/*_config.py` to
105 represent your most common use case.
105 represent your most common use case.
106
106
107 The configuration files are loaded with commented-out settings and explanations,
107 The configuration files are loaded with commented-out settings and explanations,
108 which should cover most of the available possibilities.
108 which should cover most of the available possibilities.
109
109
110 Using various batch systems with :command:`ipcluster`
110 Using various batch systems with :command:`ipcluster`
111 ------------------------------------------------------
111 ------------------------------------------------------
112
112
113 :command:`ipcluster` has a notion of Launchers that can start controllers
113 :command:`ipcluster` has a notion of Launchers that can start controllers
114 and engines with various remote execution schemes. Currently supported
114 and engines with various remote execution schemes. Currently supported
115 models include :command:`ssh`, :command`mpiexec`, PBS-style (Torque, SGE),
115 models include :command:`ssh`, :command`mpiexec`, PBS-style (Torque, SGE),
116 and Windows HPC Server.
116 and Windows HPC Server.
117
117
118 .. note::
118 .. note::
119
119
120 The Launchers and configuration are designed in such a way that advanced
120 The Launchers and configuration are designed in such a way that advanced
121 users can subclass and configure them to fit their own system that we
121 users can subclass and configure them to fit their own system that we
122 have not yet supported (such as Condor)
122 have not yet supported (such as Condor)
123
123
124 Using :command:`ipcluster` in mpiexec/mpirun mode
124 Using :command:`ipcluster` in mpiexec/mpirun mode
125 --------------------------------------------------
125 --------------------------------------------------
126
126
127
127
128 The mpiexec/mpirun mode is useful if you:
128 The mpiexec/mpirun mode is useful if you:
129
129
130 1. Have MPI installed.
130 1. Have MPI installed.
131 2. Your systems are configured to use the :command:`mpiexec` or
131 2. Your systems are configured to use the :command:`mpiexec` or
132 :command:`mpirun` commands to start MPI processes.
132 :command:`mpirun` commands to start MPI processes.
133
133
134 If these are satisfied, you can create a new profile::
134 If these are satisfied, you can create a new profile::
135
135
136 $ ipcluster create profile=mpi
136 $ ipcluster create profile=mpi
137
137
138 and edit the file :file:`IPYTHONDIR/cluster_mpi/ipcluster_config.py`.
138 and edit the file :file:`IPYTHONDIR/cluster_mpi/ipcluster_config.py`.
139
139
140 There, instruct ipcluster to use the MPIExec launchers by adding the lines:
140 There, instruct ipcluster to use the MPIExec launchers by adding the lines:
141
141
142 .. sourcecode:: python
142 .. sourcecode:: python
143
143
144 c.IPClusterEnginesApp.engine_launcher = 'IPython.parallel.apps.launcher.MPIExecEngineSetLauncher'
144 c.IPClusterEnginesApp.engine_launcher = 'IPython.parallel.apps.launcher.MPIExecEngineSetLauncher'
145
145
146 If the default MPI configuration is correct, then you can now start your cluster, with::
146 If the default MPI configuration is correct, then you can now start your cluster, with::
147
147
148 $ ipcluster start n=4 profile=mpi
148 $ ipcluster start n=4 profile=mpi
149
149
150 This does the following:
150 This does the following:
151
151
152 1. Starts the IPython controller on current host.
152 1. Starts the IPython controller on current host.
153 2. Uses :command:`mpiexec` to start 4 engines.
153 2. Uses :command:`mpiexec` to start 4 engines.
154
154
155 If you have a reason to also start the Controller with mpi, you can specify:
155 If you have a reason to also start the Controller with mpi, you can specify:
156
156
157 .. sourcecode:: python
157 .. sourcecode:: python
158
158
159 c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.MPIExecControllerLauncher'
159 c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.MPIExecControllerLauncher'
160
160
161 .. note::
161 .. note::
162
162
163 The Controller *will not* be in the same MPI universe as the engines, so there is not
163 The Controller *will not* be in the same MPI universe as the engines, so there is not
164 much reason to do this unless sysadmins demand it.
164 much reason to do this unless sysadmins demand it.
165
165
166 On newer MPI implementations (such as OpenMPI), this will work even if you
166 On newer MPI implementations (such as OpenMPI), this will work even if you
167 don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI
167 don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI
168 implementations actually require each process to call :func:`MPI_Init` upon
168 implementations actually require each process to call :func:`MPI_Init` upon
169 starting. The easiest way of having this done is to install the mpi4py
169 starting. The easiest way of having this done is to install the mpi4py
170 [mpi4py]_ package and then specify the ``c.MPI.use`` option in :file:`ipengine_config.py`:
170 [mpi4py]_ package and then specify the ``c.MPI.use`` option in :file:`ipengine_config.py`:
171
171
172 .. sourcecode:: python
172 .. sourcecode:: python
173
173
174 c.MPI.use = 'mpi4py'
174 c.MPI.use = 'mpi4py'
175
175
176 Unfortunately, even this won't work for some MPI implementations. If you are
176 Unfortunately, even this won't work for some MPI implementations. If you are
177 having problems with this, you will likely have to use a custom Python
177 having problems with this, you will likely have to use a custom Python
178 executable that itself calls :func:`MPI_Init` at the appropriate time.
178 executable that itself calls :func:`MPI_Init` at the appropriate time.
179 Fortunately, mpi4py comes with such a custom Python executable that is easy to
179 Fortunately, mpi4py comes with such a custom Python executable that is easy to
180 install and use. However, this custom Python executable approach will not work
180 install and use. However, this custom Python executable approach will not work
181 with :command:`ipcluster` currently.
181 with :command:`ipcluster` currently.
182
182
183 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
183 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
184
184
185
185
186 Using :command:`ipcluster` in PBS mode
186 Using :command:`ipcluster` in PBS mode
187 ---------------------------------------
187 ---------------------------------------
188
188
189 The PBS mode uses the Portable Batch System [PBS]_ to start the engines.
189 The PBS mode uses the Portable Batch System [PBS]_ to start the engines.
190
190
191 As usual, we will start by creating a fresh profile::
191 As usual, we will start by creating a fresh profile::
192
192
193 $ ipcluster create profile=pbs
193 $ ipcluster create profile=pbs
194
194
195 And in :file:`ipcluster_config.py`, we will select the PBS launchers for the controller
195 And in :file:`ipcluster_config.py`, we will select the PBS launchers for the controller
196 and engines:
196 and engines:
197
197
198 .. sourcecode:: python
198 .. sourcecode:: python
199
199
200 c.Global.controller_launcher = 'IPython.parallel.apps.launcher.PBSControllerLauncher'
200 c.Global.controller_launcher = 'IPython.parallel.apps.launcher.PBSControllerLauncher'
201 c.Global.engine_launcher = 'IPython.parallel.apps.launcher.PBSEngineSetLauncher'
201 c.Global.engine_launcher = 'IPython.parallel.apps.launcher.PBSEngineSetLauncher'
202
202
203 IPython does provide simple default batch templates for PBS and SGE, but you may need
203 IPython does provide simple default batch templates for PBS and SGE, but you may need
204 to specify your own. Here is a sample PBS script template:
204 to specify your own. Here is a sample PBS script template:
205
205
206 .. sourcecode:: bash
206 .. sourcecode:: bash
207
207
208 #PBS -N ipython
208 #PBS -N ipython
209 #PBS -j oe
209 #PBS -j oe
210 #PBS -l walltime=00:10:00
210 #PBS -l walltime=00:10:00
211 #PBS -l nodes=${n/4}:ppn=4
211 #PBS -l nodes={n/4}:ppn=4
212 #PBS -q $queue
212 #PBS -q {queue}
213
213
214 cd $$PBS_O_WORKDIR
214 cd $PBS_O_WORKDIR
215 export PATH=$$HOME/usr/local/bin
215 export PATH=$HOME/usr/local/bin
216 export PYTHONPATH=$$HOME/usr/local/lib/python2.7/site-packages
216 export PYTHONPATH=$HOME/usr/local/lib/python2.7/site-packages
217 /usr/local/bin/mpiexec -n ${n} ipengine cluster_dir=${cluster_dir}
217 /usr/local/bin/mpiexec -n {n} ipengine profile_dir={profile_dir}
218
218
219 There are a few important points about this template:
219 There are a few important points about this template:
220
220
221 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
221 1. This template will be rendered at runtime using IPython's :class:`EvalFormatter`.
222 template engine.
222 This is simply a subclass of :class:`string.Formatter` that allows simple expressions
223 on keys.
223
224
224 2. Instead of putting in the actual number of engines, use the notation
225 2. Instead of putting in the actual number of engines, use the notation
225 ``${n}`` to indicate the number of engines to be started. You can also uses
226 ``{n}`` to indicate the number of engines to be started. You can also use
226 expressions like ``${n/4}`` in the template to indicate the number of
227 expressions like ``{n/4}`` in the template to indicate the number of nodes.
227 nodes. There will always be a ${n} and ${cluster_dir} variable passed to the template.
228 There will always be ``{n}`` and ``{profile_dir}`` variables passed to the formatter.
228 These allow the batch system to know how many engines, and where the configuration
229 These allow the batch system to know how many engines, and where the configuration
229 files reside. The same is true for the batch queue, with the template variable ``$queue``.
230 files reside. The same is true for the batch queue, with the template variable
231 ``{queue}``.
230
232
231 3. Because ``$`` is a special character used by the template engine, you must
233 3. Any options to :command:`ipengine` can be given in the batch script
232 escape any ``$`` by using ``$$``. This is important when referring to
233 environment variables in the template, or in SGE, where the config lines start
234 with ``#$``, which will have to be ``#$$``.
235
236 4. Any options to :command:`ipengine` can be given in the batch script
237 template, or in :file:`ipengine_config.py`.
234 template, or in :file:`ipengine_config.py`.
238
235
239 5. Depending on the configuration of you system, you may have to set
236 4. Depending on the configuration of you system, you may have to set
240 environment variables in the script template.
237 environment variables in the script template.
241
238
242 The controller template should be similar, but simpler:
239 The controller template should be similar, but simpler:
243
240
244 .. sourcecode:: bash
241 .. sourcecode:: bash
245
242
246 #PBS -N ipython
243 #PBS -N ipython
247 #PBS -j oe
244 #PBS -j oe
248 #PBS -l walltime=00:10:00
245 #PBS -l walltime=00:10:00
249 #PBS -l nodes=1:ppn=4
246 #PBS -l nodes=1:ppn=4
250 #PBS -q $queue
247 #PBS -q {queue}
251
248
252 cd $$PBS_O_WORKDIR
249 cd $PBS_O_WORKDIR
253 export PATH=$$HOME/usr/local/bin
250 export PATH=$HOME/usr/local/bin
254 export PYTHONPATH=$$HOME/usr/local/lib/python2.7/site-packages
251 export PYTHONPATH=$HOME/usr/local/lib/python2.7/site-packages
255 ipcontroller cluster_dir=${cluster_dir}
252 ipcontroller profile_dir={profile_dir}
256
253
257
254
258 Once you have created these scripts, save them with names like
255 Once you have created these scripts, save them with names like
259 :file:`pbs.engine.template`. Now you can load them into the :file:`ipcluster_config` with:
256 :file:`pbs.engine.template`. Now you can load them into the :file:`ipcluster_config` with:
260
257
261 .. sourcecode:: python
258 .. sourcecode:: python
262
259
263 c.PBSEngineSetLauncher.batch_template_file = "pbs.engine.template"
260 c.PBSEngineSetLauncher.batch_template_file = "pbs.engine.template"
264
261
265 c.PBSControllerLauncher.batch_template_file = "pbs.controller.template"
262 c.PBSControllerLauncher.batch_template_file = "pbs.controller.template"
266
263
267
264
268 Alternately, you can just define the templates as strings inside :file:`ipcluster_config`.
265 Alternately, you can just define the templates as strings inside :file:`ipcluster_config`.
269
266
270 Whether you are using your own templates or our defaults, the extra configurables available are
267 Whether you are using your own templates or our defaults, the extra configurables available are
271 the number of engines to launch (``$n``, and the batch system queue to which the jobs are to be
268 the number of engines to launch (``{n}``, and the batch system queue to which the jobs are to be
272 submitted (``$queue``)). These are configurables, and can be specified in
269 submitted (``{queue}``)). These are configurables, and can be specified in
273 :file:`ipcluster_config`:
270 :file:`ipcluster_config`:
274
271
275 .. sourcecode:: python
272 .. sourcecode:: python
276
273
277 c.PBSLauncher.queue = 'veryshort.q'
274 c.PBSLauncher.queue = 'veryshort.q'
278 c.PBSEngineSetLauncher.n = 64
275 c.IPClusterEnginesApp.n = 64
279
276
280 Note that assuming you are running PBS on a multi-node cluster, the Controller's default behavior
277 Note that assuming you are running PBS on a multi-node cluster, the Controller's default behavior
281 of listening only on localhost is likely too restrictive. In this case, also assuming the
278 of listening only on localhost is likely too restrictive. In this case, also assuming the
282 nodes are safely behind a firewall, you can simply instruct the Controller to listen for
279 nodes are safely behind a firewall, you can simply instruct the Controller to listen for
283 connections on all its interfaces, by adding in :file:`ipcontroller_config`:
280 connections on all its interfaces, by adding in :file:`ipcontroller_config`:
284
281
285 .. sourcecode:: python
282 .. sourcecode:: python
286
283
287 c.RegistrationFactory.ip = '*'
284 c.RegistrationFactory.ip = '*'
288
285
289 You can now run the cluster with::
286 You can now run the cluster with::
290
287
291 $ ipcluster start profile=pbs n=128
288 $ ipcluster start profile=pbs n=128
292
289
293 Additional configuration options can be found in the PBS section of :file:`ipcluster_config`.
290 Additional configuration options can be found in the PBS section of :file:`ipcluster_config`.
294
291
295 .. note::
292 .. note::
296
293
297 Due to the flexibility of configuration, the PBS launchers work with simple changes
294 Due to the flexibility of configuration, the PBS launchers work with simple changes
298 to the template for other :command:`qsub`-using systems, such as Sun Grid Engine,
295 to the template for other :command:`qsub`-using systems, such as Sun Grid Engine,
299 and with further configuration in similar batch systems like Condor.
296 and with further configuration in similar batch systems like Condor.
300
297
301
298
302 Using :command:`ipcluster` in SSH mode
299 Using :command:`ipcluster` in SSH mode
303 ---------------------------------------
300 ---------------------------------------
304
301
305
302
306 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
303 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
307 nodes and :command:`ipcontroller` can be run remotely as well, or on localhost.
304 nodes and :command:`ipcontroller` can be run remotely as well, or on localhost.
308
305
309 .. note::
306 .. note::
310
307
311 When using this mode it highly recommended that you have set up SSH keys
308 When using this mode it highly recommended that you have set up SSH keys
312 and are using ssh-agent [SSH]_ for password-less logins.
309 and are using ssh-agent [SSH]_ for password-less logins.
313
310
314 As usual, we start by creating a clean profile::
311 As usual, we start by creating a clean profile::
315
312
316 $ ipcluster create profile= ssh
313 $ ipcluster create profile=ssh
317
314
318 To use this mode, select the SSH launchers in :file:`ipcluster_config.py`:
315 To use this mode, select the SSH launchers in :file:`ipcluster_config.py`:
319
316
320 .. sourcecode:: python
317 .. sourcecode:: python
321
318
322 c.Global.engine_launcher = 'IPython.parallel.apps.launcher.SSHEngineSetLauncher'
319 c.Global.engine_launcher = 'IPython.parallel.apps.launcher.SSHEngineSetLauncher'
323 # and if the Controller is also to be remote:
320 # and if the Controller is also to be remote:
324 c.Global.controller_launcher = 'IPython.parallel.apps.launcher.SSHControllerLauncher'
321 c.Global.controller_launcher = 'IPython.parallel.apps.launcher.SSHControllerLauncher'
325
322
326
323
327 The controller's remote location and configuration can be specified:
324 The controller's remote location and configuration can be specified:
328
325
329 .. sourcecode:: python
326 .. sourcecode:: python
330
327
331 # Set the user and hostname for the controller
328 # Set the user and hostname for the controller
332 # c.SSHControllerLauncher.hostname = 'controller.example.com'
329 # c.SSHControllerLauncher.hostname = 'controller.example.com'
333 # c.SSHControllerLauncher.user = os.environ.get('USER','username')
330 # c.SSHControllerLauncher.user = os.environ.get('USER','username')
334
331
335 # Set the arguments to be passed to ipcontroller
332 # Set the arguments to be passed to ipcontroller
336 # note that remotely launched ipcontroller will not get the contents of
333 # note that remotely launched ipcontroller will not get the contents of
337 # the local ipcontroller_config.py unless it resides on the *remote host*
334 # the local ipcontroller_config.py unless it resides on the *remote host*
338 # in the location specified by the `cluster_dir` argument.
335 # in the location specified by the `profile_dir` argument.
339 # c.SSHControllerLauncher.program_args = ['-r', '-ip', '0.0.0.0', '--cluster_dir', '/path/to/cd']
336 # c.SSHControllerLauncher.program_args = ['--reuse', 'ip=0.0.0.0', 'profile_dir=/path/to/cd']
340
337
341 .. note::
338 .. note::
342
339
343 SSH mode does not do any file movement, so you will need to distribute configuration
340 SSH mode does not do any file movement, so you will need to distribute configuration
344 files manually. To aid in this, the `reuse_files` flag defaults to True for ssh-launched
341 files manually. To aid in this, the `reuse_files` flag defaults to True for ssh-launched
345 Controllers, so you will only need to do this once, unless you override this flag back
342 Controllers, so you will only need to do this once, unless you override this flag back
346 to False.
343 to False.
347
344
348 Engines are specified in a dictionary, by hostname and the number of engines to be run
345 Engines are specified in a dictionary, by hostname and the number of engines to be run
349 on that host.
346 on that host.
350
347
351 .. sourcecode:: python
348 .. sourcecode:: python
352
349
353 c.SSHEngineSetLauncher.engines = { 'host1.example.com' : 2,
350 c.SSHEngineSetLauncher.engines = { 'host1.example.com' : 2,
354 'host2.example.com' : 5,
351 'host2.example.com' : 5,
355 'host3.example.com' : (1, ['cluster_dir=/home/different/location']),
352 'host3.example.com' : (1, ['profile_dir=/home/different/location']),
356 'host4.example.com' : 8 }
353 'host4.example.com' : 8 }
357
354
358 * The `engines` dict, where the keys are the host we want to run engines on and
355 * The `engines` dict, where the keys are the host we want to run engines on and
359 the value is the number of engines to run on that host.
356 the value is the number of engines to run on that host.
360 * on host3, the value is a tuple, where the number of engines is first, and the arguments
357 * on host3, the value is a tuple, where the number of engines is first, and the arguments
361 to be passed to :command:`ipengine` are the second element.
358 to be passed to :command:`ipengine` are the second element.
362
359
363 For engines without explicitly specified arguments, the default arguments are set in
360 For engines without explicitly specified arguments, the default arguments are set in
364 a single location:
361 a single location:
365
362
366 .. sourcecode:: python
363 .. sourcecode:: python
367
364
368 c.SSHEngineSetLauncher.engine_args = ['--cluster_dir', '/path/to/cluster_ssh']
365 c.SSHEngineSetLauncher.engine_args = ['profile_dir=/path/to/cluster_ssh']
369
366
370 Current limitations of the SSH mode of :command:`ipcluster` are:
367 Current limitations of the SSH mode of :command:`ipcluster` are:
371
368
372 * Untested on Windows. Would require a working :command:`ssh` on Windows.
369 * Untested on Windows. Would require a working :command:`ssh` on Windows.
373 Also, we are using shell scripts to setup and execute commands on remote
370 Also, we are using shell scripts to setup and execute commands on remote
374 hosts.
371 hosts.
375 * No file movement -
372 * No file movement -
376
373
377 Using the :command:`ipcontroller` and :command:`ipengine` commands
374 Using the :command:`ipcontroller` and :command:`ipengine` commands
378 ====================================================================
375 ====================================================================
379
376
380 It is also possible to use the :command:`ipcontroller` and :command:`ipengine`
377 It is also possible to use the :command:`ipcontroller` and :command:`ipengine`
381 commands to start your controller and engines. This approach gives you full
378 commands to start your controller and engines. This approach gives you full
382 control over all aspects of the startup process.
379 control over all aspects of the startup process.
383
380
384 Starting the controller and engine on your local machine
381 Starting the controller and engine on your local machine
385 --------------------------------------------------------
382 --------------------------------------------------------
386
383
387 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
384 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
388 local machine, do the following.
385 local machine, do the following.
389
386
390 First start the controller::
387 First start the controller::
391
388
392 $ ipcontroller
389 $ ipcontroller
393
390
394 Next, start however many instances of the engine you want using (repeatedly)
391 Next, start however many instances of the engine you want using (repeatedly)
395 the command::
392 the command::
396
393
397 $ ipengine
394 $ ipengine
398
395
399 The engines should start and automatically connect to the controller using the
396 The engines should start and automatically connect to the controller using the
400 JSON files in :file:`~/.ipython/cluster_default/security`. You are now ready to use the
397 JSON files in :file:`~/.ipython/cluster_default/security`. You are now ready to use the
401 controller and engines from IPython.
398 controller and engines from IPython.
402
399
403 .. warning::
400 .. warning::
404
401
405 The order of the above operations may be important. You *must*
402 The order of the above operations may be important. You *must*
406 start the controller before the engines, unless you are reusing connection
403 start the controller before the engines, unless you are reusing connection
407 information (via `-r`), in which case ordering is not important.
404 information (via `-r`), in which case ordering is not important.
408
405
409 .. note::
406 .. note::
410
407
411 On some platforms (OS X), to put the controller and engine into the
408 On some platforms (OS X), to put the controller and engine into the
412 background you may need to give these commands in the form ``(ipcontroller
409 background you may need to give these commands in the form ``(ipcontroller
413 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
410 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
414 properly.
411 properly.
415
412
416 Starting the controller and engines on different hosts
413 Starting the controller and engines on different hosts
417 ------------------------------------------------------
414 ------------------------------------------------------
418
415
419 When the controller and engines are running on different hosts, things are
416 When the controller and engines are running on different hosts, things are
420 slightly more complicated, but the underlying ideas are the same:
417 slightly more complicated, but the underlying ideas are the same:
421
418
422 1. Start the controller on a host using :command:`ipcontroller`.
419 1. Start the controller on a host using :command:`ipcontroller`.
423 2. Copy :file:`ipcontroller-engine.json` from :file:`~/.ipython/cluster_<profile>/security` on
420 2. Copy :file:`ipcontroller-engine.json` from :file:`~/.ipython/cluster_<profile>/security` on
424 the controller's host to the host where the engines will run.
421 the controller's host to the host where the engines will run.
425 3. Use :command:`ipengine` on the engine's hosts to start the engines.
422 3. Use :command:`ipengine` on the engine's hosts to start the engines.
426
423
427 The only thing you have to be careful of is to tell :command:`ipengine` where
424 The only thing you have to be careful of is to tell :command:`ipengine` where
428 the :file:`ipcontroller-engine.json` file is located. There are two ways you
425 the :file:`ipcontroller-engine.json` file is located. There are two ways you
429 can do this:
426 can do this:
430
427
431 * Put :file:`ipcontroller-engine.json` in the :file:`~/.ipython/cluster_<profile>/security`
428 * Put :file:`ipcontroller-engine.json` in the :file:`~/.ipython/cluster_<profile>/security`
432 directory on the engine's host, where it will be found automatically.
429 directory on the engine's host, where it will be found automatically.
433 * Call :command:`ipengine` with the ``--file=full_path_to_the_file``
430 * Call :command:`ipengine` with the ``--file=full_path_to_the_file``
434 flag.
431 flag.
435
432
436 The ``--file`` flag works like this::
433 The ``--file`` flag works like this::
437
434
438 $ ipengine --file=/path/to/my/ipcontroller-engine.json
435 $ ipengine --file=/path/to/my/ipcontroller-engine.json
439
436
440 .. note::
437 .. note::
441
438
442 If the controller's and engine's hosts all have a shared file system
439 If the controller's and engine's hosts all have a shared file system
443 (:file:`~/.ipython/cluster_<profile>/security` is the same on all of them), then things
440 (:file:`~/.ipython/cluster_<profile>/security` is the same on all of them), then things
444 will just work!
441 will just work!
445
442
446 Make JSON files persistent
443 Make JSON files persistent
447 --------------------------
444 --------------------------
448
445
449 At fist glance it may seem that that managing the JSON files is a bit
446 At fist glance it may seem that that managing the JSON files is a bit
450 annoying. Going back to the house and key analogy, copying the JSON around
447 annoying. Going back to the house and key analogy, copying the JSON around
451 each time you start the controller is like having to make a new key every time
448 each time you start the controller is like having to make a new key every time
452 you want to unlock the door and enter your house. As with your house, you want
449 you want to unlock the door and enter your house. As with your house, you want
453 to be able to create the key (or JSON file) once, and then simply use it at
450 to be able to create the key (or JSON file) once, and then simply use it at
454 any point in the future.
451 any point in the future.
455
452
456 To do this, the only thing you have to do is specify the `--reuse` flag, so that
453 To do this, the only thing you have to do is specify the `--reuse` flag, so that
457 the connection information in the JSON files remains accurate::
454 the connection information in the JSON files remains accurate::
458
455
459 $ ipcontroller --reuse
456 $ ipcontroller --reuse
460
457
461 Then, just copy the JSON files over the first time and you are set. You can
458 Then, just copy the JSON files over the first time and you are set. You can
462 start and stop the controller and engines any many times as you want in the
459 start and stop the controller and engines any many times as you want in the
463 future, just make sure to tell the controller to reuse the file.
460 future, just make sure to tell the controller to reuse the file.
464
461
465 .. note::
462 .. note::
466
463
467 You may ask the question: what ports does the controller listen on if you
464 You may ask the question: what ports does the controller listen on if you
468 don't tell is to use specific ones? The default is to use high random port
465 don't tell is to use specific ones? The default is to use high random port
469 numbers. We do this for two reasons: i) to increase security through
466 numbers. We do this for two reasons: i) to increase security through
470 obscurity and ii) to multiple controllers on a given host to start and
467 obscurity and ii) to multiple controllers on a given host to start and
471 automatically use different ports.
468 automatically use different ports.
472
469
473 Log files
470 Log files
474 ---------
471 ---------
475
472
476 All of the components of IPython have log files associated with them.
473 All of the components of IPython have log files associated with them.
477 These log files can be extremely useful in debugging problems with
474 These log files can be extremely useful in debugging problems with
478 IPython and can be found in the directory :file:`~/.ipython/cluster_<profile>/log`.
475 IPython and can be found in the directory :file:`~/.ipython/cluster_<profile>/log`.
479 Sending the log files to us will often help us to debug any problems.
476 Sending the log files to us will often help us to debug any problems.
480
477
481
478
482 Configuring `ipcontroller`
479 Configuring `ipcontroller`
483 ---------------------------
480 ---------------------------
484
481
485 Ports and addresses
482 Ports and addresses
486 *******************
483 *******************
487
484
488
485
489 Database Backend
486 Database Backend
490 ****************
487 ****************
491
488
492
489
493 .. seealso::
490 .. seealso::
494
491
495
492
496
493
497 Configuring `ipengine`
494 Configuring `ipengine`
498 -----------------------
495 -----------------------
499
496
500 .. note::
497 .. note::
501
498
502 TODO
499 TODO
503
500
504
501
505
502
506 .. [PBS] Portable Batch System. http://www.openpbs.org/
503 .. [PBS] Portable Batch System. http://www.openpbs.org/
507 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/ssh-agent
504 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now