##// END OF EJS Templates
add experimental LSF support (needs testing)
Justin Riley -
Show More
@@ -1,885 +1,911 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import tempfile
22 22 pjoin = os.path.join
23 23
24 24 from twisted.internet import reactor, defer
25 25 from twisted.internet.protocol import ProcessProtocol
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 28 from twisted.python import failure, log
29 29
30 30 from IPython.external import argparse
31 31 from IPython.external import Itpl
32 32 from IPython.genutils import (
33 33 get_ipython_dir,
34 34 get_log_dir,
35 35 get_security_dir,
36 36 num_cpus
37 37 )
38 38 from IPython.kernel.fcutil import have_crypto
39 39
40 40 # Create various ipython directories if they don't exist.
41 41 # This must be done before IPython.kernel.config is imported.
42 42 from IPython.iplib import user_setup
43 43 if os.name == 'posix':
44 44 rc_suffix = ''
45 45 else:
46 46 rc_suffix = '.ini'
47 47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 48 get_log_dir()
49 49 get_security_dir()
50 50
51 51 from IPython.kernel.config import config_manager as kernel_config_manager
52 52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 53 from IPython.kernel.fcutil import have_crypto
54 54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 55 from IPython.kernel.util import printer
56 56
57 57 #-----------------------------------------------------------------------------
58 58 # General process handling code
59 59 #-----------------------------------------------------------------------------
60 60
61 61
62 62 class ProcessStateError(Exception):
63 63 pass
64 64
65 65 class UnknownStatus(Exception):
66 66 pass
67 67
68 68 class LauncherProcessProtocol(ProcessProtocol):
69 69 """
70 70 A ProcessProtocol to go with the ProcessLauncher.
71 71 """
72 72 def __init__(self, process_launcher):
73 73 self.process_launcher = process_launcher
74 74
75 75 def connectionMade(self):
76 76 self.process_launcher.fire_start_deferred(self.transport.pid)
77 77
78 78 def processEnded(self, status):
79 79 value = status.value
80 80 if isinstance(value, ProcessDone):
81 81 self.process_launcher.fire_stop_deferred(0)
82 82 elif isinstance(value, ProcessTerminated):
83 83 self.process_launcher.fire_stop_deferred(
84 84 {'exit_code':value.exitCode,
85 85 'signal':value.signal,
86 86 'status':value.status
87 87 }
88 88 )
89 89 else:
90 90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91 91
92 92 def outReceived(self, data):
93 93 log.msg(data)
94 94
95 95 def errReceived(self, data):
96 96 log.err(data)
97 97
98 98 class ProcessLauncher(object):
99 99 """
100 100 Start and stop an external process in an asynchronous manner.
101 101
102 102 Currently this uses deferreds to notify other parties of process state
103 103 changes. This is an awkward design and should be moved to using
104 104 a formal NotificationCenter.
105 105 """
106 106 def __init__(self, cmd_and_args):
107 107 self.cmd = cmd_and_args[0]
108 108 self.args = cmd_and_args
109 109 self._reset()
110 110
111 111 def _reset(self):
112 112 self.process_protocol = None
113 113 self.pid = None
114 114 self.start_deferred = None
115 115 self.stop_deferreds = []
116 116 self.state = 'before' # before, running, or after
117 117
118 118 @property
119 119 def running(self):
120 120 if self.state == 'running':
121 121 return True
122 122 else:
123 123 return False
124 124
125 125 def fire_start_deferred(self, pid):
126 126 self.pid = pid
127 127 self.state = 'running'
128 128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 129 self.start_deferred.callback(pid)
130 130
131 131 def start(self):
132 132 if self.state == 'before':
133 133 self.process_protocol = LauncherProcessProtocol(self)
134 134 self.start_deferred = defer.Deferred()
135 135 self.process_transport = reactor.spawnProcess(
136 136 self.process_protocol,
137 137 self.cmd,
138 138 self.args,
139 139 env=os.environ
140 140 )
141 141 return self.start_deferred
142 142 else:
143 143 s = 'the process has already been started and has state: %r' % \
144 144 self.state
145 145 return defer.fail(ProcessStateError(s))
146 146
147 147 def get_stop_deferred(self):
148 148 if self.state == 'running' or self.state == 'before':
149 149 d = defer.Deferred()
150 150 self.stop_deferreds.append(d)
151 151 return d
152 152 else:
153 153 s = 'this process is already complete'
154 154 return defer.fail(ProcessStateError(s))
155 155
156 156 def fire_stop_deferred(self, exit_code):
157 157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 158 self.state = 'after'
159 159 for d in self.stop_deferreds:
160 160 d.callback(exit_code)
161 161
162 162 def signal(self, sig):
163 163 """
164 164 Send a signal to the process.
165 165
166 166 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 167 """
168 168 if self.state == 'running':
169 169 self.process_transport.signalProcess(sig)
170 170
171 171 # def __del__(self):
172 172 # self.signal('KILL')
173 173
174 174 def interrupt_then_kill(self, delay=1.0):
175 175 self.signal('INT')
176 176 reactor.callLater(delay, self.signal, 'KILL')
177 177
178 178
179 179 #-----------------------------------------------------------------------------
180 180 # Code for launching controller and engines
181 181 #-----------------------------------------------------------------------------
182 182
183 183
184 184 class ControllerLauncher(ProcessLauncher):
185 185
186 186 def __init__(self, extra_args=None):
187 187 if sys.platform == 'win32':
188 188 # This logic is needed because the ipcontroller script doesn't
189 189 # always get installed in the same way or in the same location.
190 190 from IPython.kernel.scripts import ipcontroller
191 191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 192 # The -u option here turns on unbuffered output, which is required
193 193 # on Win32 to prevent wierd conflict and problems with Twisted.
194 194 # Also, use sys.executable to make sure we are picking up the
195 195 # right python exe.
196 196 args = [sys.executable, '-u', script_location]
197 197 else:
198 198 args = ['ipcontroller']
199 199 self.extra_args = extra_args
200 200 if extra_args is not None:
201 201 args.extend(extra_args)
202 202
203 203 ProcessLauncher.__init__(self, args)
204 204
205 205
206 206 class EngineLauncher(ProcessLauncher):
207 207
208 208 def __init__(self, extra_args=None):
209 209 if sys.platform == 'win32':
210 210 # This logic is needed because the ipcontroller script doesn't
211 211 # always get installed in the same way or in the same location.
212 212 from IPython.kernel.scripts import ipengine
213 213 script_location = ipengine.__file__.replace('.pyc', '.py')
214 214 # The -u option here turns on unbuffered output, which is required
215 215 # on Win32 to prevent wierd conflict and problems with Twisted.
216 216 # Also, use sys.executable to make sure we are picking up the
217 217 # right python exe.
218 218 args = [sys.executable, '-u', script_location]
219 219 else:
220 220 args = ['ipengine']
221 221 self.extra_args = extra_args
222 222 if extra_args is not None:
223 223 args.extend(extra_args)
224 224
225 225 ProcessLauncher.__init__(self, args)
226 226
227 227
228 228 class LocalEngineSet(object):
229 229
230 230 def __init__(self, extra_args=None):
231 231 self.extra_args = extra_args
232 232 self.launchers = []
233 233
234 234 def start(self, n):
235 235 dlist = []
236 236 for i in range(n):
237 237 print "starting engine:", i
238 238 el = EngineLauncher(extra_args=self.extra_args)
239 239 d = el.start()
240 240 self.launchers.append(el)
241 241 dlist.append(d)
242 242 dfinal = gatherBoth(dlist, consumeErrors=True)
243 243 dfinal.addCallback(self._handle_start)
244 244 return dfinal
245 245
246 246 def _handle_start(self, r):
247 247 log.msg('Engines started with pids: %r' % r)
248 248 return r
249 249
250 250 def _handle_stop(self, r):
251 251 log.msg('Engines received signal: %r' % r)
252 252 return r
253 253
254 254 def signal(self, sig):
255 255 dlist = []
256 256 for el in self.launchers:
257 257 d = el.get_stop_deferred()
258 258 dlist.append(d)
259 259 el.signal(sig)
260 260 dfinal = gatherBoth(dlist, consumeErrors=True)
261 261 dfinal.addCallback(self._handle_stop)
262 262 return dfinal
263 263
264 264 def interrupt_then_kill(self, delay=1.0):
265 265 dlist = []
266 266 for el in self.launchers:
267 267 d = el.get_stop_deferred()
268 268 dlist.append(d)
269 269 el.interrupt_then_kill(delay)
270 270 dfinal = gatherBoth(dlist, consumeErrors=True)
271 271 dfinal.addCallback(self._handle_stop)
272 272 return dfinal
273 273
274 274 class BatchEngineSet(object):
275 275
276 276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 277 name = ''
278 278 submit_command = ''
279 279 delete_command = ''
280 280 script_param_prefix = ''
281 281 job_id_regexp = ''
282 282 job_array_regexp = ''
283 283 default_template = ''
284 284
285 285 def __init__(self, template_file, **kwargs):
286 286 self.template_file = template_file
287 287
288 288 def parse_job_id(self, output):
289 289 m = re.search(self.job_id_regexp, output)
290 290 if m is not None:
291 291 job_id = m.group()
292 292 else:
293 293 raise Exception("job id couldn't be determined: %s" % output)
294 294 self.job_id = job_id
295 295 log.msg('Job started with job id: %r' % job_id)
296 296 return job_id
297 297
298 298 def handle_error(self, f):
299 299 f.printTraceback()
300 300 f.raiseException()
301 301
302 302 def start(self, n):
303 303 log.msg("starting %d engines" % n)
304 304 self._temp_file = tempfile.NamedTemporaryFile()
305 305 regex = re.compile(self.job_array_regexp)
306 306 if self.template_file:
307 307 log.msg("Using %s script %s" % (self.name, self.template_file))
308 308 contents = open(self.template_file, 'r').read()
309 309 if not regex.search(contents):
310 310 log.msg("adding job array settings to %s script" % self.name)
311 311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
312 312 self._temp_file.write(contents)
313 313 self.template_file = self._temp_file.name
314 314 else:
315 315 log.msg("using default ipengine %s script: \n%s" %
316 316 (self.name, (self.default_template % n)))
317 317 self._temp_file.file.write(self.default_template % n)
318 318 self.template_file = self._temp_file.name
319 319 self._temp_file.file.flush()
320 320 d = getProcessOutput(self.submit_command,
321 321 [self.template_file],
322 322 env=os.environ)
323 323 d.addCallback(self.parse_job_id)
324 324 d.addErrback(self.handle_error)
325 325 return d
326 326
327 327 def kill(self):
328 328 d = getProcessOutput(self.delete_command,
329 329 [self.job_id],env=os.environ)
330 330 return d
331 331
332 332 class PBSEngineSet(BatchEngineSet):
333 333
334 334 name = 'PBS'
335 335 submit_command = 'qsub'
336 336 delete_command = 'qdel'
337 337 script_param_prefix = "#PBS"
338 338 job_id_regexp = '\d+'
339 339 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
340 340 default_template="""#PBS -V
341 341 #PBS -t 1-%d
342 342 #PBS -N ipengine
343 343 eid=$(($PBS_ARRAYID - 1))
344 344 ipengine --logfile=ipengine${eid}.log
345 345 """
346 346
347 347 class SGEEngineSet(PBSEngineSet):
348 348
349 349 name = 'SGE'
350 350 script_param_prefix = "#$"
351 351 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
352 352 default_template="""#$ -V
353 353 #$ -t 1-%d
354 354 #$ -N ipengine
355 355 eid=$(($SGE_TASK_ID - 1))
356 356 ipengine --logfile=ipengine${eid}.log
357 357 """
358 358
359 class LSFEngineSet(PBSEngineSet):
360
361 name = 'LSF'
362 submit_command = 'bsub'
363 delete_command = 'bkill'
364 script_param_prefix = "#BSUB"
365 job_array_regexp = '#BSUB[ \t]+\w+\[\d+-\d+\]'
366 default_template="""#BSUB ipengine[1-%d]
367 eid=$(($LSB_JOBINDEX - 1))
368 ipengine --logfile=ipengine${eid}.log
369 """
370
359 371 sshx_template="""#!/bin/sh
360 372 "$@" &> /dev/null &
361 373 echo $!
362 374 """
363 375
364 376 engine_killer_template="""#!/bin/sh
365 377 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
366 378 """
367 379
368 380 class SSHEngineSet(object):
369 381 sshx_template=sshx_template
370 382 engine_killer_template=engine_killer_template
371 383
372 384 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
373 385 """Start a controller on localhost and engines using ssh.
374 386
375 387 The engine_hosts argument is a dict with hostnames as keys and
376 388 the number of engine (int) as values. sshx is the name of a local
377 389 file that will be used to run remote commands. This file is used
378 390 to setup the environment properly.
379 391 """
380 392
381 393 self.temp_dir = tempfile.gettempdir()
382 394 if sshx is not None:
383 395 self.sshx = sshx
384 396 else:
385 397 # Write the sshx.sh file locally from our template.
386 398 self.sshx = os.path.join(
387 399 self.temp_dir,
388 400 '%s-main-sshx.sh' % os.environ['USER']
389 401 )
390 402 f = open(self.sshx, 'w')
391 403 f.writelines(self.sshx_template)
392 404 f.close()
393 405 self.engine_command = ipengine
394 406 self.engine_hosts = engine_hosts
395 407 # Write the engine killer script file locally from our template.
396 408 self.engine_killer = os.path.join(
397 409 self.temp_dir,
398 410 '%s-local-engine_killer.sh' % os.environ['USER']
399 411 )
400 412 f = open(self.engine_killer, 'w')
401 413 f.writelines(self.engine_killer_template)
402 414 f.close()
403 415
404 416 def start(self, send_furl=False):
405 417 dlist = []
406 418 for host in self.engine_hosts.keys():
407 419 count = self.engine_hosts[host]
408 420 d = self._start(host, count, send_furl)
409 421 dlist.append(d)
410 422 return gatherBoth(dlist, consumeErrors=True)
411 423
412 424 def _start(self, hostname, count=1, send_furl=False):
413 425 if send_furl:
414 426 d = self._scp_furl(hostname)
415 427 else:
416 428 d = defer.succeed(None)
417 429 d.addCallback(lambda r: self._scp_sshx(hostname))
418 430 d.addCallback(lambda r: self._ssh_engine(hostname, count))
419 431 return d
420 432
421 433 def _scp_furl(self, hostname):
422 434 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
423 435 cmd_list = scp_cmd.split()
424 436 cmd_list[1] = os.path.expanduser(cmd_list[1])
425 437 log.msg('Copying furl file: %s' % scp_cmd)
426 438 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
427 439 return d
428 440
429 441 def _scp_sshx(self, hostname):
430 442 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
431 443 self.sshx, hostname,
432 444 self.temp_dir, os.environ['USER']
433 445 )
434 446 print
435 447 log.msg("Copying sshx: %s" % scp_cmd)
436 448 sshx_scp = scp_cmd.split()
437 449 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
438 450 return d
439 451
440 452 def _ssh_engine(self, hostname, count):
441 453 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
442 454 hostname, self.temp_dir,
443 455 os.environ['USER'], self.engine_command
444 456 )
445 457 cmds = exec_engine.split()
446 458 dlist = []
447 459 log.msg("about to start engines...")
448 460 for i in range(count):
449 461 log.msg('Starting engines: %s' % exec_engine)
450 462 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
451 463 dlist.append(d)
452 464 return gatherBoth(dlist, consumeErrors=True)
453 465
454 466 def kill(self):
455 467 dlist = []
456 468 for host in self.engine_hosts.keys():
457 469 d = self._killall(host)
458 470 dlist.append(d)
459 471 return gatherBoth(dlist, consumeErrors=True)
460 472
461 473 def _killall(self, hostname):
462 474 d = self._scp_engine_killer(hostname)
463 475 d.addCallback(lambda r: self._ssh_kill(hostname))
464 476 # d.addErrback(self._exec_err)
465 477 return d
466 478
467 479 def _scp_engine_killer(self, hostname):
468 480 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
469 481 self.engine_killer,
470 482 hostname,
471 483 self.temp_dir,
472 484 os.environ['USER']
473 485 )
474 486 cmds = scp_cmd.split()
475 487 log.msg('Copying engine_killer: %s' % scp_cmd)
476 488 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
477 489 return d
478 490
479 491 def _ssh_kill(self, hostname):
480 492 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
481 493 hostname,
482 494 self.temp_dir,
483 495 os.environ['USER']
484 496 )
485 497 log.msg('Killing engine: %s' % kill_cmd)
486 498 kill_cmd = kill_cmd.split()
487 499 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
488 500 return d
489 501
490 502 def _exec_err(self, r):
491 503 log.msg(r)
492 504
493 505 #-----------------------------------------------------------------------------
494 506 # Main functions for the different types of clusters
495 507 #-----------------------------------------------------------------------------
496 508
497 509 # TODO:
498 510 # The logic in these codes should be moved into classes like LocalCluster
499 511 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
500 512 # The main functions should then just parse the command line arguments, create
501 513 # the appropriate class and call a 'start' method.
502 514
503 515
504 516 def check_security(args, cont_args):
505 517 """Check to see if we should run with SSL support."""
506 518 if (not args.x or not args.y) and not have_crypto:
507 519 log.err("""
508 520 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
509 521 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
510 522 reactor.stop()
511 523 return False
512 524 if args.x:
513 525 cont_args.append('-x')
514 526 if args.y:
515 527 cont_args.append('-y')
516 528 return True
517 529
518 530
519 531 def check_reuse(args, cont_args):
520 532 """Check to see if we should try to resuse FURL files."""
521 533 if args.r:
522 534 cont_args.append('-r')
523 535 if args.client_port == 0 or args.engine_port == 0:
524 536 log.err("""
525 537 To reuse FURL files, you must also set the client and engine ports using
526 538 the --client-port and --engine-port options.""")
527 539 reactor.stop()
528 540 return False
529 541 cont_args.append('--client-port=%i' % args.client_port)
530 542 cont_args.append('--engine-port=%i' % args.engine_port)
531 543 return True
532 544
533 545
534 546 def _err_and_stop(f):
535 547 """Errback to log a failure and halt the reactor on a fatal error."""
536 548 log.err(f)
537 549 reactor.stop()
538 550
539 551
540 552 def _delay_start(cont_pid, start_engines, furl_file, reuse):
541 553 """Wait for controller to create FURL files and the start the engines."""
542 554 if not reuse:
543 555 if os.path.isfile(furl_file):
544 556 os.unlink(furl_file)
545 557 log.msg('Waiting for controller to finish starting...')
546 558 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
547 559 d.addCallback(lambda _: log.msg('Controller started'))
548 560 d.addCallback(lambda _: start_engines(cont_pid))
549 561 return d
550 562
551 563
552 564 def main_local(args):
553 565 cont_args = []
554 566 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
555 567
556 568 # Check security settings before proceeding
557 569 if not check_security(args, cont_args):
558 570 return
559 571
560 572 # See if we are reusing FURL files
561 573 if not check_reuse(args, cont_args):
562 574 return
563 575
564 576 cl = ControllerLauncher(extra_args=cont_args)
565 577 dstart = cl.start()
566 578 def start_engines(cont_pid):
567 579 engine_args = []
568 580 engine_args.append('--logfile=%s' % \
569 581 pjoin(args.logdir,'ipengine%s-' % cont_pid))
570 582 eset = LocalEngineSet(extra_args=engine_args)
571 583 def shutdown(signum, frame):
572 584 log.msg('Stopping local cluster')
573 585 # We are still playing with the times here, but these seem
574 586 # to be reliable in allowing everything to exit cleanly.
575 587 eset.interrupt_then_kill(0.5)
576 588 cl.interrupt_then_kill(0.5)
577 589 reactor.callLater(1.0, reactor.stop)
578 590 signal.signal(signal.SIGINT,shutdown)
579 591 d = eset.start(args.n)
580 592 return d
581 593 config = kernel_config_manager.get_config_obj()
582 594 furl_file = config['controller']['engine_furl_file']
583 595 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
584 596 dstart.addErrback(_err_and_stop)
585 597
586 598
587 599 def main_mpi(args):
588 600 cont_args = []
589 601 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
590 602
591 603 # Check security settings before proceeding
592 604 if not check_security(args, cont_args):
593 605 return
594 606
595 607 # See if we are reusing FURL files
596 608 if not check_reuse(args, cont_args):
597 609 return
598 610
599 611 cl = ControllerLauncher(extra_args=cont_args)
600 612 dstart = cl.start()
601 613 def start_engines(cont_pid):
602 614 raw_args = [args.cmd]
603 615 raw_args.extend(['-n',str(args.n)])
604 616 raw_args.append('ipengine')
605 617 raw_args.append('-l')
606 618 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
607 619 if args.mpi:
608 620 raw_args.append('--mpi=%s' % args.mpi)
609 621 eset = ProcessLauncher(raw_args)
610 622 def shutdown(signum, frame):
611 623 log.msg('Stopping local cluster')
612 624 # We are still playing with the times here, but these seem
613 625 # to be reliable in allowing everything to exit cleanly.
614 626 eset.interrupt_then_kill(1.0)
615 627 cl.interrupt_then_kill(1.0)
616 628 reactor.callLater(2.0, reactor.stop)
617 629 signal.signal(signal.SIGINT,shutdown)
618 630 d = eset.start()
619 631 return d
620 632 config = kernel_config_manager.get_config_obj()
621 633 furl_file = config['controller']['engine_furl_file']
622 634 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
623 635 dstart.addErrback(_err_and_stop)
624 636
625 637
626 638 def main_pbs(args):
627 639 cont_args = []
628 640 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
629 641
630 642 # Check security settings before proceeding
631 643 if not check_security(args, cont_args):
632 644 return
633 645
634 646 # See if we are reusing FURL files
635 647 if not check_reuse(args, cont_args):
636 648 return
637 649
638 650 cl = ControllerLauncher(extra_args=cont_args)
639 651 dstart = cl.start()
640 652 def start_engines(r):
641 653 pbs_set = PBSEngineSet(args.pbsscript)
642 654 def shutdown(signum, frame):
643 655 log.msg('Stopping pbs cluster')
644 656 d = pbs_set.kill()
645 657 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
646 658 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
647 659 signal.signal(signal.SIGINT,shutdown)
648 660 d = pbs_set.start(args.n)
649 661 return d
650 662 config = kernel_config_manager.get_config_obj()
651 663 furl_file = config['controller']['engine_furl_file']
652 664 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
653 665 dstart.addErrback(_err_and_stop)
654 666
655 667 def main_sge(args):
656 668 cont_args = []
657 669 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
658 670
659 671 # Check security settings before proceeding
660 672 if not check_security(args, cont_args):
661 673 return
662 674
663 675 # See if we are reusing FURL files
664 676 if not check_reuse(args, cont_args):
665 677 return
666 678
667 679 if args.sgescript and not os.path.isfile(args.sgescript):
668 680 log.err('SGE script does not exist: %s' % args.sgescript)
669 681 return
670 682
671 683 cl = ControllerLauncher(extra_args=cont_args)
672 684 dstart = cl.start()
673 685 def start_engines(r):
674 686 sge_set = SGEEngineSet(args.sgescript)
675 687 def shutdown(signum, frame):
676 688 log.msg('Stopping sge cluster')
677 689 d = sge_set.kill()
678 690 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
679 691 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
680 692 signal.signal(signal.SIGINT,shutdown)
681 693 d = sge_set.start(args.n)
682 694 return d
683 695 config = kernel_config_manager.get_config_obj()
684 696 furl_file = config['controller']['engine_furl_file']
685 697 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
686 698 dstart.addErrback(_err_and_stop)
687 699
688 700
689 701 def main_ssh(args):
690 702 """Start a controller on localhost and engines using ssh.
691 703
692 704 Your clusterfile should look like::
693 705
694 706 send_furl = False # True, if you want
695 707 engines = {
696 708 'engine_host1' : engine_count,
697 709 'engine_host2' : engine_count2
698 710 }
699 711 """
700 712 clusterfile = {}
701 713 execfile(args.clusterfile, clusterfile)
702 714 if not clusterfile.has_key('send_furl'):
703 715 clusterfile['send_furl'] = False
704 716
705 717 cont_args = []
706 718 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
707 719
708 720 # Check security settings before proceeding
709 721 if not check_security(args, cont_args):
710 722 return
711 723
712 724 # See if we are reusing FURL files
713 725 if not check_reuse(args, cont_args):
714 726 return
715 727
716 728 cl = ControllerLauncher(extra_args=cont_args)
717 729 dstart = cl.start()
718 730 def start_engines(cont_pid):
719 731 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
720 732 def shutdown(signum, frame):
721 733 d = ssh_set.kill()
722 734 cl.interrupt_then_kill(1.0)
723 735 reactor.callLater(2.0, reactor.stop)
724 736 signal.signal(signal.SIGINT,shutdown)
725 737 d = ssh_set.start(clusterfile['send_furl'])
726 738 return d
727 739 config = kernel_config_manager.get_config_obj()
728 740 furl_file = config['controller']['engine_furl_file']
729 741 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
730 742 dstart.addErrback(_err_and_stop)
731 743
732 744
733 745 def get_args():
734 746 base_parser = argparse.ArgumentParser(add_help=False)
735 747 base_parser.add_argument(
736 748 '-r',
737 749 action='store_true',
738 750 dest='r',
739 751 help='try to reuse FURL files. Use with --client-port and --engine-port'
740 752 )
741 753 base_parser.add_argument(
742 754 '--client-port',
743 755 type=int,
744 756 dest='client_port',
745 757 help='the port the controller will listen on for client connections',
746 758 default=0
747 759 )
748 760 base_parser.add_argument(
749 761 '--engine-port',
750 762 type=int,
751 763 dest='engine_port',
752 764 help='the port the controller will listen on for engine connections',
753 765 default=0
754 766 )
755 767 base_parser.add_argument(
756 768 '-x',
757 769 action='store_true',
758 770 dest='x',
759 771 help='turn off client security'
760 772 )
761 773 base_parser.add_argument(
762 774 '-y',
763 775 action='store_true',
764 776 dest='y',
765 777 help='turn off engine security'
766 778 )
767 779 base_parser.add_argument(
768 780 "--logdir",
769 781 type=str,
770 782 dest="logdir",
771 783 help="directory to put log files (default=$IPYTHONDIR/log)",
772 784 default=pjoin(get_ipython_dir(),'log')
773 785 )
774 786 base_parser.add_argument(
775 787 "-n",
776 788 "--num",
777 789 type=int,
778 790 dest="n",
779 791 default=2,
780 792 help="the number of engines to start"
781 793 )
782 794
783 795 parser = argparse.ArgumentParser(
784 796 description='IPython cluster startup. This starts a controller and\
785 797 engines using various approaches. Use the IPYTHONDIR environment\
786 798 variable to change your IPython directory from the default of\
787 799 .ipython or _ipython. The log and security subdirectories of your\
788 800 IPython directory will be used by this script for log files and\
789 801 security files.'
790 802 )
791 803 subparsers = parser.add_subparsers(
792 804 help='available cluster types. For help, do "ipcluster TYPE --help"')
793 805
794 806 parser_local = subparsers.add_parser(
795 807 'local',
796 808 help='run a local cluster',
797 809 parents=[base_parser]
798 810 )
799 811 parser_local.set_defaults(func=main_local)
800 812
801 813 parser_mpirun = subparsers.add_parser(
802 814 'mpirun',
803 815 help='run a cluster using mpirun (mpiexec also works)',
804 816 parents=[base_parser]
805 817 )
806 818 parser_mpirun.add_argument(
807 819 "--mpi",
808 820 type=str,
809 821 dest="mpi", # Don't put a default here to allow no MPI support
810 822 help="how to call MPI_Init (default=mpi4py)"
811 823 )
812 824 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
813 825
814 826 parser_mpiexec = subparsers.add_parser(
815 827 'mpiexec',
816 828 help='run a cluster using mpiexec (mpirun also works)',
817 829 parents=[base_parser]
818 830 )
819 831 parser_mpiexec.add_argument(
820 832 "--mpi",
821 833 type=str,
822 834 dest="mpi", # Don't put a default here to allow no MPI support
823 835 help="how to call MPI_Init (default=mpi4py)"
824 836 )
825 837 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
826 838
827 839 parser_pbs = subparsers.add_parser(
828 840 'pbs',
829 841 help='run a pbs cluster',
830 842 parents=[base_parser]
831 843 )
832 844 parser_pbs.add_argument(
833 845 '--pbs-script',
834 846 type=str,
835 847 dest='pbsscript',
836 848 help='PBS script template',
837 849 default=''
838 850 )
839 851 parser_pbs.set_defaults(func=main_pbs)
840 852
841 853 parser_sge = subparsers.add_parser(
842 854 'sge',
843 855 help='run an sge cluster',
844 856 parents=[base_parser]
845 857 )
846 858 parser_sge.add_argument(
847 859 '--sge-script',
848 860 type=str,
849 861 dest='sgescript',
850 862 help='SGE script template',
851 863 default='' # SGEEngineSet will create one if not specified
852 864 )
853 865 parser_sge.set_defaults(func=main_sge)
866
867 parser_lsf = subparsers.add_parser(
868 'lsf',
869 help='run an lsf cluster',
870 parents=[base_parser]
871 )
872 parser_lsf.add_argument(
873 '--lsf-script',
874 type=str,
875 dest='lsfscript',
876 help='LSF script template',
877 default='' # LSFEngineSet will create one if not specified
878 )
879 parser_lsf.set_defaults(func=main_sge)
854 880
855 881 parser_ssh = subparsers.add_parser(
856 882 'ssh',
857 883 help='run a cluster using ssh, should have ssh-keys setup',
858 884 parents=[base_parser]
859 885 )
860 886 parser_ssh.add_argument(
861 887 '--clusterfile',
862 888 type=str,
863 889 dest='clusterfile',
864 890 help='python file describing the cluster',
865 891 default='clusterfile.py',
866 892 )
867 893 parser_ssh.add_argument(
868 894 '--sshx',
869 895 type=str,
870 896 dest='sshx',
871 897 help='sshx launcher helper'
872 898 )
873 899 parser_ssh.set_defaults(func=main_ssh)
874 900
875 901 args = parser.parse_args()
876 902 return args
877 903
878 904 def main():
879 905 args = get_args()
880 906 reactor.callWhenRunning(args.func, args)
881 907 log.startLogging(sys.stdout)
882 908 reactor.run()
883 909
884 910 if __name__ == '__main__':
885 911 main()
General Comments 0
You need to be logged in to leave comments. Login now