##// END OF EJS Templates
add main_lsf for experimental LSF support
Justin Riley -
Show More
@@ -1,911 +1,948 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import tempfile
22 22 pjoin = os.path.join
23 23
24 24 from twisted.internet import reactor, defer
25 25 from twisted.internet.protocol import ProcessProtocol
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 28 from twisted.python import failure, log
29 29
30 30 from IPython.external import argparse
31 31 from IPython.external import Itpl
32 32 from IPython.genutils import (
33 33 get_ipython_dir,
34 34 get_log_dir,
35 35 get_security_dir,
36 36 num_cpus
37 37 )
38 38 from IPython.kernel.fcutil import have_crypto
39 39
40 40 # Create various ipython directories if they don't exist.
41 41 # This must be done before IPython.kernel.config is imported.
42 42 from IPython.iplib import user_setup
43 43 if os.name == 'posix':
44 44 rc_suffix = ''
45 45 else:
46 46 rc_suffix = '.ini'
47 47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 48 get_log_dir()
49 49 get_security_dir()
50 50
51 51 from IPython.kernel.config import config_manager as kernel_config_manager
52 52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 53 from IPython.kernel.fcutil import have_crypto
54 54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 55 from IPython.kernel.util import printer
56 56
57 57 #-----------------------------------------------------------------------------
58 58 # General process handling code
59 59 #-----------------------------------------------------------------------------
60 60
61 61
62 62 class ProcessStateError(Exception):
63 63 pass
64 64
65 65 class UnknownStatus(Exception):
66 66 pass
67 67
68 68 class LauncherProcessProtocol(ProcessProtocol):
69 69 """
70 70 A ProcessProtocol to go with the ProcessLauncher.
71 71 """
72 72 def __init__(self, process_launcher):
73 73 self.process_launcher = process_launcher
74 74
75 75 def connectionMade(self):
76 76 self.process_launcher.fire_start_deferred(self.transport.pid)
77 77
78 78 def processEnded(self, status):
79 79 value = status.value
80 80 if isinstance(value, ProcessDone):
81 81 self.process_launcher.fire_stop_deferred(0)
82 82 elif isinstance(value, ProcessTerminated):
83 83 self.process_launcher.fire_stop_deferred(
84 84 {'exit_code':value.exitCode,
85 85 'signal':value.signal,
86 86 'status':value.status
87 87 }
88 88 )
89 89 else:
90 90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91 91
92 92 def outReceived(self, data):
93 93 log.msg(data)
94 94
95 95 def errReceived(self, data):
96 96 log.err(data)
97 97
98 98 class ProcessLauncher(object):
99 99 """
100 100 Start and stop an external process in an asynchronous manner.
101 101
102 102 Currently this uses deferreds to notify other parties of process state
103 103 changes. This is an awkward design and should be moved to using
104 104 a formal NotificationCenter.
105 105 """
106 106 def __init__(self, cmd_and_args):
107 107 self.cmd = cmd_and_args[0]
108 108 self.args = cmd_and_args
109 109 self._reset()
110 110
111 111 def _reset(self):
112 112 self.process_protocol = None
113 113 self.pid = None
114 114 self.start_deferred = None
115 115 self.stop_deferreds = []
116 116 self.state = 'before' # before, running, or after
117 117
118 118 @property
119 119 def running(self):
120 120 if self.state == 'running':
121 121 return True
122 122 else:
123 123 return False
124 124
125 125 def fire_start_deferred(self, pid):
126 126 self.pid = pid
127 127 self.state = 'running'
128 128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 129 self.start_deferred.callback(pid)
130 130
131 131 def start(self):
132 132 if self.state == 'before':
133 133 self.process_protocol = LauncherProcessProtocol(self)
134 134 self.start_deferred = defer.Deferred()
135 135 self.process_transport = reactor.spawnProcess(
136 136 self.process_protocol,
137 137 self.cmd,
138 138 self.args,
139 139 env=os.environ
140 140 )
141 141 return self.start_deferred
142 142 else:
143 143 s = 'the process has already been started and has state: %r' % \
144 144 self.state
145 145 return defer.fail(ProcessStateError(s))
146 146
147 147 def get_stop_deferred(self):
148 148 if self.state == 'running' or self.state == 'before':
149 149 d = defer.Deferred()
150 150 self.stop_deferreds.append(d)
151 151 return d
152 152 else:
153 153 s = 'this process is already complete'
154 154 return defer.fail(ProcessStateError(s))
155 155
156 156 def fire_stop_deferred(self, exit_code):
157 157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 158 self.state = 'after'
159 159 for d in self.stop_deferreds:
160 160 d.callback(exit_code)
161 161
162 162 def signal(self, sig):
163 163 """
164 164 Send a signal to the process.
165 165
166 166 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 167 """
168 168 if self.state == 'running':
169 169 self.process_transport.signalProcess(sig)
170 170
171 171 # def __del__(self):
172 172 # self.signal('KILL')
173 173
174 174 def interrupt_then_kill(self, delay=1.0):
175 175 self.signal('INT')
176 176 reactor.callLater(delay, self.signal, 'KILL')
177 177
178 178
179 179 #-----------------------------------------------------------------------------
180 180 # Code for launching controller and engines
181 181 #-----------------------------------------------------------------------------
182 182
183 183
184 184 class ControllerLauncher(ProcessLauncher):
185 185
186 186 def __init__(self, extra_args=None):
187 187 if sys.platform == 'win32':
188 188 # This logic is needed because the ipcontroller script doesn't
189 189 # always get installed in the same way or in the same location.
190 190 from IPython.kernel.scripts import ipcontroller
191 191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 192 # The -u option here turns on unbuffered output, which is required
193 193 # on Win32 to prevent wierd conflict and problems with Twisted.
194 194 # Also, use sys.executable to make sure we are picking up the
195 195 # right python exe.
196 196 args = [sys.executable, '-u', script_location]
197 197 else:
198 198 args = ['ipcontroller']
199 199 self.extra_args = extra_args
200 200 if extra_args is not None:
201 201 args.extend(extra_args)
202 202
203 203 ProcessLauncher.__init__(self, args)
204 204
205 205
206 206 class EngineLauncher(ProcessLauncher):
207 207
208 208 def __init__(self, extra_args=None):
209 209 if sys.platform == 'win32':
210 210 # This logic is needed because the ipcontroller script doesn't
211 211 # always get installed in the same way or in the same location.
212 212 from IPython.kernel.scripts import ipengine
213 213 script_location = ipengine.__file__.replace('.pyc', '.py')
214 214 # The -u option here turns on unbuffered output, which is required
215 215 # on Win32 to prevent wierd conflict and problems with Twisted.
216 216 # Also, use sys.executable to make sure we are picking up the
217 217 # right python exe.
218 218 args = [sys.executable, '-u', script_location]
219 219 else:
220 220 args = ['ipengine']
221 221 self.extra_args = extra_args
222 222 if extra_args is not None:
223 223 args.extend(extra_args)
224 224
225 225 ProcessLauncher.__init__(self, args)
226 226
227 227
228 228 class LocalEngineSet(object):
229 229
230 230 def __init__(self, extra_args=None):
231 231 self.extra_args = extra_args
232 232 self.launchers = []
233 233
234 234 def start(self, n):
235 235 dlist = []
236 236 for i in range(n):
237 237 print "starting engine:", i
238 238 el = EngineLauncher(extra_args=self.extra_args)
239 239 d = el.start()
240 240 self.launchers.append(el)
241 241 dlist.append(d)
242 242 dfinal = gatherBoth(dlist, consumeErrors=True)
243 243 dfinal.addCallback(self._handle_start)
244 244 return dfinal
245 245
246 246 def _handle_start(self, r):
247 247 log.msg('Engines started with pids: %r' % r)
248 248 return r
249 249
250 250 def _handle_stop(self, r):
251 251 log.msg('Engines received signal: %r' % r)
252 252 return r
253 253
254 254 def signal(self, sig):
255 255 dlist = []
256 256 for el in self.launchers:
257 257 d = el.get_stop_deferred()
258 258 dlist.append(d)
259 259 el.signal(sig)
260 260 dfinal = gatherBoth(dlist, consumeErrors=True)
261 261 dfinal.addCallback(self._handle_stop)
262 262 return dfinal
263 263
264 264 def interrupt_then_kill(self, delay=1.0):
265 265 dlist = []
266 266 for el in self.launchers:
267 267 d = el.get_stop_deferred()
268 268 dlist.append(d)
269 269 el.interrupt_then_kill(delay)
270 270 dfinal = gatherBoth(dlist, consumeErrors=True)
271 271 dfinal.addCallback(self._handle_stop)
272 272 return dfinal
273 273
274 274 class BatchEngineSet(object):
275 275
276 276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 277 name = ''
278 278 submit_command = ''
279 279 delete_command = ''
280 280 script_param_prefix = ''
281 281 job_id_regexp = ''
282 282 job_array_regexp = ''
283 283 default_template = ''
284 284
285 285 def __init__(self, template_file, **kwargs):
286 286 self.template_file = template_file
287 287
288 288 def parse_job_id(self, output):
289 289 m = re.search(self.job_id_regexp, output)
290 290 if m is not None:
291 291 job_id = m.group()
292 292 else:
293 293 raise Exception("job id couldn't be determined: %s" % output)
294 294 self.job_id = job_id
295 295 log.msg('Job started with job id: %r' % job_id)
296 296 return job_id
297 297
298 298 def handle_error(self, f):
299 299 f.printTraceback()
300 300 f.raiseException()
301 301
302 302 def start(self, n):
303 303 log.msg("starting %d engines" % n)
304 304 self._temp_file = tempfile.NamedTemporaryFile()
305 305 regex = re.compile(self.job_array_regexp)
306 306 if self.template_file:
307 307 log.msg("Using %s script %s" % (self.name, self.template_file))
308 308 contents = open(self.template_file, 'r').read()
309 309 if not regex.search(contents):
310 310 log.msg("adding job array settings to %s script" % self.name)
311 311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
312 312 self._temp_file.write(contents)
313 313 self.template_file = self._temp_file.name
314 314 else:
315 315 log.msg("using default ipengine %s script: \n%s" %
316 316 (self.name, (self.default_template % n)))
317 317 self._temp_file.file.write(self.default_template % n)
318 318 self.template_file = self._temp_file.name
319 319 self._temp_file.file.flush()
320 320 d = getProcessOutput(self.submit_command,
321 321 [self.template_file],
322 322 env=os.environ)
323 323 d.addCallback(self.parse_job_id)
324 324 d.addErrback(self.handle_error)
325 325 return d
326 326
327 327 def kill(self):
328 328 d = getProcessOutput(self.delete_command,
329 329 [self.job_id],env=os.environ)
330 330 return d
331 331
332 332 class PBSEngineSet(BatchEngineSet):
333 333
334 334 name = 'PBS'
335 335 submit_command = 'qsub'
336 336 delete_command = 'qdel'
337 337 script_param_prefix = "#PBS"
338 338 job_id_regexp = '\d+'
339 339 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
340 340 default_template="""#PBS -V
341 341 #PBS -t 1-%d
342 342 #PBS -N ipengine
343 343 eid=$(($PBS_ARRAYID - 1))
344 344 ipengine --logfile=ipengine${eid}.log
345 345 """
346 346
347 347 class SGEEngineSet(PBSEngineSet):
348 348
349 349 name = 'SGE'
350 350 script_param_prefix = "#$"
351 351 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
352 352 default_template="""#$ -V
353 353 #$ -t 1-%d
354 354 #$ -N ipengine
355 355 eid=$(($SGE_TASK_ID - 1))
356 356 ipengine --logfile=ipengine${eid}.log
357 357 """
358 358
359 359 class LSFEngineSet(PBSEngineSet):
360 360
361 361 name = 'LSF'
362 362 submit_command = 'bsub'
363 363 delete_command = 'bkill'
364 364 script_param_prefix = "#BSUB"
365 365 job_array_regexp = '#BSUB[ \t]+\w+\[\d+-\d+\]'
366 366 default_template="""#BSUB ipengine[1-%d]
367 367 eid=$(($LSB_JOBINDEX - 1))
368 368 ipengine --logfile=ipengine${eid}.log
369 369 """
370 370
371 371 sshx_template="""#!/bin/sh
372 372 "$@" &> /dev/null &
373 373 echo $!
374 374 """
375 375
376 376 engine_killer_template="""#!/bin/sh
377 377 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
378 378 """
379 379
380 380 class SSHEngineSet(object):
381 381 sshx_template=sshx_template
382 382 engine_killer_template=engine_killer_template
383 383
384 384 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
385 385 """Start a controller on localhost and engines using ssh.
386 386
387 387 The engine_hosts argument is a dict with hostnames as keys and
388 388 the number of engine (int) as values. sshx is the name of a local
389 389 file that will be used to run remote commands. This file is used
390 390 to setup the environment properly.
391 391 """
392 392
393 393 self.temp_dir = tempfile.gettempdir()
394 394 if sshx is not None:
395 395 self.sshx = sshx
396 396 else:
397 397 # Write the sshx.sh file locally from our template.
398 398 self.sshx = os.path.join(
399 399 self.temp_dir,
400 400 '%s-main-sshx.sh' % os.environ['USER']
401 401 )
402 402 f = open(self.sshx, 'w')
403 403 f.writelines(self.sshx_template)
404 404 f.close()
405 405 self.engine_command = ipengine
406 406 self.engine_hosts = engine_hosts
407 407 # Write the engine killer script file locally from our template.
408 408 self.engine_killer = os.path.join(
409 409 self.temp_dir,
410 410 '%s-local-engine_killer.sh' % os.environ['USER']
411 411 )
412 412 f = open(self.engine_killer, 'w')
413 413 f.writelines(self.engine_killer_template)
414 414 f.close()
415 415
416 416 def start(self, send_furl=False):
417 417 dlist = []
418 418 for host in self.engine_hosts.keys():
419 419 count = self.engine_hosts[host]
420 420 d = self._start(host, count, send_furl)
421 421 dlist.append(d)
422 422 return gatherBoth(dlist, consumeErrors=True)
423 423
424 424 def _start(self, hostname, count=1, send_furl=False):
425 425 if send_furl:
426 426 d = self._scp_furl(hostname)
427 427 else:
428 428 d = defer.succeed(None)
429 429 d.addCallback(lambda r: self._scp_sshx(hostname))
430 430 d.addCallback(lambda r: self._ssh_engine(hostname, count))
431 431 return d
432 432
433 433 def _scp_furl(self, hostname):
434 434 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
435 435 cmd_list = scp_cmd.split()
436 436 cmd_list[1] = os.path.expanduser(cmd_list[1])
437 437 log.msg('Copying furl file: %s' % scp_cmd)
438 438 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
439 439 return d
440 440
441 441 def _scp_sshx(self, hostname):
442 442 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
443 443 self.sshx, hostname,
444 444 self.temp_dir, os.environ['USER']
445 445 )
446 446 print
447 447 log.msg("Copying sshx: %s" % scp_cmd)
448 448 sshx_scp = scp_cmd.split()
449 449 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
450 450 return d
451 451
452 452 def _ssh_engine(self, hostname, count):
453 453 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
454 454 hostname, self.temp_dir,
455 455 os.environ['USER'], self.engine_command
456 456 )
457 457 cmds = exec_engine.split()
458 458 dlist = []
459 459 log.msg("about to start engines...")
460 460 for i in range(count):
461 461 log.msg('Starting engines: %s' % exec_engine)
462 462 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
463 463 dlist.append(d)
464 464 return gatherBoth(dlist, consumeErrors=True)
465 465
466 466 def kill(self):
467 467 dlist = []
468 468 for host in self.engine_hosts.keys():
469 469 d = self._killall(host)
470 470 dlist.append(d)
471 471 return gatherBoth(dlist, consumeErrors=True)
472 472
473 473 def _killall(self, hostname):
474 474 d = self._scp_engine_killer(hostname)
475 475 d.addCallback(lambda r: self._ssh_kill(hostname))
476 476 # d.addErrback(self._exec_err)
477 477 return d
478 478
479 479 def _scp_engine_killer(self, hostname):
480 480 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
481 481 self.engine_killer,
482 482 hostname,
483 483 self.temp_dir,
484 484 os.environ['USER']
485 485 )
486 486 cmds = scp_cmd.split()
487 487 log.msg('Copying engine_killer: %s' % scp_cmd)
488 488 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
489 489 return d
490 490
491 491 def _ssh_kill(self, hostname):
492 492 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
493 493 hostname,
494 494 self.temp_dir,
495 495 os.environ['USER']
496 496 )
497 497 log.msg('Killing engine: %s' % kill_cmd)
498 498 kill_cmd = kill_cmd.split()
499 499 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
500 500 return d
501 501
502 502 def _exec_err(self, r):
503 503 log.msg(r)
504 504
505 505 #-----------------------------------------------------------------------------
506 506 # Main functions for the different types of clusters
507 507 #-----------------------------------------------------------------------------
508 508
509 509 # TODO:
510 510 # The logic in these codes should be moved into classes like LocalCluster
511 511 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
512 512 # The main functions should then just parse the command line arguments, create
513 513 # the appropriate class and call a 'start' method.
514 514
515 515
516 516 def check_security(args, cont_args):
517 517 """Check to see if we should run with SSL support."""
518 518 if (not args.x or not args.y) and not have_crypto:
519 519 log.err("""
520 520 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
521 521 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
522 522 reactor.stop()
523 523 return False
524 524 if args.x:
525 525 cont_args.append('-x')
526 526 if args.y:
527 527 cont_args.append('-y')
528 528 return True
529 529
530 530
531 531 def check_reuse(args, cont_args):
532 532 """Check to see if we should try to resuse FURL files."""
533 533 if args.r:
534 534 cont_args.append('-r')
535 535 if args.client_port == 0 or args.engine_port == 0:
536 536 log.err("""
537 537 To reuse FURL files, you must also set the client and engine ports using
538 538 the --client-port and --engine-port options.""")
539 539 reactor.stop()
540 540 return False
541 541 cont_args.append('--client-port=%i' % args.client_port)
542 542 cont_args.append('--engine-port=%i' % args.engine_port)
543 543 return True
544 544
545 545
546 546 def _err_and_stop(f):
547 547 """Errback to log a failure and halt the reactor on a fatal error."""
548 548 log.err(f)
549 549 reactor.stop()
550 550
551 551
552 552 def _delay_start(cont_pid, start_engines, furl_file, reuse):
553 553 """Wait for controller to create FURL files and the start the engines."""
554 554 if not reuse:
555 555 if os.path.isfile(furl_file):
556 556 os.unlink(furl_file)
557 557 log.msg('Waiting for controller to finish starting...')
558 558 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
559 559 d.addCallback(lambda _: log.msg('Controller started'))
560 560 d.addCallback(lambda _: start_engines(cont_pid))
561 561 return d
562 562
563 563
564 564 def main_local(args):
565 565 cont_args = []
566 566 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
567 567
568 568 # Check security settings before proceeding
569 569 if not check_security(args, cont_args):
570 570 return
571 571
572 572 # See if we are reusing FURL files
573 573 if not check_reuse(args, cont_args):
574 574 return
575 575
576 576 cl = ControllerLauncher(extra_args=cont_args)
577 577 dstart = cl.start()
578 578 def start_engines(cont_pid):
579 579 engine_args = []
580 580 engine_args.append('--logfile=%s' % \
581 581 pjoin(args.logdir,'ipengine%s-' % cont_pid))
582 582 eset = LocalEngineSet(extra_args=engine_args)
583 583 def shutdown(signum, frame):
584 584 log.msg('Stopping local cluster')
585 585 # We are still playing with the times here, but these seem
586 586 # to be reliable in allowing everything to exit cleanly.
587 587 eset.interrupt_then_kill(0.5)
588 588 cl.interrupt_then_kill(0.5)
589 589 reactor.callLater(1.0, reactor.stop)
590 590 signal.signal(signal.SIGINT,shutdown)
591 591 d = eset.start(args.n)
592 592 return d
593 593 config = kernel_config_manager.get_config_obj()
594 594 furl_file = config['controller']['engine_furl_file']
595 595 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
596 596 dstart.addErrback(_err_and_stop)
597 597
598 598
599 599 def main_mpi(args):
600 600 cont_args = []
601 601 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
602 602
603 603 # Check security settings before proceeding
604 604 if not check_security(args, cont_args):
605 605 return
606 606
607 607 # See if we are reusing FURL files
608 608 if not check_reuse(args, cont_args):
609 609 return
610 610
611 611 cl = ControllerLauncher(extra_args=cont_args)
612 612 dstart = cl.start()
613 613 def start_engines(cont_pid):
614 614 raw_args = [args.cmd]
615 615 raw_args.extend(['-n',str(args.n)])
616 616 raw_args.append('ipengine')
617 617 raw_args.append('-l')
618 618 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
619 619 if args.mpi:
620 620 raw_args.append('--mpi=%s' % args.mpi)
621 621 eset = ProcessLauncher(raw_args)
622 622 def shutdown(signum, frame):
623 623 log.msg('Stopping local cluster')
624 624 # We are still playing with the times here, but these seem
625 625 # to be reliable in allowing everything to exit cleanly.
626 626 eset.interrupt_then_kill(1.0)
627 627 cl.interrupt_then_kill(1.0)
628 628 reactor.callLater(2.0, reactor.stop)
629 629 signal.signal(signal.SIGINT,shutdown)
630 630 d = eset.start()
631 631 return d
632 632 config = kernel_config_manager.get_config_obj()
633 633 furl_file = config['controller']['engine_furl_file']
634 634 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
635 635 dstart.addErrback(_err_and_stop)
636 636
637 637
638 638 def main_pbs(args):
639 639 cont_args = []
640 640 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
641 641
642 642 # Check security settings before proceeding
643 643 if not check_security(args, cont_args):
644 644 return
645 645
646 646 # See if we are reusing FURL files
647 647 if not check_reuse(args, cont_args):
648 648 return
649
650 if args.pbsscript and not os.path.isfile(args.pbsscript):
651 log.err('PBS script does not exist: %s' % args.pbsscript)
652 return
649 653
650 654 cl = ControllerLauncher(extra_args=cont_args)
651 655 dstart = cl.start()
652 656 def start_engines(r):
653 657 pbs_set = PBSEngineSet(args.pbsscript)
654 658 def shutdown(signum, frame):
655 log.msg('Stopping pbs cluster')
659 log.msg('Stopping PBS cluster')
656 660 d = pbs_set.kill()
657 661 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
658 662 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
659 663 signal.signal(signal.SIGINT,shutdown)
660 664 d = pbs_set.start(args.n)
661 665 return d
662 666 config = kernel_config_manager.get_config_obj()
663 667 furl_file = config['controller']['engine_furl_file']
664 668 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
665 669 dstart.addErrback(_err_and_stop)
666 670
667 671 def main_sge(args):
668 672 cont_args = []
669 673 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
670 674
671 675 # Check security settings before proceeding
672 676 if not check_security(args, cont_args):
673 677 return
674 678
675 679 # See if we are reusing FURL files
676 680 if not check_reuse(args, cont_args):
677 681 return
678 682
679 683 if args.sgescript and not os.path.isfile(args.sgescript):
680 684 log.err('SGE script does not exist: %s' % args.sgescript)
681 685 return
682 686
683 687 cl = ControllerLauncher(extra_args=cont_args)
684 688 dstart = cl.start()
685 689 def start_engines(r):
686 690 sge_set = SGEEngineSet(args.sgescript)
687 691 def shutdown(signum, frame):
688 692 log.msg('Stopping sge cluster')
689 693 d = sge_set.kill()
690 694 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
691 695 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
692 696 signal.signal(signal.SIGINT,shutdown)
693 697 d = sge_set.start(args.n)
694 698 return d
695 699 config = kernel_config_manager.get_config_obj()
696 700 furl_file = config['controller']['engine_furl_file']
697 701 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
698 702 dstart.addErrback(_err_and_stop)
699 703
704 def main_lsf(args):
705 cont_args = []
706 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
707
708 # Check security settings before proceeding
709 if not check_security(args, cont_args):
710 return
711
712 # See if we are reusing FURL files
713 if not check_reuse(args, cont_args):
714 return
715
716 if args.lsfscript and not os.path.isfile(args.lsfscript):
717 log.err('LSF script does not exist: %s' % args.lsfscript)
718 return
719
720 cl = ControllerLauncher(extra_args=cont_args)
721 dstart = cl.start()
722 def start_engines(r):
723 lsf_set = LSFEngineSet(args.lsfscript)
724 def shutdown(signum, frame):
725 log.msg('Stopping LSF cluster')
726 d = lsf_set.kill()
727 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
728 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
729 signal.signal(signal.SIGINT,shutdown)
730 d = lsf_set.start(args.n)
731 return d
732 config = kernel_config_manager.get_config_obj()
733 furl_file = config['controller']['engine_furl_file']
734 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
735 dstart.addErrback(_err_and_stop)
736
700 737
701 738 def main_ssh(args):
702 739 """Start a controller on localhost and engines using ssh.
703 740
704 741 Your clusterfile should look like::
705 742
706 743 send_furl = False # True, if you want
707 744 engines = {
708 745 'engine_host1' : engine_count,
709 746 'engine_host2' : engine_count2
710 747 }
711 748 """
712 749 clusterfile = {}
713 750 execfile(args.clusterfile, clusterfile)
714 751 if not clusterfile.has_key('send_furl'):
715 752 clusterfile['send_furl'] = False
716 753
717 754 cont_args = []
718 755 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
719 756
720 757 # Check security settings before proceeding
721 758 if not check_security(args, cont_args):
722 759 return
723 760
724 761 # See if we are reusing FURL files
725 762 if not check_reuse(args, cont_args):
726 763 return
727 764
728 765 cl = ControllerLauncher(extra_args=cont_args)
729 766 dstart = cl.start()
730 767 def start_engines(cont_pid):
731 768 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
732 769 def shutdown(signum, frame):
733 770 d = ssh_set.kill()
734 771 cl.interrupt_then_kill(1.0)
735 772 reactor.callLater(2.0, reactor.stop)
736 773 signal.signal(signal.SIGINT,shutdown)
737 774 d = ssh_set.start(clusterfile['send_furl'])
738 775 return d
739 776 config = kernel_config_manager.get_config_obj()
740 777 furl_file = config['controller']['engine_furl_file']
741 778 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
742 779 dstart.addErrback(_err_and_stop)
743 780
744 781
745 782 def get_args():
746 783 base_parser = argparse.ArgumentParser(add_help=False)
747 784 base_parser.add_argument(
748 785 '-r',
749 786 action='store_true',
750 787 dest='r',
751 788 help='try to reuse FURL files. Use with --client-port and --engine-port'
752 789 )
753 790 base_parser.add_argument(
754 791 '--client-port',
755 792 type=int,
756 793 dest='client_port',
757 794 help='the port the controller will listen on for client connections',
758 795 default=0
759 796 )
760 797 base_parser.add_argument(
761 798 '--engine-port',
762 799 type=int,
763 800 dest='engine_port',
764 801 help='the port the controller will listen on for engine connections',
765 802 default=0
766 803 )
767 804 base_parser.add_argument(
768 805 '-x',
769 806 action='store_true',
770 807 dest='x',
771 808 help='turn off client security'
772 809 )
773 810 base_parser.add_argument(
774 811 '-y',
775 812 action='store_true',
776 813 dest='y',
777 814 help='turn off engine security'
778 815 )
779 816 base_parser.add_argument(
780 817 "--logdir",
781 818 type=str,
782 819 dest="logdir",
783 820 help="directory to put log files (default=$IPYTHONDIR/log)",
784 821 default=pjoin(get_ipython_dir(),'log')
785 822 )
786 823 base_parser.add_argument(
787 824 "-n",
788 825 "--num",
789 826 type=int,
790 827 dest="n",
791 828 default=2,
792 829 help="the number of engines to start"
793 830 )
794 831
795 832 parser = argparse.ArgumentParser(
796 833 description='IPython cluster startup. This starts a controller and\
797 834 engines using various approaches. Use the IPYTHONDIR environment\
798 835 variable to change your IPython directory from the default of\
799 836 .ipython or _ipython. The log and security subdirectories of your\
800 837 IPython directory will be used by this script for log files and\
801 838 security files.'
802 839 )
803 840 subparsers = parser.add_subparsers(
804 841 help='available cluster types. For help, do "ipcluster TYPE --help"')
805 842
806 843 parser_local = subparsers.add_parser(
807 844 'local',
808 845 help='run a local cluster',
809 846 parents=[base_parser]
810 847 )
811 848 parser_local.set_defaults(func=main_local)
812 849
813 850 parser_mpirun = subparsers.add_parser(
814 851 'mpirun',
815 852 help='run a cluster using mpirun (mpiexec also works)',
816 853 parents=[base_parser]
817 854 )
818 855 parser_mpirun.add_argument(
819 856 "--mpi",
820 857 type=str,
821 858 dest="mpi", # Don't put a default here to allow no MPI support
822 859 help="how to call MPI_Init (default=mpi4py)"
823 860 )
824 861 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
825 862
826 863 parser_mpiexec = subparsers.add_parser(
827 864 'mpiexec',
828 865 help='run a cluster using mpiexec (mpirun also works)',
829 866 parents=[base_parser]
830 867 )
831 868 parser_mpiexec.add_argument(
832 869 "--mpi",
833 870 type=str,
834 871 dest="mpi", # Don't put a default here to allow no MPI support
835 872 help="how to call MPI_Init (default=mpi4py)"
836 873 )
837 874 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
838 875
839 876 parser_pbs = subparsers.add_parser(
840 877 'pbs',
841 878 help='run a pbs cluster',
842 879 parents=[base_parser]
843 880 )
844 881 parser_pbs.add_argument(
845 882 '--pbs-script',
846 883 type=str,
847 884 dest='pbsscript',
848 885 help='PBS script template',
849 886 default=''
850 887 )
851 888 parser_pbs.set_defaults(func=main_pbs)
852 889
853 890 parser_sge = subparsers.add_parser(
854 891 'sge',
855 892 help='run an sge cluster',
856 893 parents=[base_parser]
857 894 )
858 895 parser_sge.add_argument(
859 896 '--sge-script',
860 897 type=str,
861 898 dest='sgescript',
862 899 help='SGE script template',
863 900 default='' # SGEEngineSet will create one if not specified
864 901 )
865 902 parser_sge.set_defaults(func=main_sge)
866 903
867 904 parser_lsf = subparsers.add_parser(
868 905 'lsf',
869 906 help='run an lsf cluster',
870 907 parents=[base_parser]
871 908 )
872 909 parser_lsf.add_argument(
873 910 '--lsf-script',
874 911 type=str,
875 912 dest='lsfscript',
876 913 help='LSF script template',
877 914 default='' # LSFEngineSet will create one if not specified
878 915 )
879 parser_lsf.set_defaults(func=main_sge)
916 parser_lsf.set_defaults(func=main_lsf)
880 917
881 918 parser_ssh = subparsers.add_parser(
882 919 'ssh',
883 920 help='run a cluster using ssh, should have ssh-keys setup',
884 921 parents=[base_parser]
885 922 )
886 923 parser_ssh.add_argument(
887 924 '--clusterfile',
888 925 type=str,
889 926 dest='clusterfile',
890 927 help='python file describing the cluster',
891 928 default='clusterfile.py',
892 929 )
893 930 parser_ssh.add_argument(
894 931 '--sshx',
895 932 type=str,
896 933 dest='sshx',
897 934 help='sshx launcher helper'
898 935 )
899 936 parser_ssh.set_defaults(func=main_ssh)
900 937
901 938 args = parser.parse_args()
902 939 return args
903 940
904 941 def main():
905 942 args = get_args()
906 943 reactor.callWhenRunning(args.func, args)
907 944 log.startLogging(sys.stdout)
908 945 reactor.run()
909 946
910 947 if __name__ == '__main__':
911 948 main()
General Comments 0
You need to be logged in to leave comments. Login now