##// END OF EJS Templates
updated sge to launch multiple engines - start cleanly but does not shutdown cleanly
Satrajit Ghosh -
Show More
@@ -1,874 +1,914 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Start an IPython cluster = (controller + engines)."""
4 """Start an IPython cluster = (controller + engines)."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008 The IPython Development Team
7 # Copyright (C) 2008 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import re
18 import re
19 import sys
19 import sys
20 import signal
20 import signal
21 import tempfile
21 import tempfile
22 pjoin = os.path.join
22 pjoin = os.path.join
23
23
24 from twisted.internet import reactor, defer
24 from twisted.internet import reactor, defer
25 from twisted.internet.protocol import ProcessProtocol
25 from twisted.internet.protocol import ProcessProtocol
26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 from twisted.internet.utils import getProcessOutput
27 from twisted.internet.utils import getProcessOutput
28 from twisted.python import failure, log
28 from twisted.python import failure, log
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.external import Itpl
31 from IPython.external import Itpl
32 from IPython.genutils import (
32 from IPython.genutils import (
33 get_ipython_dir,
33 get_ipython_dir,
34 get_log_dir,
34 get_log_dir,
35 get_security_dir,
35 get_security_dir,
36 num_cpus
36 num_cpus
37 )
37 )
38 from IPython.kernel.fcutil import have_crypto
38 from IPython.kernel.fcutil import have_crypto
39
39
40 # Create various ipython directories if they don't exist.
40 # Create various ipython directories if they don't exist.
41 # This must be done before IPython.kernel.config is imported.
41 # This must be done before IPython.kernel.config is imported.
42 from IPython.iplib import user_setup
42 from IPython.iplib import user_setup
43 if os.name == 'posix':
43 if os.name == 'posix':
44 rc_suffix = ''
44 rc_suffix = ''
45 else:
45 else:
46 rc_suffix = '.ini'
46 rc_suffix = '.ini'
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 get_log_dir()
48 get_log_dir()
49 get_security_dir()
49 get_security_dir()
50
50
51 from IPython.kernel.config import config_manager as kernel_config_manager
51 from IPython.kernel.config import config_manager as kernel_config_manager
52 from IPython.kernel.error import SecurityError, FileTimeoutError
52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 from IPython.kernel.fcutil import have_crypto
53 from IPython.kernel.fcutil import have_crypto
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 from IPython.kernel.util import printer
55 from IPython.kernel.util import printer
56
56
57 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
58 # General process handling code
58 # General process handling code
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60
60
61
61
62 class ProcessStateError(Exception):
62 class ProcessStateError(Exception):
63 pass
63 pass
64
64
65 class UnknownStatus(Exception):
65 class UnknownStatus(Exception):
66 pass
66 pass
67
67
68 class LauncherProcessProtocol(ProcessProtocol):
68 class LauncherProcessProtocol(ProcessProtocol):
69 """
69 """
70 A ProcessProtocol to go with the ProcessLauncher.
70 A ProcessProtocol to go with the ProcessLauncher.
71 """
71 """
72 def __init__(self, process_launcher):
72 def __init__(self, process_launcher):
73 self.process_launcher = process_launcher
73 self.process_launcher = process_launcher
74
74
75 def connectionMade(self):
75 def connectionMade(self):
76 self.process_launcher.fire_start_deferred(self.transport.pid)
76 self.process_launcher.fire_start_deferred(self.transport.pid)
77
77
78 def processEnded(self, status):
78 def processEnded(self, status):
79 value = status.value
79 value = status.value
80 if isinstance(value, ProcessDone):
80 if isinstance(value, ProcessDone):
81 self.process_launcher.fire_stop_deferred(0)
81 self.process_launcher.fire_stop_deferred(0)
82 elif isinstance(value, ProcessTerminated):
82 elif isinstance(value, ProcessTerminated):
83 self.process_launcher.fire_stop_deferred(
83 self.process_launcher.fire_stop_deferred(
84 {'exit_code':value.exitCode,
84 {'exit_code':value.exitCode,
85 'signal':value.signal,
85 'signal':value.signal,
86 'status':value.status
86 'status':value.status
87 }
87 }
88 )
88 )
89 else:
89 else:
90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91
91
92 def outReceived(self, data):
92 def outReceived(self, data):
93 log.msg(data)
93 log.msg(data)
94
94
95 def errReceived(self, data):
95 def errReceived(self, data):
96 log.err(data)
96 log.err(data)
97
97
98 class ProcessLauncher(object):
98 class ProcessLauncher(object):
99 """
99 """
100 Start and stop an external process in an asynchronous manner.
100 Start and stop an external process in an asynchronous manner.
101
101
102 Currently this uses deferreds to notify other parties of process state
102 Currently this uses deferreds to notify other parties of process state
103 changes. This is an awkward design and should be moved to using
103 changes. This is an awkward design and should be moved to using
104 a formal NotificationCenter.
104 a formal NotificationCenter.
105 """
105 """
106 def __init__(self, cmd_and_args):
106 def __init__(self, cmd_and_args):
107 self.cmd = cmd_and_args[0]
107 self.cmd = cmd_and_args[0]
108 self.args = cmd_and_args
108 self.args = cmd_and_args
109 self._reset()
109 self._reset()
110
110
111 def _reset(self):
111 def _reset(self):
112 self.process_protocol = None
112 self.process_protocol = None
113 self.pid = None
113 self.pid = None
114 self.start_deferred = None
114 self.start_deferred = None
115 self.stop_deferreds = []
115 self.stop_deferreds = []
116 self.state = 'before' # before, running, or after
116 self.state = 'before' # before, running, or after
117
117
118 @property
118 @property
119 def running(self):
119 def running(self):
120 if self.state == 'running':
120 if self.state == 'running':
121 return True
121 return True
122 else:
122 else:
123 return False
123 return False
124
124
125 def fire_start_deferred(self, pid):
125 def fire_start_deferred(self, pid):
126 self.pid = pid
126 self.pid = pid
127 self.state = 'running'
127 self.state = 'running'
128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 self.start_deferred.callback(pid)
129 self.start_deferred.callback(pid)
130
130
131 def start(self):
131 def start(self):
132 if self.state == 'before':
132 if self.state == 'before':
133 self.process_protocol = LauncherProcessProtocol(self)
133 self.process_protocol = LauncherProcessProtocol(self)
134 self.start_deferred = defer.Deferred()
134 self.start_deferred = defer.Deferred()
135 self.process_transport = reactor.spawnProcess(
135 self.process_transport = reactor.spawnProcess(
136 self.process_protocol,
136 self.process_protocol,
137 self.cmd,
137 self.cmd,
138 self.args,
138 self.args,
139 env=os.environ
139 env=os.environ
140 )
140 )
141 return self.start_deferred
141 return self.start_deferred
142 else:
142 else:
143 s = 'the process has already been started and has state: %r' % \
143 s = 'the process has already been started and has state: %r' % \
144 self.state
144 self.state
145 return defer.fail(ProcessStateError(s))
145 return defer.fail(ProcessStateError(s))
146
146
147 def get_stop_deferred(self):
147 def get_stop_deferred(self):
148 if self.state == 'running' or self.state == 'before':
148 if self.state == 'running' or self.state == 'before':
149 d = defer.Deferred()
149 d = defer.Deferred()
150 self.stop_deferreds.append(d)
150 self.stop_deferreds.append(d)
151 return d
151 return d
152 else:
152 else:
153 s = 'this process is already complete'
153 s = 'this process is already complete'
154 return defer.fail(ProcessStateError(s))
154 return defer.fail(ProcessStateError(s))
155
155
156 def fire_stop_deferred(self, exit_code):
156 def fire_stop_deferred(self, exit_code):
157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 self.state = 'after'
158 self.state = 'after'
159 for d in self.stop_deferreds:
159 for d in self.stop_deferreds:
160 d.callback(exit_code)
160 d.callback(exit_code)
161
161
162 def signal(self, sig):
162 def signal(self, sig):
163 """
163 """
164 Send a signal to the process.
164 Send a signal to the process.
165
165
166 The argument sig can be ('KILL','INT', etc.) or any signal number.
166 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 """
167 """
168 if self.state == 'running':
168 if self.state == 'running':
169 self.process_transport.signalProcess(sig)
169 self.process_transport.signalProcess(sig)
170
170
171 # def __del__(self):
171 # def __del__(self):
172 # self.signal('KILL')
172 # self.signal('KILL')
173
173
174 def interrupt_then_kill(self, delay=1.0):
174 def interrupt_then_kill(self, delay=1.0):
175 self.signal('INT')
175 self.signal('INT')
176 reactor.callLater(delay, self.signal, 'KILL')
176 reactor.callLater(delay, self.signal, 'KILL')
177
177
178
178
179 #-----------------------------------------------------------------------------
179 #-----------------------------------------------------------------------------
180 # Code for launching controller and engines
180 # Code for launching controller and engines
181 #-----------------------------------------------------------------------------
181 #-----------------------------------------------------------------------------
182
182
183
183
184 class ControllerLauncher(ProcessLauncher):
184 class ControllerLauncher(ProcessLauncher):
185
185
186 def __init__(self, extra_args=None):
186 def __init__(self, extra_args=None):
187 if sys.platform == 'win32':
187 if sys.platform == 'win32':
188 # This logic is needed because the ipcontroller script doesn't
188 # This logic is needed because the ipcontroller script doesn't
189 # always get installed in the same way or in the same location.
189 # always get installed in the same way or in the same location.
190 from IPython.kernel.scripts import ipcontroller
190 from IPython.kernel.scripts import ipcontroller
191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 # The -u option here turns on unbuffered output, which is required
192 # The -u option here turns on unbuffered output, which is required
193 # on Win32 to prevent wierd conflict and problems with Twisted.
193 # on Win32 to prevent wierd conflict and problems with Twisted.
194 # Also, use sys.executable to make sure we are picking up the
194 # Also, use sys.executable to make sure we are picking up the
195 # right python exe.
195 # right python exe.
196 args = [sys.executable, '-u', script_location]
196 args = [sys.executable, '-u', script_location]
197 else:
197 else:
198 args = ['ipcontroller']
198 args = ['ipcontroller']
199 self.extra_args = extra_args
199 self.extra_args = extra_args
200 if extra_args is not None:
200 if extra_args is not None:
201 args.extend(extra_args)
201 args.extend(extra_args)
202
202
203 ProcessLauncher.__init__(self, args)
203 ProcessLauncher.__init__(self, args)
204
204
205
205
206 class EngineLauncher(ProcessLauncher):
206 class EngineLauncher(ProcessLauncher):
207
207
208 def __init__(self, extra_args=None):
208 def __init__(self, extra_args=None):
209 if sys.platform == 'win32':
209 if sys.platform == 'win32':
210 # This logic is needed because the ipcontroller script doesn't
210 # This logic is needed because the ipcontroller script doesn't
211 # always get installed in the same way or in the same location.
211 # always get installed in the same way or in the same location.
212 from IPython.kernel.scripts import ipengine
212 from IPython.kernel.scripts import ipengine
213 script_location = ipengine.__file__.replace('.pyc', '.py')
213 script_location = ipengine.__file__.replace('.pyc', '.py')
214 # The -u option here turns on unbuffered output, which is required
214 # The -u option here turns on unbuffered output, which is required
215 # on Win32 to prevent wierd conflict and problems with Twisted.
215 # on Win32 to prevent wierd conflict and problems with Twisted.
216 # Also, use sys.executable to make sure we are picking up the
216 # Also, use sys.executable to make sure we are picking up the
217 # right python exe.
217 # right python exe.
218 args = [sys.executable, '-u', script_location]
218 args = [sys.executable, '-u', script_location]
219 else:
219 else:
220 args = ['ipengine']
220 args = ['ipengine']
221 self.extra_args = extra_args
221 self.extra_args = extra_args
222 if extra_args is not None:
222 if extra_args is not None:
223 args.extend(extra_args)
223 args.extend(extra_args)
224
224
225 ProcessLauncher.__init__(self, args)
225 ProcessLauncher.__init__(self, args)
226
226
227
227
228 class LocalEngineSet(object):
228 class LocalEngineSet(object):
229
229
230 def __init__(self, extra_args=None):
230 def __init__(self, extra_args=None):
231 self.extra_args = extra_args
231 self.extra_args = extra_args
232 self.launchers = []
232 self.launchers = []
233
233
234 def start(self, n):
234 def start(self, n):
235 dlist = []
235 dlist = []
236 for i in range(n):
236 for i in range(n):
237 print "starting engine:", i
237 el = EngineLauncher(extra_args=self.extra_args)
238 el = EngineLauncher(extra_args=self.extra_args)
238 d = el.start()
239 d = el.start()
239 self.launchers.append(el)
240 self.launchers.append(el)
240 dlist.append(d)
241 dlist.append(d)
241 dfinal = gatherBoth(dlist, consumeErrors=True)
242 dfinal = gatherBoth(dlist, consumeErrors=True)
242 dfinal.addCallback(self._handle_start)
243 dfinal.addCallback(self._handle_start)
243 return dfinal
244 return dfinal
244
245
245 def _handle_start(self, r):
246 def _handle_start(self, r):
246 log.msg('Engines started with pids: %r' % r)
247 log.msg('Engines started with pids: %r' % r)
247 return r
248 return r
248
249
249 def _handle_stop(self, r):
250 def _handle_stop(self, r):
250 log.msg('Engines received signal: %r' % r)
251 log.msg('Engines received signal: %r' % r)
251 return r
252 return r
252
253
253 def signal(self, sig):
254 def signal(self, sig):
254 dlist = []
255 dlist = []
255 for el in self.launchers:
256 for el in self.launchers:
256 d = el.get_stop_deferred()
257 d = el.get_stop_deferred()
257 dlist.append(d)
258 dlist.append(d)
258 el.signal(sig)
259 el.signal(sig)
259 dfinal = gatherBoth(dlist, consumeErrors=True)
260 dfinal = gatherBoth(dlist, consumeErrors=True)
260 dfinal.addCallback(self._handle_stop)
261 dfinal.addCallback(self._handle_stop)
261 return dfinal
262 return dfinal
262
263
263 def interrupt_then_kill(self, delay=1.0):
264 def interrupt_then_kill(self, delay=1.0):
264 dlist = []
265 dlist = []
265 for el in self.launchers:
266 for el in self.launchers:
266 d = el.get_stop_deferred()
267 d = el.get_stop_deferred()
267 dlist.append(d)
268 dlist.append(d)
268 el.interrupt_then_kill(delay)
269 el.interrupt_then_kill(delay)
269 dfinal = gatherBoth(dlist, consumeErrors=True)
270 dfinal = gatherBoth(dlist, consumeErrors=True)
270 dfinal.addCallback(self._handle_stop)
271 dfinal.addCallback(self._handle_stop)
271 return dfinal
272 return dfinal
272
273
273
274
274 class BatchEngineSet(object):
275 class BatchEngineSet(object):
275
276
276 # Subclasses must fill these in. See PBSEngineSet
277 # Subclasses must fill these in. See PBSEngineSet
277 submit_command = ''
278 submit_command = ''
278 delete_command = ''
279 delete_command = ''
279 job_id_regexp = ''
280 job_id_regexp = ''
280
281
281 def __init__(self, template_file, **kwargs):
282 def __init__(self, template_file, **kwargs):
282 self.template_file = template_file
283 self.template_file = template_file
283 self.context = {}
284 self.context = {}
284 self.context.update(kwargs)
285 self.context.update(kwargs)
285 self.batch_file = self.template_file+'-run'
286 self.batch_file = self.template_file+'-run'
286
287
287 def parse_job_id(self, output):
288 def parse_job_id(self, output):
288 m = re.match(self.job_id_regexp, output)
289 m = re.match(self.job_id_regexp, output)
289 if m is not None:
290 if m is not None:
290 job_id = m.group()
291 job_id = m.group()
291 else:
292 else:
292 raise Exception("job id couldn't be determined: %s" % output)
293 raise Exception("job id couldn't be determined: %s" % output)
293 self.job_id = job_id
294 self.job_id = job_id
294 log.msg('Job started with job id: %r' % job_id)
295 log.msg('Job started with job id: %r' % job_id)
295 return job_id
296 return job_id
296
297
297 def write_batch_script(self, n):
298 def write_batch_script(self, n):
298 self.context['n'] = n
299 self.context['n'] = n
299 template = open(self.template_file, 'r').read()
300 template = open(self.template_file, 'r').read()
300 log.msg('Using template for batch script: %s' % self.template_file)
301 log.msg('Using template for batch script: %s' % self.template_file)
301 script_as_string = Itpl.itplns(template, self.context)
302 script_as_string = Itpl.itplns(template, self.context)
302 log.msg('Writing instantiated batch script: %s' % self.batch_file)
303 log.msg('Writing instantiated batch script: %s' % self.batch_file)
303 f = open(self.batch_file,'w')
304 f = open(self.batch_file,'w')
304 f.write(script_as_string)
305 f.write(script_as_string)
305 f.close()
306 f.close()
306
307
307 def handle_error(self, f):
308 def handle_error(self, f):
308 f.printTraceback()
309 f.printTraceback()
309 f.raiseException()
310 f.raiseException()
310
311
311 def start(self, n):
312 def start(self, n):
312 self.write_batch_script(n)
313 self.write_batch_script(n)
313 d = getProcessOutput(self.submit_command,
314 d = getProcessOutput(self.submit_command,
314 [self.batch_file],env=os.environ)
315 [self.batch_file],env=os.environ)
315 d.addCallback(self.parse_job_id)
316 d.addCallback(self.parse_job_id)
316 d.addErrback(self.handle_error)
317 d.addErrback(self.handle_error)
317 return d
318 return d
318
319
319 def kill(self):
320 def kill(self):
320 d = getProcessOutput(self.delete_command,
321 d = getProcessOutput(self.delete_command,
321 [self.job_id],env=os.environ)
322 [self.job_id],env=os.environ)
322 return d
323 return d
323
324
324 class PBSEngineSet(BatchEngineSet):
325 class PBSEngineSet(BatchEngineSet):
325
326
326 submit_command = 'qsub'
327 submit_command = 'qsub'
327 delete_command = 'qdel'
328 delete_command = 'qdel'
328 job_id_regexp = '\d+'
329 job_id_regexp = '\d+'
329
330
330 def __init__(self, template_file, **kwargs):
331 def __init__(self, template_file, **kwargs):
331 BatchEngineSet.__init__(self, template_file, **kwargs)
332 BatchEngineSet.__init__(self, template_file, **kwargs)
332
333
333 class SGEEngineSet(BatchEngineSet):
334 class SGEEngineSet(BatchEngineSet):
334
335
335 submit_command = 'qsub'
336 submit_command = 'qsub'
336 delete_command = 'qdel'
337 delete_command = 'qdel'
337 job_id_regexp = '\d+'
338 job_id_regexp = '\d+'
338
339
339 def __init__(self, template_file, **kwargs):
340 def __init__(self, template_file, **kwargs):
340 BatchEngineSet.__init__(self, template_file, **kwargs)
341 BatchEngineSet.__init__(self, template_file, **kwargs)
342 self.num_engines = None
341
343
342 def parse_job_id(self, output):
344 def parse_job_id(self, output):
343 m = re.search(self.job_id_regexp, output)
345 m = re.search(self.job_id_regexp, output)
344 if m is not None:
346 if m is not None:
345 job_id = m.group()
347 job_id = m.group()
346 else:
348 else:
347 raise Exception("job id couldn't be determined: %s" % output)
349 raise Exception("job id couldn't be determined: %s" % output)
348 self.job_id = job_id
350 self.job_id.append(job_id)
349 log.msg('Job started with job id: %r' % job_id)
351 log.msg('Job started with job id: %r' % job_id)
350 return job_id
352 return job_id
351
353
354 def kill_job(self, output):
355 log.msg(output)
356 return output
357
358 def write_batch_script(self, i):
359 context = {'eid':i}
360 template = open(self.template_file, 'r').read()
361 log.msg('Using template for batch script: %s' % self.template_file)
362 script_as_string = Itpl.itplns(template, context)
363 log.msg('Writing instantiated batch script: %s' % self.batch_file+str(i))
364 f = open(self.batch_file+str(i),'w')
365 f.write(script_as_string)
366 f.close()
367
368 def start(self, n):
369 dlist = []
370 self.num_engines = 0
371 self.job_id = []
372 for i in range(n):
373 log.msg("starting engine: %d"%i)
374 self.write_batch_script(i)
375 d = getProcessOutput(self.submit_command,
376 [self.batch_file+str(i)],env=os.environ)
377 d.addCallback(self.parse_job_id)
378 d.addErrback(self.handle_error)
379 dlist.append(d)
380 return gatherBoth(dlist, consumeErrors=True)
381
382 def kill(self):
383 dlist = []
384 for i in range(self.num_engines):
385 log.msg("killing job id: %d"%self.job_id[i])
386 d = getProcessOutput(self.delete_command,
387 [self.job_id[i]],env=os.environ)
388 d.addCallback(self.kill_job)
389 dlist.append(d)
390 return gatherBoth(dlist, consumeErrors=True)
391
352 sshx_template="""#!/bin/sh
392 sshx_template="""#!/bin/sh
353 "$@" &> /dev/null &
393 "$@" &> /dev/null &
354 echo $!
394 echo $!
355 """
395 """
356
396
357 engine_killer_template="""#!/bin/sh
397 engine_killer_template="""#!/bin/sh
358 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
398 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
359 """
399 """
360
400
361 class SSHEngineSet(object):
401 class SSHEngineSet(object):
362 sshx_template=sshx_template
402 sshx_template=sshx_template
363 engine_killer_template=engine_killer_template
403 engine_killer_template=engine_killer_template
364
404
365 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
405 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
366 """Start a controller on localhost and engines using ssh.
406 """Start a controller on localhost and engines using ssh.
367
407
368 The engine_hosts argument is a dict with hostnames as keys and
408 The engine_hosts argument is a dict with hostnames as keys and
369 the number of engine (int) as values. sshx is the name of a local
409 the number of engine (int) as values. sshx is the name of a local
370 file that will be used to run remote commands. This file is used
410 file that will be used to run remote commands. This file is used
371 to setup the environment properly.
411 to setup the environment properly.
372 """
412 """
373
413
374 self.temp_dir = tempfile.gettempdir()
414 self.temp_dir = tempfile.gettempdir()
375 if sshx is not None:
415 if sshx is not None:
376 self.sshx = sshx
416 self.sshx = sshx
377 else:
417 else:
378 # Write the sshx.sh file locally from our template.
418 # Write the sshx.sh file locally from our template.
379 self.sshx = os.path.join(
419 self.sshx = os.path.join(
380 self.temp_dir,
420 self.temp_dir,
381 '%s-main-sshx.sh' % os.environ['USER']
421 '%s-main-sshx.sh' % os.environ['USER']
382 )
422 )
383 f = open(self.sshx, 'w')
423 f = open(self.sshx, 'w')
384 f.writelines(self.sshx_template)
424 f.writelines(self.sshx_template)
385 f.close()
425 f.close()
386 self.engine_command = ipengine
426 self.engine_command = ipengine
387 self.engine_hosts = engine_hosts
427 self.engine_hosts = engine_hosts
388 # Write the engine killer script file locally from our template.
428 # Write the engine killer script file locally from our template.
389 self.engine_killer = os.path.join(
429 self.engine_killer = os.path.join(
390 self.temp_dir,
430 self.temp_dir,
391 '%s-local-engine_killer.sh' % os.environ['USER']
431 '%s-local-engine_killer.sh' % os.environ['USER']
392 )
432 )
393 f = open(self.engine_killer, 'w')
433 f = open(self.engine_killer, 'w')
394 f.writelines(self.engine_killer_template)
434 f.writelines(self.engine_killer_template)
395 f.close()
435 f.close()
396
436
397 def start(self, send_furl=False):
437 def start(self, send_furl=False):
398 dlist = []
438 dlist = []
399 for host in self.engine_hosts.keys():
439 for host in self.engine_hosts.keys():
400 count = self.engine_hosts[host]
440 count = self.engine_hosts[host]
401 d = self._start(host, count, send_furl)
441 d = self._start(host, count, send_furl)
402 dlist.append(d)
442 dlist.append(d)
403 return gatherBoth(dlist, consumeErrors=True)
443 return gatherBoth(dlist, consumeErrors=True)
404
444
405 def _start(self, hostname, count=1, send_furl=False):
445 def _start(self, hostname, count=1, send_furl=False):
406 if send_furl:
446 if send_furl:
407 d = self._scp_furl(hostname)
447 d = self._scp_furl(hostname)
408 else:
448 else:
409 d = defer.succeed(None)
449 d = defer.succeed(None)
410 d.addCallback(lambda r: self._scp_sshx(hostname))
450 d.addCallback(lambda r: self._scp_sshx(hostname))
411 d.addCallback(lambda r: self._ssh_engine(hostname, count))
451 d.addCallback(lambda r: self._ssh_engine(hostname, count))
412 return d
452 return d
413
453
414 def _scp_furl(self, hostname):
454 def _scp_furl(self, hostname):
415 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
455 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
416 cmd_list = scp_cmd.split()
456 cmd_list = scp_cmd.split()
417 cmd_list[1] = os.path.expanduser(cmd_list[1])
457 cmd_list[1] = os.path.expanduser(cmd_list[1])
418 log.msg('Copying furl file: %s' % scp_cmd)
458 log.msg('Copying furl file: %s' % scp_cmd)
419 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
459 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
420 return d
460 return d
421
461
422 def _scp_sshx(self, hostname):
462 def _scp_sshx(self, hostname):
423 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
463 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
424 self.sshx, hostname,
464 self.sshx, hostname,
425 self.temp_dir, os.environ['USER']
465 self.temp_dir, os.environ['USER']
426 )
466 )
427 print
467 print
428 log.msg("Copying sshx: %s" % scp_cmd)
468 log.msg("Copying sshx: %s" % scp_cmd)
429 sshx_scp = scp_cmd.split()
469 sshx_scp = scp_cmd.split()
430 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
470 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
431 return d
471 return d
432
472
433 def _ssh_engine(self, hostname, count):
473 def _ssh_engine(self, hostname, count):
434 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
474 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
435 hostname, self.temp_dir,
475 hostname, self.temp_dir,
436 os.environ['USER'], self.engine_command
476 os.environ['USER'], self.engine_command
437 )
477 )
438 cmds = exec_engine.split()
478 cmds = exec_engine.split()
439 dlist = []
479 dlist = []
440 log.msg("about to start engines...")
480 log.msg("about to start engines...")
441 for i in range(count):
481 for i in range(count):
442 log.msg('Starting engines: %s' % exec_engine)
482 log.msg('Starting engines: %s' % exec_engine)
443 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
483 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
444 dlist.append(d)
484 dlist.append(d)
445 return gatherBoth(dlist, consumeErrors=True)
485 return gatherBoth(dlist, consumeErrors=True)
446
486
447 def kill(self):
487 def kill(self):
448 dlist = []
488 dlist = []
449 for host in self.engine_hosts.keys():
489 for host in self.engine_hosts.keys():
450 d = self._killall(host)
490 d = self._killall(host)
451 dlist.append(d)
491 dlist.append(d)
452 return gatherBoth(dlist, consumeErrors=True)
492 return gatherBoth(dlist, consumeErrors=True)
453
493
454 def _killall(self, hostname):
494 def _killall(self, hostname):
455 d = self._scp_engine_killer(hostname)
495 d = self._scp_engine_killer(hostname)
456 d.addCallback(lambda r: self._ssh_kill(hostname))
496 d.addCallback(lambda r: self._ssh_kill(hostname))
457 # d.addErrback(self._exec_err)
497 # d.addErrback(self._exec_err)
458 return d
498 return d
459
499
460 def _scp_engine_killer(self, hostname):
500 def _scp_engine_killer(self, hostname):
461 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
501 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
462 self.engine_killer,
502 self.engine_killer,
463 hostname,
503 hostname,
464 self.temp_dir,
504 self.temp_dir,
465 os.environ['USER']
505 os.environ['USER']
466 )
506 )
467 cmds = scp_cmd.split()
507 cmds = scp_cmd.split()
468 log.msg('Copying engine_killer: %s' % scp_cmd)
508 log.msg('Copying engine_killer: %s' % scp_cmd)
469 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
509 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
470 return d
510 return d
471
511
472 def _ssh_kill(self, hostname):
512 def _ssh_kill(self, hostname):
473 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
513 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
474 hostname,
514 hostname,
475 self.temp_dir,
515 self.temp_dir,
476 os.environ['USER']
516 os.environ['USER']
477 )
517 )
478 log.msg('Killing engine: %s' % kill_cmd)
518 log.msg('Killing engine: %s' % kill_cmd)
479 kill_cmd = kill_cmd.split()
519 kill_cmd = kill_cmd.split()
480 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
520 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
481 return d
521 return d
482
522
483 def _exec_err(self, r):
523 def _exec_err(self, r):
484 log.msg(r)
524 log.msg(r)
485
525
486 #-----------------------------------------------------------------------------
526 #-----------------------------------------------------------------------------
487 # Main functions for the different types of clusters
527 # Main functions for the different types of clusters
488 #-----------------------------------------------------------------------------
528 #-----------------------------------------------------------------------------
489
529
490 # TODO:
530 # TODO:
491 # The logic in these codes should be moved into classes like LocalCluster
531 # The logic in these codes should be moved into classes like LocalCluster
492 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
532 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
493 # The main functions should then just parse the command line arguments, create
533 # The main functions should then just parse the command line arguments, create
494 # the appropriate class and call a 'start' method.
534 # the appropriate class and call a 'start' method.
495
535
496
536
497 def check_security(args, cont_args):
537 def check_security(args, cont_args):
498 """Check to see if we should run with SSL support."""
538 """Check to see if we should run with SSL support."""
499 if (not args.x or not args.y) and not have_crypto:
539 if (not args.x or not args.y) and not have_crypto:
500 log.err("""
540 log.err("""
501 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
541 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
502 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
542 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
503 reactor.stop()
543 reactor.stop()
504 return False
544 return False
505 if args.x:
545 if args.x:
506 cont_args.append('-x')
546 cont_args.append('-x')
507 if args.y:
547 if args.y:
508 cont_args.append('-y')
548 cont_args.append('-y')
509 return True
549 return True
510
550
511
551
512 def check_reuse(args, cont_args):
552 def check_reuse(args, cont_args):
513 """Check to see if we should try to resuse FURL files."""
553 """Check to see if we should try to resuse FURL files."""
514 if args.r:
554 if args.r:
515 cont_args.append('-r')
555 cont_args.append('-r')
516 if args.client_port == 0 or args.engine_port == 0:
556 if args.client_port == 0 or args.engine_port == 0:
517 log.err("""
557 log.err("""
518 To reuse FURL files, you must also set the client and engine ports using
558 To reuse FURL files, you must also set the client and engine ports using
519 the --client-port and --engine-port options.""")
559 the --client-port and --engine-port options.""")
520 reactor.stop()
560 reactor.stop()
521 return False
561 return False
522 cont_args.append('--client-port=%i' % args.client_port)
562 cont_args.append('--client-port=%i' % args.client_port)
523 cont_args.append('--engine-port=%i' % args.engine_port)
563 cont_args.append('--engine-port=%i' % args.engine_port)
524 return True
564 return True
525
565
526
566
527 def _err_and_stop(f):
567 def _err_and_stop(f):
528 """Errback to log a failure and halt the reactor on a fatal error."""
568 """Errback to log a failure and halt the reactor on a fatal error."""
529 log.err(f)
569 log.err(f)
530 reactor.stop()
570 reactor.stop()
531
571
532
572
533 def _delay_start(cont_pid, start_engines, furl_file, reuse):
573 def _delay_start(cont_pid, start_engines, furl_file, reuse):
534 """Wait for controller to create FURL files and the start the engines."""
574 """Wait for controller to create FURL files and the start the engines."""
535 if not reuse:
575 if not reuse:
536 if os.path.isfile(furl_file):
576 if os.path.isfile(furl_file):
537 os.unlink(furl_file)
577 os.unlink(furl_file)
538 log.msg('Waiting for controller to finish starting...')
578 log.msg('Waiting for controller to finish starting...')
539 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
579 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
540 d.addCallback(lambda _: log.msg('Controller started'))
580 d.addCallback(lambda _: log.msg('Controller started'))
541 d.addCallback(lambda _: start_engines(cont_pid))
581 d.addCallback(lambda _: start_engines(cont_pid))
542 return d
582 return d
543
583
544
584
545 def main_local(args):
585 def main_local(args):
546 cont_args = []
586 cont_args = []
547 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
587 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
548
588
549 # Check security settings before proceeding
589 # Check security settings before proceeding
550 if not check_security(args, cont_args):
590 if not check_security(args, cont_args):
551 return
591 return
552
592
553 # See if we are reusing FURL files
593 # See if we are reusing FURL files
554 if not check_reuse(args, cont_args):
594 if not check_reuse(args, cont_args):
555 return
595 return
556
596
557 cl = ControllerLauncher(extra_args=cont_args)
597 cl = ControllerLauncher(extra_args=cont_args)
558 dstart = cl.start()
598 dstart = cl.start()
559 def start_engines(cont_pid):
599 def start_engines(cont_pid):
560 engine_args = []
600 engine_args = []
561 engine_args.append('--logfile=%s' % \
601 engine_args.append('--logfile=%s' % \
562 pjoin(args.logdir,'ipengine%s-' % cont_pid))
602 pjoin(args.logdir,'ipengine%s-' % cont_pid))
563 eset = LocalEngineSet(extra_args=engine_args)
603 eset = LocalEngineSet(extra_args=engine_args)
564 def shutdown(signum, frame):
604 def shutdown(signum, frame):
565 log.msg('Stopping local cluster')
605 log.msg('Stopping local cluster')
566 # We are still playing with the times here, but these seem
606 # We are still playing with the times here, but these seem
567 # to be reliable in allowing everything to exit cleanly.
607 # to be reliable in allowing everything to exit cleanly.
568 eset.interrupt_then_kill(0.5)
608 eset.interrupt_then_kill(0.5)
569 cl.interrupt_then_kill(0.5)
609 cl.interrupt_then_kill(0.5)
570 reactor.callLater(1.0, reactor.stop)
610 reactor.callLater(1.0, reactor.stop)
571 signal.signal(signal.SIGINT,shutdown)
611 signal.signal(signal.SIGINT,shutdown)
572 d = eset.start(args.n)
612 d = eset.start(args.n)
573 return d
613 return d
574 config = kernel_config_manager.get_config_obj()
614 config = kernel_config_manager.get_config_obj()
575 furl_file = config['controller']['engine_furl_file']
615 furl_file = config['controller']['engine_furl_file']
576 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
616 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
577 dstart.addErrback(_err_and_stop)
617 dstart.addErrback(_err_and_stop)
578
618
579
619
580 def main_mpi(args):
620 def main_mpi(args):
581 cont_args = []
621 cont_args = []
582 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
622 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
583
623
584 # Check security settings before proceeding
624 # Check security settings before proceeding
585 if not check_security(args, cont_args):
625 if not check_security(args, cont_args):
586 return
626 return
587
627
588 # See if we are reusing FURL files
628 # See if we are reusing FURL files
589 if not check_reuse(args, cont_args):
629 if not check_reuse(args, cont_args):
590 return
630 return
591
631
592 cl = ControllerLauncher(extra_args=cont_args)
632 cl = ControllerLauncher(extra_args=cont_args)
593 dstart = cl.start()
633 dstart = cl.start()
594 def start_engines(cont_pid):
634 def start_engines(cont_pid):
595 raw_args = [args.cmd]
635 raw_args = [args.cmd]
596 raw_args.extend(['-n',str(args.n)])
636 raw_args.extend(['-n',str(args.n)])
597 raw_args.append('ipengine')
637 raw_args.append('ipengine')
598 raw_args.append('-l')
638 raw_args.append('-l')
599 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
639 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
600 if args.mpi:
640 if args.mpi:
601 raw_args.append('--mpi=%s' % args.mpi)
641 raw_args.append('--mpi=%s' % args.mpi)
602 eset = ProcessLauncher(raw_args)
642 eset = ProcessLauncher(raw_args)
603 def shutdown(signum, frame):
643 def shutdown(signum, frame):
604 log.msg('Stopping local cluster')
644 log.msg('Stopping local cluster')
605 # We are still playing with the times here, but these seem
645 # We are still playing with the times here, but these seem
606 # to be reliable in allowing everything to exit cleanly.
646 # to be reliable in allowing everything to exit cleanly.
607 eset.interrupt_then_kill(1.0)
647 eset.interrupt_then_kill(1.0)
608 cl.interrupt_then_kill(1.0)
648 cl.interrupt_then_kill(1.0)
609 reactor.callLater(2.0, reactor.stop)
649 reactor.callLater(2.0, reactor.stop)
610 signal.signal(signal.SIGINT,shutdown)
650 signal.signal(signal.SIGINT,shutdown)
611 d = eset.start()
651 d = eset.start()
612 return d
652 return d
613 config = kernel_config_manager.get_config_obj()
653 config = kernel_config_manager.get_config_obj()
614 furl_file = config['controller']['engine_furl_file']
654 furl_file = config['controller']['engine_furl_file']
615 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
655 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
616 dstart.addErrback(_err_and_stop)
656 dstart.addErrback(_err_and_stop)
617
657
618
658
619 def main_pbs(args):
659 def main_pbs(args):
620 cont_args = []
660 cont_args = []
621 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
661 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
622
662
623 # Check security settings before proceeding
663 # Check security settings before proceeding
624 if not check_security(args, cont_args):
664 if not check_security(args, cont_args):
625 return
665 return
626
666
627 # See if we are reusing FURL files
667 # See if we are reusing FURL files
628 if not check_reuse(args, cont_args):
668 if not check_reuse(args, cont_args):
629 return
669 return
630
670
631 cl = ControllerLauncher(extra_args=cont_args)
671 cl = ControllerLauncher(extra_args=cont_args)
632 dstart = cl.start()
672 dstart = cl.start()
633 def start_engines(r):
673 def start_engines(r):
634 pbs_set = PBSEngineSet(args.pbsscript)
674 pbs_set = PBSEngineSet(args.pbsscript)
635 def shutdown(signum, frame):
675 def shutdown(signum, frame):
636 log.msg('Stopping pbs cluster')
676 log.msg('Stopping pbs cluster')
637 d = pbs_set.kill()
677 d = pbs_set.kill()
638 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
678 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
639 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
679 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
640 signal.signal(signal.SIGINT,shutdown)
680 signal.signal(signal.SIGINT,shutdown)
641 d = pbs_set.start(args.n)
681 d = pbs_set.start(args.n)
642 return d
682 return d
643 config = kernel_config_manager.get_config_obj()
683 config = kernel_config_manager.get_config_obj()
644 furl_file = config['controller']['engine_furl_file']
684 furl_file = config['controller']['engine_furl_file']
645 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
685 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
646 dstart.addErrback(_err_and_stop)
686 dstart.addErrback(_err_and_stop)
647
687
648 def main_sge(args):
688 def main_sge(args):
649 cont_args = []
689 cont_args = []
650 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
690 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
651
691
652 # Check security settings before proceeding
692 # Check security settings before proceeding
653 if not check_security(args, cont_args):
693 if not check_security(args, cont_args):
654 return
694 return
655
695
656 # See if we are reusing FURL files
696 # See if we are reusing FURL files
657 if not check_reuse(args, cont_args):
697 if not check_reuse(args, cont_args):
658 return
698 return
659
699
660 cl = ControllerLauncher(extra_args=cont_args)
700 cl = ControllerLauncher(extra_args=cont_args)
661 dstart = cl.start()
701 dstart = cl.start()
662 def start_engines(r):
702 def start_engines(r):
663 sge_set = SGEEngineSet(args.sgescript)
703 sge_set = SGEEngineSet(args.sgescript)
664 def shutdown(signum, frame):
704 def shutdown(signum, frame):
665 log.msg('Stopping sge cluster')
705 log.msg('Stopping sge cluster')
666 d = sge_set.kill()
706 d = sge_set.kill()
667 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
707 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
668 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
708 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
669 signal.signal(signal.SIGINT,shutdown)
709 signal.signal(signal.SIGINT,shutdown)
670 d = sge_set.start(args.n)
710 d = sge_set.start(args.n)
671 return d
711 return d
672 config = kernel_config_manager.get_config_obj()
712 config = kernel_config_manager.get_config_obj()
673 furl_file = config['controller']['engine_furl_file']
713 furl_file = config['controller']['engine_furl_file']
674 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
714 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
675 dstart.addErrback(_err_and_stop)
715 dstart.addErrback(_err_and_stop)
676
716
677
717
678 def main_ssh(args):
718 def main_ssh(args):
679 """Start a controller on localhost and engines using ssh.
719 """Start a controller on localhost and engines using ssh.
680
720
681 Your clusterfile should look like::
721 Your clusterfile should look like::
682
722
683 send_furl = False # True, if you want
723 send_furl = False # True, if you want
684 engines = {
724 engines = {
685 'engine_host1' : engine_count,
725 'engine_host1' : engine_count,
686 'engine_host2' : engine_count2
726 'engine_host2' : engine_count2
687 }
727 }
688 """
728 """
689 clusterfile = {}
729 clusterfile = {}
690 execfile(args.clusterfile, clusterfile)
730 execfile(args.clusterfile, clusterfile)
691 if not clusterfile.has_key('send_furl'):
731 if not clusterfile.has_key('send_furl'):
692 clusterfile['send_furl'] = False
732 clusterfile['send_furl'] = False
693
733
694 cont_args = []
734 cont_args = []
695 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
735 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
696
736
697 # Check security settings before proceeding
737 # Check security settings before proceeding
698 if not check_security(args, cont_args):
738 if not check_security(args, cont_args):
699 return
739 return
700
740
701 # See if we are reusing FURL files
741 # See if we are reusing FURL files
702 if not check_reuse(args, cont_args):
742 if not check_reuse(args, cont_args):
703 return
743 return
704
744
705 cl = ControllerLauncher(extra_args=cont_args)
745 cl = ControllerLauncher(extra_args=cont_args)
706 dstart = cl.start()
746 dstart = cl.start()
707 def start_engines(cont_pid):
747 def start_engines(cont_pid):
708 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
748 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
709 def shutdown(signum, frame):
749 def shutdown(signum, frame):
710 d = ssh_set.kill()
750 d = ssh_set.kill()
711 cl.interrupt_then_kill(1.0)
751 cl.interrupt_then_kill(1.0)
712 reactor.callLater(2.0, reactor.stop)
752 reactor.callLater(2.0, reactor.stop)
713 signal.signal(signal.SIGINT,shutdown)
753 signal.signal(signal.SIGINT,shutdown)
714 d = ssh_set.start(clusterfile['send_furl'])
754 d = ssh_set.start(clusterfile['send_furl'])
715 return d
755 return d
716 config = kernel_config_manager.get_config_obj()
756 config = kernel_config_manager.get_config_obj()
717 furl_file = config['controller']['engine_furl_file']
757 furl_file = config['controller']['engine_furl_file']
718 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
758 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
719 dstart.addErrback(_err_and_stop)
759 dstart.addErrback(_err_and_stop)
720
760
721
761
722 def get_args():
762 def get_args():
723 base_parser = argparse.ArgumentParser(add_help=False)
763 base_parser = argparse.ArgumentParser(add_help=False)
724 base_parser.add_argument(
764 base_parser.add_argument(
725 '-r',
765 '-r',
726 action='store_true',
766 action='store_true',
727 dest='r',
767 dest='r',
728 help='try to reuse FURL files. Use with --client-port and --engine-port'
768 help='try to reuse FURL files. Use with --client-port and --engine-port'
729 )
769 )
730 base_parser.add_argument(
770 base_parser.add_argument(
731 '--client-port',
771 '--client-port',
732 type=int,
772 type=int,
733 dest='client_port',
773 dest='client_port',
734 help='the port the controller will listen on for client connections',
774 help='the port the controller will listen on for client connections',
735 default=0
775 default=0
736 )
776 )
737 base_parser.add_argument(
777 base_parser.add_argument(
738 '--engine-port',
778 '--engine-port',
739 type=int,
779 type=int,
740 dest='engine_port',
780 dest='engine_port',
741 help='the port the controller will listen on for engine connections',
781 help='the port the controller will listen on for engine connections',
742 default=0
782 default=0
743 )
783 )
744 base_parser.add_argument(
784 base_parser.add_argument(
745 '-x',
785 '-x',
746 action='store_true',
786 action='store_true',
747 dest='x',
787 dest='x',
748 help='turn off client security'
788 help='turn off client security'
749 )
789 )
750 base_parser.add_argument(
790 base_parser.add_argument(
751 '-y',
791 '-y',
752 action='store_true',
792 action='store_true',
753 dest='y',
793 dest='y',
754 help='turn off engine security'
794 help='turn off engine security'
755 )
795 )
756 base_parser.add_argument(
796 base_parser.add_argument(
757 "--logdir",
797 "--logdir",
758 type=str,
798 type=str,
759 dest="logdir",
799 dest="logdir",
760 help="directory to put log files (default=$IPYTHONDIR/log)",
800 help="directory to put log files (default=$IPYTHONDIR/log)",
761 default=pjoin(get_ipython_dir(),'log')
801 default=pjoin(get_ipython_dir(),'log')
762 )
802 )
763 base_parser.add_argument(
803 base_parser.add_argument(
764 "-n",
804 "-n",
765 "--num",
805 "--num",
766 type=int,
806 type=int,
767 dest="n",
807 dest="n",
768 default=2,
808 default=2,
769 help="the number of engines to start"
809 help="the number of engines to start"
770 )
810 )
771
811
772 parser = argparse.ArgumentParser(
812 parser = argparse.ArgumentParser(
773 description='IPython cluster startup. This starts a controller and\
813 description='IPython cluster startup. This starts a controller and\
774 engines using various approaches. Use the IPYTHONDIR environment\
814 engines using various approaches. Use the IPYTHONDIR environment\
775 variable to change your IPython directory from the default of\
815 variable to change your IPython directory from the default of\
776 .ipython or _ipython. The log and security subdirectories of your\
816 .ipython or _ipython. The log and security subdirectories of your\
777 IPython directory will be used by this script for log files and\
817 IPython directory will be used by this script for log files and\
778 security files.'
818 security files.'
779 )
819 )
780 subparsers = parser.add_subparsers(
820 subparsers = parser.add_subparsers(
781 help='available cluster types. For help, do "ipcluster TYPE --help"')
821 help='available cluster types. For help, do "ipcluster TYPE --help"')
782
822
783 parser_local = subparsers.add_parser(
823 parser_local = subparsers.add_parser(
784 'local',
824 'local',
785 help='run a local cluster',
825 help='run a local cluster',
786 parents=[base_parser]
826 parents=[base_parser]
787 )
827 )
788 parser_local.set_defaults(func=main_local)
828 parser_local.set_defaults(func=main_local)
789
829
790 parser_mpirun = subparsers.add_parser(
830 parser_mpirun = subparsers.add_parser(
791 'mpirun',
831 'mpirun',
792 help='run a cluster using mpirun (mpiexec also works)',
832 help='run a cluster using mpirun (mpiexec also works)',
793 parents=[base_parser]
833 parents=[base_parser]
794 )
834 )
795 parser_mpirun.add_argument(
835 parser_mpirun.add_argument(
796 "--mpi",
836 "--mpi",
797 type=str,
837 type=str,
798 dest="mpi", # Don't put a default here to allow no MPI support
838 dest="mpi", # Don't put a default here to allow no MPI support
799 help="how to call MPI_Init (default=mpi4py)"
839 help="how to call MPI_Init (default=mpi4py)"
800 )
840 )
801 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
841 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
802
842
803 parser_mpiexec = subparsers.add_parser(
843 parser_mpiexec = subparsers.add_parser(
804 'mpiexec',
844 'mpiexec',
805 help='run a cluster using mpiexec (mpirun also works)',
845 help='run a cluster using mpiexec (mpirun also works)',
806 parents=[base_parser]
846 parents=[base_parser]
807 )
847 )
808 parser_mpiexec.add_argument(
848 parser_mpiexec.add_argument(
809 "--mpi",
849 "--mpi",
810 type=str,
850 type=str,
811 dest="mpi", # Don't put a default here to allow no MPI support
851 dest="mpi", # Don't put a default here to allow no MPI support
812 help="how to call MPI_Init (default=mpi4py)"
852 help="how to call MPI_Init (default=mpi4py)"
813 )
853 )
814 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
854 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
815
855
816 parser_pbs = subparsers.add_parser(
856 parser_pbs = subparsers.add_parser(
817 'pbs',
857 'pbs',
818 help='run a pbs cluster',
858 help='run a pbs cluster',
819 parents=[base_parser]
859 parents=[base_parser]
820 )
860 )
821 parser_pbs.add_argument(
861 parser_pbs.add_argument(
822 '--pbs-script',
862 '--pbs-script',
823 type=str,
863 type=str,
824 dest='pbsscript',
864 dest='pbsscript',
825 help='PBS script template',
865 help='PBS script template',
826 default='pbs.template'
866 default='pbs.template'
827 )
867 )
828 parser_pbs.set_defaults(func=main_pbs)
868 parser_pbs.set_defaults(func=main_pbs)
829
869
830 parser_sge = subparsers.add_parser(
870 parser_sge = subparsers.add_parser(
831 'sge',
871 'sge',
832 help='run an sge cluster',
872 help='run an sge cluster',
833 parents=[base_parser]
873 parents=[base_parser]
834 )
874 )
835 parser_sge.add_argument(
875 parser_sge.add_argument(
836 '--sge-script',
876 '--sge-script',
837 type=str,
877 type=str,
838 dest='sgescript',
878 dest='sgescript',
839 help='SGE script template',
879 help='SGE script template',
840 default='template.sge'
880 default='template.sge'
841 )
881 )
842 parser_sge.set_defaults(func=main_sge)
882 parser_sge.set_defaults(func=main_sge)
843
883
844 parser_ssh = subparsers.add_parser(
884 parser_ssh = subparsers.add_parser(
845 'ssh',
885 'ssh',
846 help='run a cluster using ssh, should have ssh-keys setup',
886 help='run a cluster using ssh, should have ssh-keys setup',
847 parents=[base_parser]
887 parents=[base_parser]
848 )
888 )
849 parser_ssh.add_argument(
889 parser_ssh.add_argument(
850 '--clusterfile',
890 '--clusterfile',
851 type=str,
891 type=str,
852 dest='clusterfile',
892 dest='clusterfile',
853 help='python file describing the cluster',
893 help='python file describing the cluster',
854 default='clusterfile.py',
894 default='clusterfile.py',
855 )
895 )
856 parser_ssh.add_argument(
896 parser_ssh.add_argument(
857 '--sshx',
897 '--sshx',
858 type=str,
898 type=str,
859 dest='sshx',
899 dest='sshx',
860 help='sshx launcher helper'
900 help='sshx launcher helper'
861 )
901 )
862 parser_ssh.set_defaults(func=main_ssh)
902 parser_ssh.set_defaults(func=main_ssh)
863
903
864 args = parser.parse_args()
904 args = parser.parse_args()
865 return args
905 return args
866
906
867 def main():
907 def main():
868 args = get_args()
908 args = get_args()
869 reactor.callWhenRunning(args.func, args)
909 reactor.callWhenRunning(args.func, args)
870 log.startLogging(sys.stdout)
910 log.startLogging(sys.stdout)
871 reactor.run()
911 reactor.run()
872
912
873 if __name__ == '__main__':
913 if __name__ == '__main__':
874 main()
914 main()
@@ -1,400 +1,397 b''
1 .. _parallel_process:
1 .. _parallel_process:
2
2
3 ===========================================
3 ===========================================
4 Starting the IPython controller and engines
4 Starting the IPython controller and engines
5 ===========================================
5 ===========================================
6
6
7 To use IPython for parallel computing, you need to start one instance of
7 To use IPython for parallel computing, you need to start one instance of
8 the controller and one or more instances of the engine. The controller
8 the controller and one or more instances of the engine. The controller
9 and each engine can run on different machines or on the same machine.
9 and each engine can run on different machines or on the same machine.
10 Because of this, there are many different possibilities.
10 Because of this, there are many different possibilities.
11
11
12 Broadly speaking, there are two ways of going about starting a controller and engines:
12 Broadly speaking, there are two ways of going about starting a controller and engines:
13
13
14 * In an automated manner using the :command:`ipcluster` command.
14 * In an automated manner using the :command:`ipcluster` command.
15 * In a more manual way using the :command:`ipcontroller` and
15 * In a more manual way using the :command:`ipcontroller` and
16 :command:`ipengine` commands.
16 :command:`ipengine` commands.
17
17
18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
19
19
20 General considerations
20 General considerations
21 ======================
21 ======================
22
22
23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
24
24
25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
26
26
27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
28 ``host0``.
28 ``host0``.
29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
30 controller from ``host0`` to hosts ``host1``-``hostn``.
30 controller from ``host0`` to hosts ``host1``-``hostn``.
31 3. Start the engines on hosts ``host1``-``hostn`` by running
31 3. Start the engines on hosts ``host1``-``hostn`` by running
32 :command:`ipengine`. This command has to be told where the FURL file
32 :command:`ipengine`. This command has to be told where the FURL file
33 (:file:`ipcontroller-engine.furl`) is located.
33 (:file:`ipcontroller-engine.furl`) is located.
34
34
35 At this point, the controller and engines will be connected. By default, the
35 At this point, the controller and engines will be connected. By default, the
36 FURL files created by the controller are put into the
36 FURL files created by the controller are put into the
37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
38 the controller, step 2 can be skipped as the engines will automatically look
38 the controller, step 2 can be skipped as the engines will automatically look
39 at that location.
39 at that location.
40
40
41 The final step required required to actually use the running controller from a
41 The final step required required to actually use the running controller from a
42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
45
45
46 Using :command:`ipcluster`
46 Using :command:`ipcluster`
47 ==========================
47 ==========================
48
48
49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
50
50
51 1. When the controller and engines are all run on localhost. This is useful
51 1. When the controller and engines are all run on localhost. This is useful
52 for testing or running on a multicore computer.
52 for testing or running on a multicore computer.
53 2. When engines are started using the :command:`mpirun` command that comes
53 2. When engines are started using the :command:`mpirun` command that comes
54 with most MPI [MPI]_ implementations
54 with most MPI [MPI]_ implementations
55 3. When engines are started using the PBS [PBS]_ batch system.
55 3. When engines are started using the PBS [PBS]_ batch system.
56 4. When engines are started using the SGE [SGE]_ batch system.
56 4. When engines are started using the SGE [SGE]_ batch system.
57 5. When the controller is started on localhost and the engines are started on
57 5. When the controller is started on localhost and the engines are started on
58 remote nodes using :command:`ssh`.
58 remote nodes using :command:`ssh`.
59
59
60 .. note::
60 .. note::
61
61
62 It is also possible for advanced users to add support to
62 It is also possible for advanced users to add support to
63 :command:`ipcluster` for starting controllers and engines using other
63 :command:`ipcluster` for starting controllers and engines using other
64 methods (like Sun's Grid Engine for example).
64 methods (like Sun's Grid Engine for example).
65
65
66 .. note::
66 .. note::
67
67
68 Currently :command:`ipcluster` requires that the
68 Currently :command:`ipcluster` requires that the
69 :file:`~/.ipython/security` directory live on a shared filesystem that is
69 :file:`~/.ipython/security` directory live on a shared filesystem that is
70 seen by both the controller and engines. If you don't have a shared file
70 seen by both the controller and engines. If you don't have a shared file
71 system you will need to use :command:`ipcontroller` and
71 system you will need to use :command:`ipcontroller` and
72 :command:`ipengine` directly. This constraint can be relaxed if you are
72 :command:`ipengine` directly. This constraint can be relaxed if you are
73 using the :command:`ssh` method to start the cluster.
73 using the :command:`ssh` method to start the cluster.
74
74
75 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
75 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
76 and :command:`ipengine` to perform the steps described above.
76 and :command:`ipengine` to perform the steps described above.
77
77
78 Using :command:`ipcluster` in local mode
78 Using :command:`ipcluster` in local mode
79 ----------------------------------------
79 ----------------------------------------
80
80
81 To start one controller and 4 engines on localhost, just do::
81 To start one controller and 4 engines on localhost, just do::
82
82
83 $ ipcluster local -n 4
83 $ ipcluster local -n 4
84
84
85 To see other command line options for the local mode, do::
85 To see other command line options for the local mode, do::
86
86
87 $ ipcluster local -h
87 $ ipcluster local -h
88
88
89 Using :command:`ipcluster` in mpiexec/mpirun mode
89 Using :command:`ipcluster` in mpiexec/mpirun mode
90 -------------------------------------------------
90 -------------------------------------------------
91
91
92 The mpiexec/mpirun mode is useful if you:
92 The mpiexec/mpirun mode is useful if you:
93
93
94 1. Have MPI installed.
94 1. Have MPI installed.
95 2. Your systems are configured to use the :command:`mpiexec` or
95 2. Your systems are configured to use the :command:`mpiexec` or
96 :command:`mpirun` commands to start MPI processes.
96 :command:`mpirun` commands to start MPI processes.
97
97
98 .. note::
98 .. note::
99
99
100 The preferred command to use is :command:`mpiexec`. However, we also
100 The preferred command to use is :command:`mpiexec`. However, we also
101 support :command:`mpirun` for backwards compatibility. The underlying
101 support :command:`mpirun` for backwards compatibility. The underlying
102 logic used is exactly the same, the only difference being the name of the
102 logic used is exactly the same, the only difference being the name of the
103 command line program that is called.
103 command line program that is called.
104
104
105 If these are satisfied, you can start an IPython cluster using::
105 If these are satisfied, you can start an IPython cluster using::
106
106
107 $ ipcluster mpiexec -n 4
107 $ ipcluster mpiexec -n 4
108
108
109 This does the following:
109 This does the following:
110
110
111 1. Starts the IPython controller on current host.
111 1. Starts the IPython controller on current host.
112 2. Uses :command:`mpiexec` to start 4 engines.
112 2. Uses :command:`mpiexec` to start 4 engines.
113
113
114 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
114 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
115
115
116 $ ipcluster mpiexec -n 4 --mpi=mpi4py
116 $ ipcluster mpiexec -n 4 --mpi=mpi4py
117
117
118 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
118 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
119
119
120 Additional command line options for this mode can be found by doing::
120 Additional command line options for this mode can be found by doing::
121
121
122 $ ipcluster mpiexec -h
122 $ ipcluster mpiexec -h
123
123
124 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
124 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
125
125
126
126
127 Using :command:`ipcluster` in PBS mode
127 Using :command:`ipcluster` in PBS mode
128 --------------------------------------
128 --------------------------------------
129
129
130 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
130 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
131
131
132 .. sourcecode:: bash
132 .. sourcecode:: bash
133
133
134 #PBS -N ipython
134 #PBS -N ipython
135 #PBS -j oe
135 #PBS -j oe
136 #PBS -l walltime=00:10:00
136 #PBS -l walltime=00:10:00
137 #PBS -l nodes=${n/4}:ppn=4
137 #PBS -l nodes=${n/4}:ppn=4
138 #PBS -q parallel
138 #PBS -q parallel
139
139
140 cd $$PBS_O_WORKDIR
140 cd $$PBS_O_WORKDIR
141 export PATH=$$HOME/usr/local/bin
141 export PATH=$$HOME/usr/local/bin
142 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
142 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
143 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
143 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
144
144
145 There are a few important points about this template:
145 There are a few important points about this template:
146
146
147 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
147 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
148 template engine.
148 template engine.
149
149
150 2. Instead of putting in the actual number of engines, use the notation
150 2. Instead of putting in the actual number of engines, use the notation
151 ``${n}`` to indicate the number of engines to be started. You can also uses
151 ``${n}`` to indicate the number of engines to be started. You can also uses
152 expressions like ``${n/4}`` in the template to indicate the number of
152 expressions like ``${n/4}`` in the template to indicate the number of
153 nodes.
153 nodes.
154
154
155 3. Because ``$`` is a special character used by the template engine, you must
155 3. Because ``$`` is a special character used by the template engine, you must
156 escape any ``$`` by using ``$$``. This is important when referring to
156 escape any ``$`` by using ``$$``. This is important when referring to
157 environment variables in the template.
157 environment variables in the template.
158
158
159 4. Any options to :command:`ipengine` should be given in the batch script
159 4. Any options to :command:`ipengine` should be given in the batch script
160 template.
160 template.
161
161
162 5. Depending on the configuration of you system, you may have to set
162 5. Depending on the configuration of you system, you may have to set
163 environment variables in the script template.
163 environment variables in the script template.
164
164
165 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
165 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
166
166
167 $ ipcluster pbs -n 128 --pbs-script=pbs.template
167 $ ipcluster pbs -n 128 --pbs-script=pbs.template
168
168
169 Additional command line options for this mode can be found by doing::
169 Additional command line options for this mode can be found by doing::
170
170
171 $ ipcluster pbs -h
171 $ ipcluster pbs -h
172
172
173 Using :command:`ipcluster` in SGE mode
173 Using :command:`ipcluster` in SGE mode
174 --------------------------------------
174 --------------------------------------
175
175
176 The SGE mode uses the Sun Grid Engine [SGE]_ to start the engines. To use this mode, you first need to create a SGE script template that will be used to start the engines. Here is a sample SGE script template:
176 The SGE mode uses the Sun Grid Engine [SGE]_ to start the engines. To use this mode, you first need to create a SGE script template that will be used to start the engines. Here is a sample SGE script template:
177
177
178 .. sourcecode:: bash
178 .. sourcecode:: bash
179
179
180 #!/bin/bash
180 #!/bin/bash
181 #$ -V
181 #$ -V
182 #$ -cwd
183 #$ -m n
182 #$ -m n
184 #$ -N satra-ipython
183 #$ -N ipengine-${eid}
185 #$ -r y
184 #$ -r y
186 #$ -q sub
185 #$ -q sub
187 #$ -S /bin/bash
186 #$ -S /bin/bash
188
187
189 cd $$HOME/sge
188 cd $$HOME/sge
190 ipengine --logfile=ipengine
189 ipengine --logfile=ipengine${eid}
191
190
192 There are a few important points about this template:
191 There are a few important points about this template:
193
192
194 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
193 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
195 template engine.
194 template engine.
196
195
197 2. Instead of putting in the actual number of engines, use the notation
196 2. Instead of putting in the actual id of engines, use the notation
198 ``${n}`` to indicate the number of engines to be started. You can also uses
197 ``${eid}`` to indicate where engine id should be inserted.
199 expressions like ``${n/4}`` in the template to indicate the number of
200 nodes.
201
198
202 3. Because ``$`` is a special character used by the template engine, you must
199 3. Because ``$`` is a special character used by the template engine, you must
203 escape any ``$`` by using ``$$``. This is important when referring to
200 escape any ``$`` by using ``$$``. This is important when referring to
204 environment variables in the template.
201 environment variables in the template.
205
202
206 4. Any options to :command:`ipengine` should be given in the batch script
203 4. Any options to :command:`ipengine` should be given in the batch script
207 template.
204 template.
208
205
209 5. Depending on the configuration of you system, you may have to set
206 5. Depending on the configuration of you system, you may have to set
210 environment variables in the script template.
207 environment variables in the script template.
211
208
212 Once you have created such a script, save it with a name like :file:`sge.template`. Now you are ready to start your job::
209 Once you have created such a script, save it with a name like :file:`sge.template`. Now you are ready to start your job::
213
210
214 $ ipcluster sge -n 128 --sge-script=sge.template
211 $ ipcluster sge -n 12 --sge-script=sge.template
215
212
216 Additional command line options for this mode can be found by doing::
213 Additional command line options for this mode can be found by doing::
217
214
218 $ ipcluster sge -h
215 $ ipcluster sge -h
219
216
220 Using :command:`ipcluster` in SSH mode
217 Using :command:`ipcluster` in SSH mode
221 --------------------------------------
218 --------------------------------------
222
219
223 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
220 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
224 nodes and the :command:`ipcontroller` on localhost.
221 nodes and the :command:`ipcontroller` on localhost.
225
222
226 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
223 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
227
224
228 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
225 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
229
226
230 .. sourcecode:: python
227 .. sourcecode:: python
231
228
232 send_furl = True
229 send_furl = True
233 engines = { 'host1.example.com' : 2,
230 engines = { 'host1.example.com' : 2,
234 'host2.example.com' : 5,
231 'host2.example.com' : 5,
235 'host3.example.com' : 1,
232 'host3.example.com' : 1,
236 'host4.example.com' : 8 }
233 'host4.example.com' : 8 }
237
234
238 Since this is a regular python file usual python syntax applies. Things to note:
235 Since this is a regular python file usual python syntax applies. Things to note:
239
236
240 * The `engines` dict, where the keys is the host we want to run engines on and
237 * The `engines` dict, where the keys is the host we want to run engines on and
241 the value is the number of engines to run on that host.
238 the value is the number of engines to run on that host.
242 * send_furl can either be `True` or `False`, if `True` it will copy over the
239 * send_furl can either be `True` or `False`, if `True` it will copy over the
243 furl needed for :command:`ipengine` to each host.
240 furl needed for :command:`ipengine` to each host.
244
241
245 The ``--clusterfile`` command line option lets you specify the file to use for
242 The ``--clusterfile`` command line option lets you specify the file to use for
246 the cluster definition. Once you have your cluster file and you can
243 the cluster definition. Once you have your cluster file and you can
247 :command:`ssh` into the remote hosts with out an password you are ready to
244 :command:`ssh` into the remote hosts with out an password you are ready to
248 start your cluster like so:
245 start your cluster like so:
249
246
250 .. sourcecode:: bash
247 .. sourcecode:: bash
251
248
252 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
249 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
253
250
254
251
255 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
252 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
256
253
257 * sshx.sh
254 * sshx.sh
258 * engine_killer.sh
255 * engine_killer.sh
259
256
260 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
257 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
261
258
262 The default sshx.sh is the following:
259 The default sshx.sh is the following:
263
260
264 .. sourcecode:: bash
261 .. sourcecode:: bash
265
262
266 #!/bin/sh
263 #!/bin/sh
267 "$@" &> /dev/null &
264 "$@" &> /dev/null &
268 echo $!
265 echo $!
269
266
270 If you want to use a custom sshx.sh script you need to use the ``--sshx``
267 If you want to use a custom sshx.sh script you need to use the ``--sshx``
271 option and specify the file to use. Using a custom sshx.sh file could be
268 option and specify the file to use. Using a custom sshx.sh file could be
272 helpful when you need to setup the environment on the remote host before
269 helpful when you need to setup the environment on the remote host before
273 executing :command:`ipengine`.
270 executing :command:`ipengine`.
274
271
275 For a detailed options list:
272 For a detailed options list:
276
273
277 .. sourcecode:: bash
274 .. sourcecode:: bash
278
275
279 $ ipcluster ssh -h
276 $ ipcluster ssh -h
280
277
281 Current limitations of the SSH mode of :command:`ipcluster` are:
278 Current limitations of the SSH mode of :command:`ipcluster` are:
282
279
283 * Untested on Windows. Would require a working :command:`ssh` on Windows.
280 * Untested on Windows. Would require a working :command:`ssh` on Windows.
284 Also, we are using shell scripts to setup and execute commands on remote
281 Also, we are using shell scripts to setup and execute commands on remote
285 hosts.
282 hosts.
286 * :command:`ipcontroller` is started on localhost, with no option to start it
283 * :command:`ipcontroller` is started on localhost, with no option to start it
287 on a remote node.
284 on a remote node.
288
285
289 Using the :command:`ipcontroller` and :command:`ipengine` commands
286 Using the :command:`ipcontroller` and :command:`ipengine` commands
290 ==================================================================
287 ==================================================================
291
288
292 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
289 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
293
290
294 Starting the controller and engine on your local machine
291 Starting the controller and engine on your local machine
295 --------------------------------------------------------
292 --------------------------------------------------------
296
293
297 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
294 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
298 local machine, do the following.
295 local machine, do the following.
299
296
300 First start the controller::
297 First start the controller::
301
298
302 $ ipcontroller
299 $ ipcontroller
303
300
304 Next, start however many instances of the engine you want using (repeatedly) the command::
301 Next, start however many instances of the engine you want using (repeatedly) the command::
305
302
306 $ ipengine
303 $ ipengine
307
304
308 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
305 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
309
306
310 .. warning::
307 .. warning::
311
308
312 The order of the above operations is very important. You *must*
309 The order of the above operations is very important. You *must*
313 start the controller before the engines, since the engines connect
310 start the controller before the engines, since the engines connect
314 to the controller as they get started.
311 to the controller as they get started.
315
312
316 .. note::
313 .. note::
317
314
318 On some platforms (OS X), to put the controller and engine into the
315 On some platforms (OS X), to put the controller and engine into the
319 background you may need to give these commands in the form ``(ipcontroller
316 background you may need to give these commands in the form ``(ipcontroller
320 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
317 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
321 properly.
318 properly.
322
319
323 Starting the controller and engines on different hosts
320 Starting the controller and engines on different hosts
324 ------------------------------------------------------
321 ------------------------------------------------------
325
322
326 When the controller and engines are running on different hosts, things are
323 When the controller and engines are running on different hosts, things are
327 slightly more complicated, but the underlying ideas are the same:
324 slightly more complicated, but the underlying ideas are the same:
328
325
329 1. Start the controller on a host using :command:`ipcontroller`.
326 1. Start the controller on a host using :command:`ipcontroller`.
330 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
327 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
331 3. Use :command:`ipengine` on the engine's hosts to start the engines.
328 3. Use :command:`ipengine` on the engine's hosts to start the engines.
332
329
333 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
330 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
334
331
335 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
332 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
336 directory on the engine's host, where it will be found automatically.
333 directory on the engine's host, where it will be found automatically.
337 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
334 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
338 flag.
335 flag.
339
336
340 The ``--furl-file`` flag works like this::
337 The ``--furl-file`` flag works like this::
341
338
342 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
339 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
343
340
344 .. note::
341 .. note::
345
342
346 If the controller's and engine's hosts all have a shared file system
343 If the controller's and engine's hosts all have a shared file system
347 (:file:`~./ipython/security` is the same on all of them), then things
344 (:file:`~./ipython/security` is the same on all of them), then things
348 will just work!
345 will just work!
349
346
350 Make FURL files persistent
347 Make FURL files persistent
351 ---------------------------
348 ---------------------------
352
349
353 At fist glance it may seem that that managing the FURL files is a bit
350 At fist glance it may seem that that managing the FURL files is a bit
354 annoying. Going back to the house and key analogy, copying the FURL around
351 annoying. Going back to the house and key analogy, copying the FURL around
355 each time you start the controller is like having to make a new key every time
352 each time you start the controller is like having to make a new key every time
356 you want to unlock the door and enter your house. As with your house, you want
353 you want to unlock the door and enter your house. As with your house, you want
357 to be able to create the key (or FURL file) once, and then simply use it at
354 to be able to create the key (or FURL file) once, and then simply use it at
358 any point in the future.
355 any point in the future.
359
356
360 This is possible, but before you do this, you **must** remove any old FURL
357 This is possible, but before you do this, you **must** remove any old FURL
361 files in the :file:`~/.ipython/security` directory.
358 files in the :file:`~/.ipython/security` directory.
362
359
363 .. warning::
360 .. warning::
364
361
365 You **must** remove old FURL files before using persistent FURL files.
362 You **must** remove old FURL files before using persistent FURL files.
366
363
367 Then, The only thing you have to do is decide what ports the controller will
364 Then, The only thing you have to do is decide what ports the controller will
368 listen on for the engines and clients. This is done as follows::
365 listen on for the engines and clients. This is done as follows::
369
366
370 $ ipcontroller -r --client-port=10101 --engine-port=10102
367 $ ipcontroller -r --client-port=10101 --engine-port=10102
371
368
372 These options also work with all of the various modes of
369 These options also work with all of the various modes of
373 :command:`ipcluster`::
370 :command:`ipcluster`::
374
371
375 $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102
372 $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102
376
373
377 Then, just copy the furl files over the first time and you are set. You can
374 Then, just copy the furl files over the first time and you are set. You can
378 start and stop the controller and engines any many times as you want in the
375 start and stop the controller and engines any many times as you want in the
379 future, just make sure to tell the controller to use the *same* ports.
376 future, just make sure to tell the controller to use the *same* ports.
380
377
381 .. note::
378 .. note::
382
379
383 You may ask the question: what ports does the controller listen on if you
380 You may ask the question: what ports does the controller listen on if you
384 don't tell is to use specific ones? The default is to use high random port
381 don't tell is to use specific ones? The default is to use high random port
385 numbers. We do this for two reasons: i) to increase security through
382 numbers. We do this for two reasons: i) to increase security through
386 obscurity and ii) to multiple controllers on a given host to start and
383 obscurity and ii) to multiple controllers on a given host to start and
387 automatically use different ports.
384 automatically use different ports.
388
385
389 Log files
386 Log files
390 ---------
387 ---------
391
388
392 All of the components of IPython have log files associated with them.
389 All of the components of IPython have log files associated with them.
393 These log files can be extremely useful in debugging problems with
390 These log files can be extremely useful in debugging problems with
394 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
391 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
395 the log files to us will often help us to debug any problems.
392 the log files to us will often help us to debug any problems.
396
393
397
394
398 .. [PBS] Portable Batch System. http://www.openpbs.org/
395 .. [PBS] Portable Batch System. http://www.openpbs.org/
399 .. [SGE] Sun Grid Engine. http://www.sun.com/software/sge/
396 .. [SGE] Sun Grid Engine. http://www.sun.com/software/sge/
400 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
397 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now