##// END OF EJS Templates
force /bin/sh in default SGE/PBS/LFS templates
Justin Riley -
Show More
@@ -1,1001 +1,1003 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import stat
22 22 import tempfile
23 23 pjoin = os.path.join
24 24
25 25 from twisted.internet import reactor, defer
26 26 from twisted.internet.protocol import ProcessProtocol
27 27 from twisted.internet.error import ProcessDone, ProcessTerminated
28 28 from twisted.internet.utils import getProcessOutput
29 29 from twisted.python import failure, log
30 30
31 31 from IPython.external import argparse
32 32 from IPython.external import Itpl
33 33 from IPython.genutils import (
34 34 get_ipython_dir,
35 35 get_log_dir,
36 36 get_security_dir,
37 37 num_cpus
38 38 )
39 39 from IPython.kernel.fcutil import have_crypto
40 40
41 41 # Create various ipython directories if they don't exist.
42 42 # This must be done before IPython.kernel.config is imported.
43 43 from IPython.iplib import user_setup
44 44 if os.name == 'posix':
45 45 rc_suffix = ''
46 46 else:
47 47 rc_suffix = '.ini'
48 48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
49 49 get_log_dir()
50 50 get_security_dir()
51 51
52 52 from IPython.kernel.config import config_manager as kernel_config_manager
53 53 from IPython.kernel.error import SecurityError, FileTimeoutError
54 54 from IPython.kernel.fcutil import have_crypto
55 55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
56 56 from IPython.kernel.util import printer
57 57
58 58 #-----------------------------------------------------------------------------
59 59 # General process handling code
60 60 #-----------------------------------------------------------------------------
61 61
62 62
63 63 class ProcessStateError(Exception):
64 64 pass
65 65
66 66 class UnknownStatus(Exception):
67 67 pass
68 68
69 69 class LauncherProcessProtocol(ProcessProtocol):
70 70 """
71 71 A ProcessProtocol to go with the ProcessLauncher.
72 72 """
73 73 def __init__(self, process_launcher):
74 74 self.process_launcher = process_launcher
75 75
76 76 def connectionMade(self):
77 77 self.process_launcher.fire_start_deferred(self.transport.pid)
78 78
79 79 def processEnded(self, status):
80 80 value = status.value
81 81 if isinstance(value, ProcessDone):
82 82 self.process_launcher.fire_stop_deferred(0)
83 83 elif isinstance(value, ProcessTerminated):
84 84 self.process_launcher.fire_stop_deferred(
85 85 {'exit_code':value.exitCode,
86 86 'signal':value.signal,
87 87 'status':value.status
88 88 }
89 89 )
90 90 else:
91 91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
92 92
93 93 def outReceived(self, data):
94 94 log.msg(data)
95 95
96 96 def errReceived(self, data):
97 97 log.err(data)
98 98
99 99 class ProcessLauncher(object):
100 100 """
101 101 Start and stop an external process in an asynchronous manner.
102 102
103 103 Currently this uses deferreds to notify other parties of process state
104 104 changes. This is an awkward design and should be moved to using
105 105 a formal NotificationCenter.
106 106 """
107 107 def __init__(self, cmd_and_args):
108 108 self.cmd = cmd_and_args[0]
109 109 self.args = cmd_and_args
110 110 self._reset()
111 111
112 112 def _reset(self):
113 113 self.process_protocol = None
114 114 self.pid = None
115 115 self.start_deferred = None
116 116 self.stop_deferreds = []
117 117 self.state = 'before' # before, running, or after
118 118
119 119 @property
120 120 def running(self):
121 121 if self.state == 'running':
122 122 return True
123 123 else:
124 124 return False
125 125
126 126 def fire_start_deferred(self, pid):
127 127 self.pid = pid
128 128 self.state = 'running'
129 129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
130 130 self.start_deferred.callback(pid)
131 131
132 132 def start(self):
133 133 if self.state == 'before':
134 134 self.process_protocol = LauncherProcessProtocol(self)
135 135 self.start_deferred = defer.Deferred()
136 136 self.process_transport = reactor.spawnProcess(
137 137 self.process_protocol,
138 138 self.cmd,
139 139 self.args,
140 140 env=os.environ
141 141 )
142 142 return self.start_deferred
143 143 else:
144 144 s = 'the process has already been started and has state: %r' % \
145 145 self.state
146 146 return defer.fail(ProcessStateError(s))
147 147
148 148 def get_stop_deferred(self):
149 149 if self.state == 'running' or self.state == 'before':
150 150 d = defer.Deferred()
151 151 self.stop_deferreds.append(d)
152 152 return d
153 153 else:
154 154 s = 'this process is already complete'
155 155 return defer.fail(ProcessStateError(s))
156 156
157 157 def fire_stop_deferred(self, exit_code):
158 158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
159 159 self.state = 'after'
160 160 for d in self.stop_deferreds:
161 161 d.callback(exit_code)
162 162
163 163 def signal(self, sig):
164 164 """
165 165 Send a signal to the process.
166 166
167 167 The argument sig can be ('KILL','INT', etc.) or any signal number.
168 168 """
169 169 if self.state == 'running':
170 170 self.process_transport.signalProcess(sig)
171 171
172 172 # def __del__(self):
173 173 # self.signal('KILL')
174 174
175 175 def interrupt_then_kill(self, delay=1.0):
176 176 self.signal('INT')
177 177 reactor.callLater(delay, self.signal, 'KILL')
178 178
179 179
180 180 #-----------------------------------------------------------------------------
181 181 # Code for launching controller and engines
182 182 #-----------------------------------------------------------------------------
183 183
184 184
185 185 class ControllerLauncher(ProcessLauncher):
186 186
187 187 def __init__(self, extra_args=None):
188 188 if sys.platform == 'win32':
189 189 # This logic is needed because the ipcontroller script doesn't
190 190 # always get installed in the same way or in the same location.
191 191 from IPython.kernel.scripts import ipcontroller
192 192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
193 193 # The -u option here turns on unbuffered output, which is required
194 194 # on Win32 to prevent wierd conflict and problems with Twisted.
195 195 # Also, use sys.executable to make sure we are picking up the
196 196 # right python exe.
197 197 args = [sys.executable, '-u', script_location]
198 198 else:
199 199 args = ['ipcontroller']
200 200 self.extra_args = extra_args
201 201 if extra_args is not None:
202 202 args.extend(extra_args)
203 203
204 204 ProcessLauncher.__init__(self, args)
205 205
206 206
207 207 class EngineLauncher(ProcessLauncher):
208 208
209 209 def __init__(self, extra_args=None):
210 210 if sys.platform == 'win32':
211 211 # This logic is needed because the ipcontroller script doesn't
212 212 # always get installed in the same way or in the same location.
213 213 from IPython.kernel.scripts import ipengine
214 214 script_location = ipengine.__file__.replace('.pyc', '.py')
215 215 # The -u option here turns on unbuffered output, which is required
216 216 # on Win32 to prevent wierd conflict and problems with Twisted.
217 217 # Also, use sys.executable to make sure we are picking up the
218 218 # right python exe.
219 219 args = [sys.executable, '-u', script_location]
220 220 else:
221 221 args = ['ipengine']
222 222 self.extra_args = extra_args
223 223 if extra_args is not None:
224 224 args.extend(extra_args)
225 225
226 226 ProcessLauncher.__init__(self, args)
227 227
228 228
229 229 class LocalEngineSet(object):
230 230
231 231 def __init__(self, extra_args=None):
232 232 self.extra_args = extra_args
233 233 self.launchers = []
234 234
235 235 def start(self, n):
236 236 dlist = []
237 237 for i in range(n):
238 238 print "starting engine:", i
239 239 el = EngineLauncher(extra_args=self.extra_args)
240 240 d = el.start()
241 241 self.launchers.append(el)
242 242 dlist.append(d)
243 243 dfinal = gatherBoth(dlist, consumeErrors=True)
244 244 dfinal.addCallback(self._handle_start)
245 245 return dfinal
246 246
247 247 def _handle_start(self, r):
248 248 log.msg('Engines started with pids: %r' % r)
249 249 return r
250 250
251 251 def _handle_stop(self, r):
252 252 log.msg('Engines received signal: %r' % r)
253 253 return r
254 254
255 255 def signal(self, sig):
256 256 dlist = []
257 257 for el in self.launchers:
258 258 d = el.get_stop_deferred()
259 259 dlist.append(d)
260 260 el.signal(sig)
261 261 dfinal = gatherBoth(dlist, consumeErrors=True)
262 262 dfinal.addCallback(self._handle_stop)
263 263 return dfinal
264 264
265 265 def interrupt_then_kill(self, delay=1.0):
266 266 dlist = []
267 267 for el in self.launchers:
268 268 d = el.get_stop_deferred()
269 269 dlist.append(d)
270 270 el.interrupt_then_kill(delay)
271 271 dfinal = gatherBoth(dlist, consumeErrors=True)
272 272 dfinal.addCallback(self._handle_stop)
273 273 return dfinal
274 274
275 275 class BatchEngineSet(object):
276 276
277 277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
278 278 name = ''
279 279 submit_command = ''
280 280 delete_command = ''
281 281 job_id_regexp = ''
282 282 job_array_regexp = ''
283 283 job_array_template = ''
284 284 queue_regexp = ''
285 285 queue_template = ''
286 286 default_template = ''
287 287
288 288 def __init__(self, template_file, queue, **kwargs):
289 289 self.template_file = template_file
290 290 self.queue = queue
291 291
292 292 def parse_job_id(self, output):
293 293 m = re.search(self.job_id_regexp, output)
294 294 if m is not None:
295 295 job_id = m.group()
296 296 else:
297 297 raise Exception("job id couldn't be determined: %s" % output)
298 298 self.job_id = job_id
299 299 log.msg('Job started with job id: %r' % job_id)
300 300 return job_id
301 301
302 302 def handle_error(self, f):
303 303 f.printTraceback()
304 304 f.raiseException()
305 305
306 306 def start(self, n):
307 307 log.msg("starting %d engines" % n)
308 308 self._temp_file = tempfile.NamedTemporaryFile()
309 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
309 310 if self.template_file:
310 311 log.msg("Using %s script %s" % (self.name, self.template_file))
311 312 contents = open(self.template_file, 'r').read()
312 313 new_script = contents
313 314 regex = re.compile(self.job_array_regexp)
314 315 if not regex.search(contents):
315 316 log.msg("adding job array settings to %s script" % self.name)
316 317 new_script = self.job_array_template % n +'\n' + new_script
317 318 print self.queue_regexp
318 319 regex = re.compile(self.queue_regexp)
319 320 print regex.search(contents)
320 321 if self.queue and not regex.search(contents):
321 322 log.msg("adding queue settings to %s script" % self.name)
322 323 new_script = self.queue_template % self.queue + '\n' + new_script
323 324 if new_script != contents:
324 325 self._temp_file.write(new_script)
325 326 self.template_file = self._temp_file.name
326 327 else:
327 328 default_script = self.default_template % n
328 329 if self.queue:
329 330 default_script = self.queue_template % self.queue + \
330 331 '\n' + default_script
331 332 log.msg("using default ipengine %s script: \n%s" %
332 333 (self.name, default_script))
333 334 self._temp_file.file.write(default_script)
334 335 self.template_file = self._temp_file.name
335 self._temp_file.file.flush()
336 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
337 336 self._temp_file.file.close()
338 337 d = getProcessOutput(self.submit_command,
339 338 [self.template_file],
340 339 env=os.environ)
341 340 d.addCallback(self.parse_job_id)
342 341 d.addErrback(self.handle_error)
343 342 return d
344 343
345 344 def kill(self):
346 345 d = getProcessOutput(self.delete_command,
347 346 [self.job_id],env=os.environ)
348 347 return d
349 348
350 349 class PBSEngineSet(BatchEngineSet):
351 350
352 351 name = 'PBS'
353 352 submit_command = 'qsub'
354 353 delete_command = 'qdel'
355 354 job_id_regexp = '\d+'
356 355 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
357 356 job_array_template = '#PBS -t 1-%d'
358 357 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
359 358 queue_template = '#PBS -q %s'
360 default_template="""#PBS -V
359 default_template="""#!/bin/sh
360 #PBS -V
361 361 #PBS -t 1-%d
362 362 #PBS -N ipengine
363 363 eid=$(($PBS_ARRAYID - 1))
364 364 ipengine --logfile=ipengine${eid}.log
365 365 """
366 366
367 367 class SGEEngineSet(PBSEngineSet):
368 368
369 369 name = 'SGE'
370 370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
371 371 job_array_template = '#$ -t 1-%d'
372 372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
373 373 queue_template = '#$ -q %s'
374 374 default_template="""#$ -V
375 #$ -S /bin/sh
375 376 #$ -t 1-%d
376 377 #$ -N ipengine
377 378 eid=$(($SGE_TASK_ID - 1))
378 379 ipengine --logfile=ipengine${eid}.log
379 380 """
380 381
381 382 class LSFEngineSet(PBSEngineSet):
382 383
383 384 name = 'LSF'
384 385 submit_command = 'bsub'
385 386 delete_command = 'bkill'
386 387 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
387 388 job_array_template = '#BSUB -J ipengine[1-%d]'
388 389 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
389 390 queue_template = '#BSUB -q %s'
390 default_template="""#BSUB -J ipengine[1-%d]
391 default_template="""#!/bin/sh
392 #BSUB -J ipengine[1-%d]
391 393 eid=$(($LSB_JOBINDEX - 1))
392 394 ipengine --logfile=ipengine${eid}.log
393 395 """
394 396
395 397 sshx_template="""#!/bin/sh
396 398 "$@" &> /dev/null &
397 399 echo $!
398 400 """
399 401
400 402 engine_killer_template="""#!/bin/sh
401 403 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
402 404 """
403 405
404 406 class SSHEngineSet(object):
405 407 sshx_template=sshx_template
406 408 engine_killer_template=engine_killer_template
407 409
408 410 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
409 411 """Start a controller on localhost and engines using ssh.
410 412
411 413 The engine_hosts argument is a dict with hostnames as keys and
412 414 the number of engine (int) as values. sshx is the name of a local
413 415 file that will be used to run remote commands. This file is used
414 416 to setup the environment properly.
415 417 """
416 418
417 419 self.temp_dir = tempfile.gettempdir()
418 420 if sshx is not None:
419 421 self.sshx = sshx
420 422 else:
421 423 # Write the sshx.sh file locally from our template.
422 424 self.sshx = os.path.join(
423 425 self.temp_dir,
424 426 '%s-main-sshx.sh' % os.environ['USER']
425 427 )
426 428 f = open(self.sshx, 'w')
427 429 f.writelines(self.sshx_template)
428 430 f.close()
429 431 self.engine_command = ipengine
430 432 self.engine_hosts = engine_hosts
431 433 # Write the engine killer script file locally from our template.
432 434 self.engine_killer = os.path.join(
433 435 self.temp_dir,
434 436 '%s-local-engine_killer.sh' % os.environ['USER']
435 437 )
436 438 f = open(self.engine_killer, 'w')
437 439 f.writelines(self.engine_killer_template)
438 440 f.close()
439 441
440 442 def start(self, send_furl=False):
441 443 dlist = []
442 444 for host in self.engine_hosts.keys():
443 445 count = self.engine_hosts[host]
444 446 d = self._start(host, count, send_furl)
445 447 dlist.append(d)
446 448 return gatherBoth(dlist, consumeErrors=True)
447 449
448 450 def _start(self, hostname, count=1, send_furl=False):
449 451 if send_furl:
450 452 d = self._scp_furl(hostname)
451 453 else:
452 454 d = defer.succeed(None)
453 455 d.addCallback(lambda r: self._scp_sshx(hostname))
454 456 d.addCallback(lambda r: self._ssh_engine(hostname, count))
455 457 return d
456 458
457 459 def _scp_furl(self, hostname):
458 460 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
459 461 cmd_list = scp_cmd.split()
460 462 cmd_list[1] = os.path.expanduser(cmd_list[1])
461 463 log.msg('Copying furl file: %s' % scp_cmd)
462 464 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
463 465 return d
464 466
465 467 def _scp_sshx(self, hostname):
466 468 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
467 469 self.sshx, hostname,
468 470 self.temp_dir, os.environ['USER']
469 471 )
470 472 print
471 473 log.msg("Copying sshx: %s" % scp_cmd)
472 474 sshx_scp = scp_cmd.split()
473 475 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
474 476 return d
475 477
476 478 def _ssh_engine(self, hostname, count):
477 479 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
478 480 hostname, self.temp_dir,
479 481 os.environ['USER'], self.engine_command
480 482 )
481 483 cmds = exec_engine.split()
482 484 dlist = []
483 485 log.msg("about to start engines...")
484 486 for i in range(count):
485 487 log.msg('Starting engines: %s' % exec_engine)
486 488 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
487 489 dlist.append(d)
488 490 return gatherBoth(dlist, consumeErrors=True)
489 491
490 492 def kill(self):
491 493 dlist = []
492 494 for host in self.engine_hosts.keys():
493 495 d = self._killall(host)
494 496 dlist.append(d)
495 497 return gatherBoth(dlist, consumeErrors=True)
496 498
497 499 def _killall(self, hostname):
498 500 d = self._scp_engine_killer(hostname)
499 501 d.addCallback(lambda r: self._ssh_kill(hostname))
500 502 # d.addErrback(self._exec_err)
501 503 return d
502 504
503 505 def _scp_engine_killer(self, hostname):
504 506 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
505 507 self.engine_killer,
506 508 hostname,
507 509 self.temp_dir,
508 510 os.environ['USER']
509 511 )
510 512 cmds = scp_cmd.split()
511 513 log.msg('Copying engine_killer: %s' % scp_cmd)
512 514 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
513 515 return d
514 516
515 517 def _ssh_kill(self, hostname):
516 518 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
517 519 hostname,
518 520 self.temp_dir,
519 521 os.environ['USER']
520 522 )
521 523 log.msg('Killing engine: %s' % kill_cmd)
522 524 kill_cmd = kill_cmd.split()
523 525 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
524 526 return d
525 527
526 528 def _exec_err(self, r):
527 529 log.msg(r)
528 530
529 531 #-----------------------------------------------------------------------------
530 532 # Main functions for the different types of clusters
531 533 #-----------------------------------------------------------------------------
532 534
533 535 # TODO:
534 536 # The logic in these codes should be moved into classes like LocalCluster
535 537 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
536 538 # The main functions should then just parse the command line arguments, create
537 539 # the appropriate class and call a 'start' method.
538 540
539 541
540 542 def check_security(args, cont_args):
541 543 """Check to see if we should run with SSL support."""
542 544 if (not args.x or not args.y) and not have_crypto:
543 545 log.err("""
544 546 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
545 547 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
546 548 reactor.stop()
547 549 return False
548 550 if args.x:
549 551 cont_args.append('-x')
550 552 if args.y:
551 553 cont_args.append('-y')
552 554 return True
553 555
554 556
555 557 def check_reuse(args, cont_args):
556 558 """Check to see if we should try to resuse FURL files."""
557 559 if args.r:
558 560 cont_args.append('-r')
559 561 if args.client_port == 0 or args.engine_port == 0:
560 562 log.err("""
561 563 To reuse FURL files, you must also set the client and engine ports using
562 564 the --client-port and --engine-port options.""")
563 565 reactor.stop()
564 566 return False
565 567 cont_args.append('--client-port=%i' % args.client_port)
566 568 cont_args.append('--engine-port=%i' % args.engine_port)
567 569 return True
568 570
569 571
570 572 def _err_and_stop(f):
571 573 """Errback to log a failure and halt the reactor on a fatal error."""
572 574 log.err(f)
573 575 reactor.stop()
574 576
575 577
576 578 def _delay_start(cont_pid, start_engines, furl_file, reuse):
577 579 """Wait for controller to create FURL files and the start the engines."""
578 580 if not reuse:
579 581 if os.path.isfile(furl_file):
580 582 os.unlink(furl_file)
581 583 log.msg('Waiting for controller to finish starting...')
582 584 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
583 585 d.addCallback(lambda _: log.msg('Controller started'))
584 586 d.addCallback(lambda _: start_engines(cont_pid))
585 587 return d
586 588
587 589
588 590 def main_local(args):
589 591 cont_args = []
590 592 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
591 593
592 594 # Check security settings before proceeding
593 595 if not check_security(args, cont_args):
594 596 return
595 597
596 598 # See if we are reusing FURL files
597 599 if not check_reuse(args, cont_args):
598 600 return
599 601
600 602 cl = ControllerLauncher(extra_args=cont_args)
601 603 dstart = cl.start()
602 604 def start_engines(cont_pid):
603 605 engine_args = []
604 606 engine_args.append('--logfile=%s' % \
605 607 pjoin(args.logdir,'ipengine%s-' % cont_pid))
606 608 eset = LocalEngineSet(extra_args=engine_args)
607 609 def shutdown(signum, frame):
608 610 log.msg('Stopping local cluster')
609 611 # We are still playing with the times here, but these seem
610 612 # to be reliable in allowing everything to exit cleanly.
611 613 eset.interrupt_then_kill(0.5)
612 614 cl.interrupt_then_kill(0.5)
613 615 reactor.callLater(1.0, reactor.stop)
614 616 signal.signal(signal.SIGINT,shutdown)
615 617 d = eset.start(args.n)
616 618 return d
617 619 config = kernel_config_manager.get_config_obj()
618 620 furl_file = config['controller']['engine_furl_file']
619 621 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
620 622 dstart.addErrback(_err_and_stop)
621 623
622 624
623 625 def main_mpi(args):
624 626 cont_args = []
625 627 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
626 628
627 629 # Check security settings before proceeding
628 630 if not check_security(args, cont_args):
629 631 return
630 632
631 633 # See if we are reusing FURL files
632 634 if not check_reuse(args, cont_args):
633 635 return
634 636
635 637 cl = ControllerLauncher(extra_args=cont_args)
636 638 dstart = cl.start()
637 639 def start_engines(cont_pid):
638 640 raw_args = [args.cmd]
639 641 raw_args.extend(['-n',str(args.n)])
640 642 raw_args.append('ipengine')
641 643 raw_args.append('-l')
642 644 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
643 645 if args.mpi:
644 646 raw_args.append('--mpi=%s' % args.mpi)
645 647 eset = ProcessLauncher(raw_args)
646 648 def shutdown(signum, frame):
647 649 log.msg('Stopping local cluster')
648 650 # We are still playing with the times here, but these seem
649 651 # to be reliable in allowing everything to exit cleanly.
650 652 eset.interrupt_then_kill(1.0)
651 653 cl.interrupt_then_kill(1.0)
652 654 reactor.callLater(2.0, reactor.stop)
653 655 signal.signal(signal.SIGINT,shutdown)
654 656 d = eset.start()
655 657 return d
656 658 config = kernel_config_manager.get_config_obj()
657 659 furl_file = config['controller']['engine_furl_file']
658 660 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
659 661 dstart.addErrback(_err_and_stop)
660 662
661 663
662 664 def main_pbs(args):
663 665 cont_args = []
664 666 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
665 667
666 668 # Check security settings before proceeding
667 669 if not check_security(args, cont_args):
668 670 return
669 671
670 672 # See if we are reusing FURL files
671 673 if not check_reuse(args, cont_args):
672 674 return
673 675
674 676 if args.pbsscript and not os.path.isfile(args.pbsscript):
675 677 log.err('PBS script does not exist: %s' % args.pbsscript)
676 678 return
677 679
678 680 cl = ControllerLauncher(extra_args=cont_args)
679 681 dstart = cl.start()
680 682 def start_engines(r):
681 683 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
682 684 def shutdown(signum, frame):
683 685 log.msg('Stopping PBS cluster')
684 686 d = pbs_set.kill()
685 687 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
686 688 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
687 689 signal.signal(signal.SIGINT,shutdown)
688 690 d = pbs_set.start(args.n)
689 691 return d
690 692 config = kernel_config_manager.get_config_obj()
691 693 furl_file = config['controller']['engine_furl_file']
692 694 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
693 695 dstart.addErrback(_err_and_stop)
694 696
695 697 def main_sge(args):
696 698 cont_args = []
697 699 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
698 700
699 701 # Check security settings before proceeding
700 702 if not check_security(args, cont_args):
701 703 return
702 704
703 705 # See if we are reusing FURL files
704 706 if not check_reuse(args, cont_args):
705 707 return
706 708
707 709 if args.sgescript and not os.path.isfile(args.sgescript):
708 710 log.err('SGE script does not exist: %s' % args.sgescript)
709 711 return
710 712
711 713 cl = ControllerLauncher(extra_args=cont_args)
712 714 dstart = cl.start()
713 715 def start_engines(r):
714 716 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
715 717 def shutdown(signum, frame):
716 718 log.msg('Stopping sge cluster')
717 719 d = sge_set.kill()
718 720 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
719 721 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
720 722 signal.signal(signal.SIGINT,shutdown)
721 723 d = sge_set.start(args.n)
722 724 return d
723 725 config = kernel_config_manager.get_config_obj()
724 726 furl_file = config['controller']['engine_furl_file']
725 727 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
726 728 dstart.addErrback(_err_and_stop)
727 729
728 730 def main_lsf(args):
729 731 cont_args = []
730 732 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
731 733
732 734 # Check security settings before proceeding
733 735 if not check_security(args, cont_args):
734 736 return
735 737
736 738 # See if we are reusing FURL files
737 739 if not check_reuse(args, cont_args):
738 740 return
739 741
740 742 if args.lsfscript and not os.path.isfile(args.lsfscript):
741 743 log.err('LSF script does not exist: %s' % args.lsfscript)
742 744 return
743 745
744 746 cl = ControllerLauncher(extra_args=cont_args)
745 747 dstart = cl.start()
746 748 def start_engines(r):
747 749 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
748 750 def shutdown(signum, frame):
749 751 log.msg('Stopping LSF cluster')
750 752 d = lsf_set.kill()
751 753 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
752 754 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
753 755 signal.signal(signal.SIGINT,shutdown)
754 756 d = lsf_set.start(args.n)
755 757 return d
756 758 config = kernel_config_manager.get_config_obj()
757 759 furl_file = config['controller']['engine_furl_file']
758 760 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
759 761 dstart.addErrback(_err_and_stop)
760 762
761 763
762 764 def main_ssh(args):
763 765 """Start a controller on localhost and engines using ssh.
764 766
765 767 Your clusterfile should look like::
766 768
767 769 send_furl = False # True, if you want
768 770 engines = {
769 771 'engine_host1' : engine_count,
770 772 'engine_host2' : engine_count2
771 773 }
772 774 """
773 775 clusterfile = {}
774 776 execfile(args.clusterfile, clusterfile)
775 777 if not clusterfile.has_key('send_furl'):
776 778 clusterfile['send_furl'] = False
777 779
778 780 cont_args = []
779 781 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
780 782
781 783 # Check security settings before proceeding
782 784 if not check_security(args, cont_args):
783 785 return
784 786
785 787 # See if we are reusing FURL files
786 788 if not check_reuse(args, cont_args):
787 789 return
788 790
789 791 cl = ControllerLauncher(extra_args=cont_args)
790 792 dstart = cl.start()
791 793 def start_engines(cont_pid):
792 794 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
793 795 def shutdown(signum, frame):
794 796 d = ssh_set.kill()
795 797 cl.interrupt_then_kill(1.0)
796 798 reactor.callLater(2.0, reactor.stop)
797 799 signal.signal(signal.SIGINT,shutdown)
798 800 d = ssh_set.start(clusterfile['send_furl'])
799 801 return d
800 802 config = kernel_config_manager.get_config_obj()
801 803 furl_file = config['controller']['engine_furl_file']
802 804 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
803 805 dstart.addErrback(_err_and_stop)
804 806
805 807
806 808 def get_args():
807 809 base_parser = argparse.ArgumentParser(add_help=False)
808 810 base_parser.add_argument(
809 811 '-r',
810 812 action='store_true',
811 813 dest='r',
812 814 help='try to reuse FURL files. Use with --client-port and --engine-port'
813 815 )
814 816 base_parser.add_argument(
815 817 '--client-port',
816 818 type=int,
817 819 dest='client_port',
818 820 help='the port the controller will listen on for client connections',
819 821 default=0
820 822 )
821 823 base_parser.add_argument(
822 824 '--engine-port',
823 825 type=int,
824 826 dest='engine_port',
825 827 help='the port the controller will listen on for engine connections',
826 828 default=0
827 829 )
828 830 base_parser.add_argument(
829 831 '-x',
830 832 action='store_true',
831 833 dest='x',
832 834 help='turn off client security'
833 835 )
834 836 base_parser.add_argument(
835 837 '-y',
836 838 action='store_true',
837 839 dest='y',
838 840 help='turn off engine security'
839 841 )
840 842 base_parser.add_argument(
841 843 "--logdir",
842 844 type=str,
843 845 dest="logdir",
844 846 help="directory to put log files (default=$IPYTHONDIR/log)",
845 847 default=pjoin(get_ipython_dir(),'log')
846 848 )
847 849 base_parser.add_argument(
848 850 "-n",
849 851 "--num",
850 852 type=int,
851 853 dest="n",
852 854 default=2,
853 855 help="the number of engines to start"
854 856 )
855 857
856 858 parser = argparse.ArgumentParser(
857 859 description='IPython cluster startup. This starts a controller and\
858 860 engines using various approaches. Use the IPYTHONDIR environment\
859 861 variable to change your IPython directory from the default of\
860 862 .ipython or _ipython. The log and security subdirectories of your\
861 863 IPython directory will be used by this script for log files and\
862 864 security files.'
863 865 )
864 866 subparsers = parser.add_subparsers(
865 867 help='available cluster types. For help, do "ipcluster TYPE --help"')
866 868
867 869 parser_local = subparsers.add_parser(
868 870 'local',
869 871 help='run a local cluster',
870 872 parents=[base_parser]
871 873 )
872 874 parser_local.set_defaults(func=main_local)
873 875
874 876 parser_mpirun = subparsers.add_parser(
875 877 'mpirun',
876 878 help='run a cluster using mpirun (mpiexec also works)',
877 879 parents=[base_parser]
878 880 )
879 881 parser_mpirun.add_argument(
880 882 "--mpi",
881 883 type=str,
882 884 dest="mpi", # Don't put a default here to allow no MPI support
883 885 help="how to call MPI_Init (default=mpi4py)"
884 886 )
885 887 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
886 888
887 889 parser_mpiexec = subparsers.add_parser(
888 890 'mpiexec',
889 891 help='run a cluster using mpiexec (mpirun also works)',
890 892 parents=[base_parser]
891 893 )
892 894 parser_mpiexec.add_argument(
893 895 "--mpi",
894 896 type=str,
895 897 dest="mpi", # Don't put a default here to allow no MPI support
896 898 help="how to call MPI_Init (default=mpi4py)"
897 899 )
898 900 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
899 901
900 902 parser_pbs = subparsers.add_parser(
901 903 'pbs',
902 904 help='run a pbs cluster',
903 905 parents=[base_parser]
904 906 )
905 907 parser_pbs.add_argument(
906 908 '-s',
907 909 '--pbs-script',
908 910 type=str,
909 911 dest='pbsscript',
910 912 help='PBS script template',
911 913 default=''
912 914 )
913 915 parser_pbs.add_argument(
914 916 '-q',
915 917 '--queue',
916 918 type=str,
917 919 dest='pbsqueue',
918 920 help='PBS queue to use when starting the engines',
919 921 default=None,
920 922 )
921 923 parser_pbs.set_defaults(func=main_pbs)
922 924
923 925 parser_sge = subparsers.add_parser(
924 926 'sge',
925 927 help='run an sge cluster',
926 928 parents=[base_parser]
927 929 )
928 930 parser_sge.add_argument(
929 931 '-s',
930 932 '--sge-script',
931 933 type=str,
932 934 dest='sgescript',
933 935 help='SGE script template',
934 936 default='' # SGEEngineSet will create one if not specified
935 937 )
936 938 parser_sge.add_argument(
937 939 '-q',
938 940 '--queue',
939 941 type=str,
940 942 dest='sgequeue',
941 943 help='SGE queue to use when starting the engines',
942 944 default=None,
943 945 )
944 946 parser_sge.set_defaults(func=main_sge)
945 947
946 948 parser_lsf = subparsers.add_parser(
947 949 'lsf',
948 950 help='run an lsf cluster',
949 951 parents=[base_parser]
950 952 )
951 953
952 954 parser_lsf.add_argument(
953 955 '-s',
954 956 '--lsf-script',
955 957 type=str,
956 958 dest='lsfscript',
957 959 help='LSF script template',
958 960 default='' # LSFEngineSet will create one if not specified
959 961 )
960 962
961 963 parser_lsf.add_argument(
962 964 '-q',
963 965 '--queue',
964 966 type=str,
965 967 dest='lsfqueue',
966 968 help='LSF queue to use when starting the engines',
967 969 default=None,
968 970 )
969 971 parser_lsf.set_defaults(func=main_lsf)
970 972
971 973 parser_ssh = subparsers.add_parser(
972 974 'ssh',
973 975 help='run a cluster using ssh, should have ssh-keys setup',
974 976 parents=[base_parser]
975 977 )
976 978 parser_ssh.add_argument(
977 979 '--clusterfile',
978 980 type=str,
979 981 dest='clusterfile',
980 982 help='python file describing the cluster',
981 983 default='clusterfile.py',
982 984 )
983 985 parser_ssh.add_argument(
984 986 '--sshx',
985 987 type=str,
986 988 dest='sshx',
987 989 help='sshx launcher helper'
988 990 )
989 991 parser_ssh.set_defaults(func=main_ssh)
990 992
991 993 args = parser.parse_args()
992 994 return args
993 995
994 996 def main():
995 997 args = get_args()
996 998 reactor.callWhenRunning(args.func, args)
997 999 log.startLogging(sys.stdout)
998 1000 reactor.run()
999 1001
1000 1002 if __name__ == '__main__':
1001 1003 main()
General Comments 0
You need to be logged in to leave comments. Login now