##// END OF EJS Templates
Adding support for mpiexec as well as mpirun....
Brian Granger -
Show More
@@ -1,723 +1,737 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import tempfile
22 22 pjoin = os.path.join
23 23
24 24 from twisted.internet import reactor, defer
25 25 from twisted.internet.protocol import ProcessProtocol
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 28 from twisted.python import failure, log
29 29
30 30 from IPython.external import argparse
31 31 from IPython.external import Itpl
32 32 from IPython.genutils import get_ipython_dir, num_cpus
33 33 from IPython.kernel.fcutil import have_crypto
34 34 from IPython.kernel.error import SecurityError
35 35 from IPython.kernel.fcutil import have_crypto
36 36 from IPython.kernel.twistedutil import gatherBoth
37 37 from IPython.kernel.util import printer
38 38
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # General process handling code
42 42 #-----------------------------------------------------------------------------
43 43
44 44 def find_exe(cmd):
45 45 try:
46 46 import win32api
47 47 except ImportError:
48 48 raise ImportError('you need to have pywin32 installed for this to work')
49 49 else:
50 50 try:
51 51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
52 52 except:
53 53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
54 54 return path
55 55
56 56 class ProcessStateError(Exception):
57 57 pass
58 58
59 59 class UnknownStatus(Exception):
60 60 pass
61 61
62 62 class LauncherProcessProtocol(ProcessProtocol):
63 63 """
64 64 A ProcessProtocol to go with the ProcessLauncher.
65 65 """
66 66 def __init__(self, process_launcher):
67 67 self.process_launcher = process_launcher
68 68
69 69 def connectionMade(self):
70 70 self.process_launcher.fire_start_deferred(self.transport.pid)
71 71
72 72 def processEnded(self, status):
73 73 value = status.value
74 74 if isinstance(value, ProcessDone):
75 75 self.process_launcher.fire_stop_deferred(0)
76 76 elif isinstance(value, ProcessTerminated):
77 77 self.process_launcher.fire_stop_deferred(
78 78 {'exit_code':value.exitCode,
79 79 'signal':value.signal,
80 80 'status':value.status
81 81 }
82 82 )
83 83 else:
84 84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
85 85
86 86 def outReceived(self, data):
87 87 log.msg(data)
88 88
89 89 def errReceived(self, data):
90 90 log.err(data)
91 91
92 92 class ProcessLauncher(object):
93 93 """
94 94 Start and stop an external process in an asynchronous manner.
95 95
96 96 Currently this uses deferreds to notify other parties of process state
97 97 changes. This is an awkward design and should be moved to using
98 98 a formal NotificationCenter.
99 99 """
100 100 def __init__(self, cmd_and_args):
101 101 self.cmd = cmd_and_args[0]
102 102 self.args = cmd_and_args
103 103 self._reset()
104 104
105 105 def _reset(self):
106 106 self.process_protocol = None
107 107 self.pid = None
108 108 self.start_deferred = None
109 109 self.stop_deferreds = []
110 110 self.state = 'before' # before, running, or after
111 111
112 112 @property
113 113 def running(self):
114 114 if self.state == 'running':
115 115 return True
116 116 else:
117 117 return False
118 118
119 119 def fire_start_deferred(self, pid):
120 120 self.pid = pid
121 121 self.state = 'running'
122 122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
123 123 self.start_deferred.callback(pid)
124 124
125 125 def start(self):
126 126 if self.state == 'before':
127 127 self.process_protocol = LauncherProcessProtocol(self)
128 128 self.start_deferred = defer.Deferred()
129 129 self.process_transport = reactor.spawnProcess(
130 130 self.process_protocol,
131 131 self.cmd,
132 132 self.args,
133 133 env=os.environ
134 134 )
135 135 return self.start_deferred
136 136 else:
137 137 s = 'the process has already been started and has state: %r' % \
138 138 self.state
139 139 return defer.fail(ProcessStateError(s))
140 140
141 141 def get_stop_deferred(self):
142 142 if self.state == 'running' or self.state == 'before':
143 143 d = defer.Deferred()
144 144 self.stop_deferreds.append(d)
145 145 return d
146 146 else:
147 147 s = 'this process is already complete'
148 148 return defer.fail(ProcessStateError(s))
149 149
150 150 def fire_stop_deferred(self, exit_code):
151 151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
152 152 self.state = 'after'
153 153 for d in self.stop_deferreds:
154 154 d.callback(exit_code)
155 155
156 156 def signal(self, sig):
157 157 """
158 158 Send a signal to the process.
159 159
160 160 The argument sig can be ('KILL','INT', etc.) or any signal number.
161 161 """
162 162 if self.state == 'running':
163 163 self.process_transport.signalProcess(sig)
164 164
165 165 # def __del__(self):
166 166 # self.signal('KILL')
167 167
168 168 def interrupt_then_kill(self, delay=1.0):
169 169 self.signal('INT')
170 170 reactor.callLater(delay, self.signal, 'KILL')
171 171
172 172
173 173 #-----------------------------------------------------------------------------
174 174 # Code for launching controller and engines
175 175 #-----------------------------------------------------------------------------
176 176
177 177
178 178 class ControllerLauncher(ProcessLauncher):
179 179
180 180 def __init__(self, extra_args=None):
181 181 if sys.platform == 'win32':
182 182 # This logic is needed because the ipcontroller script doesn't
183 183 # always get installed in the same way or in the same location.
184 184 from IPython.kernel.scripts import ipcontroller
185 185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
186 186 # The -u option here turns on unbuffered output, which is required
187 187 # on Win32 to prevent wierd conflict and problems with Twisted
188 188 args = [find_exe('python'), '-u', script_location]
189 189 else:
190 190 args = ['ipcontroller']
191 191 self.extra_args = extra_args
192 192 if extra_args is not None:
193 193 args.extend(extra_args)
194 194
195 195 ProcessLauncher.__init__(self, args)
196 196
197 197
198 198 class EngineLauncher(ProcessLauncher):
199 199
200 200 def __init__(self, extra_args=None):
201 201 if sys.platform == 'win32':
202 202 # This logic is needed because the ipcontroller script doesn't
203 203 # always get installed in the same way or in the same location.
204 204 from IPython.kernel.scripts import ipengine
205 205 script_location = ipengine.__file__.replace('.pyc', '.py')
206 206 # The -u option here turns on unbuffered output, which is required
207 207 # on Win32 to prevent wierd conflict and problems with Twisted
208 208 args = [find_exe('python'), '-u', script_location]
209 209 else:
210 210 args = ['ipengine']
211 211 self.extra_args = extra_args
212 212 if extra_args is not None:
213 213 args.extend(extra_args)
214 214
215 215 ProcessLauncher.__init__(self, args)
216 216
217 217
218 218 class LocalEngineSet(object):
219 219
220 220 def __init__(self, extra_args=None):
221 221 self.extra_args = extra_args
222 222 self.launchers = []
223 223
224 224 def start(self, n):
225 225 dlist = []
226 226 for i in range(n):
227 227 el = EngineLauncher(extra_args=self.extra_args)
228 228 d = el.start()
229 229 self.launchers.append(el)
230 230 dlist.append(d)
231 231 dfinal = gatherBoth(dlist, consumeErrors=True)
232 232 dfinal.addCallback(self._handle_start)
233 233 return dfinal
234 234
235 235 def _handle_start(self, r):
236 236 log.msg('Engines started with pids: %r' % r)
237 237 return r
238 238
239 239 def _handle_stop(self, r):
240 240 log.msg('Engines received signal: %r' % r)
241 241 return r
242 242
243 243 def signal(self, sig):
244 244 dlist = []
245 245 for el in self.launchers:
246 246 d = el.get_stop_deferred()
247 247 dlist.append(d)
248 248 el.signal(sig)
249 249 dfinal = gatherBoth(dlist, consumeErrors=True)
250 250 dfinal.addCallback(self._handle_stop)
251 251 return dfinal
252 252
253 253 def interrupt_then_kill(self, delay=1.0):
254 254 dlist = []
255 255 for el in self.launchers:
256 256 d = el.get_stop_deferred()
257 257 dlist.append(d)
258 258 el.interrupt_then_kill(delay)
259 259 dfinal = gatherBoth(dlist, consumeErrors=True)
260 260 dfinal.addCallback(self._handle_stop)
261 261 return dfinal
262 262
263 263
264 264 class BatchEngineSet(object):
265 265
266 266 # Subclasses must fill these in. See PBSEngineSet
267 267 submit_command = ''
268 268 delete_command = ''
269 269 job_id_regexp = ''
270 270
271 271 def __init__(self, template_file, **kwargs):
272 272 self.template_file = template_file
273 273 self.context = {}
274 274 self.context.update(kwargs)
275 275 self.batch_file = self.template_file+'-run'
276 276
277 277 def parse_job_id(self, output):
278 278 m = re.match(self.job_id_regexp, output)
279 279 if m is not None:
280 280 job_id = m.group()
281 281 else:
282 282 raise Exception("job id couldn't be determined: %s" % output)
283 283 self.job_id = job_id
284 284 log.msg('Job started with job id: %r' % job_id)
285 285 return job_id
286 286
287 287 def write_batch_script(self, n):
288 288 self.context['n'] = n
289 289 template = open(self.template_file, 'r').read()
290 290 log.msg('Using template for batch script: %s' % self.template_file)
291 291 script_as_string = Itpl.itplns(template, self.context)
292 292 log.msg('Writing instantiated batch script: %s' % self.batch_file)
293 293 f = open(self.batch_file,'w')
294 294 f.write(script_as_string)
295 295 f.close()
296 296
297 297 def handle_error(self, f):
298 298 f.printTraceback()
299 299 f.raiseException()
300 300
301 301 def start(self, n):
302 302 self.write_batch_script(n)
303 303 d = getProcessOutput(self.submit_command,
304 304 [self.batch_file],env=os.environ)
305 305 d.addCallback(self.parse_job_id)
306 306 d.addErrback(self.handle_error)
307 307 return d
308 308
309 309 def kill(self):
310 310 d = getProcessOutput(self.delete_command,
311 311 [self.job_id],env=os.environ)
312 312 return d
313 313
314 314 class PBSEngineSet(BatchEngineSet):
315 315
316 316 submit_command = 'qsub'
317 317 delete_command = 'qdel'
318 318 job_id_regexp = '\d+'
319 319
320 320 def __init__(self, template_file, **kwargs):
321 321 BatchEngineSet.__init__(self, template_file, **kwargs)
322 322
323 323
324 324 sshx_template="""#!/bin/sh
325 325 "$@" &> /dev/null &
326 326 echo $!
327 327 """
328 328
329 329 engine_killer_template="""#!/bin/sh
330 330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
331 331 """
332 332
333 333 class SSHEngineSet(object):
334 334 sshx_template=sshx_template
335 335 engine_killer_template=engine_killer_template
336 336
337 337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
338 338 """Start a controller on localhost and engines using ssh.
339 339
340 340 The engine_hosts argument is a dict with hostnames as keys and
341 341 the number of engine (int) as values. sshx is the name of a local
342 342 file that will be used to run remote commands. This file is used
343 343 to setup the environment properly.
344 344 """
345 345
346 346 self.temp_dir = tempfile.gettempdir()
347 347 if sshx is not None:
348 348 self.sshx = sshx
349 349 else:
350 350 # Write the sshx.sh file locally from our template.
351 351 self.sshx = os.path.join(
352 352 self.temp_dir,
353 353 '%s-main-sshx.sh' % os.environ['USER']
354 354 )
355 355 f = open(self.sshx, 'w')
356 356 f.writelines(self.sshx_template)
357 357 f.close()
358 358 self.engine_command = ipengine
359 359 self.engine_hosts = engine_hosts
360 360 # Write the engine killer script file locally from our template.
361 361 self.engine_killer = os.path.join(
362 362 self.temp_dir,
363 363 '%s-local-engine_killer.sh' % os.environ['USER']
364 364 )
365 365 f = open(self.engine_killer, 'w')
366 366 f.writelines(self.engine_killer_template)
367 367 f.close()
368 368
369 369 def start(self, send_furl=False):
370 370 dlist = []
371 371 for host in self.engine_hosts.keys():
372 372 count = self.engine_hosts[host]
373 373 d = self._start(host, count, send_furl)
374 374 dlist.append(d)
375 375 return gatherBoth(dlist, consumeErrors=True)
376 376
377 377 def _start(self, hostname, count=1, send_furl=False):
378 378 if send_furl:
379 379 d = self._scp_furl(hostname)
380 380 else:
381 381 d = defer.succeed(None)
382 382 d.addCallback(lambda r: self._scp_sshx(hostname))
383 383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
384 384 return d
385 385
386 386 def _scp_furl(self, hostname):
387 387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
388 388 cmd_list = scp_cmd.split()
389 389 cmd_list[1] = os.path.expanduser(cmd_list[1])
390 390 log.msg('Copying furl file: %s' % scp_cmd)
391 391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
392 392 return d
393 393
394 394 def _scp_sshx(self, hostname):
395 395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
396 396 self.sshx, hostname,
397 397 self.temp_dir, os.environ['USER']
398 398 )
399 399 print
400 400 log.msg("Copying sshx: %s" % scp_cmd)
401 401 sshx_scp = scp_cmd.split()
402 402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
403 403 return d
404 404
405 405 def _ssh_engine(self, hostname, count):
406 406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
407 407 hostname, self.temp_dir,
408 408 os.environ['USER'], self.engine_command
409 409 )
410 410 cmds = exec_engine.split()
411 411 dlist = []
412 412 log.msg("about to start engines...")
413 413 for i in range(count):
414 414 log.msg('Starting engines: %s' % exec_engine)
415 415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
416 416 dlist.append(d)
417 417 return gatherBoth(dlist, consumeErrors=True)
418 418
419 419 def kill(self):
420 420 dlist = []
421 421 for host in self.engine_hosts.keys():
422 422 d = self._killall(host)
423 423 dlist.append(d)
424 424 return gatherBoth(dlist, consumeErrors=True)
425 425
426 426 def _killall(self, hostname):
427 427 d = self._scp_engine_killer(hostname)
428 428 d.addCallback(lambda r: self._ssh_kill(hostname))
429 429 # d.addErrback(self._exec_err)
430 430 return d
431 431
432 432 def _scp_engine_killer(self, hostname):
433 433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
434 434 self.engine_killer,
435 435 hostname,
436 436 self.temp_dir,
437 437 os.environ['USER']
438 438 )
439 439 cmds = scp_cmd.split()
440 440 log.msg('Copying engine_killer: %s' % scp_cmd)
441 441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
442 442 return d
443 443
444 444 def _ssh_kill(self, hostname):
445 445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
446 446 hostname,
447 447 self.temp_dir,
448 448 os.environ['USER']
449 449 )
450 450 log.msg('Killing engine: %s' % kill_cmd)
451 451 kill_cmd = kill_cmd.split()
452 452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
453 453 return d
454 454
455 455 def _exec_err(self, r):
456 456 log.msg(r)
457 457
458 458 #-----------------------------------------------------------------------------
459 459 # Main functions for the different types of clusters
460 460 #-----------------------------------------------------------------------------
461 461
462 462 # TODO:
463 463 # The logic in these codes should be moved into classes like LocalCluster
464 464 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
465 465 # The main functions should then just parse the command line arguments, create
466 466 # the appropriate class and call a 'start' method.
467 467
468 468 def check_security(args, cont_args):
469 469 if (not args.x or not args.y) and not have_crypto:
470 470 log.err("""
471 471 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
472 472 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
473 473 reactor.stop()
474 474 return False
475 475 if args.x:
476 476 cont_args.append('-x')
477 477 if args.y:
478 478 cont_args.append('-y')
479 479 return True
480 480
481 481
482 482 def main_local(args):
483 483 cont_args = []
484 484 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
485 485
486 486 # Check security settings before proceeding
487 487 if not check_security(args, cont_args):
488 488 return
489 489
490 490 cl = ControllerLauncher(extra_args=cont_args)
491 491 dstart = cl.start()
492 492 def start_engines(cont_pid):
493 493 engine_args = []
494 494 engine_args.append('--logfile=%s' % \
495 495 pjoin(args.logdir,'ipengine%s-' % cont_pid))
496 496 eset = LocalEngineSet(extra_args=engine_args)
497 497 def shutdown(signum, frame):
498 498 log.msg('Stopping local cluster')
499 499 # We are still playing with the times here, but these seem
500 500 # to be reliable in allowing everything to exit cleanly.
501 501 eset.interrupt_then_kill(0.5)
502 502 cl.interrupt_then_kill(0.5)
503 503 reactor.callLater(1.0, reactor.stop)
504 504 signal.signal(signal.SIGINT,shutdown)
505 505 d = eset.start(args.n)
506 506 return d
507 507 def delay_start(cont_pid):
508 508 # This is needed because the controller doesn't start listening
509 509 # right when it starts and the controller needs to write
510 510 # furl files for the engine to pick up
511 511 reactor.callLater(1.0, start_engines, cont_pid)
512 512 dstart.addCallback(delay_start)
513 513 dstart.addErrback(lambda f: f.raiseException())
514 514
515 515
516 def main_mpirun(args):
516 def main_mpi(args):
517 print vars(args)
517 518 cont_args = []
518 519 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
519 520
520 521 # Check security settings before proceeding
521 522 if not check_security(args, cont_args):
522 523 return
523 524
524 525 cl = ControllerLauncher(extra_args=cont_args)
525 526 dstart = cl.start()
526 527 def start_engines(cont_pid):
527 raw_args = ['mpirun']
528 raw_args = [args.cmd]
528 529 raw_args.extend(['-n',str(args.n)])
529 530 raw_args.append('ipengine')
530 531 raw_args.append('-l')
531 532 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
532 533 if args.mpi:
533 534 raw_args.append('--mpi=%s' % args.mpi)
534 535 eset = ProcessLauncher(raw_args)
535 536 def shutdown(signum, frame):
536 537 log.msg('Stopping local cluster')
537 538 # We are still playing with the times here, but these seem
538 539 # to be reliable in allowing everything to exit cleanly.
539 540 eset.interrupt_then_kill(1.0)
540 541 cl.interrupt_then_kill(1.0)
541 542 reactor.callLater(2.0, reactor.stop)
542 543 signal.signal(signal.SIGINT,shutdown)
543 544 d = eset.start()
544 545 return d
545 546 def delay_start(cont_pid):
546 547 # This is needed because the controller doesn't start listening
547 548 # right when it starts and the controller needs to write
548 549 # furl files for the engine to pick up
549 550 reactor.callLater(1.0, start_engines, cont_pid)
550 551 dstart.addCallback(delay_start)
551 552 dstart.addErrback(lambda f: f.raiseException())
552 553
553 554
554 555 def main_pbs(args):
555 556 cont_args = []
556 557 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
557 558
558 559 # Check security settings before proceeding
559 560 if not check_security(args, cont_args):
560 561 return
561 562
562 563 cl = ControllerLauncher(extra_args=cont_args)
563 564 dstart = cl.start()
564 565 def start_engines(r):
565 566 pbs_set = PBSEngineSet(args.pbsscript)
566 567 def shutdown(signum, frame):
567 568 log.msg('Stopping pbs cluster')
568 569 d = pbs_set.kill()
569 570 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
570 571 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
571 572 signal.signal(signal.SIGINT,shutdown)
572 573 d = pbs_set.start(args.n)
573 574 return d
574 575 dstart.addCallback(start_engines)
575 576 dstart.addErrback(lambda f: f.raiseException())
576 577
577 578
578 579 def main_ssh(args):
579 580 """Start a controller on localhost and engines using ssh.
580 581
581 582 Your clusterfile should look like::
582 583
583 584 send_furl = False # True, if you want
584 585 engines = {
585 586 'engine_host1' : engine_count,
586 587 'engine_host2' : engine_count2
587 588 }
588 589 """
589 590 clusterfile = {}
590 591 execfile(args.clusterfile, clusterfile)
591 592 if not clusterfile.has_key('send_furl'):
592 593 clusterfile['send_furl'] = False
593 594
594 595 cont_args = []
595 596 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
596 597
597 598 # Check security settings before proceeding
598 599 if not check_security(args, cont_args):
599 600 return
600 601
601 602 cl = ControllerLauncher(extra_args=cont_args)
602 603 dstart = cl.start()
603 604 def start_engines(cont_pid):
604 605 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
605 606 def shutdown(signum, frame):
606 607 d = ssh_set.kill()
607 608 # d.addErrback(log.err)
608 609 cl.interrupt_then_kill(1.0)
609 610 reactor.callLater(2.0, reactor.stop)
610 611 signal.signal(signal.SIGINT,shutdown)
611 612 d = ssh_set.start(clusterfile['send_furl'])
612 613 return d
613 614
614 615 def delay_start(cont_pid):
615 616 reactor.callLater(1.0, start_engines, cont_pid)
616 617
617 618 dstart.addCallback(delay_start)
618 619 dstart.addErrback(lambda f: f.raiseException())
619 620
620 621
621 622 def get_args():
622 623 base_parser = argparse.ArgumentParser(add_help=False)
623 624 base_parser.add_argument(
624 625 '-x',
625 626 action='store_true',
626 627 dest='x',
627 628 help='turn off client security'
628 629 )
629 630 base_parser.add_argument(
630 631 '-y',
631 632 action='store_true',
632 633 dest='y',
633 634 help='turn off engine security'
634 635 )
635 636 base_parser.add_argument(
636 637 "--logdir",
637 638 type=str,
638 639 dest="logdir",
639 640 help="directory to put log files (default=$IPYTHONDIR/log)",
640 641 default=pjoin(get_ipython_dir(),'log')
641 642 )
642 643 base_parser.add_argument(
643 644 "-n",
644 645 "--num",
645 646 type=int,
646 647 dest="n",
647 648 default=2,
648 649 help="the number of engines to start"
649 650 )
650 651
651 652 parser = argparse.ArgumentParser(
652 653 description='IPython cluster startup. This starts a controller and\
653 654 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
654 655 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
655 656 )
656 657 subparsers = parser.add_subparsers(
657 658 help='available cluster types. For help, do "ipcluster TYPE --help"')
658 659
659 660 parser_local = subparsers.add_parser(
660 661 'local',
661 662 help='run a local cluster',
662 663 parents=[base_parser]
663 664 )
664 665 parser_local.set_defaults(func=main_local)
665 666
666 667 parser_mpirun = subparsers.add_parser(
667 668 'mpirun',
668 help='run a cluster using mpirun',
669 help='run a cluster using mpirun (mpiexec also works)',
669 670 parents=[base_parser]
670 671 )
671 672 parser_mpirun.add_argument(
672 673 "--mpi",
673 674 type=str,
674 675 dest="mpi", # Don't put a default here to allow no MPI support
675 676 help="how to call MPI_Init (default=mpi4py)"
676 677 )
677 parser_mpirun.set_defaults(func=main_mpirun)
678 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
679
680 parser_mpiexec = subparsers.add_parser(
681 'mpiexec',
682 help='run a cluster using mpiexec (mpirun also works)',
683 parents=[base_parser]
684 )
685 parser_mpiexec.add_argument(
686 "--mpi",
687 type=str,
688 dest="mpi", # Don't put a default here to allow no MPI support
689 help="how to call MPI_Init (default=mpi4py)"
690 )
691 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
678 692
679 693 parser_pbs = subparsers.add_parser(
680 694 'pbs',
681 695 help='run a pbs cluster',
682 696 parents=[base_parser]
683 697 )
684 698 parser_pbs.add_argument(
685 699 '--pbs-script',
686 700 type=str,
687 701 dest='pbsscript',
688 702 help='PBS script template',
689 703 default='pbs.template'
690 704 )
691 705 parser_pbs.set_defaults(func=main_pbs)
692 706
693 707 parser_ssh = subparsers.add_parser(
694 708 'ssh',
695 709 help='run a cluster using ssh, should have ssh-keys setup',
696 710 parents=[base_parser]
697 711 )
698 712 parser_ssh.add_argument(
699 713 '--clusterfile',
700 714 type=str,
701 715 dest='clusterfile',
702 716 help='python file describing the cluster',
703 717 default='clusterfile.py',
704 718 )
705 719 parser_ssh.add_argument(
706 720 '--sshx',
707 721 type=str,
708 722 dest='sshx',
709 723 help='sshx launcher helper'
710 724 )
711 725 parser_ssh.set_defaults(func=main_ssh)
712 726
713 727 args = parser.parse_args()
714 728 return args
715 729
716 730 def main():
717 731 args = get_args()
718 732 reactor.callWhenRunning(args.func, args)
719 733 log.startLogging(sys.stdout)
720 734 reactor.run()
721 735
722 736 if __name__ == '__main__':
723 737 main()
@@ -1,324 +1,331 b''
1 1 .. _parallel_process:
2 2
3 3 ===========================================
4 4 Starting the IPython controller and engines
5 5 ===========================================
6 6
7 7 To use IPython for parallel computing, you need to start one instance of
8 8 the controller and one or more instances of the engine. The controller
9 9 and each engine can run on different machines or on the same machine.
10 10 Because of this, there are many different possibilities.
11 11
12 12 Broadly speaking, there are two ways of going about starting a controller and engines:
13 13
14 14 * In an automated manner using the :command:`ipcluster` command.
15 15 * In a more manual way using the :command:`ipcontroller` and
16 16 :command:`ipengine` commands.
17 17
18 18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
19 19
20 20 General considerations
21 21 ======================
22 22
23 23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
24 24
25 25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
26 26
27 27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
28 28 ``host0``.
29 29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
30 30 controller from ``host0`` to hosts ``host1``-``hostn``.
31 31 3. Start the engines on hosts ``host1``-``hostn`` by running
32 32 :command:`ipengine`. This command has to be told where the FURL file
33 33 (:file:`ipcontroller-engine.furl`) is located.
34 34
35 35 At this point, the controller and engines will be connected. By default, the
36 36 FURL files created by the controller are put into the
37 37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
38 38 the controller, step 2 can be skipped as the engines will automatically look
39 39 at that location.
40 40
41 41 The final step required required to actually use the running controller from a
42 42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
43 43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
44 44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
45 45
46 46 Using :command:`ipcluster`
47 47 ==========================
48 48
49 49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
50 50
51 51 1. When the controller and engines are all run on localhost. This is useful
52 52 for testing or running on a multicore computer.
53 53 2. When engines are started using the :command:`mpirun` command that comes
54 54 with most MPI [MPI]_ implementations
55 55 3. When engines are started using the PBS [PBS]_ batch system.
56 56 4. When the controller is started on localhost and the engines are started on
57 57 remote nodes using :command:`ssh`.
58 58
59 59 .. note::
60 60
61 61 It is also possible for advanced users to add support to
62 62 :command:`ipcluster` for starting controllers and engines using other
63 63 methods (like Sun's Grid Engine for example).
64 64
65 65 .. note::
66 66
67 67 Currently :command:`ipcluster` requires that the
68 68 :file:`~/.ipython/security` directory live on a shared filesystem that is
69 69 seen by both the controller and engines. If you don't have a shared file
70 70 system you will need to use :command:`ipcontroller` and
71 71 :command:`ipengine` directly. This constraint can be relaxed if you are
72 72 using the :command:`ssh` method to start the cluster.
73 73
74 74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
75 75 and :command:`ipengine` to perform the steps described above.
76 76
77 77 Using :command:`ipcluster` in local mode
78 78 ----------------------------------------
79 79
80 80 To start one controller and 4 engines on localhost, just do::
81 81
82 82 $ ipcluster local -n 4
83 83
84 84 To see other command line options for the local mode, do::
85 85
86 86 $ ipcluster local -h
87 87
88 Using :command:`ipcluster` in mpirun mode
89 -----------------------------------------
88 Using :command:`ipcluster` in mpiexec/mpirun mode
89 -------------------------------------------------
90 90
91 The mpirun mode is useful if you:
91 The mpiexec/mpirun mode is useful if you:
92 92
93 93 1. Have MPI installed.
94 2. Your systems are configured to use the :command:`mpirun` command to start
95 processes.
94 2. Your systems are configured to use the :command:`mpiexec` or
95 :command:`mpirun` commands to start MPI processes.
96
97 .. note::
98
99 The preferred command to use is :command:`mpiexec`. However, we also
100 support :command:`mpirun` for backwards compatibility. The underlying
101 logic used is exactly the same, the only difference being the name of the
102 command line program that is called.
96 103
97 104 If these are satisfied, you can start an IPython cluster using::
98 105
99 $ ipcluster mpirun -n 4
106 $ ipcluster mpiexec -n 4
100 107
101 108 This does the following:
102 109
103 110 1. Starts the IPython controller on current host.
104 2. Uses :command:`mpirun` to start 4 engines.
111 2. Uses :command:`mpiexec` to start 4 engines.
105 112
106 113 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
107 114
108 $ ipcluster mpirun -n 4 --mpi=mpi4py
115 $ ipcluster mpiexec -n 4 --mpi=mpi4py
109 116
110 117 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
111 118
112 119 Additional command line options for this mode can be found by doing::
113 120
114 $ ipcluster mpirun -h
121 $ ipcluster mpiexec -h
115 122
116 123 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
117 124
118 125
119 126 Using :command:`ipcluster` in PBS mode
120 127 --------------------------------------
121 128
122 129 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
123 130
124 131 .. sourcecode:: bash
125 132
126 133 #PBS -N ipython
127 134 #PBS -j oe
128 135 #PBS -l walltime=00:10:00
129 136 #PBS -l nodes=${n/4}:ppn=4
130 137 #PBS -q parallel
131 138
132 139 cd $$PBS_O_WORKDIR
133 140 export PATH=$$HOME/usr/local/bin
134 141 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
135 142 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
136 143
137 144 There are a few important points about this template:
138 145
139 146 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
140 147 template engine.
141 148
142 149 2. Instead of putting in the actual number of engines, use the notation
143 150 ``${n}`` to indicate the number of engines to be started. You can also uses
144 151 expressions like ``${n/4}`` in the template to indicate the number of
145 152 nodes.
146 153
147 154 3. Because ``$`` is a special character used by the template engine, you must
148 155 escape any ``$`` by using ``$$``. This is important when referring to
149 156 environment variables in the template.
150 157
151 158 4. Any options to :command:`ipengine` should be given in the batch script
152 159 template.
153 160
154 161 5. Depending on the configuration of you system, you may have to set
155 162 environment variables in the script template.
156 163
157 164 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
158 165
159 166 $ ipcluster pbs -n 128 --pbs-script=pbs.template
160 167
161 168 Additional command line options for this mode can be found by doing::
162 169
163 170 $ ipcluster pbs -h
164 171
165 172 Using :command:`ipcluster` in SSH mode
166 173 --------------------------------------
167 174
168 175 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
169 176 nodes and the :command:`ipcontroller` on localhost.
170 177
171 178 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
172 179
173 180 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
174 181
175 182 .. sourcecode:: python
176 183
177 184 send_furl = True
178 185 engines = { 'host1.example.com' : 2,
179 186 'host2.example.com' : 5,
180 187 'host3.example.com' : 1,
181 188 'host4.example.com' : 8 }
182 189
183 190 Since this is a regular python file usual python syntax applies. Things to note:
184 191
185 192 * The `engines` dict, where the keys is the host we want to run engines on and
186 193 the value is the number of engines to run on that host.
187 194 * send_furl can either be `True` or `False`, if `True` it will copy over the
188 195 furl needed for :command:`ipengine` to each host.
189 196
190 197 The ``--clusterfile`` command line option lets you specify the file to use for
191 198 the cluster definition. Once you have your cluster file and you can
192 199 :command:`ssh` into the remote hosts with out an password you are ready to
193 200 start your cluster like so:
194 201
195 202 .. sourcecode:: bash
196 203
197 204 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
198 205
199 206
200 207 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
201 208
202 209 * sshx.sh
203 210 * engine_killer.sh
204 211
205 212 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
206 213
207 214 The default sshx.sh is the following:
208 215
209 216 .. sourcecode:: bash
210 217
211 218 #!/bin/sh
212 219 "$@" &> /dev/null &
213 220 echo $!
214 221
215 222 If you want to use a custom sshx.sh script you need to use the ``--sshx``
216 223 option and specify the file to use. Using a custom sshx.sh file could be
217 224 helpful when you need to setup the environment on the remote host before
218 225 executing :command:`ipengine`.
219 226
220 227 For a detailed options list:
221 228
222 229 .. sourcecode:: bash
223 230
224 231 $ ipcluster ssh -h
225 232
226 233 Current limitations of the SSH mode of :command:`ipcluster` are:
227 234
228 235 * Untested on Windows. Would require a working :command:`ssh` on Windows.
229 236 Also, we are using shell scripts to setup and execute commands on remote
230 237 hosts.
231 238 * :command:`ipcontroller` is started on localhost, with no option to start it
232 239 on a remote node.
233 240
234 241 Using the :command:`ipcontroller` and :command:`ipengine` commands
235 242 ==================================================================
236 243
237 244 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
238 245
239 246 Starting the controller and engine on your local machine
240 247 --------------------------------------------------------
241 248
242 249 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
243 250 local machine, do the following.
244 251
245 252 First start the controller::
246 253
247 254 $ ipcontroller
248 255
249 256 Next, start however many instances of the engine you want using (repeatedly) the command::
250 257
251 258 $ ipengine
252 259
253 260 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
254 261
255 262 .. warning::
256 263
257 264 The order of the above operations is very important. You *must*
258 265 start the controller before the engines, since the engines connect
259 266 to the controller as they get started.
260 267
261 268 .. note::
262 269
263 270 On some platforms (OS X), to put the controller and engine into the
264 271 background you may need to give these commands in the form ``(ipcontroller
265 272 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
266 273 properly.
267 274
268 275 Starting the controller and engines on different hosts
269 276 ------------------------------------------------------
270 277
271 278 When the controller and engines are running on different hosts, things are
272 279 slightly more complicated, but the underlying ideas are the same:
273 280
274 281 1. Start the controller on a host using :command:`ipcontroller`.
275 282 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
276 283 3. Use :command:`ipengine` on the engine's hosts to start the engines.
277 284
278 285 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
279 286
280 287 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
281 288 directory on the engine's host, where it will be found automatically.
282 289 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
283 290 flag.
284 291
285 292 The ``--furl-file`` flag works like this::
286 293
287 294 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
288 295
289 296 .. note::
290 297
291 298 If the controller's and engine's hosts all have a shared file system
292 299 (:file:`~./ipython/security` is the same on all of them), then things
293 300 will just work!
294 301
295 302 Make FURL files persistent
296 303 ---------------------------
297 304
298 305 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
299 306
300 307 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
301 308
302 309 $ ipcontroller -r --client-port=10101 --engine-port=10102
303 310
304 311 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
305 312
306 313 .. note::
307 314
308 315 You may ask the question: what ports does the controller listen on if you
309 316 don't tell is to use specific ones? The default is to use high random port
310 317 numbers. We do this for two reasons: i) to increase security through
311 318 obscurity and ii) to multiple controllers on a given host to start and
312 319 automatically use different ports.
313 320
314 321 Log files
315 322 ---------
316 323
317 324 All of the components of IPython have log files associated with them.
318 325 These log files can be extremely useful in debugging problems with
319 326 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
320 327 the log files to us will often help us to debug any problems.
321 328
322 329
323 330 .. [PBS] Portable Batch System. http://www.openpbs.org/
324 331 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now