##// END OF EJS Templates
The ipcluster command is now able to reuse FURL files....
Brian Granger -
Show More
@@ -1,736 +1,783
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import tempfile
22 22 pjoin = os.path.join
23 23
24 24 from twisted.internet import reactor, defer
25 25 from twisted.internet.protocol import ProcessProtocol
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 28 from twisted.python import failure, log
29 29
30 30 from IPython.external import argparse
31 31 from IPython.external import Itpl
32 32 from IPython.genutils import get_ipython_dir, num_cpus
33 33 from IPython.kernel.fcutil import have_crypto
34 34 from IPython.kernel.error import SecurityError
35 35 from IPython.kernel.fcutil import have_crypto
36 36 from IPython.kernel.twistedutil import gatherBoth
37 37 from IPython.kernel.util import printer
38 38
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # General process handling code
42 42 #-----------------------------------------------------------------------------
43 43
44 44 def find_exe(cmd):
45 45 try:
46 46 import win32api
47 47 except ImportError:
48 48 raise ImportError('you need to have pywin32 installed for this to work')
49 49 else:
50 50 try:
51 51 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
52 52 except:
53 53 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
54 54 return path
55 55
56 56 class ProcessStateError(Exception):
57 57 pass
58 58
59 59 class UnknownStatus(Exception):
60 60 pass
61 61
62 62 class LauncherProcessProtocol(ProcessProtocol):
63 63 """
64 64 A ProcessProtocol to go with the ProcessLauncher.
65 65 """
66 66 def __init__(self, process_launcher):
67 67 self.process_launcher = process_launcher
68 68
69 69 def connectionMade(self):
70 70 self.process_launcher.fire_start_deferred(self.transport.pid)
71 71
72 72 def processEnded(self, status):
73 73 value = status.value
74 74 if isinstance(value, ProcessDone):
75 75 self.process_launcher.fire_stop_deferred(0)
76 76 elif isinstance(value, ProcessTerminated):
77 77 self.process_launcher.fire_stop_deferred(
78 78 {'exit_code':value.exitCode,
79 79 'signal':value.signal,
80 80 'status':value.status
81 81 }
82 82 )
83 83 else:
84 84 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
85 85
86 86 def outReceived(self, data):
87 87 log.msg(data)
88 88
89 89 def errReceived(self, data):
90 90 log.err(data)
91 91
92 92 class ProcessLauncher(object):
93 93 """
94 94 Start and stop an external process in an asynchronous manner.
95 95
96 96 Currently this uses deferreds to notify other parties of process state
97 97 changes. This is an awkward design and should be moved to using
98 98 a formal NotificationCenter.
99 99 """
100 100 def __init__(self, cmd_and_args):
101 101 self.cmd = cmd_and_args[0]
102 102 self.args = cmd_and_args
103 103 self._reset()
104 104
105 105 def _reset(self):
106 106 self.process_protocol = None
107 107 self.pid = None
108 108 self.start_deferred = None
109 109 self.stop_deferreds = []
110 110 self.state = 'before' # before, running, or after
111 111
112 112 @property
113 113 def running(self):
114 114 if self.state == 'running':
115 115 return True
116 116 else:
117 117 return False
118 118
119 119 def fire_start_deferred(self, pid):
120 120 self.pid = pid
121 121 self.state = 'running'
122 122 log.msg('Process %r has started with pid=%i' % (self.args, pid))
123 123 self.start_deferred.callback(pid)
124 124
125 125 def start(self):
126 126 if self.state == 'before':
127 127 self.process_protocol = LauncherProcessProtocol(self)
128 128 self.start_deferred = defer.Deferred()
129 129 self.process_transport = reactor.spawnProcess(
130 130 self.process_protocol,
131 131 self.cmd,
132 132 self.args,
133 133 env=os.environ
134 134 )
135 135 return self.start_deferred
136 136 else:
137 137 s = 'the process has already been started and has state: %r' % \
138 138 self.state
139 139 return defer.fail(ProcessStateError(s))
140 140
141 141 def get_stop_deferred(self):
142 142 if self.state == 'running' or self.state == 'before':
143 143 d = defer.Deferred()
144 144 self.stop_deferreds.append(d)
145 145 return d
146 146 else:
147 147 s = 'this process is already complete'
148 148 return defer.fail(ProcessStateError(s))
149 149
150 150 def fire_stop_deferred(self, exit_code):
151 151 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
152 152 self.state = 'after'
153 153 for d in self.stop_deferreds:
154 154 d.callback(exit_code)
155 155
156 156 def signal(self, sig):
157 157 """
158 158 Send a signal to the process.
159 159
160 160 The argument sig can be ('KILL','INT', etc.) or any signal number.
161 161 """
162 162 if self.state == 'running':
163 163 self.process_transport.signalProcess(sig)
164 164
165 165 # def __del__(self):
166 166 # self.signal('KILL')
167 167
168 168 def interrupt_then_kill(self, delay=1.0):
169 169 self.signal('INT')
170 170 reactor.callLater(delay, self.signal, 'KILL')
171 171
172 172
173 173 #-----------------------------------------------------------------------------
174 174 # Code for launching controller and engines
175 175 #-----------------------------------------------------------------------------
176 176
177 177
178 178 class ControllerLauncher(ProcessLauncher):
179 179
180 180 def __init__(self, extra_args=None):
181 181 if sys.platform == 'win32':
182 182 # This logic is needed because the ipcontroller script doesn't
183 183 # always get installed in the same way or in the same location.
184 184 from IPython.kernel.scripts import ipcontroller
185 185 script_location = ipcontroller.__file__.replace('.pyc', '.py')
186 186 # The -u option here turns on unbuffered output, which is required
187 187 # on Win32 to prevent wierd conflict and problems with Twisted
188 188 args = [find_exe('python'), '-u', script_location]
189 189 else:
190 190 args = ['ipcontroller']
191 191 self.extra_args = extra_args
192 192 if extra_args is not None:
193 193 args.extend(extra_args)
194 194
195 195 ProcessLauncher.__init__(self, args)
196 196
197 197
198 198 class EngineLauncher(ProcessLauncher):
199 199
200 200 def __init__(self, extra_args=None):
201 201 if sys.platform == 'win32':
202 202 # This logic is needed because the ipcontroller script doesn't
203 203 # always get installed in the same way or in the same location.
204 204 from IPython.kernel.scripts import ipengine
205 205 script_location = ipengine.__file__.replace('.pyc', '.py')
206 206 # The -u option here turns on unbuffered output, which is required
207 207 # on Win32 to prevent wierd conflict and problems with Twisted
208 208 args = [find_exe('python'), '-u', script_location]
209 209 else:
210 210 args = ['ipengine']
211 211 self.extra_args = extra_args
212 212 if extra_args is not None:
213 213 args.extend(extra_args)
214 214
215 215 ProcessLauncher.__init__(self, args)
216 216
217 217
218 218 class LocalEngineSet(object):
219 219
220 220 def __init__(self, extra_args=None):
221 221 self.extra_args = extra_args
222 222 self.launchers = []
223 223
224 224 def start(self, n):
225 225 dlist = []
226 226 for i in range(n):
227 227 el = EngineLauncher(extra_args=self.extra_args)
228 228 d = el.start()
229 229 self.launchers.append(el)
230 230 dlist.append(d)
231 231 dfinal = gatherBoth(dlist, consumeErrors=True)
232 232 dfinal.addCallback(self._handle_start)
233 233 return dfinal
234 234
235 235 def _handle_start(self, r):
236 236 log.msg('Engines started with pids: %r' % r)
237 237 return r
238 238
239 239 def _handle_stop(self, r):
240 240 log.msg('Engines received signal: %r' % r)
241 241 return r
242 242
243 243 def signal(self, sig):
244 244 dlist = []
245 245 for el in self.launchers:
246 246 d = el.get_stop_deferred()
247 247 dlist.append(d)
248 248 el.signal(sig)
249 249 dfinal = gatherBoth(dlist, consumeErrors=True)
250 250 dfinal.addCallback(self._handle_stop)
251 251 return dfinal
252 252
253 253 def interrupt_then_kill(self, delay=1.0):
254 254 dlist = []
255 255 for el in self.launchers:
256 256 d = el.get_stop_deferred()
257 257 dlist.append(d)
258 258 el.interrupt_then_kill(delay)
259 259 dfinal = gatherBoth(dlist, consumeErrors=True)
260 260 dfinal.addCallback(self._handle_stop)
261 261 return dfinal
262 262
263 263
264 264 class BatchEngineSet(object):
265 265
266 266 # Subclasses must fill these in. See PBSEngineSet
267 267 submit_command = ''
268 268 delete_command = ''
269 269 job_id_regexp = ''
270 270
271 271 def __init__(self, template_file, **kwargs):
272 272 self.template_file = template_file
273 273 self.context = {}
274 274 self.context.update(kwargs)
275 275 self.batch_file = self.template_file+'-run'
276 276
277 277 def parse_job_id(self, output):
278 278 m = re.match(self.job_id_regexp, output)
279 279 if m is not None:
280 280 job_id = m.group()
281 281 else:
282 282 raise Exception("job id couldn't be determined: %s" % output)
283 283 self.job_id = job_id
284 284 log.msg('Job started with job id: %r' % job_id)
285 285 return job_id
286 286
287 287 def write_batch_script(self, n):
288 288 self.context['n'] = n
289 289 template = open(self.template_file, 'r').read()
290 290 log.msg('Using template for batch script: %s' % self.template_file)
291 291 script_as_string = Itpl.itplns(template, self.context)
292 292 log.msg('Writing instantiated batch script: %s' % self.batch_file)
293 293 f = open(self.batch_file,'w')
294 294 f.write(script_as_string)
295 295 f.close()
296 296
297 297 def handle_error(self, f):
298 298 f.printTraceback()
299 299 f.raiseException()
300 300
301 301 def start(self, n):
302 302 self.write_batch_script(n)
303 303 d = getProcessOutput(self.submit_command,
304 304 [self.batch_file],env=os.environ)
305 305 d.addCallback(self.parse_job_id)
306 306 d.addErrback(self.handle_error)
307 307 return d
308 308
309 309 def kill(self):
310 310 d = getProcessOutput(self.delete_command,
311 311 [self.job_id],env=os.environ)
312 312 return d
313 313
314 314 class PBSEngineSet(BatchEngineSet):
315 315
316 316 submit_command = 'qsub'
317 317 delete_command = 'qdel'
318 318 job_id_regexp = '\d+'
319 319
320 320 def __init__(self, template_file, **kwargs):
321 321 BatchEngineSet.__init__(self, template_file, **kwargs)
322 322
323 323
324 324 sshx_template="""#!/bin/sh
325 325 "$@" &> /dev/null &
326 326 echo $!
327 327 """
328 328
329 329 engine_killer_template="""#!/bin/sh
330 330 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
331 331 """
332 332
333 333 class SSHEngineSet(object):
334 334 sshx_template=sshx_template
335 335 engine_killer_template=engine_killer_template
336 336
337 337 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
338 338 """Start a controller on localhost and engines using ssh.
339 339
340 340 The engine_hosts argument is a dict with hostnames as keys and
341 341 the number of engine (int) as values. sshx is the name of a local
342 342 file that will be used to run remote commands. This file is used
343 343 to setup the environment properly.
344 344 """
345 345
346 346 self.temp_dir = tempfile.gettempdir()
347 347 if sshx is not None:
348 348 self.sshx = sshx
349 349 else:
350 350 # Write the sshx.sh file locally from our template.
351 351 self.sshx = os.path.join(
352 352 self.temp_dir,
353 353 '%s-main-sshx.sh' % os.environ['USER']
354 354 )
355 355 f = open(self.sshx, 'w')
356 356 f.writelines(self.sshx_template)
357 357 f.close()
358 358 self.engine_command = ipengine
359 359 self.engine_hosts = engine_hosts
360 360 # Write the engine killer script file locally from our template.
361 361 self.engine_killer = os.path.join(
362 362 self.temp_dir,
363 363 '%s-local-engine_killer.sh' % os.environ['USER']
364 364 )
365 365 f = open(self.engine_killer, 'w')
366 366 f.writelines(self.engine_killer_template)
367 367 f.close()
368 368
369 369 def start(self, send_furl=False):
370 370 dlist = []
371 371 for host in self.engine_hosts.keys():
372 372 count = self.engine_hosts[host]
373 373 d = self._start(host, count, send_furl)
374 374 dlist.append(d)
375 375 return gatherBoth(dlist, consumeErrors=True)
376 376
377 377 def _start(self, hostname, count=1, send_furl=False):
378 378 if send_furl:
379 379 d = self._scp_furl(hostname)
380 380 else:
381 381 d = defer.succeed(None)
382 382 d.addCallback(lambda r: self._scp_sshx(hostname))
383 383 d.addCallback(lambda r: self._ssh_engine(hostname, count))
384 384 return d
385 385
386 386 def _scp_furl(self, hostname):
387 387 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
388 388 cmd_list = scp_cmd.split()
389 389 cmd_list[1] = os.path.expanduser(cmd_list[1])
390 390 log.msg('Copying furl file: %s' % scp_cmd)
391 391 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
392 392 return d
393 393
394 394 def _scp_sshx(self, hostname):
395 395 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
396 396 self.sshx, hostname,
397 397 self.temp_dir, os.environ['USER']
398 398 )
399 399 print
400 400 log.msg("Copying sshx: %s" % scp_cmd)
401 401 sshx_scp = scp_cmd.split()
402 402 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
403 403 return d
404 404
405 405 def _ssh_engine(self, hostname, count):
406 406 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
407 407 hostname, self.temp_dir,
408 408 os.environ['USER'], self.engine_command
409 409 )
410 410 cmds = exec_engine.split()
411 411 dlist = []
412 412 log.msg("about to start engines...")
413 413 for i in range(count):
414 414 log.msg('Starting engines: %s' % exec_engine)
415 415 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
416 416 dlist.append(d)
417 417 return gatherBoth(dlist, consumeErrors=True)
418 418
419 419 def kill(self):
420 420 dlist = []
421 421 for host in self.engine_hosts.keys():
422 422 d = self._killall(host)
423 423 dlist.append(d)
424 424 return gatherBoth(dlist, consumeErrors=True)
425 425
426 426 def _killall(self, hostname):
427 427 d = self._scp_engine_killer(hostname)
428 428 d.addCallback(lambda r: self._ssh_kill(hostname))
429 429 # d.addErrback(self._exec_err)
430 430 return d
431 431
432 432 def _scp_engine_killer(self, hostname):
433 433 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
434 434 self.engine_killer,
435 435 hostname,
436 436 self.temp_dir,
437 437 os.environ['USER']
438 438 )
439 439 cmds = scp_cmd.split()
440 440 log.msg('Copying engine_killer: %s' % scp_cmd)
441 441 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
442 442 return d
443 443
444 444 def _ssh_kill(self, hostname):
445 445 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
446 446 hostname,
447 447 self.temp_dir,
448 448 os.environ['USER']
449 449 )
450 450 log.msg('Killing engine: %s' % kill_cmd)
451 451 kill_cmd = kill_cmd.split()
452 452 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
453 453 return d
454 454
455 455 def _exec_err(self, r):
456 456 log.msg(r)
457 457
458 458 #-----------------------------------------------------------------------------
459 459 # Main functions for the different types of clusters
460 460 #-----------------------------------------------------------------------------
461 461
462 462 # TODO:
463 463 # The logic in these codes should be moved into classes like LocalCluster
464 464 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
465 465 # The main functions should then just parse the command line arguments, create
466 466 # the appropriate class and call a 'start' method.
467 467
468 468 def check_security(args, cont_args):
469 469 if (not args.x or not args.y) and not have_crypto:
470 470 log.err("""
471 471 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
472 472 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
473 473 reactor.stop()
474 474 return False
475 475 if args.x:
476 476 cont_args.append('-x')
477 477 if args.y:
478 478 cont_args.append('-y')
479 479 return True
480 480
481 def check_reuse(args, cont_args):
482 if args.r:
483 cont_args.append('-r')
484 if args.client_port == 0 or args.engine_port == 0:
485 log.err("""
486 To reuse FURL files, you must also set the client and engine ports using
487 the --client-port and --engine-port options.""")
488 reactor.stop()
489 return False
490 cont_args.append('--client-port=%i' % args.client_port)
491 cont_args.append('--engine-port=%i' % args.engine_port)
492 return True
481 493
482 494 def main_local(args):
483 495 cont_args = []
484 496 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
485 497
486 498 # Check security settings before proceeding
487 499 if not check_security(args, cont_args):
488 500 return
489 501
502 # See if we are reusing FURL files
503 if not check_reuse(args, cont_args):
504 return
505
490 506 cl = ControllerLauncher(extra_args=cont_args)
491 507 dstart = cl.start()
492 508 def start_engines(cont_pid):
493 509 engine_args = []
494 510 engine_args.append('--logfile=%s' % \
495 511 pjoin(args.logdir,'ipengine%s-' % cont_pid))
496 512 eset = LocalEngineSet(extra_args=engine_args)
497 513 def shutdown(signum, frame):
498 514 log.msg('Stopping local cluster')
499 515 # We are still playing with the times here, but these seem
500 516 # to be reliable in allowing everything to exit cleanly.
501 517 eset.interrupt_then_kill(0.5)
502 518 cl.interrupt_then_kill(0.5)
503 519 reactor.callLater(1.0, reactor.stop)
504 520 signal.signal(signal.SIGINT,shutdown)
505 521 d = eset.start(args.n)
506 522 return d
507 523 def delay_start(cont_pid):
508 524 # This is needed because the controller doesn't start listening
509 525 # right when it starts and the controller needs to write
510 526 # furl files for the engine to pick up
511 527 reactor.callLater(1.0, start_engines, cont_pid)
512 528 dstart.addCallback(delay_start)
513 529 dstart.addErrback(lambda f: f.raiseException())
514 530
515 531
516 532 def main_mpi(args):
517 print vars(args)
518 533 cont_args = []
519 534 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
520 535
521 536 # Check security settings before proceeding
522 537 if not check_security(args, cont_args):
523 538 return
524 539
540 # See if we are reusing FURL files
541 if not check_reuse(args, cont_args):
542 return
543
525 544 cl = ControllerLauncher(extra_args=cont_args)
526 545 dstart = cl.start()
527 546 def start_engines(cont_pid):
528 547 raw_args = [args.cmd]
529 548 raw_args.extend(['-n',str(args.n)])
530 549 raw_args.append('ipengine')
531 550 raw_args.append('-l')
532 551 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
533 552 if args.mpi:
534 553 raw_args.append('--mpi=%s' % args.mpi)
535 554 eset = ProcessLauncher(raw_args)
536 555 def shutdown(signum, frame):
537 556 log.msg('Stopping local cluster')
538 557 # We are still playing with the times here, but these seem
539 558 # to be reliable in allowing everything to exit cleanly.
540 559 eset.interrupt_then_kill(1.0)
541 560 cl.interrupt_then_kill(1.0)
542 561 reactor.callLater(2.0, reactor.stop)
543 562 signal.signal(signal.SIGINT,shutdown)
544 563 d = eset.start()
545 564 return d
546 565 def delay_start(cont_pid):
547 566 # This is needed because the controller doesn't start listening
548 567 # right when it starts and the controller needs to write
549 568 # furl files for the engine to pick up
550 569 reactor.callLater(1.0, start_engines, cont_pid)
551 570 dstart.addCallback(delay_start)
552 571 dstart.addErrback(lambda f: f.raiseException())
553 572
554 573
555 574 def main_pbs(args):
556 575 cont_args = []
557 576 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
558 577
559 578 # Check security settings before proceeding
560 579 if not check_security(args, cont_args):
561 580 return
562 581
582 # See if we are reusing FURL files
583 if not check_reuse(args, cont_args):
584 return
585
563 586 cl = ControllerLauncher(extra_args=cont_args)
564 587 dstart = cl.start()
565 588 def start_engines(r):
566 589 pbs_set = PBSEngineSet(args.pbsscript)
567 590 def shutdown(signum, frame):
568 591 log.msg('Stopping pbs cluster')
569 592 d = pbs_set.kill()
570 593 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
571 594 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
572 595 signal.signal(signal.SIGINT,shutdown)
573 596 d = pbs_set.start(args.n)
574 597 return d
575 598 dstart.addCallback(start_engines)
576 599 dstart.addErrback(lambda f: f.raiseException())
577 600
578 601
579 602 def main_ssh(args):
580 603 """Start a controller on localhost and engines using ssh.
581 604
582 605 Your clusterfile should look like::
583 606
584 607 send_furl = False # True, if you want
585 608 engines = {
586 609 'engine_host1' : engine_count,
587 610 'engine_host2' : engine_count2
588 611 }
589 612 """
590 613 clusterfile = {}
591 614 execfile(args.clusterfile, clusterfile)
592 615 if not clusterfile.has_key('send_furl'):
593 616 clusterfile['send_furl'] = False
594 617
595 618 cont_args = []
596 619 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
597 620
598 621 # Check security settings before proceeding
599 622 if not check_security(args, cont_args):
600 623 return
601 624
625 # See if we are reusing FURL files
626 if not check_reuse(args, cont_args):
627 return
628
602 629 cl = ControllerLauncher(extra_args=cont_args)
603 630 dstart = cl.start()
604 631 def start_engines(cont_pid):
605 632 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
606 633 def shutdown(signum, frame):
607 634 d = ssh_set.kill()
608 635 cl.interrupt_then_kill(1.0)
609 636 reactor.callLater(2.0, reactor.stop)
610 637 signal.signal(signal.SIGINT,shutdown)
611 638 d = ssh_set.start(clusterfile['send_furl'])
612 639 return d
613 640
614 641 def delay_start(cont_pid):
615 642 reactor.callLater(1.0, start_engines, cont_pid)
616 643
617 644 dstart.addCallback(delay_start)
618 645 dstart.addErrback(lambda f: f.raiseException())
619 646
620 647
621 648 def get_args():
622 649 base_parser = argparse.ArgumentParser(add_help=False)
623 650 base_parser.add_argument(
651 '-r',
652 action='store_true',
653 dest='r',
654 help='try to reuse FURL files. Use with --client-port and --engine-port'
655 )
656 base_parser.add_argument(
657 '--client-port',
658 type=int,
659 dest='client_port',
660 help='the port the controller will listen on for client connections',
661 default=0
662 )
663 base_parser.add_argument(
664 '--engine-port',
665 type=int,
666 dest='engine_port',
667 help='the port the controller will listen on for engine connections',
668 default=0
669 )
670 base_parser.add_argument(
624 671 '-x',
625 672 action='store_true',
626 673 dest='x',
627 674 help='turn off client security'
628 675 )
629 676 base_parser.add_argument(
630 677 '-y',
631 678 action='store_true',
632 679 dest='y',
633 680 help='turn off engine security'
634 681 )
635 682 base_parser.add_argument(
636 683 "--logdir",
637 684 type=str,
638 685 dest="logdir",
639 686 help="directory to put log files (default=$IPYTHONDIR/log)",
640 687 default=pjoin(get_ipython_dir(),'log')
641 688 )
642 689 base_parser.add_argument(
643 690 "-n",
644 691 "--num",
645 692 type=int,
646 693 dest="n",
647 694 default=2,
648 695 help="the number of engines to start"
649 696 )
650 697
651 698 parser = argparse.ArgumentParser(
652 699 description='IPython cluster startup. This starts a controller and\
653 700 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
654 701 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
655 702 )
656 703 subparsers = parser.add_subparsers(
657 704 help='available cluster types. For help, do "ipcluster TYPE --help"')
658 705
659 706 parser_local = subparsers.add_parser(
660 707 'local',
661 708 help='run a local cluster',
662 709 parents=[base_parser]
663 710 )
664 711 parser_local.set_defaults(func=main_local)
665 712
666 713 parser_mpirun = subparsers.add_parser(
667 714 'mpirun',
668 715 help='run a cluster using mpirun (mpiexec also works)',
669 716 parents=[base_parser]
670 717 )
671 718 parser_mpirun.add_argument(
672 719 "--mpi",
673 720 type=str,
674 721 dest="mpi", # Don't put a default here to allow no MPI support
675 722 help="how to call MPI_Init (default=mpi4py)"
676 723 )
677 724 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
678 725
679 726 parser_mpiexec = subparsers.add_parser(
680 727 'mpiexec',
681 728 help='run a cluster using mpiexec (mpirun also works)',
682 729 parents=[base_parser]
683 730 )
684 731 parser_mpiexec.add_argument(
685 732 "--mpi",
686 733 type=str,
687 734 dest="mpi", # Don't put a default here to allow no MPI support
688 735 help="how to call MPI_Init (default=mpi4py)"
689 736 )
690 737 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
691 738
692 739 parser_pbs = subparsers.add_parser(
693 740 'pbs',
694 741 help='run a pbs cluster',
695 742 parents=[base_parser]
696 743 )
697 744 parser_pbs.add_argument(
698 745 '--pbs-script',
699 746 type=str,
700 747 dest='pbsscript',
701 748 help='PBS script template',
702 749 default='pbs.template'
703 750 )
704 751 parser_pbs.set_defaults(func=main_pbs)
705 752
706 753 parser_ssh = subparsers.add_parser(
707 754 'ssh',
708 755 help='run a cluster using ssh, should have ssh-keys setup',
709 756 parents=[base_parser]
710 757 )
711 758 parser_ssh.add_argument(
712 759 '--clusterfile',
713 760 type=str,
714 761 dest='clusterfile',
715 762 help='python file describing the cluster',
716 763 default='clusterfile.py',
717 764 )
718 765 parser_ssh.add_argument(
719 766 '--sshx',
720 767 type=str,
721 768 dest='sshx',
722 769 help='sshx launcher helper'
723 770 )
724 771 parser_ssh.set_defaults(func=main_ssh)
725 772
726 773 args = parser.parse_args()
727 774 return args
728 775
729 776 def main():
730 777 args = get_args()
731 778 reactor.callWhenRunning(args.func, args)
732 779 log.startLogging(sys.stdout)
733 780 reactor.run()
734 781
735 782 if __name__ == '__main__':
736 783 main()
@@ -1,331 +1,336
1 1 .. _parallel_process:
2 2
3 3 ===========================================
4 4 Starting the IPython controller and engines
5 5 ===========================================
6 6
7 7 To use IPython for parallel computing, you need to start one instance of
8 8 the controller and one or more instances of the engine. The controller
9 9 and each engine can run on different machines or on the same machine.
10 10 Because of this, there are many different possibilities.
11 11
12 12 Broadly speaking, there are two ways of going about starting a controller and engines:
13 13
14 14 * In an automated manner using the :command:`ipcluster` command.
15 15 * In a more manual way using the :command:`ipcontroller` and
16 16 :command:`ipengine` commands.
17 17
18 18 This document describes both of these methods. We recommend that new users start with the :command:`ipcluster` command as it simplifies many common usage cases.
19 19
20 20 General considerations
21 21 ======================
22 22
23 23 Before delving into the details about how you can start a controller and engines using the various methods, we outline some of the general issues that come up when starting the controller and engines. These things come up no matter which method you use to start your IPython cluster.
24 24
25 25 Let's say that you want to start the controller on ``host0`` and engines on hosts ``host1``-``hostn``. The following steps are then required:
26 26
27 27 1. Start the controller on ``host0`` by running :command:`ipcontroller` on
28 28 ``host0``.
29 29 2. Move the FURL file (:file:`ipcontroller-engine.furl`) created by the
30 30 controller from ``host0`` to hosts ``host1``-``hostn``.
31 31 3. Start the engines on hosts ``host1``-``hostn`` by running
32 32 :command:`ipengine`. This command has to be told where the FURL file
33 33 (:file:`ipcontroller-engine.furl`) is located.
34 34
35 35 At this point, the controller and engines will be connected. By default, the
36 36 FURL files created by the controller are put into the
37 37 :file:`~/.ipython/security` directory. If the engines share a filesystem with
38 38 the controller, step 2 can be skipped as the engines will automatically look
39 39 at that location.
40 40
41 41 The final step required required to actually use the running controller from a
42 42 client is to move the FURL files :file:`ipcontroller-mec.furl` and
43 43 :file:`ipcontroller-tc.furl` from ``host0`` to the host where the clients will
44 44 be run. If these file are put into the :file:`~/.ipython/security` directory of the client's host, they will be found automatically. Otherwise, the full path to them has to be passed to the client's constructor.
45 45
46 46 Using :command:`ipcluster`
47 47 ==========================
48 48
49 49 The :command:`ipcluster` command provides a simple way of starting a controller and engines in the following situations:
50 50
51 51 1. When the controller and engines are all run on localhost. This is useful
52 52 for testing or running on a multicore computer.
53 53 2. When engines are started using the :command:`mpirun` command that comes
54 54 with most MPI [MPI]_ implementations
55 55 3. When engines are started using the PBS [PBS]_ batch system.
56 56 4. When the controller is started on localhost and the engines are started on
57 57 remote nodes using :command:`ssh`.
58 58
59 59 .. note::
60 60
61 61 It is also possible for advanced users to add support to
62 62 :command:`ipcluster` for starting controllers and engines using other
63 63 methods (like Sun's Grid Engine for example).
64 64
65 65 .. note::
66 66
67 67 Currently :command:`ipcluster` requires that the
68 68 :file:`~/.ipython/security` directory live on a shared filesystem that is
69 69 seen by both the controller and engines. If you don't have a shared file
70 70 system you will need to use :command:`ipcontroller` and
71 71 :command:`ipengine` directly. This constraint can be relaxed if you are
72 72 using the :command:`ssh` method to start the cluster.
73 73
74 74 Underneath the hood, :command:`ipcluster` just uses :command:`ipcontroller`
75 75 and :command:`ipengine` to perform the steps described above.
76 76
77 77 Using :command:`ipcluster` in local mode
78 78 ----------------------------------------
79 79
80 80 To start one controller and 4 engines on localhost, just do::
81 81
82 82 $ ipcluster local -n 4
83 83
84 84 To see other command line options for the local mode, do::
85 85
86 86 $ ipcluster local -h
87 87
88 88 Using :command:`ipcluster` in mpiexec/mpirun mode
89 89 -------------------------------------------------
90 90
91 91 The mpiexec/mpirun mode is useful if you:
92 92
93 93 1. Have MPI installed.
94 94 2. Your systems are configured to use the :command:`mpiexec` or
95 95 :command:`mpirun` commands to start MPI processes.
96 96
97 97 .. note::
98 98
99 99 The preferred command to use is :command:`mpiexec`. However, we also
100 100 support :command:`mpirun` for backwards compatibility. The underlying
101 101 logic used is exactly the same, the only difference being the name of the
102 102 command line program that is called.
103 103
104 104 If these are satisfied, you can start an IPython cluster using::
105 105
106 106 $ ipcluster mpiexec -n 4
107 107
108 108 This does the following:
109 109
110 110 1. Starts the IPython controller on current host.
111 111 2. Uses :command:`mpiexec` to start 4 engines.
112 112
113 113 On newer MPI implementations (such as OpenMPI), this will work even if you don't make any calls to MPI or call :func:`MPI_Init`. However, older MPI implementations actually require each process to call :func:`MPI_Init` upon starting. The easiest way of having this done is to install the mpi4py [mpi4py]_ package and then call ipcluster with the ``--mpi`` option::
114 114
115 115 $ ipcluster mpiexec -n 4 --mpi=mpi4py
116 116
117 117 Unfortunately, even this won't work for some MPI implementations. If you are having problems with this, you will likely have to use a custom Python executable that itself calls :func:`MPI_Init` at the appropriate time. Fortunately, mpi4py comes with such a custom Python executable that is easy to install and use. However, this custom Python executable approach will not work with :command:`ipcluster` currently.
118 118
119 119 Additional command line options for this mode can be found by doing::
120 120
121 121 $ ipcluster mpiexec -h
122 122
123 123 More details on using MPI with IPython can be found :ref:`here <parallelmpi>`.
124 124
125 125
126 126 Using :command:`ipcluster` in PBS mode
127 127 --------------------------------------
128 128
129 129 The PBS mode uses the Portable Batch System [PBS]_ to start the engines. To use this mode, you first need to create a PBS script template that will be used to start the engines. Here is a sample PBS script template:
130 130
131 131 .. sourcecode:: bash
132 132
133 133 #PBS -N ipython
134 134 #PBS -j oe
135 135 #PBS -l walltime=00:10:00
136 136 #PBS -l nodes=${n/4}:ppn=4
137 137 #PBS -q parallel
138 138
139 139 cd $$PBS_O_WORKDIR
140 140 export PATH=$$HOME/usr/local/bin
141 141 export PYTHONPATH=$$HOME/usr/local/lib/python2.4/site-packages
142 142 /usr/local/bin/mpiexec -n ${n} ipengine --logfile=$$PBS_O_WORKDIR/ipengine
143 143
144 144 There are a few important points about this template:
145 145
146 146 1. This template will be rendered at runtime using IPython's :mod:`Itpl`
147 147 template engine.
148 148
149 149 2. Instead of putting in the actual number of engines, use the notation
150 150 ``${n}`` to indicate the number of engines to be started. You can also uses
151 151 expressions like ``${n/4}`` in the template to indicate the number of
152 152 nodes.
153 153
154 154 3. Because ``$`` is a special character used by the template engine, you must
155 155 escape any ``$`` by using ``$$``. This is important when referring to
156 156 environment variables in the template.
157 157
158 158 4. Any options to :command:`ipengine` should be given in the batch script
159 159 template.
160 160
161 161 5. Depending on the configuration of you system, you may have to set
162 162 environment variables in the script template.
163 163
164 164 Once you have created such a script, save it with a name like :file:`pbs.template`. Now you are ready to start your job::
165 165
166 166 $ ipcluster pbs -n 128 --pbs-script=pbs.template
167 167
168 168 Additional command line options for this mode can be found by doing::
169 169
170 170 $ ipcluster pbs -h
171 171
172 172 Using :command:`ipcluster` in SSH mode
173 173 --------------------------------------
174 174
175 175 The SSH mode uses :command:`ssh` to execute :command:`ipengine` on remote
176 176 nodes and the :command:`ipcontroller` on localhost.
177 177
178 178 When using using this mode it highly recommended that you have set up SSH keys and are using ssh-agent [SSH]_ for password-less logins.
179 179
180 180 To use this mode you need a python file describing the cluster, here is an example of such a "clusterfile":
181 181
182 182 .. sourcecode:: python
183 183
184 184 send_furl = True
185 185 engines = { 'host1.example.com' : 2,
186 186 'host2.example.com' : 5,
187 187 'host3.example.com' : 1,
188 188 'host4.example.com' : 8 }
189 189
190 190 Since this is a regular python file usual python syntax applies. Things to note:
191 191
192 192 * The `engines` dict, where the keys is the host we want to run engines on and
193 193 the value is the number of engines to run on that host.
194 194 * send_furl can either be `True` or `False`, if `True` it will copy over the
195 195 furl needed for :command:`ipengine` to each host.
196 196
197 197 The ``--clusterfile`` command line option lets you specify the file to use for
198 198 the cluster definition. Once you have your cluster file and you can
199 199 :command:`ssh` into the remote hosts with out an password you are ready to
200 200 start your cluster like so:
201 201
202 202 .. sourcecode:: bash
203 203
204 204 $ ipcluster ssh --clusterfile /path/to/my/clusterfile.py
205 205
206 206
207 207 Two helper shell scripts are used to start and stop :command:`ipengine` on remote hosts:
208 208
209 209 * sshx.sh
210 210 * engine_killer.sh
211 211
212 212 Defaults for both of these are contained in the source code for :command:`ipcluster`. The default scripts are written to a local file in a tmep directory and then copied to a temp directory on the remote host and executed from there. On most Unix, Linux and OS X systems this is /tmp.
213 213
214 214 The default sshx.sh is the following:
215 215
216 216 .. sourcecode:: bash
217 217
218 218 #!/bin/sh
219 219 "$@" &> /dev/null &
220 220 echo $!
221 221
222 222 If you want to use a custom sshx.sh script you need to use the ``--sshx``
223 223 option and specify the file to use. Using a custom sshx.sh file could be
224 224 helpful when you need to setup the environment on the remote host before
225 225 executing :command:`ipengine`.
226 226
227 227 For a detailed options list:
228 228
229 229 .. sourcecode:: bash
230 230
231 231 $ ipcluster ssh -h
232 232
233 233 Current limitations of the SSH mode of :command:`ipcluster` are:
234 234
235 235 * Untested on Windows. Would require a working :command:`ssh` on Windows.
236 236 Also, we are using shell scripts to setup and execute commands on remote
237 237 hosts.
238 238 * :command:`ipcontroller` is started on localhost, with no option to start it
239 239 on a remote node.
240 240
241 241 Using the :command:`ipcontroller` and :command:`ipengine` commands
242 242 ==================================================================
243 243
244 244 It is also possible to use the :command:`ipcontroller` and :command:`ipengine` commands to start your controller and engines. This approach gives you full control over all aspects of the startup process.
245 245
246 246 Starting the controller and engine on your local machine
247 247 --------------------------------------------------------
248 248
249 249 To use :command:`ipcontroller` and :command:`ipengine` to start things on your
250 250 local machine, do the following.
251 251
252 252 First start the controller::
253 253
254 254 $ ipcontroller
255 255
256 256 Next, start however many instances of the engine you want using (repeatedly) the command::
257 257
258 258 $ ipengine
259 259
260 260 The engines should start and automatically connect to the controller using the FURL files in :file:`~./ipython/security`. You are now ready to use the controller and engines from IPython.
261 261
262 262 .. warning::
263 263
264 264 The order of the above operations is very important. You *must*
265 265 start the controller before the engines, since the engines connect
266 266 to the controller as they get started.
267 267
268 268 .. note::
269 269
270 270 On some platforms (OS X), to put the controller and engine into the
271 271 background you may need to give these commands in the form ``(ipcontroller
272 272 &)`` and ``(ipengine &)`` (with the parentheses) for them to work
273 273 properly.
274 274
275 275 Starting the controller and engines on different hosts
276 276 ------------------------------------------------------
277 277
278 278 When the controller and engines are running on different hosts, things are
279 279 slightly more complicated, but the underlying ideas are the same:
280 280
281 281 1. Start the controller on a host using :command:`ipcontroller`.
282 282 2. Copy :file:`ipcontroller-engine.furl` from :file:`~./ipython/security` on the controller's host to the host where the engines will run.
283 283 3. Use :command:`ipengine` on the engine's hosts to start the engines.
284 284
285 285 The only thing you have to be careful of is to tell :command:`ipengine` where the :file:`ipcontroller-engine.furl` file is located. There are two ways you can do this:
286 286
287 287 * Put :file:`ipcontroller-engine.furl` in the :file:`~./ipython/security`
288 288 directory on the engine's host, where it will be found automatically.
289 289 * Call :command:`ipengine` with the ``--furl-file=full_path_to_the_file``
290 290 flag.
291 291
292 292 The ``--furl-file`` flag works like this::
293 293
294 294 $ ipengine --furl-file=/path/to/my/ipcontroller-engine.furl
295 295
296 296 .. note::
297 297
298 298 If the controller's and engine's hosts all have a shared file system
299 299 (:file:`~./ipython/security` is the same on all of them), then things
300 300 will just work!
301 301
302 302 Make FURL files persistent
303 303 ---------------------------
304 304
305 305 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
306 306
307 307 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
308 308
309 309 $ ipcontroller -r --client-port=10101 --engine-port=10102
310 310
311 These options also work with all of the various modes of
312 :command:`ipcluster`::
313
314 $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102
315
311 316 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
312 317
313 318 .. note::
314 319
315 320 You may ask the question: what ports does the controller listen on if you
316 321 don't tell is to use specific ones? The default is to use high random port
317 322 numbers. We do this for two reasons: i) to increase security through
318 323 obscurity and ii) to multiple controllers on a given host to start and
319 324 automatically use different ports.
320 325
321 326 Log files
322 327 ---------
323 328
324 329 All of the components of IPython have log files associated with them.
325 330 These log files can be extremely useful in debugging problems with
326 331 IPython and can be found in the directory :file:`~/.ipython/log`. Sending
327 332 the log files to us will often help us to debug any problems.
328 333
329 334
330 335 .. [PBS] Portable Batch System. http://www.openpbs.org/
331 336 .. [SSH] SSH-Agent http://en.wikipedia.org/wiki/Ssh-agent
General Comments 0
You need to be logged in to leave comments. Login now