##// END OF EJS Templates
Initial merge of ssh cluster from ~vvatsa's ipcluster-dev.
Brian Granger -
Show More
@@ -1,521 +1,658 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 import tempfile
21 22 pjoin = os.path.join
22 23
23 24 from twisted.internet import reactor, defer
24 25 from twisted.internet.protocol import ProcessProtocol
25 26 from twisted.internet.error import ProcessDone, ProcessTerminated
26 27 from twisted.internet.utils import getProcessOutput
27 28 from twisted.python import failure, log
28 29
29 30 from IPython.external import argparse
30 31 from IPython.external import Itpl
31 32 from IPython.genutils import get_ipython_dir, num_cpus
32 33 from IPython.kernel.fcutil import have_crypto
33 34 from IPython.kernel.error import SecurityError
34 35 from IPython.kernel.fcutil import have_crypto
35 36 from IPython.kernel.twistedutil import gatherBoth
36 37 from IPython.kernel.util import printer
37 38
38 39
39 40 #-----------------------------------------------------------------------------
40 41 # General process handling code
41 42 #-----------------------------------------------------------------------------
42 43
43 44 def find_exe(cmd):
44 45 try:
45 46 import win32api
46 47 except ImportError:
47 48 raise ImportError('you need to have pywin32 installed for this to work')
48 49 else:
49 50 try:
50 51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
51 52 except:
52 53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
53 54 return path
54 55
55 56 class ProcessStateError(Exception):
56 57 pass
57 58
58 59 class UnknownStatus(Exception):
59 60 pass
60 61
61 62 class LauncherProcessProtocol(ProcessProtocol):
62 63 """
63 64 A ProcessProtocol to go with the ProcessLauncher.
64 65 """
65 66 def __init__(self, process_launcher):
66 67 self.process_launcher = process_launcher
67 68
68 69 def connectionMade(self):
69 70 self.process_launcher.fire_start_deferred(self.transport.pid)
70 71
71 72 def processEnded(self, status):
72 73 value = status.value
73 74 if isinstance(value, ProcessDone):
74 75 self.process_launcher.fire_stop_deferred(0)
75 76 elif isinstance(value, ProcessTerminated):
76 77 self.process_launcher.fire_stop_deferred(
77 78 {'exit_code':value.exitCode,
78 79 'signal':value.signal,
79 80 'status':value.status
80 81 }
81 82 )
82 83 else:
83 84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
84 85
85 86 def outReceived(self, data):
86 87 log.msg(data)
87 88
88 89 def errReceived(self, data):
89 90 log.err(data)
90 91
91 92 class ProcessLauncher(object):
92 93 """
93 94 Start and stop an external process in an asynchronous manner.
94 95
95 96 Currently this uses deferreds to notify other parties of process state
96 97 changes. This is an awkward design and should be moved to using
97 98 a formal NotificationCenter.
98 99 """
99 100 def __init__(self, cmd_and_args):
100 101 self.cmd = cmd_and_args[0]
101 102 self.args = cmd_and_args
102 103 self._reset()
103 104
104 105 def _reset(self):
105 106 self.process_protocol = None
106 107 self.pid = None
107 108 self.start_deferred = None
108 109 self.stop_deferreds = []
109 110 self.state = 'before' # before, running, or after
110 111
111 112 @property
112 113 def running(self):
113 114 if self.state == 'running':
114 115 return True
115 116 else:
116 117 return False
117 118
118 119 def fire_start_deferred(self, pid):
119 120 self.pid = pid
120 121 self.state = 'running'
121 122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
122 123 self.start_deferred.callback(pid)
123 124
124 125 def start(self):
125 126 if self.state == 'before':
126 127 self.process_protocol = LauncherProcessProtocol(self)
127 128 self.start_deferred = defer.Deferred()
128 129 self.process_transport = reactor.spawnProcess(
129 130 self.process_protocol,
130 131 self.cmd,
131 132 self.args,
132 133 env=os.environ
133 134 )
134 135 return self.start_deferred
135 136 else:
136 137 s = 'the process has already been started and has state: %r' % \
137 138 self.state
138 139 return defer.fail(ProcessStateError(s))
139 140
140 141 def get_stop_deferred(self):
141 142 if self.state == 'running' or self.state == 'before':
142 143 d = defer.Deferred()
143 144 self.stop_deferreds.append(d)
144 145 return d
145 146 else:
146 147 s = 'this process is already complete'
147 148 return defer.fail(ProcessStateError(s))
148 149
149 150 def fire_stop_deferred(self, exit_code):
150 151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
151 152 self.state = 'after'
152 153 for d in self.stop_deferreds:
153 154 d.callback(exit_code)
154 155
155 156 def signal(self, sig):
156 157 """
157 158 Send a signal to the process.
158 159
159 160 The argument sig can be ('KILL','INT', etc.) or any signal number.
160 161 """
161 162 if self.state == 'running':
162 163 self.process_transport.signalProcess(sig)
163 164
164 165 # def __del__(self):
165 166 # self.signal('KILL')
166 167
167 168 def interrupt_then_kill(self, delay=1.0):
168 169 self.signal('INT')
169 170 reactor.callLater(delay, self.signal, 'KILL')
170 171
171 172
172 173 #-----------------------------------------------------------------------------
173 174 # Code for launching controller and engines
174 175 #-----------------------------------------------------------------------------
175 176
176 177
177 178 class ControllerLauncher(ProcessLauncher):
178 179
179 180 def __init__(self, extra_args=None):
180 181 if sys.platform == 'win32':
181 182 # This logic is needed because the ipcontroller script doesn't
182 183 # always get installed in the same way or in the same location.
183 184 from IPython.kernel.scripts import ipcontroller
184 185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
185 186 # The -u option here turns on unbuffered output, which is required
186 187 # on Win32 to prevent wierd conflict and problems with Twisted
187 188 args = [find_exe('python'), '-u', script_location]
188 189 else:
189 190 args = ['ipcontroller']
190 191 self.extra_args = extra_args
191 192 if extra_args is not None:
192 193 args.extend(extra_args)
193 194
194 195 ProcessLauncher.__init__(self, args)
195 196
196 197
197 198 class EngineLauncher(ProcessLauncher):
198 199
199 200 def __init__(self, extra_args=None):
200 201 if sys.platform == 'win32':
201 202 # This logic is needed because the ipcontroller script doesn't
202 203 # always get installed in the same way or in the same location.
203 204 from IPython.kernel.scripts import ipengine
204 205 script_location = ipengine.__file__.replace('.pyc', '.py')
205 206 # The -u option here turns on unbuffered output, which is required
206 207 # on Win32 to prevent wierd conflict and problems with Twisted
207 208 args = [find_exe('python'), '-u', script_location]
208 209 else:
209 210 args = ['ipengine']
210 211 self.extra_args = extra_args
211 212 if extra_args is not None:
212 213 args.extend(extra_args)
213 214
214 215 ProcessLauncher.__init__(self, args)
215 216
216 217
217 218 class LocalEngineSet(object):
218 219
219 220 def __init__(self, extra_args=None):
220 221 self.extra_args = extra_args
221 222 self.launchers = []
222 223
223 224 def start(self, n):
224 225 dlist = []
225 226 for i in range(n):
226 227 el = EngineLauncher(extra_args=self.extra_args)
227 228 d = el.start()
228 229 self.launchers.append(el)
229 230 dlist.append(d)
230 231 dfinal = gatherBoth(dlist, consumeErrors=True)
231 232 dfinal.addCallback(self._handle_start)
232 233 return dfinal
233 234
234 235 def _handle_start(self, r):
235 236 log.msg('Engines started with pids: %r' % r)
236 237 return r
237 238
238 239 def _handle_stop(self, r):
239 240 log.msg('Engines received signal: %r' % r)
240 241 return r
241 242
242 243 def signal(self, sig):
243 244 dlist = []
244 245 for el in self.launchers:
245 246 d = el.get_stop_deferred()
246 247 dlist.append(d)
247 248 el.signal(sig)
248 249 dfinal = gatherBoth(dlist, consumeErrors=True)
249 250 dfinal.addCallback(self._handle_stop)
250 251 return dfinal
251 252
252 253 def interrupt_then_kill(self, delay=1.0):
253 254 dlist = []
254 255 for el in self.launchers:
255 256 d = el.get_stop_deferred()
256 257 dlist.append(d)
257 258 el.interrupt_then_kill(delay)
258 259 dfinal = gatherBoth(dlist, consumeErrors=True)
259 260 dfinal.addCallback(self._handle_stop)
260 261 return dfinal
261 262
262 263
263 264 class BatchEngineSet(object):
264 265
265 266 # Subclasses must fill these in. See PBSEngineSet
266 267 submit_command = ''
267 268 delete_command = ''
268 269 job_id_regexp = ''
269 270
270 271 def __init__(self, template_file, **kwargs):
271 272 self.template_file = template_file
272 273 self.context = {}
273 274 self.context.update(kwargs)
274 275 self.batch_file = self.template_file+'-run'
275 276
276 277 def parse_job_id(self, output):
277 278 m = re.match(self.job_id_regexp, output)
278 279 if m is not None:
279 280 job_id = m.group()
280 281 else:
281 282 raise Exception("job id couldn't be determined: %s" % output)
282 283 self.job_id = job_id
283 284 log.msg('Job started with job id: %r' % job_id)
284 285 return job_id
285 286
286 287 def write_batch_script(self, n):
287 288 self.context['n'] = n
288 289 template = open(self.template_file, 'r').read()
289 290 log.msg('Using template for batch script: %s' % self.template_file)
290 291 script_as_string = Itpl.itplns(template, self.context)
291 292 log.msg('Writing instantiated batch script: %s' % self.batch_file)
292 293 f = open(self.batch_file,'w')
293 294 f.write(script_as_string)
294 295 f.close()
295 296
296 297 def handle_error(self, f):
297 298 f.printTraceback()
298 299 f.raiseException()
299 300
300 301 def start(self, n):
301 302 self.write_batch_script(n)
302 303 d = getProcessOutput(self.submit_command,
303 304 [self.batch_file],env=os.environ)
304 305 d.addCallback(self.parse_job_id)
305 306 d.addErrback(self.handle_error)
306 307 return d
307 308
308 309 def kill(self):
309 310 d = getProcessOutput(self.delete_command,
310 311 [self.job_id],env=os.environ)
311 312 return d
312 313
313 314 class PBSEngineSet(BatchEngineSet):
314 315
315 316 submit_command = 'qsub'
316 317 delete_command = 'qdel'
317 318 job_id_regexp = '\d+'
318 319
319 320 def __init__(self, template_file, **kwargs):
320 321 BatchEngineSet.__init__(self, template_file, **kwargs)
321 322
323 class SSHEngineSet(object):
324 sshx_template="""#!/bin/sh
325 "$@" &> /dev/null &
326 echo $!"""
327
328 engine_killer_template="""#!/bin/sh
329
330 ps -fu `whoami` | grep ipengine | awk '{print $2}' | xargs kill -TERM"""
331
332 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
333 self.temp_dir = tempfile.gettempdir()
334 if sshx != None:
335 self.sshx = sshx
336 else:
337 self.sshx = os.path.join(self.temp_dir, '%s-main-sshx.sh'%os.environ['USER'])
338 f = open(self.sshx, 'w')
339 f.writelines(self.sshx_template)
340 f.close()
341 self.engine_command = ipengine
342 self.engine_hosts = engine_hosts
343 self.engine_killer = os.path.join(self.temp_dir, '%s-main-engine_killer.sh'%os.environ['USER'])
344 f = open(self.engine_killer, 'w')
345 f.writelines(self.engine_killer_template)
346 f.close()
347
348 def start(self, send_furl=False):
349 for host in self.engine_hosts.keys():
350 count = self.engine_hosts[host]
351 self._start(host, count, send_furl)
352
353 def killall(self):
354 for host in self.engine_hosts.keys():
355 self._killall(host)
356
357 def _start(self, host_name, count=1, send_furl=False):
358
359 def _scp_sshx(d):
360 scp_cmd = "scp %s %s:%s/%s-sshx.sh"%(self.sshx, host_name, self.temp_dir, os.environ['USER'])
361 sshx_scp = scp_cmd.split()
362 print sshx_scp
363 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
364 d.addCallback(_exec_engine)
365
366 def _exec_engine(d):
367 exec_engine = "ssh %s sh %s/%s-sshx.sh %s"%(host_name, self.temp_dir, os.environ['USER'], self.engine_command)
368 cmds = exec_engine.split()
369 print cmds
370 for i in range(count):
371 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
372
373 if send_furl:
374 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/"%(host_name)
375 cmd_list = scp_cmd.split()
376 cmd_list[1] = os.path.expanduser(cmd_list[1])
377 print cmd_list
378 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
379 d.addCallback(_scp_sshx)
380 else:
381 _scp_sshx(d=None)
382
383 def _killall(self, host_name):
384 def _exec_err(d):
385 if d.getErrorMessage()[-18:] != "No such process\\n\'":
386 raise d
387
388 def _exec_kill(d):
389 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh"%( host_name, self.temp_dir, os.environ['USER'])
390 kill_cmd = kill_cmd.split()
391 print kill_cmd
392 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
393 d.addErrback(_exec_err)
394 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh"%( self.engine_killer, host_name, self.temp_dir, os.environ['USER'])
395 cmds = scp_cmd.split()
396 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
397 d.addCallback(_exec_kill)
398 d.addErrback(_exec_err)
399
322 400
323 401 #-----------------------------------------------------------------------------
324 402 # Main functions for the different types of clusters
325 403 #-----------------------------------------------------------------------------
326 404
327 405 # TODO:
328 406 # The logic in these codes should be moved into classes like LocalCluster
329 407 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
330 408 # The main functions should then just parse the command line arguments, create
331 409 # the appropriate class and call a 'start' method.
332 410
333 411 def check_security(args, cont_args):
334 412 if (not args.x or not args.y) and not have_crypto:
335 413 log.err("""
336 414 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
337 415 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
338 416 reactor.stop()
339 417 return False
340 418 if args.x:
341 419 cont_args.append('-x')
342 420 if args.y:
343 421 cont_args.append('-y')
344 422 return True
345 423
424
346 425 def main_local(args):
347 426 cont_args = []
348 427 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
349 428
350 429 # Check security settings before proceeding
351 430 if not check_security(args, cont_args):
352 431 return
353 432
354 433 cl = ControllerLauncher(extra_args=cont_args)
355 434 dstart = cl.start()
356 435 def start_engines(cont_pid):
357 436 engine_args = []
358 437 engine_args.append('--logfile=%s' % \
359 438 pjoin(args.logdir,'ipengine%s-' % cont_pid))
360 439 eset = LocalEngineSet(extra_args=engine_args)
361 440 def shutdown(signum, frame):
362 441 log.msg('Stopping local cluster')
363 442 # We are still playing with the times here, but these seem
364 443 # to be reliable in allowing everything to exit cleanly.
365 444 eset.interrupt_then_kill(0.5)
366 445 cl.interrupt_then_kill(0.5)
367 446 reactor.callLater(1.0, reactor.stop)
368 447 signal.signal(signal.SIGINT,shutdown)
369 448 d = eset.start(args.n)
370 449 return d
371 450 def delay_start(cont_pid):
372 451 # This is needed because the controller doesn't start listening
373 452 # right when it starts and the controller needs to write
374 453 # furl files for the engine to pick up
375 454 reactor.callLater(1.0, start_engines, cont_pid)
376 455 dstart.addCallback(delay_start)
377 456 dstart.addErrback(lambda f: f.raiseException())
378 457
458
379 459 def main_mpirun(args):
380 460 cont_args = []
381 461 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
382 462
383 463 # Check security settings before proceeding
384 464 if not check_security(args, cont_args):
385 465 return
386 466
387 467 cl = ControllerLauncher(extra_args=cont_args)
388 468 dstart = cl.start()
389 469 def start_engines(cont_pid):
390 470 raw_args = ['mpirun']
391 471 raw_args.extend(['-n',str(args.n)])
392 472 raw_args.append('ipengine')
393 473 raw_args.append('-l')
394 474 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
395 475 if args.mpi:
396 476 raw_args.append('--mpi=%s' % args.mpi)
397 477 eset = ProcessLauncher(raw_args)
398 478 def shutdown(signum, frame):
399 479 log.msg('Stopping local cluster')
400 480 # We are still playing with the times here, but these seem
401 481 # to be reliable in allowing everything to exit cleanly.
402 482 eset.interrupt_then_kill(1.0)
403 483 cl.interrupt_then_kill(1.0)
404 484 reactor.callLater(2.0, reactor.stop)
405 485 signal.signal(signal.SIGINT,shutdown)
406 486 d = eset.start()
407 487 return d
408 488 def delay_start(cont_pid):
409 489 # This is needed because the controller doesn't start listening
410 490 # right when it starts and the controller needs to write
411 491 # furl files for the engine to pick up
412 492 reactor.callLater(1.0, start_engines, cont_pid)
413 493 dstart.addCallback(delay_start)
414 494 dstart.addErrback(lambda f: f.raiseException())
415 495
496
416 497 def main_pbs(args):
417 498 cont_args = []
418 499 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
419 500
420 501 # Check security settings before proceeding
421 502 if not check_security(args, cont_args):
422 503 return
423 504
424 505 cl = ControllerLauncher(extra_args=cont_args)
425 506 dstart = cl.start()
426 507 def start_engines(r):
427 508 pbs_set = PBSEngineSet(args.pbsscript)
428 509 def shutdown(signum, frame):
429 510 log.msg('Stopping pbs cluster')
430 511 d = pbs_set.kill()
431 512 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
432 513 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
433 514 signal.signal(signal.SIGINT,shutdown)
434 515 d = pbs_set.start(args.n)
435 516 return d
436 517 dstart.addCallback(start_engines)
437 518 dstart.addErrback(lambda f: f.raiseException())
438 519
439 520
521 # currently the ssh launcher only launches the controller on localhost.
522 def main_ssh(args):
523 # the clusterfile should look like:
524 # send_furl = False # True, if you want
525 # engines = {'engine_host1' : engine_count, 'engine_host2' : engine_count2}
526 clusterfile = {}
527 execfile(args.clusterfile, clusterfile)
528 if not clusterfile.has_key('send_furl'):
529 clusterfile['send_furl'] = False
530
531 cont_args = []
532 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
533 if args.x:
534 cont_args.append('-x')
535 if args.y:
536 cont_args.append('-y')
537 cl = ControllerLauncher(extra_args=cont_args)
538 dstart = cl.start()
539 def start_engines(cont_pid):
540 est = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
541 est.start(clusterfile['send_furl'])
542 def shutdown(signum, frame):
543 est.killall()
544 cl.interrupt_then_kill(0.5)
545 reactor.callLater(2.0, reactor.stop)
546 signal.signal(signal.SIGINT,shutdown)
547
548 def delay_start(cont_pid):
549 reactor.callLater(1.0, start_engines, cont_pid)
550
551 dstart.addCallback(delay_start)
552 dstart.addErrback(lambda f: f.raiseException())
553
554
440 555 def get_args():
441 556 base_parser = argparse.ArgumentParser(add_help=False)
442 557 base_parser.add_argument(
443 558 '-x',
444 559 action='store_true',
445 560 dest='x',
446 561 help='turn off client security'
447 562 )
448 563 base_parser.add_argument(
449 564 '-y',
450 565 action='store_true',
451 566 dest='y',
452 567 help='turn off engine security'
453 568 )
454 569 base_parser.add_argument(
455 570 "--logdir",
456 571 type=str,
457 572 dest="logdir",
458 573 help="directory to put log files (default=$IPYTHONDIR/log)",
459 574 default=pjoin(get_ipython_dir(),'log')
460 575 )
461 576 base_parser.add_argument(
462 577 "-n",
463 578 "--num",
464 579 type=int,
465 580 dest="n",
466 581 default=2,
467 582 help="the number of engines to start"
468 583 )
469 584
470 585 parser = argparse.ArgumentParser(
471 586 description='IPython cluster startup. This starts a controller and\
472 587 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
473 588 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
474 589 )
475 590 subparsers = parser.add_subparsers(
476 591 help='available cluster types. For help, do "ipcluster TYPE --help"')
477 592
478 593 parser_local = subparsers.add_parser(
479 594 'local',
480 595 help='run a local cluster',
481 596 parents=[base_parser]
482 597 )
483 598 parser_local.set_defaults(func=main_local)
484 599
485 600 parser_mpirun = subparsers.add_parser(
486 601 'mpirun',
487 602 help='run a cluster using mpirun',
488 603 parents=[base_parser]
489 604 )
490 605 parser_mpirun.add_argument(
491 606 "--mpi",
492 607 type=str,
493 608 dest="mpi", # Don't put a default here to allow no MPI support
494 609 help="how to call MPI_Init (default=mpi4py)"
495 610 )
496 611 parser_mpirun.set_defaults(func=main_mpirun)
497 612
498 613 parser_pbs = subparsers.add_parser(
499 614 'pbs',
500 615 help='run a pbs cluster',
501 616 parents=[base_parser]
502 617 )
503 618 parser_pbs.add_argument(
504 619 '--pbs-script',
505 620 type=str,
506 621 dest='pbsscript',
507 622 help='PBS script template',
508 623 default='pbs.template'
509 624 )
510 625 parser_pbs.set_defaults(func=main_pbs)
626
627 parser_ssh = subparsers.add_parser(
628 'ssh',
629 help='run a cluster using ssh, should have ssh-keys setup',
630 parents=[base_parser]
631 )
632 parser_ssh.add_argument(
633 '--clusterfile',
634 type=str,
635 dest='clusterfile',
636 help='python file describing the cluster',
637 default='clusterfile.py',
638 )
639 parser_ssh.add_argument(
640 '--sshx',
641 type=str,
642 dest='sshx',
643 help='sshx launcher helper',
644 default='sshx.sh',
645 )
646 parser_ssh.set_defaults(func=main_ssh)
647
511 648 args = parser.parse_args()
512 649 return args
513 650
514 651 def main():
515 652 args = get_args()
516 653 reactor.callWhenRunning(args.func, args)
517 654 log.startLogging(sys.stdout)
518 655 reactor.run()
519 656
520 657 if __name__ == '__main__':
521 658 main()
@@ -1,251 +1,326 b''
1 1 .. _parallel_process:
2 2
3 3 ===========================================
4 4 Starting the IPython controller and engines
5 5 ===========================================
6 6
7 7 To use IPython for parallel computing, you need to start one instance of
8 8 the controller and one or more instances of the engine. The controller
9 9 and each engine can run on different machines or on the same machine.
10 10 Because of this, there are many different possibilities.
11 11
12 12 Broadly speaking, there are two ways of going about starting a controller and engines:
13 13
14 14 * In an automated manner using the :command:`ipcluster` command.
15 15 * In a more manual way using the :command:`ipcontroller` and
16 16 :command:`ipengine` commands.
17 17
18 18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
19 19
20 20 General considerations
21 21 ======================
22 22
23 23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
24 24
25 25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
26 26
27 27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
28 28 ``host0``.
29 29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
30 30 controller from ``host0`` to hosts ``host1``-``hostn``.
31 31 3. Start the engines on hosts ``host1``-``hostn`` by running
32 32 :command:`ipengine`. This command has to be told where the FURL file
33 33 (:file:`ipcontroller-engine.furl`) is located.
34 34
35 35 At this point, the controller and engines will be connected. By default, the
36 36 FURL files created by the controller are put into the
37 37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
38 38 the controller, step 2 can be skipped as the engines will automatically look
39 39 at that location.
40 40
41 41 The final step required required to actually use the running controller from a
42 42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
43 43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
44 44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
45 45
46 46 Using :command:`ipcluster`
47 47 ==========================
48 48
49 49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
50 50
51 51 1. When the controller and engines are all run on localhost. This is useful
52 52 for testing or running on a multicore computer.
53 53 2. When engines are started using the :command:`mpirun` command that comes
54 54 with most MPI [MPI]_ implementations
55 55 3. When engines are started using the PBS [PBS]_ batch system.
56 4. When the controller is started on localhost and the engines are started on
57 remote nodes using :command:`ssh`.
56 58
57 59 .. note::
58 60
59 61 It is also possible for advanced users to add support to
60 62 :command:`ipcluster` for starting controllers and engines using other
61 63 methods (like Sun's Grid Engine for example).
62 64
63 65 .. note::
64 66
65 67 Currently :command:`ipcluster` requires that the
66 68 :file:`~/.ipython/security` directory live on a shared filesystem that is
67 69 seen by both the controller and engines. If you don't have a shared file
68 70 system you will need to use :command:`ipcontroller` and
69 :command:`ipengine` directly.
71 :command:`ipengine` directly. This constraint can be relaxed if you are
72 using the :command:`ssh` method to start the cluster.
70 73
71 74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
72 75 and :command:`ipengine` to perform the steps described above.
73 76
74 77 Using :command:`ipcluster` in local mode
75 78 ----------------------------------------
76 79
77 80 To start one controller and 4 engines on localhost, just do::
78 81
79 82 $ ipcluster local -n 4
80 83
81 84 To see other command line options for the local mode, do::
82 85
83 86 $ ipcluster local -h
84 87
85 88 Using :command:`ipcluster` in mpirun mode
86 89 -----------------------------------------
87 90
88 91 The mpirun mode is useful if you:
89 92
90 93 1. Have MPI installed.
91 94 2. Your systems are configured to use the :command:`mpirun` command to start
92 95 processes.
93 96
94 97 If these are satisfied, you can start an IPython cluster using::
95 98
96 99 $ ipcluster mpirun -n 4
97 100
98 101 This does the following:
99 102
100 103 1. Starts the IPython controller on current host.
101 104 2. Uses :command:`mpirun` to start 4 engines.
102 105
103 106 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
104 107
105 108 $ ipcluster mpirun -n 4 --mpi=mpi4py
106 109
107 110 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
108 111
109 112 Additional command line options for this mode can be found by doing::
110 113
111 114 $ ipcluster mpirun -h
112 115
113 116 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
114 117
115 118
116 119 Using :command:`ipcluster` in PBS mode
117 120 --------------------------------------
118 121
119 122 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
120 123
121 124 .. sourcecode:: bash
122 125
123 126 #PBS -N ipython
124 127 #PBS -j oe
125 128 #PBS -l walltime=00:10:00
126 129 #PBS -l nodes=${n/4}:ppn=4
127 130 #PBS -q parallel
128 131
129 132 cd $$PBS_O_WORKDIR
130 133 export PATH=$$HOME/usr/local/bin
131 134 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
132 135 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
133 136
134 137 There are a few important points about this template:
135 138
136 139 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
137 140 template engine.
138 141
139 142 2. Instead of putting in the actual number of engines, use the notation
140 143 ``${n}`` to indicate the number of engines to be started. You can also uses
141 144 expressions like ``${n/4}`` in the template to indicate the number of
142 145 nodes.
143 146
144 147 3. Because ``$`` is a special character used by the template engine, you must
145 148 escape any ``$`` by using ``$$``. This is important when referring to
146 149 environment variables in the template.
147 150
148 151 4. Any options to :command:`ipengine` should be given in the batch script
149 152 template.
150 153
151 154 5. Depending on the configuration of you system, you may have to set
152 155 environment variables in the script template.
153 156
154 157 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
155 158
156 159 $ ipcluster pbs -n 128 --pbs-script=pbs.template
157 160
158 161 Additional command line options for this mode can be found by doing::
159 162
160 163 $ ipcluster pbs -h
161 164
165 Using :command:`ipcluster` in SSH mode
166 --------------------------------------
167
168 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
169 nodes and the :command:`ipcontroller` on localhost.
170
171 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
172
173 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
174
175 .. sourcecode:: python
176
177 send_furl = True
178 engines = { 'host1.example.com' : 2,
179 'host2.example.com' : 5,
180 'host3.example.com' : 1,
181 'host4.example.com' : 8 }
182
183 Since this is a regular python file usual python syntax applies. Things to note:
184
185 * The `engines` dict, where the keys is the host we want to run engines on and
186 the value is the number of engines to run on that host.
187 * send_furl can either be `True` or `False`, if `True` it will copy over the
188 furl needed for :command:`ipengine` to each host.
189
190 The ``--clusterfile`` command line option lets you specify the file to use for
191 the cluster definition. Once you have your cluster file and you can
192 :command:`ssh` into the remote hosts with out an password you are ready to
193 start your cluster like so:
194
195 .. sourcecode:: bash
196
197 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
198
199
200 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
201
202 * sshx.sh
203 * engine_killer.sh
204
205 Both are provided in the :dir:`IPython.kernel.scripts`. They are copied to a
206 temp directory on the remote host and executed from there, on most Unix, Linux
207 and OS X systems this is /tmp.
208
209 The sshx.sh is as simple as:
210
211 .. sourcecode:: bash
212
213 #!/bin/sh
214 "$@" &> /dev/null &
215 echo $!
216
217 If you want to use a custom sshx.sh script you need to use the ``--sshx``
218 option and specify the file to use. Using a custom sshx.sh file could be
219 helpful when you need to setup the environment on the remote host before
220 executing :command:`ipengine`.
221
222 For a detailed options list:
223
224 .. sourcecode:: bash
225
226 $ ipcluster ssh -h
227
228 Current limitations of the SSH mode of :command:`ipcluster` are:
229
230 * Untested on Windows. Would require a working :command:`ssh` on Windows.
231 Also, we are using shell scripts to setup and execute commands on remote
232 hosts.
233 * :command:`ipcontroller` is started on localhost, with no option to start it
234 on a remote node also.
235
162 236 Using the :command:`ipcontroller` and :command:`ipengine` commands
163 237 ==================================================================
164 238
165 239 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
166 240
167 241 Starting the controller and engine on your local machine
168 242 --------------------------------------------------------
169 243
170 244 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
171 245 local machine, do the following.
172 246
173 247 First start the controller::
174 248
175 249 $ ipcontroller
176 250
177 251 Next, start however many instances of the engine you want using (repeatedly) the command::
178 252
179 253 $ ipengine
180 254
181 255 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
182 256
183 257 .. warning::
184 258
185 259 The order of the above operations is very important. You *must*
186 260 start the controller before the engines, since the engines connect
187 261 to the controller as they get started.
188 262
189 263 .. note::
190 264
191 265 On some platforms (OS X), to put the controller and engine into the
192 266 background you may need to give these commands in the form ``(ipcontroller
193 267 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
194 268 properly.
195 269
196 270 Starting the controller and engines on different hosts
197 271 ------------------------------------------------------
198 272
199 273 When the controller and engines are running on different hosts, things are
200 274 slightly more complicated, but the underlying ideas are the same:
201 275
202 276 1. Start the controller on a host using :command:`ipcontroller`.
203 277 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
204 278 3. Use :command:`ipengine` on the engine's hosts to start the engines.
205 279
206 280 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
207 281
208 282 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
209 283 directory on the engine's host, where it will be found automatically.
210 284 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
211 285 flag.
212 286
213 287 The ``--furl-file`` flag works like this::
214 288
215 289 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
216 290
217 291 .. note::
218 292
219 293 If the controller's and engine's hosts all have a shared file system
220 294 (:file:`~./ipython/security` is the same on all of them), then things
221 295 will just work!
222 296
223 297 Make FURL files persistent
224 298 ---------------------------
225 299
226 300 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
227 301
228 302 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
229 303
230 304 $ ipcontroller -r --client-port=10101 --engine-port=10102
231 305
232 306 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
233 307
234 308 .. note::
235 309
236 310 You may ask the question: what ports does the controller listen on if you
237 311 don't tell is to use specific ones? The default is to use high random port
238 312 numbers. We do this for two reasons: i) to increase security through
239 313 obscurity and ii) to multiple controllers on a given host to start and
240 314 automatically use different ports.
241 315
242 316 Log files
243 317 ---------
244 318
245 319 All of the components of IPython have log files associated with them.
246 320 These log files can be extremely useful in debugging problems with
247 321 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
248 322 the log files to us will often help us to debug any problems.
249 323
250 324
251 325 .. [PBS] Portable Batch System. http://www.openpbs.org/
326 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now