##// END OF EJS Templates
add bsub wrapper for LSFEngineSet...
Justin Riley -
Show More
@@ -1,1003 +1,1018 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import stat
22 22 import tempfile
23 23 pjoin = os.path.join
24 24
25 25 from twisted.internet import reactor, defer
26 26 from twisted.internet.protocol import ProcessProtocol
27 27 from twisted.internet.error import ProcessDone, ProcessTerminated
28 28 from twisted.internet.utils import getProcessOutput
29 29 from twisted.python import failure, log
30 30
31 31 from IPython.external import argparse
32 32 from IPython.external import Itpl
33 33 from IPython.genutils import (
34 34 get_ipython_dir,
35 35 get_log_dir,
36 36 get_security_dir,
37 37 num_cpus
38 38 )
39 39 from IPython.kernel.fcutil import have_crypto
40 40
41 41 # Create various ipython directories if they don't exist.
42 42 # This must be done before IPython.kernel.config is imported.
43 43 from IPython.iplib import user_setup
44 44 if os.name == 'posix':
45 45 rc_suffix = ''
46 46 else:
47 47 rc_suffix = '.ini'
48 48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
49 49 get_log_dir()
50 50 get_security_dir()
51 51
52 52 from IPython.kernel.config import config_manager as kernel_config_manager
53 53 from IPython.kernel.error import SecurityError, FileTimeoutError
54 54 from IPython.kernel.fcutil import have_crypto
55 55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
56 56 from IPython.kernel.util import printer
57 57
58 58 #-----------------------------------------------------------------------------
59 59 # General process handling code
60 60 #-----------------------------------------------------------------------------
61 61
62 62
63 63 class ProcessStateError(Exception):
64 64 pass
65 65
66 66 class UnknownStatus(Exception):
67 67 pass
68 68
69 69 class LauncherProcessProtocol(ProcessProtocol):
70 70 """
71 71 A ProcessProtocol to go with the ProcessLauncher.
72 72 """
73 73 def __init__(self, process_launcher):
74 74 self.process_launcher = process_launcher
75 75
76 76 def connectionMade(self):
77 77 self.process_launcher.fire_start_deferred(self.transport.pid)
78 78
79 79 def processEnded(self, status):
80 80 value = status.value
81 81 if isinstance(value, ProcessDone):
82 82 self.process_launcher.fire_stop_deferred(0)
83 83 elif isinstance(value, ProcessTerminated):
84 84 self.process_launcher.fire_stop_deferred(
85 85 {'exit_code':value.exitCode,
86 86 'signal':value.signal,
87 87 'status':value.status
88 88 }
89 89 )
90 90 else:
91 91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
92 92
93 93 def outReceived(self, data):
94 94 log.msg(data)
95 95
96 96 def errReceived(self, data):
97 97 log.err(data)
98 98
99 99 class ProcessLauncher(object):
100 100 """
101 101 Start and stop an external process in an asynchronous manner.
102 102
103 103 Currently this uses deferreds to notify other parties of process state
104 104 changes. This is an awkward design and should be moved to using
105 105 a formal NotificationCenter.
106 106 """
107 107 def __init__(self, cmd_and_args):
108 108 self.cmd = cmd_and_args[0]
109 109 self.args = cmd_and_args
110 110 self._reset()
111 111
112 112 def _reset(self):
113 113 self.process_protocol = None
114 114 self.pid = None
115 115 self.start_deferred = None
116 116 self.stop_deferreds = []
117 117 self.state = 'before' # before, running, or after
118 118
119 119 @property
120 120 def running(self):
121 121 if self.state == 'running':
122 122 return True
123 123 else:
124 124 return False
125 125
126 126 def fire_start_deferred(self, pid):
127 127 self.pid = pid
128 128 self.state = 'running'
129 129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
130 130 self.start_deferred.callback(pid)
131 131
132 132 def start(self):
133 133 if self.state == 'before':
134 134 self.process_protocol = LauncherProcessProtocol(self)
135 135 self.start_deferred = defer.Deferred()
136 136 self.process_transport = reactor.spawnProcess(
137 137 self.process_protocol,
138 138 self.cmd,
139 139 self.args,
140 140 env=os.environ
141 141 )
142 142 return self.start_deferred
143 143 else:
144 144 s = 'the process has already been started and has state: %r' % \
145 145 self.state
146 146 return defer.fail(ProcessStateError(s))
147 147
148 148 def get_stop_deferred(self):
149 149 if self.state == 'running' or self.state == 'before':
150 150 d = defer.Deferred()
151 151 self.stop_deferreds.append(d)
152 152 return d
153 153 else:
154 154 s = 'this process is already complete'
155 155 return defer.fail(ProcessStateError(s))
156 156
157 157 def fire_stop_deferred(self, exit_code):
158 158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
159 159 self.state = 'after'
160 160 for d in self.stop_deferreds:
161 161 d.callback(exit_code)
162 162
163 163 def signal(self, sig):
164 164 """
165 165 Send a signal to the process.
166 166
167 167 The argument sig can be ('KILL','INT', etc.) or any signal number.
168 168 """
169 169 if self.state == 'running':
170 170 self.process_transport.signalProcess(sig)
171 171
172 172 # def __del__(self):
173 173 # self.signal('KILL')
174 174
175 175 def interrupt_then_kill(self, delay=1.0):
176 176 self.signal('INT')
177 177 reactor.callLater(delay, self.signal, 'KILL')
178 178
179 179
180 180 #-----------------------------------------------------------------------------
181 181 # Code for launching controller and engines
182 182 #-----------------------------------------------------------------------------
183 183
184 184
185 185 class ControllerLauncher(ProcessLauncher):
186 186
187 187 def __init__(self, extra_args=None):
188 188 if sys.platform == 'win32':
189 189 # This logic is needed because the ipcontroller script doesn't
190 190 # always get installed in the same way or in the same location.
191 191 from IPython.kernel.scripts import ipcontroller
192 192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
193 193 # The -u option here turns on unbuffered output, which is required
194 194 # on Win32 to prevent wierd conflict and problems with Twisted.
195 195 # Also, use sys.executable to make sure we are picking up the
196 196 # right python exe.
197 197 args = [sys.executable, '-u', script_location]
198 198 else:
199 199 args = ['ipcontroller']
200 200 self.extra_args = extra_args
201 201 if extra_args is not None:
202 202 args.extend(extra_args)
203 203
204 204 ProcessLauncher.__init__(self, args)
205 205
206 206
207 207 class EngineLauncher(ProcessLauncher):
208 208
209 209 def __init__(self, extra_args=None):
210 210 if sys.platform == 'win32':
211 211 # This logic is needed because the ipcontroller script doesn't
212 212 # always get installed in the same way or in the same location.
213 213 from IPython.kernel.scripts import ipengine
214 214 script_location = ipengine.__file__.replace('.pyc', '.py')
215 215 # The -u option here turns on unbuffered output, which is required
216 216 # on Win32 to prevent wierd conflict and problems with Twisted.
217 217 # Also, use sys.executable to make sure we are picking up the
218 218 # right python exe.
219 219 args = [sys.executable, '-u', script_location]
220 220 else:
221 221 args = ['ipengine']
222 222 self.extra_args = extra_args
223 223 if extra_args is not None:
224 224 args.extend(extra_args)
225 225
226 226 ProcessLauncher.__init__(self, args)
227 227
228 228
229 229 class LocalEngineSet(object):
230 230
231 231 def __init__(self, extra_args=None):
232 232 self.extra_args = extra_args
233 233 self.launchers = []
234 234
235 235 def start(self, n):
236 236 dlist = []
237 237 for i in range(n):
238 238 print "starting engine:", i
239 239 el = EngineLauncher(extra_args=self.extra_args)
240 240 d = el.start()
241 241 self.launchers.append(el)
242 242 dlist.append(d)
243 243 dfinal = gatherBoth(dlist, consumeErrors=True)
244 244 dfinal.addCallback(self._handle_start)
245 245 return dfinal
246 246
247 247 def _handle_start(self, r):
248 248 log.msg('Engines started with pids: %r' % r)
249 249 return r
250 250
251 251 def _handle_stop(self, r):
252 252 log.msg('Engines received signal: %r' % r)
253 253 return r
254 254
255 255 def signal(self, sig):
256 256 dlist = []
257 257 for el in self.launchers:
258 258 d = el.get_stop_deferred()
259 259 dlist.append(d)
260 260 el.signal(sig)
261 261 dfinal = gatherBoth(dlist, consumeErrors=True)
262 262 dfinal.addCallback(self._handle_stop)
263 263 return dfinal
264 264
265 265 def interrupt_then_kill(self, delay=1.0):
266 266 dlist = []
267 267 for el in self.launchers:
268 268 d = el.get_stop_deferred()
269 269 dlist.append(d)
270 270 el.interrupt_then_kill(delay)
271 271 dfinal = gatherBoth(dlist, consumeErrors=True)
272 272 dfinal.addCallback(self._handle_stop)
273 273 return dfinal
274 274
275 275 class BatchEngineSet(object):
276 276
277 277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
278 278 name = ''
279 279 submit_command = ''
280 280 delete_command = ''
281 281 job_id_regexp = ''
282 282 job_array_regexp = ''
283 283 job_array_template = ''
284 284 queue_regexp = ''
285 285 queue_template = ''
286 286 default_template = ''
287 287
288 288 def __init__(self, template_file, queue, **kwargs):
289 289 self.template_file = template_file
290 290 self.queue = queue
291 291
292 292 def parse_job_id(self, output):
293 293 m = re.search(self.job_id_regexp, output)
294 294 if m is not None:
295 295 job_id = m.group()
296 296 else:
297 297 raise Exception("job id couldn't be determined: %s" % output)
298 298 self.job_id = job_id
299 299 log.msg('Job started with job id: %r' % job_id)
300 300 return job_id
301 301
302 302 def handle_error(self, f):
303 303 f.printTraceback()
304 304 f.raiseException()
305 305
306 306 def start(self, n):
307 307 log.msg("starting %d engines" % n)
308 308 self._temp_file = tempfile.NamedTemporaryFile()
309 309 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
310 310 if self.template_file:
311 311 log.msg("Using %s script %s" % (self.name, self.template_file))
312 312 contents = open(self.template_file, 'r').read()
313 313 new_script = contents
314 314 regex = re.compile(self.job_array_regexp)
315 315 if not regex.search(contents):
316 316 log.msg("adding job array settings to %s script" % self.name)
317 317 new_script = self.job_array_template % n +'\n' + new_script
318 318 print self.queue_regexp
319 319 regex = re.compile(self.queue_regexp)
320 320 print regex.search(contents)
321 321 if self.queue and not regex.search(contents):
322 322 log.msg("adding queue settings to %s script" % self.name)
323 323 new_script = self.queue_template % self.queue + '\n' + new_script
324 324 if new_script != contents:
325 325 self._temp_file.write(new_script)
326 326 self.template_file = self._temp_file.name
327 327 else:
328 328 default_script = self.default_template % n
329 329 if self.queue:
330 330 default_script = self.queue_template % self.queue + \
331 331 '\n' + default_script
332 332 log.msg("using default ipengine %s script: \n%s" %
333 333 (self.name, default_script))
334 334 self._temp_file.file.write(default_script)
335 335 self.template_file = self._temp_file.name
336 336 self._temp_file.file.close()
337 337 d = getProcessOutput(self.submit_command,
338 338 [self.template_file],
339 339 env=os.environ)
340 340 d.addCallback(self.parse_job_id)
341 341 d.addErrback(self.handle_error)
342 342 return d
343 343
344 344 def kill(self):
345 345 d = getProcessOutput(self.delete_command,
346 346 [self.job_id],env=os.environ)
347 347 return d
348 348
349 349 class PBSEngineSet(BatchEngineSet):
350 350
351 351 name = 'PBS'
352 352 submit_command = 'qsub'
353 353 delete_command = 'qdel'
354 354 job_id_regexp = '\d+'
355 355 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
356 356 job_array_template = '#PBS -t 1-%d'
357 357 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
358 358 queue_template = '#PBS -q %s'
359 359 default_template="""#!/bin/sh
360 360 #PBS -V
361 361 #PBS -t 1-%d
362 362 #PBS -N ipengine
363 363 eid=$(($PBS_ARRAYID - 1))
364 364 ipengine --logfile=ipengine${eid}.log
365 365 """
366 366
367 367 class SGEEngineSet(PBSEngineSet):
368 368
369 369 name = 'SGE'
370 370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
371 371 job_array_template = '#$ -t 1-%d'
372 372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
373 373 queue_template = '#$ -q %s'
374 374 default_template="""#$ -V
375 375 #$ -S /bin/sh
376 376 #$ -t 1-%d
377 377 #$ -N ipengine
378 378 eid=$(($SGE_TASK_ID - 1))
379 379 ipengine --logfile=ipengine${eid}.log
380 380 """
381 381
382 382 class LSFEngineSet(PBSEngineSet):
383 383
384 384 name = 'LSF'
385 385 submit_command = 'bsub'
386 386 delete_command = 'bkill'
387 387 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
388 388 job_array_template = '#BSUB -J ipengine[1-%d]'
389 389 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
390 390 queue_template = '#BSUB -q %s'
391 391 default_template="""#!/bin/sh
392 392 #BSUB -J ipengine[1-%d]
393 393 eid=$(($LSB_JOBINDEX - 1))
394 394 ipengine --logfile=ipengine${eid}.log
395 395 """
396 bsub_wrapper="""#!/bin/sh
397 bsub < $1
398 """
399
400 def __init__(self, template_file, queue, **kwargs):
401 self._bsub_wrapper = self._make_bsub_wrapper()
402 self.submit_command = self._bsub_wrapper.name
403 PBSEngineSet.__init__(self,template_file, queue, **kwargs)
404
405 def _make_bsub_wrapper(self):
406 bsub_wrapper = tempfile.NamedTemporaryFile()
407 bsub_wrapper.write(self.bsub_wrapper)
408 bsub_wrapper.file.close()
409 os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
410 return bsub_wrapper
396 411
397 412 sshx_template="""#!/bin/sh
398 413 "$@" &> /dev/null &
399 414 echo $!
400 415 """
401 416
402 417 engine_killer_template="""#!/bin/sh
403 418 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
404 419 """
405 420
406 421 class SSHEngineSet(object):
407 422 sshx_template=sshx_template
408 423 engine_killer_template=engine_killer_template
409 424
410 425 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
411 426 """Start a controller on localhost and engines using ssh.
412 427
413 428 The engine_hosts argument is a dict with hostnames as keys and
414 429 the number of engine (int) as values. sshx is the name of a local
415 430 file that will be used to run remote commands. This file is used
416 431 to setup the environment properly.
417 432 """
418 433
419 434 self.temp_dir = tempfile.gettempdir()
420 435 if sshx is not None:
421 436 self.sshx = sshx
422 437 else:
423 438 # Write the sshx.sh file locally from our template.
424 439 self.sshx = os.path.join(
425 440 self.temp_dir,
426 441 '%s-main-sshx.sh' % os.environ['USER']
427 442 )
428 443 f = open(self.sshx, 'w')
429 444 f.writelines(self.sshx_template)
430 445 f.close()
431 446 self.engine_command = ipengine
432 447 self.engine_hosts = engine_hosts
433 448 # Write the engine killer script file locally from our template.
434 449 self.engine_killer = os.path.join(
435 450 self.temp_dir,
436 451 '%s-local-engine_killer.sh' % os.environ['USER']
437 452 )
438 453 f = open(self.engine_killer, 'w')
439 454 f.writelines(self.engine_killer_template)
440 455 f.close()
441 456
442 457 def start(self, send_furl=False):
443 458 dlist = []
444 459 for host in self.engine_hosts.keys():
445 460 count = self.engine_hosts[host]
446 461 d = self._start(host, count, send_furl)
447 462 dlist.append(d)
448 463 return gatherBoth(dlist, consumeErrors=True)
449 464
450 465 def _start(self, hostname, count=1, send_furl=False):
451 466 if send_furl:
452 467 d = self._scp_furl(hostname)
453 468 else:
454 469 d = defer.succeed(None)
455 470 d.addCallback(lambda r: self._scp_sshx(hostname))
456 471 d.addCallback(lambda r: self._ssh_engine(hostname, count))
457 472 return d
458 473
459 474 def _scp_furl(self, hostname):
460 475 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
461 476 cmd_list = scp_cmd.split()
462 477 cmd_list[1] = os.path.expanduser(cmd_list[1])
463 478 log.msg('Copying furl file: %s' % scp_cmd)
464 479 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
465 480 return d
466 481
467 482 def _scp_sshx(self, hostname):
468 483 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
469 484 self.sshx, hostname,
470 485 self.temp_dir, os.environ['USER']
471 486 )
472 487 print
473 488 log.msg("Copying sshx: %s" % scp_cmd)
474 489 sshx_scp = scp_cmd.split()
475 490 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
476 491 return d
477 492
478 493 def _ssh_engine(self, hostname, count):
479 494 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
480 495 hostname, self.temp_dir,
481 496 os.environ['USER'], self.engine_command
482 497 )
483 498 cmds = exec_engine.split()
484 499 dlist = []
485 500 log.msg("about to start engines...")
486 501 for i in range(count):
487 502 log.msg('Starting engines: %s' % exec_engine)
488 503 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
489 504 dlist.append(d)
490 505 return gatherBoth(dlist, consumeErrors=True)
491 506
492 507 def kill(self):
493 508 dlist = []
494 509 for host in self.engine_hosts.keys():
495 510 d = self._killall(host)
496 511 dlist.append(d)
497 512 return gatherBoth(dlist, consumeErrors=True)
498 513
499 514 def _killall(self, hostname):
500 515 d = self._scp_engine_killer(hostname)
501 516 d.addCallback(lambda r: self._ssh_kill(hostname))
502 517 # d.addErrback(self._exec_err)
503 518 return d
504 519
505 520 def _scp_engine_killer(self, hostname):
506 521 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
507 522 self.engine_killer,
508 523 hostname,
509 524 self.temp_dir,
510 525 os.environ['USER']
511 526 )
512 527 cmds = scp_cmd.split()
513 528 log.msg('Copying engine_killer: %s' % scp_cmd)
514 529 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
515 530 return d
516 531
517 532 def _ssh_kill(self, hostname):
518 533 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
519 534 hostname,
520 535 self.temp_dir,
521 536 os.environ['USER']
522 537 )
523 538 log.msg('Killing engine: %s' % kill_cmd)
524 539 kill_cmd = kill_cmd.split()
525 540 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
526 541 return d
527 542
528 543 def _exec_err(self, r):
529 544 log.msg(r)
530 545
531 546 #-----------------------------------------------------------------------------
532 547 # Main functions for the different types of clusters
533 548 #-----------------------------------------------------------------------------
534 549
535 550 # TODO:
536 551 # The logic in these codes should be moved into classes like LocalCluster
537 552 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
538 553 # The main functions should then just parse the command line arguments, create
539 554 # the appropriate class and call a 'start' method.
540 555
541 556
542 557 def check_security(args, cont_args):
543 558 """Check to see if we should run with SSL support."""
544 559 if (not args.x or not args.y) and not have_crypto:
545 560 log.err("""
546 561 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
547 562 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
548 563 reactor.stop()
549 564 return False
550 565 if args.x:
551 566 cont_args.append('-x')
552 567 if args.y:
553 568 cont_args.append('-y')
554 569 return True
555 570
556 571
557 572 def check_reuse(args, cont_args):
558 573 """Check to see if we should try to resuse FURL files."""
559 574 if args.r:
560 575 cont_args.append('-r')
561 576 if args.client_port == 0 or args.engine_port == 0:
562 577 log.err("""
563 578 To reuse FURL files, you must also set the client and engine ports using
564 579 the --client-port and --engine-port options.""")
565 580 reactor.stop()
566 581 return False
567 582 cont_args.append('--client-port=%i' % args.client_port)
568 583 cont_args.append('--engine-port=%i' % args.engine_port)
569 584 return True
570 585
571 586
572 587 def _err_and_stop(f):
573 588 """Errback to log a failure and halt the reactor on a fatal error."""
574 589 log.err(f)
575 590 reactor.stop()
576 591
577 592
578 593 def _delay_start(cont_pid, start_engines, furl_file, reuse):
579 594 """Wait for controller to create FURL files and the start the engines."""
580 595 if not reuse:
581 596 if os.path.isfile(furl_file):
582 597 os.unlink(furl_file)
583 598 log.msg('Waiting for controller to finish starting...')
584 599 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
585 600 d.addCallback(lambda _: log.msg('Controller started'))
586 601 d.addCallback(lambda _: start_engines(cont_pid))
587 602 return d
588 603
589 604
590 605 def main_local(args):
591 606 cont_args = []
592 607 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
593 608
594 609 # Check security settings before proceeding
595 610 if not check_security(args, cont_args):
596 611 return
597 612
598 613 # See if we are reusing FURL files
599 614 if not check_reuse(args, cont_args):
600 615 return
601 616
602 617 cl = ControllerLauncher(extra_args=cont_args)
603 618 dstart = cl.start()
604 619 def start_engines(cont_pid):
605 620 engine_args = []
606 621 engine_args.append('--logfile=%s' % \
607 622 pjoin(args.logdir,'ipengine%s-' % cont_pid))
608 623 eset = LocalEngineSet(extra_args=engine_args)
609 624 def shutdown(signum, frame):
610 625 log.msg('Stopping local cluster')
611 626 # We are still playing with the times here, but these seem
612 627 # to be reliable in allowing everything to exit cleanly.
613 628 eset.interrupt_then_kill(0.5)
614 629 cl.interrupt_then_kill(0.5)
615 630 reactor.callLater(1.0, reactor.stop)
616 631 signal.signal(signal.SIGINT,shutdown)
617 632 d = eset.start(args.n)
618 633 return d
619 634 config = kernel_config_manager.get_config_obj()
620 635 furl_file = config['controller']['engine_furl_file']
621 636 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
622 637 dstart.addErrback(_err_and_stop)
623 638
624 639
625 640 def main_mpi(args):
626 641 cont_args = []
627 642 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
628 643
629 644 # Check security settings before proceeding
630 645 if not check_security(args, cont_args):
631 646 return
632 647
633 648 # See if we are reusing FURL files
634 649 if not check_reuse(args, cont_args):
635 650 return
636 651
637 652 cl = ControllerLauncher(extra_args=cont_args)
638 653 dstart = cl.start()
639 654 def start_engines(cont_pid):
640 655 raw_args = [args.cmd]
641 656 raw_args.extend(['-n',str(args.n)])
642 657 raw_args.append('ipengine')
643 658 raw_args.append('-l')
644 659 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
645 660 if args.mpi:
646 661 raw_args.append('--mpi=%s' % args.mpi)
647 662 eset = ProcessLauncher(raw_args)
648 663 def shutdown(signum, frame):
649 664 log.msg('Stopping local cluster')
650 665 # We are still playing with the times here, but these seem
651 666 # to be reliable in allowing everything to exit cleanly.
652 667 eset.interrupt_then_kill(1.0)
653 668 cl.interrupt_then_kill(1.0)
654 669 reactor.callLater(2.0, reactor.stop)
655 670 signal.signal(signal.SIGINT,shutdown)
656 671 d = eset.start()
657 672 return d
658 673 config = kernel_config_manager.get_config_obj()
659 674 furl_file = config['controller']['engine_furl_file']
660 675 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
661 676 dstart.addErrback(_err_and_stop)
662 677
663 678
664 679 def main_pbs(args):
665 680 cont_args = []
666 681 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
667 682
668 683 # Check security settings before proceeding
669 684 if not check_security(args, cont_args):
670 685 return
671 686
672 687 # See if we are reusing FURL files
673 688 if not check_reuse(args, cont_args):
674 689 return
675 690
676 691 if args.pbsscript and not os.path.isfile(args.pbsscript):
677 692 log.err('PBS script does not exist: %s' % args.pbsscript)
678 693 return
679 694
680 695 cl = ControllerLauncher(extra_args=cont_args)
681 696 dstart = cl.start()
682 697 def start_engines(r):
683 698 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
684 699 def shutdown(signum, frame):
685 700 log.msg('Stopping PBS cluster')
686 701 d = pbs_set.kill()
687 702 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
688 703 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
689 704 signal.signal(signal.SIGINT,shutdown)
690 705 d = pbs_set.start(args.n)
691 706 return d
692 707 config = kernel_config_manager.get_config_obj()
693 708 furl_file = config['controller']['engine_furl_file']
694 709 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
695 710 dstart.addErrback(_err_and_stop)
696 711
697 712 def main_sge(args):
698 713 cont_args = []
699 714 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
700 715
701 716 # Check security settings before proceeding
702 717 if not check_security(args, cont_args):
703 718 return
704 719
705 720 # See if we are reusing FURL files
706 721 if not check_reuse(args, cont_args):
707 722 return
708 723
709 724 if args.sgescript and not os.path.isfile(args.sgescript):
710 725 log.err('SGE script does not exist: %s' % args.sgescript)
711 726 return
712 727
713 728 cl = ControllerLauncher(extra_args=cont_args)
714 729 dstart = cl.start()
715 730 def start_engines(r):
716 731 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
717 732 def shutdown(signum, frame):
718 733 log.msg('Stopping sge cluster')
719 734 d = sge_set.kill()
720 735 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
721 736 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
722 737 signal.signal(signal.SIGINT,shutdown)
723 738 d = sge_set.start(args.n)
724 739 return d
725 740 config = kernel_config_manager.get_config_obj()
726 741 furl_file = config['controller']['engine_furl_file']
727 742 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
728 743 dstart.addErrback(_err_and_stop)
729 744
730 745 def main_lsf(args):
731 746 cont_args = []
732 747 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
733 748
734 749 # Check security settings before proceeding
735 750 if not check_security(args, cont_args):
736 751 return
737 752
738 753 # See if we are reusing FURL files
739 754 if not check_reuse(args, cont_args):
740 755 return
741 756
742 757 if args.lsfscript and not os.path.isfile(args.lsfscript):
743 758 log.err('LSF script does not exist: %s' % args.lsfscript)
744 759 return
745 760
746 761 cl = ControllerLauncher(extra_args=cont_args)
747 762 dstart = cl.start()
748 763 def start_engines(r):
749 764 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
750 765 def shutdown(signum, frame):
751 766 log.msg('Stopping LSF cluster')
752 767 d = lsf_set.kill()
753 768 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
754 769 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
755 770 signal.signal(signal.SIGINT,shutdown)
756 771 d = lsf_set.start(args.n)
757 772 return d
758 773 config = kernel_config_manager.get_config_obj()
759 774 furl_file = config['controller']['engine_furl_file']
760 775 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
761 776 dstart.addErrback(_err_and_stop)
762 777
763 778
764 779 def main_ssh(args):
765 780 """Start a controller on localhost and engines using ssh.
766 781
767 782 Your clusterfile should look like::
768 783
769 784 send_furl = False # True, if you want
770 785 engines = {
771 786 'engine_host1' : engine_count,
772 787 'engine_host2' : engine_count2
773 788 }
774 789 """
775 790 clusterfile = {}
776 791 execfile(args.clusterfile, clusterfile)
777 792 if not clusterfile.has_key('send_furl'):
778 793 clusterfile['send_furl'] = False
779 794
780 795 cont_args = []
781 796 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
782 797
783 798 # Check security settings before proceeding
784 799 if not check_security(args, cont_args):
785 800 return
786 801
787 802 # See if we are reusing FURL files
788 803 if not check_reuse(args, cont_args):
789 804 return
790 805
791 806 cl = ControllerLauncher(extra_args=cont_args)
792 807 dstart = cl.start()
793 808 def start_engines(cont_pid):
794 809 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
795 810 def shutdown(signum, frame):
796 811 d = ssh_set.kill()
797 812 cl.interrupt_then_kill(1.0)
798 813 reactor.callLater(2.0, reactor.stop)
799 814 signal.signal(signal.SIGINT,shutdown)
800 815 d = ssh_set.start(clusterfile['send_furl'])
801 816 return d
802 817 config = kernel_config_manager.get_config_obj()
803 818 furl_file = config['controller']['engine_furl_file']
804 819 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
805 820 dstart.addErrback(_err_and_stop)
806 821
807 822
808 823 def get_args():
809 824 base_parser = argparse.ArgumentParser(add_help=False)
810 825 base_parser.add_argument(
811 826 '-r',
812 827 action='store_true',
813 828 dest='r',
814 829 help='try to reuse FURL files. Use with --client-port and --engine-port'
815 830 )
816 831 base_parser.add_argument(
817 832 '--client-port',
818 833 type=int,
819 834 dest='client_port',
820 835 help='the port the controller will listen on for client connections',
821 836 default=0
822 837 )
823 838 base_parser.add_argument(
824 839 '--engine-port',
825 840 type=int,
826 841 dest='engine_port',
827 842 help='the port the controller will listen on for engine connections',
828 843 default=0
829 844 )
830 845 base_parser.add_argument(
831 846 '-x',
832 847 action='store_true',
833 848 dest='x',
834 849 help='turn off client security'
835 850 )
836 851 base_parser.add_argument(
837 852 '-y',
838 853 action='store_true',
839 854 dest='y',
840 855 help='turn off engine security'
841 856 )
842 857 base_parser.add_argument(
843 858 "--logdir",
844 859 type=str,
845 860 dest="logdir",
846 861 help="directory to put log files (default=$IPYTHONDIR/log)",
847 862 default=pjoin(get_ipython_dir(),'log')
848 863 )
849 864 base_parser.add_argument(
850 865 "-n",
851 866 "--num",
852 867 type=int,
853 868 dest="n",
854 869 default=2,
855 870 help="the number of engines to start"
856 871 )
857 872
858 873 parser = argparse.ArgumentParser(
859 874 description='IPython cluster startup. This starts a controller and\
860 875 engines using various approaches. Use the IPYTHONDIR environment\
861 876 variable to change your IPython directory from the default of\
862 877 .ipython or _ipython. The log and security subdirectories of your\
863 878 IPython directory will be used by this script for log files and\
864 879 security files.'
865 880 )
866 881 subparsers = parser.add_subparsers(
867 882 help='available cluster types. For help, do "ipcluster TYPE --help"')
868 883
869 884 parser_local = subparsers.add_parser(
870 885 'local',
871 886 help='run a local cluster',
872 887 parents=[base_parser]
873 888 )
874 889 parser_local.set_defaults(func=main_local)
875 890
876 891 parser_mpirun = subparsers.add_parser(
877 892 'mpirun',
878 893 help='run a cluster using mpirun (mpiexec also works)',
879 894 parents=[base_parser]
880 895 )
881 896 parser_mpirun.add_argument(
882 897 "--mpi",
883 898 type=str,
884 899 dest="mpi", # Don't put a default here to allow no MPI support
885 900 help="how to call MPI_Init (default=mpi4py)"
886 901 )
887 902 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
888 903
889 904 parser_mpiexec = subparsers.add_parser(
890 905 'mpiexec',
891 906 help='run a cluster using mpiexec (mpirun also works)',
892 907 parents=[base_parser]
893 908 )
894 909 parser_mpiexec.add_argument(
895 910 "--mpi",
896 911 type=str,
897 912 dest="mpi", # Don't put a default here to allow no MPI support
898 913 help="how to call MPI_Init (default=mpi4py)"
899 914 )
900 915 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
901 916
902 917 parser_pbs = subparsers.add_parser(
903 918 'pbs',
904 919 help='run a pbs cluster',
905 920 parents=[base_parser]
906 921 )
907 922 parser_pbs.add_argument(
908 923 '-s',
909 924 '--pbs-script',
910 925 type=str,
911 926 dest='pbsscript',
912 927 help='PBS script template',
913 928 default=''
914 929 )
915 930 parser_pbs.add_argument(
916 931 '-q',
917 932 '--queue',
918 933 type=str,
919 934 dest='pbsqueue',
920 935 help='PBS queue to use when starting the engines',
921 936 default=None,
922 937 )
923 938 parser_pbs.set_defaults(func=main_pbs)
924 939
925 940 parser_sge = subparsers.add_parser(
926 941 'sge',
927 942 help='run an sge cluster',
928 943 parents=[base_parser]
929 944 )
930 945 parser_sge.add_argument(
931 946 '-s',
932 947 '--sge-script',
933 948 type=str,
934 949 dest='sgescript',
935 950 help='SGE script template',
936 951 default='' # SGEEngineSet will create one if not specified
937 952 )
938 953 parser_sge.add_argument(
939 954 '-q',
940 955 '--queue',
941 956 type=str,
942 957 dest='sgequeue',
943 958 help='SGE queue to use when starting the engines',
944 959 default=None,
945 960 )
946 961 parser_sge.set_defaults(func=main_sge)
947 962
948 963 parser_lsf = subparsers.add_parser(
949 964 'lsf',
950 965 help='run an lsf cluster',
951 966 parents=[base_parser]
952 967 )
953 968
954 969 parser_lsf.add_argument(
955 970 '-s',
956 971 '--lsf-script',
957 972 type=str,
958 973 dest='lsfscript',
959 974 help='LSF script template',
960 975 default='' # LSFEngineSet will create one if not specified
961 976 )
962 977
963 978 parser_lsf.add_argument(
964 979 '-q',
965 980 '--queue',
966 981 type=str,
967 982 dest='lsfqueue',
968 983 help='LSF queue to use when starting the engines',
969 984 default=None,
970 985 )
971 986 parser_lsf.set_defaults(func=main_lsf)
972 987
973 988 parser_ssh = subparsers.add_parser(
974 989 'ssh',
975 990 help='run a cluster using ssh, should have ssh-keys setup',
976 991 parents=[base_parser]
977 992 )
978 993 parser_ssh.add_argument(
979 994 '--clusterfile',
980 995 type=str,
981 996 dest='clusterfile',
982 997 help='python file describing the cluster',
983 998 default='clusterfile.py',
984 999 )
985 1000 parser_ssh.add_argument(
986 1001 '--sshx',
987 1002 type=str,
988 1003 dest='sshx',
989 1004 help='sshx launcher helper'
990 1005 )
991 1006 parser_ssh.set_defaults(func=main_ssh)
992 1007
993 1008 args = parser.parse_args()
994 1009 return args
995 1010
996 1011 def main():
997 1012 args = get_args()
998 1013 reactor.callWhenRunning(args.func, args)
999 1014 log.startLogging(sys.stdout)
1000 1015 reactor.run()
1001 1016
1002 1017 if __name__ == '__main__':
1003 1018 main()
General Comments 0
You need to be logged in to leave comments. Login now