##// END OF EJS Templates
add --queue to ipcluster's SGE/PBS/LSF subcmds
Justin Riley -
Show More
@@ -1,948 +1,998 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import tempfile
22 22 pjoin = os.path.join
23 23
24 24 from twisted.internet import reactor, defer
25 25 from twisted.internet.protocol import ProcessProtocol
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 28 from twisted.python import failure, log
29 29
30 30 from IPython.external import argparse
31 31 from IPython.external import Itpl
32 32 from IPython.genutils import (
33 33 get_ipython_dir,
34 34 get_log_dir,
35 35 get_security_dir,
36 36 num_cpus
37 37 )
38 38 from IPython.kernel.fcutil import have_crypto
39 39
40 40 # Create various ipython directories if they don't exist.
41 41 # This must be done before IPython.kernel.config is imported.
42 42 from IPython.iplib import user_setup
43 43 if os.name == 'posix':
44 44 rc_suffix = ''
45 45 else:
46 46 rc_suffix = '.ini'
47 47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 48 get_log_dir()
49 49 get_security_dir()
50 50
51 51 from IPython.kernel.config import config_manager as kernel_config_manager
52 52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 53 from IPython.kernel.fcutil import have_crypto
54 54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 55 from IPython.kernel.util import printer
56 56
57 57 #-----------------------------------------------------------------------------
58 58 # General process handling code
59 59 #-----------------------------------------------------------------------------
60 60
61 61
62 62 class ProcessStateError(Exception):
63 63 pass
64 64
65 65 class UnknownStatus(Exception):
66 66 pass
67 67
68 68 class LauncherProcessProtocol(ProcessProtocol):
69 69 """
70 70 A ProcessProtocol to go with the ProcessLauncher.
71 71 """
72 72 def __init__(self, process_launcher):
73 73 self.process_launcher = process_launcher
74 74
75 75 def connectionMade(self):
76 76 self.process_launcher.fire_start_deferred(self.transport.pid)
77 77
78 78 def processEnded(self, status):
79 79 value = status.value
80 80 if isinstance(value, ProcessDone):
81 81 self.process_launcher.fire_stop_deferred(0)
82 82 elif isinstance(value, ProcessTerminated):
83 83 self.process_launcher.fire_stop_deferred(
84 84 {'exit_code':value.exitCode,
85 85 'signal':value.signal,
86 86 'status':value.status
87 87 }
88 88 )
89 89 else:
90 90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91 91
92 92 def outReceived(self, data):
93 93 log.msg(data)
94 94
95 95 def errReceived(self, data):
96 96 log.err(data)
97 97
98 98 class ProcessLauncher(object):
99 99 """
100 100 Start and stop an external process in an asynchronous manner.
101 101
102 102 Currently this uses deferreds to notify other parties of process state
103 103 changes. This is an awkward design and should be moved to using
104 104 a formal NotificationCenter.
105 105 """
106 106 def __init__(self, cmd_and_args):
107 107 self.cmd = cmd_and_args[0]
108 108 self.args = cmd_and_args
109 109 self._reset()
110 110
111 111 def _reset(self):
112 112 self.process_protocol = None
113 113 self.pid = None
114 114 self.start_deferred = None
115 115 self.stop_deferreds = []
116 116 self.state = 'before' # before, running, or after
117 117
118 118 @property
119 119 def running(self):
120 120 if self.state == 'running':
121 121 return True
122 122 else:
123 123 return False
124 124
125 125 def fire_start_deferred(self, pid):
126 126 self.pid = pid
127 127 self.state = 'running'
128 128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 129 self.start_deferred.callback(pid)
130 130
131 131 def start(self):
132 132 if self.state == 'before':
133 133 self.process_protocol = LauncherProcessProtocol(self)
134 134 self.start_deferred = defer.Deferred()
135 135 self.process_transport = reactor.spawnProcess(
136 136 self.process_protocol,
137 137 self.cmd,
138 138 self.args,
139 139 env=os.environ
140 140 )
141 141 return self.start_deferred
142 142 else:
143 143 s = 'the process has already been started and has state: %r' % \
144 144 self.state
145 145 return defer.fail(ProcessStateError(s))
146 146
147 147 def get_stop_deferred(self):
148 148 if self.state == 'running' or self.state == 'before':
149 149 d = defer.Deferred()
150 150 self.stop_deferreds.append(d)
151 151 return d
152 152 else:
153 153 s = 'this process is already complete'
154 154 return defer.fail(ProcessStateError(s))
155 155
156 156 def fire_stop_deferred(self, exit_code):
157 157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 158 self.state = 'after'
159 159 for d in self.stop_deferreds:
160 160 d.callback(exit_code)
161 161
162 162 def signal(self, sig):
163 163 """
164 164 Send a signal to the process.
165 165
166 166 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 167 """
168 168 if self.state == 'running':
169 169 self.process_transport.signalProcess(sig)
170 170
171 171 # def __del__(self):
172 172 # self.signal('KILL')
173 173
174 174 def interrupt_then_kill(self, delay=1.0):
175 175 self.signal('INT')
176 176 reactor.callLater(delay, self.signal, 'KILL')
177 177
178 178
179 179 #-----------------------------------------------------------------------------
180 180 # Code for launching controller and engines
181 181 #-----------------------------------------------------------------------------
182 182
183 183
184 184 class ControllerLauncher(ProcessLauncher):
185 185
186 186 def __init__(self, extra_args=None):
187 187 if sys.platform == 'win32':
188 188 # This logic is needed because the ipcontroller script doesn't
189 189 # always get installed in the same way or in the same location.
190 190 from IPython.kernel.scripts import ipcontroller
191 191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 192 # The -u option here turns on unbuffered output, which is required
193 193 # on Win32 to prevent wierd conflict and problems with Twisted.
194 194 # Also, use sys.executable to make sure we are picking up the
195 195 # right python exe.
196 196 args = [sys.executable, '-u', script_location]
197 197 else:
198 198 args = ['ipcontroller']
199 199 self.extra_args = extra_args
200 200 if extra_args is not None:
201 201 args.extend(extra_args)
202 202
203 203 ProcessLauncher.__init__(self, args)
204 204
205 205
206 206 class EngineLauncher(ProcessLauncher):
207 207
208 208 def __init__(self, extra_args=None):
209 209 if sys.platform == 'win32':
210 210 # This logic is needed because the ipcontroller script doesn't
211 211 # always get installed in the same way or in the same location.
212 212 from IPython.kernel.scripts import ipengine
213 213 script_location = ipengine.__file__.replace('.pyc', '.py')
214 214 # The -u option here turns on unbuffered output, which is required
215 215 # on Win32 to prevent wierd conflict and problems with Twisted.
216 216 # Also, use sys.executable to make sure we are picking up the
217 217 # right python exe.
218 218 args = [sys.executable, '-u', script_location]
219 219 else:
220 220 args = ['ipengine']
221 221 self.extra_args = extra_args
222 222 if extra_args is not None:
223 223 args.extend(extra_args)
224 224
225 225 ProcessLauncher.__init__(self, args)
226 226
227 227
228 228 class LocalEngineSet(object):
229 229
230 230 def __init__(self, extra_args=None):
231 231 self.extra_args = extra_args
232 232 self.launchers = []
233 233
234 234 def start(self, n):
235 235 dlist = []
236 236 for i in range(n):
237 237 print "starting engine:", i
238 238 el = EngineLauncher(extra_args=self.extra_args)
239 239 d = el.start()
240 240 self.launchers.append(el)
241 241 dlist.append(d)
242 242 dfinal = gatherBoth(dlist, consumeErrors=True)
243 243 dfinal.addCallback(self._handle_start)
244 244 return dfinal
245 245
246 246 def _handle_start(self, r):
247 247 log.msg('Engines started with pids: %r' % r)
248 248 return r
249 249
250 250 def _handle_stop(self, r):
251 251 log.msg('Engines received signal: %r' % r)
252 252 return r
253 253
254 254 def signal(self, sig):
255 255 dlist = []
256 256 for el in self.launchers:
257 257 d = el.get_stop_deferred()
258 258 dlist.append(d)
259 259 el.signal(sig)
260 260 dfinal = gatherBoth(dlist, consumeErrors=True)
261 261 dfinal.addCallback(self._handle_stop)
262 262 return dfinal
263 263
264 264 def interrupt_then_kill(self, delay=1.0):
265 265 dlist = []
266 266 for el in self.launchers:
267 267 d = el.get_stop_deferred()
268 268 dlist.append(d)
269 269 el.interrupt_then_kill(delay)
270 270 dfinal = gatherBoth(dlist, consumeErrors=True)
271 271 dfinal.addCallback(self._handle_stop)
272 272 return dfinal
273 273
274 274 class BatchEngineSet(object):
275 275
276 276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 277 name = ''
278 278 submit_command = ''
279 279 delete_command = ''
280 script_param_prefix = ''
281 280 job_id_regexp = ''
282 281 job_array_regexp = ''
282 job_array_template = ''
283 queue_regexp = ''
284 queue_template = ''
283 285 default_template = ''
284 286
285 def __init__(self, template_file, **kwargs):
287 def __init__(self, template_file, queue, **kwargs):
286 288 self.template_file = template_file
289 self.queue = queue
287 290
288 291 def parse_job_id(self, output):
289 292 m = re.search(self.job_id_regexp, output)
290 293 if m is not None:
291 294 job_id = m.group()
292 295 else:
293 296 raise Exception("job id couldn't be determined: %s" % output)
294 297 self.job_id = job_id
295 298 log.msg('Job started with job id: %r' % job_id)
296 299 return job_id
297 300
298 301 def handle_error(self, f):
299 302 f.printTraceback()
300 303 f.raiseException()
301 304
302 305 def start(self, n):
303 306 log.msg("starting %d engines" % n)
304 307 self._temp_file = tempfile.NamedTemporaryFile()
305 regex = re.compile(self.job_array_regexp)
306 308 if self.template_file:
307 309 log.msg("Using %s script %s" % (self.name, self.template_file))
308 310 contents = open(self.template_file, 'r').read()
311 new_script = contents
312 regex = re.compile(self.job_array_regexp)
309 313 if not regex.search(contents):
310 314 log.msg("adding job array settings to %s script" % self.name)
311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
312 self._temp_file.write(contents)
315 new_script = self.job_array_template % n +'\n' + new_script
316 print self.queue_regexp
317 regex = re.compile(self.queue_regexp)
318 print regex.search(contents)
319 if self.queue and not regex.search(contents):
320 log.msg("adding queue settings to %s script" % self.name)
321 new_script = self.queue_template % self.queue + '\n' + new_script
322 if new_script != contents:
323 self._temp_file.write(new_script)
313 324 self.template_file = self._temp_file.name
314 325 else:
326 default_script = self.default_template % n
327 if self.queue:
328 default_script = self.queue_template % self.queue + \
329 '\n' + default_script
315 330 log.msg("using default ipengine %s script: \n%s" %
316 (self.name, (self.default_template % n)))
317 self._temp_file.file.write(self.default_template % n)
331 (self.name, default_script))
332 self._temp_file.file.write(default_script)
318 333 self.template_file = self._temp_file.name
319 334 self._temp_file.file.flush()
320 335 d = getProcessOutput(self.submit_command,
321 336 [self.template_file],
322 337 env=os.environ)
323 338 d.addCallback(self.parse_job_id)
324 339 d.addErrback(self.handle_error)
325 340 return d
326 341
327 342 def kill(self):
328 343 d = getProcessOutput(self.delete_command,
329 344 [self.job_id],env=os.environ)
330 345 return d
331 346
332 347 class PBSEngineSet(BatchEngineSet):
333 348
334 349 name = 'PBS'
335 350 submit_command = 'qsub'
336 351 delete_command = 'qdel'
337 script_param_prefix = "#PBS"
338 352 job_id_regexp = '\d+'
339 353 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
354 job_array_template = '#PBS -t 1-%d'
355 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
356 queue_template = '#PBS -q %s'
340 357 default_template="""#PBS -V
341 358 #PBS -t 1-%d
342 359 #PBS -N ipengine
343 360 eid=$(($PBS_ARRAYID - 1))
344 361 ipengine --logfile=ipengine${eid}.log
345 362 """
346 363
347 364 class SGEEngineSet(PBSEngineSet):
348 365
349 366 name = 'SGE'
350 script_param_prefix = "#$"
351 367 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
368 job_array_template = '#$ -t 1-%d'
369 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
370 queue_template = '#$ -q %s'
352 371 default_template="""#$ -V
353 372 #$ -t 1-%d
354 373 #$ -N ipengine
355 374 eid=$(($SGE_TASK_ID - 1))
356 375 ipengine --logfile=ipengine${eid}.log
357 376 """
358 377
359 378 class LSFEngineSet(PBSEngineSet):
360 379
361 380 name = 'LSF'
362 381 submit_command = 'bsub'
363 382 delete_command = 'bkill'
364 script_param_prefix = "#BSUB"
365 job_array_regexp = '#BSUB[ \t]+\w+\[\d+-\d+\]'
366 default_template="""#BSUB ipengine[1-%d]
383 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
384 job_array_template = '#BSUB -J ipengine[1-%d]'
385 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
386 queue_template = '#BSUB -q %s'
387 default_template="""#BSUB -J ipengine[1-%d]
367 388 eid=$(($LSB_JOBINDEX - 1))
368 389 ipengine --logfile=ipengine${eid}.log
369 390 """
370 391
371 392 sshx_template="""#!/bin/sh
372 393 "$@" &> /dev/null &
373 394 echo $!
374 395 """
375 396
376 397 engine_killer_template="""#!/bin/sh
377 398 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
378 399 """
379 400
380 401 class SSHEngineSet(object):
381 402 sshx_template=sshx_template
382 403 engine_killer_template=engine_killer_template
383 404
384 405 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
385 406 """Start a controller on localhost and engines using ssh.
386 407
387 408 The engine_hosts argument is a dict with hostnames as keys and
388 409 the number of engine (int) as values. sshx is the name of a local
389 410 file that will be used to run remote commands. This file is used
390 411 to setup the environment properly.
391 412 """
392 413
393 414 self.temp_dir = tempfile.gettempdir()
394 415 if sshx is not None:
395 416 self.sshx = sshx
396 417 else:
397 418 # Write the sshx.sh file locally from our template.
398 419 self.sshx = os.path.join(
399 420 self.temp_dir,
400 421 '%s-main-sshx.sh' % os.environ['USER']
401 422 )
402 423 f = open(self.sshx, 'w')
403 424 f.writelines(self.sshx_template)
404 425 f.close()
405 426 self.engine_command = ipengine
406 427 self.engine_hosts = engine_hosts
407 428 # Write the engine killer script file locally from our template.
408 429 self.engine_killer = os.path.join(
409 430 self.temp_dir,
410 431 '%s-local-engine_killer.sh' % os.environ['USER']
411 432 )
412 433 f = open(self.engine_killer, 'w')
413 434 f.writelines(self.engine_killer_template)
414 435 f.close()
415 436
416 437 def start(self, send_furl=False):
417 438 dlist = []
418 439 for host in self.engine_hosts.keys():
419 440 count = self.engine_hosts[host]
420 441 d = self._start(host, count, send_furl)
421 442 dlist.append(d)
422 443 return gatherBoth(dlist, consumeErrors=True)
423 444
424 445 def _start(self, hostname, count=1, send_furl=False):
425 446 if send_furl:
426 447 d = self._scp_furl(hostname)
427 448 else:
428 449 d = defer.succeed(None)
429 450 d.addCallback(lambda r: self._scp_sshx(hostname))
430 451 d.addCallback(lambda r: self._ssh_engine(hostname, count))
431 452 return d
432 453
433 454 def _scp_furl(self, hostname):
434 455 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
435 456 cmd_list = scp_cmd.split()
436 457 cmd_list[1] = os.path.expanduser(cmd_list[1])
437 458 log.msg('Copying furl file: %s' % scp_cmd)
438 459 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
439 460 return d
440 461
441 462 def _scp_sshx(self, hostname):
442 463 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
443 464 self.sshx, hostname,
444 465 self.temp_dir, os.environ['USER']
445 466 )
446 467 print
447 468 log.msg("Copying sshx: %s" % scp_cmd)
448 469 sshx_scp = scp_cmd.split()
449 470 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
450 471 return d
451 472
452 473 def _ssh_engine(self, hostname, count):
453 474 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
454 475 hostname, self.temp_dir,
455 476 os.environ['USER'], self.engine_command
456 477 )
457 478 cmds = exec_engine.split()
458 479 dlist = []
459 480 log.msg("about to start engines...")
460 481 for i in range(count):
461 482 log.msg('Starting engines: %s' % exec_engine)
462 483 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
463 484 dlist.append(d)
464 485 return gatherBoth(dlist, consumeErrors=True)
465 486
466 487 def kill(self):
467 488 dlist = []
468 489 for host in self.engine_hosts.keys():
469 490 d = self._killall(host)
470 491 dlist.append(d)
471 492 return gatherBoth(dlist, consumeErrors=True)
472 493
473 494 def _killall(self, hostname):
474 495 d = self._scp_engine_killer(hostname)
475 496 d.addCallback(lambda r: self._ssh_kill(hostname))
476 497 # d.addErrback(self._exec_err)
477 498 return d
478 499
479 500 def _scp_engine_killer(self, hostname):
480 501 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
481 502 self.engine_killer,
482 503 hostname,
483 504 self.temp_dir,
484 505 os.environ['USER']
485 506 )
486 507 cmds = scp_cmd.split()
487 508 log.msg('Copying engine_killer: %s' % scp_cmd)
488 509 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
489 510 return d
490 511
491 512 def _ssh_kill(self, hostname):
492 513 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
493 514 hostname,
494 515 self.temp_dir,
495 516 os.environ['USER']
496 517 )
497 518 log.msg('Killing engine: %s' % kill_cmd)
498 519 kill_cmd = kill_cmd.split()
499 520 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
500 521 return d
501 522
502 523 def _exec_err(self, r):
503 524 log.msg(r)
504 525
505 526 #-----------------------------------------------------------------------------
506 527 # Main functions for the different types of clusters
507 528 #-----------------------------------------------------------------------------
508 529
509 530 # TODO:
510 531 # The logic in these codes should be moved into classes like LocalCluster
511 532 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
512 533 # The main functions should then just parse the command line arguments, create
513 534 # the appropriate class and call a 'start' method.
514 535
515 536
516 537 def check_security(args, cont_args):
517 538 """Check to see if we should run with SSL support."""
518 539 if (not args.x or not args.y) and not have_crypto:
519 540 log.err("""
520 541 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
521 542 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
522 543 reactor.stop()
523 544 return False
524 545 if args.x:
525 546 cont_args.append('-x')
526 547 if args.y:
527 548 cont_args.append('-y')
528 549 return True
529 550
530 551
531 552 def check_reuse(args, cont_args):
532 553 """Check to see if we should try to resuse FURL files."""
533 554 if args.r:
534 555 cont_args.append('-r')
535 556 if args.client_port == 0 or args.engine_port == 0:
536 557 log.err("""
537 558 To reuse FURL files, you must also set the client and engine ports using
538 559 the --client-port and --engine-port options.""")
539 560 reactor.stop()
540 561 return False
541 562 cont_args.append('--client-port=%i' % args.client_port)
542 563 cont_args.append('--engine-port=%i' % args.engine_port)
543 564 return True
544 565
545 566
546 567 def _err_and_stop(f):
547 568 """Errback to log a failure and halt the reactor on a fatal error."""
548 569 log.err(f)
549 570 reactor.stop()
550 571
551 572
552 573 def _delay_start(cont_pid, start_engines, furl_file, reuse):
553 574 """Wait for controller to create FURL files and the start the engines."""
554 575 if not reuse:
555 576 if os.path.isfile(furl_file):
556 577 os.unlink(furl_file)
557 578 log.msg('Waiting for controller to finish starting...')
558 579 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
559 580 d.addCallback(lambda _: log.msg('Controller started'))
560 581 d.addCallback(lambda _: start_engines(cont_pid))
561 582 return d
562 583
563 584
564 585 def main_local(args):
565 586 cont_args = []
566 587 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
567 588
568 589 # Check security settings before proceeding
569 590 if not check_security(args, cont_args):
570 591 return
571 592
572 593 # See if we are reusing FURL files
573 594 if not check_reuse(args, cont_args):
574 595 return
575 596
576 597 cl = ControllerLauncher(extra_args=cont_args)
577 598 dstart = cl.start()
578 599 def start_engines(cont_pid):
579 600 engine_args = []
580 601 engine_args.append('--logfile=%s' % \
581 602 pjoin(args.logdir,'ipengine%s-' % cont_pid))
582 603 eset = LocalEngineSet(extra_args=engine_args)
583 604 def shutdown(signum, frame):
584 605 log.msg('Stopping local cluster')
585 606 # We are still playing with the times here, but these seem
586 607 # to be reliable in allowing everything to exit cleanly.
587 608 eset.interrupt_then_kill(0.5)
588 609 cl.interrupt_then_kill(0.5)
589 610 reactor.callLater(1.0, reactor.stop)
590 611 signal.signal(signal.SIGINT,shutdown)
591 612 d = eset.start(args.n)
592 613 return d
593 614 config = kernel_config_manager.get_config_obj()
594 615 furl_file = config['controller']['engine_furl_file']
595 616 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
596 617 dstart.addErrback(_err_and_stop)
597 618
598 619
599 620 def main_mpi(args):
600 621 cont_args = []
601 622 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
602 623
603 624 # Check security settings before proceeding
604 625 if not check_security(args, cont_args):
605 626 return
606 627
607 628 # See if we are reusing FURL files
608 629 if not check_reuse(args, cont_args):
609 630 return
610 631
611 632 cl = ControllerLauncher(extra_args=cont_args)
612 633 dstart = cl.start()
613 634 def start_engines(cont_pid):
614 635 raw_args = [args.cmd]
615 636 raw_args.extend(['-n',str(args.n)])
616 637 raw_args.append('ipengine')
617 638 raw_args.append('-l')
618 639 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
619 640 if args.mpi:
620 641 raw_args.append('--mpi=%s' % args.mpi)
621 642 eset = ProcessLauncher(raw_args)
622 643 def shutdown(signum, frame):
623 644 log.msg('Stopping local cluster')
624 645 # We are still playing with the times here, but these seem
625 646 # to be reliable in allowing everything to exit cleanly.
626 647 eset.interrupt_then_kill(1.0)
627 648 cl.interrupt_then_kill(1.0)
628 649 reactor.callLater(2.0, reactor.stop)
629 650 signal.signal(signal.SIGINT,shutdown)
630 651 d = eset.start()
631 652 return d
632 653 config = kernel_config_manager.get_config_obj()
633 654 furl_file = config['controller']['engine_furl_file']
634 655 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
635 656 dstart.addErrback(_err_and_stop)
636 657
637 658
638 659 def main_pbs(args):
639 660 cont_args = []
640 661 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
641 662
642 663 # Check security settings before proceeding
643 664 if not check_security(args, cont_args):
644 665 return
645 666
646 667 # See if we are reusing FURL files
647 668 if not check_reuse(args, cont_args):
648 669 return
649 670
650 671 if args.pbsscript and not os.path.isfile(args.pbsscript):
651 672 log.err('PBS script does not exist: %s' % args.pbsscript)
652 673 return
653 674
654 675 cl = ControllerLauncher(extra_args=cont_args)
655 676 dstart = cl.start()
656 677 def start_engines(r):
657 pbs_set = PBSEngineSet(args.pbsscript)
678 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
658 679 def shutdown(signum, frame):
659 680 log.msg('Stopping PBS cluster')
660 681 d = pbs_set.kill()
661 682 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
662 683 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
663 684 signal.signal(signal.SIGINT,shutdown)
664 685 d = pbs_set.start(args.n)
665 686 return d
666 687 config = kernel_config_manager.get_config_obj()
667 688 furl_file = config['controller']['engine_furl_file']
668 689 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
669 690 dstart.addErrback(_err_and_stop)
670 691
671 692 def main_sge(args):
672 693 cont_args = []
673 694 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
674 695
675 696 # Check security settings before proceeding
676 697 if not check_security(args, cont_args):
677 698 return
678 699
679 700 # See if we are reusing FURL files
680 701 if not check_reuse(args, cont_args):
681 702 return
682 703
683 704 if args.sgescript and not os.path.isfile(args.sgescript):
684 705 log.err('SGE script does not exist: %s' % args.sgescript)
685 706 return
686 707
687 708 cl = ControllerLauncher(extra_args=cont_args)
688 709 dstart = cl.start()
689 710 def start_engines(r):
690 sge_set = SGEEngineSet(args.sgescript)
711 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
691 712 def shutdown(signum, frame):
692 713 log.msg('Stopping sge cluster')
693 714 d = sge_set.kill()
694 715 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
695 716 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
696 717 signal.signal(signal.SIGINT,shutdown)
697 718 d = sge_set.start(args.n)
698 719 return d
699 720 config = kernel_config_manager.get_config_obj()
700 721 furl_file = config['controller']['engine_furl_file']
701 722 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
702 723 dstart.addErrback(_err_and_stop)
703 724
704 725 def main_lsf(args):
705 726 cont_args = []
706 727 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
707 728
708 729 # Check security settings before proceeding
709 730 if not check_security(args, cont_args):
710 731 return
711 732
712 733 # See if we are reusing FURL files
713 734 if not check_reuse(args, cont_args):
714 735 return
715 736
716 737 if args.lsfscript and not os.path.isfile(args.lsfscript):
717 738 log.err('LSF script does not exist: %s' % args.lsfscript)
718 739 return
719 740
720 741 cl = ControllerLauncher(extra_args=cont_args)
721 742 dstart = cl.start()
722 743 def start_engines(r):
723 lsf_set = LSFEngineSet(args.lsfscript)
744 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
724 745 def shutdown(signum, frame):
725 746 log.msg('Stopping LSF cluster')
726 747 d = lsf_set.kill()
727 748 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
728 749 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
729 750 signal.signal(signal.SIGINT,shutdown)
730 751 d = lsf_set.start(args.n)
731 752 return d
732 753 config = kernel_config_manager.get_config_obj()
733 754 furl_file = config['controller']['engine_furl_file']
734 755 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
735 756 dstart.addErrback(_err_and_stop)
736 757
737 758
738 759 def main_ssh(args):
739 760 """Start a controller on localhost and engines using ssh.
740 761
741 762 Your clusterfile should look like::
742 763
743 764 send_furl = False # True, if you want
744 765 engines = {
745 766 'engine_host1' : engine_count,
746 767 'engine_host2' : engine_count2
747 768 }
748 769 """
749 770 clusterfile = {}
750 771 execfile(args.clusterfile, clusterfile)
751 772 if not clusterfile.has_key('send_furl'):
752 773 clusterfile['send_furl'] = False
753 774
754 775 cont_args = []
755 776 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
756 777
757 778 # Check security settings before proceeding
758 779 if not check_security(args, cont_args):
759 780 return
760 781
761 782 # See if we are reusing FURL files
762 783 if not check_reuse(args, cont_args):
763 784 return
764 785
765 786 cl = ControllerLauncher(extra_args=cont_args)
766 787 dstart = cl.start()
767 788 def start_engines(cont_pid):
768 789 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
769 790 def shutdown(signum, frame):
770 791 d = ssh_set.kill()
771 792 cl.interrupt_then_kill(1.0)
772 793 reactor.callLater(2.0, reactor.stop)
773 794 signal.signal(signal.SIGINT,shutdown)
774 795 d = ssh_set.start(clusterfile['send_furl'])
775 796 return d
776 797 config = kernel_config_manager.get_config_obj()
777 798 furl_file = config['controller']['engine_furl_file']
778 799 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
779 800 dstart.addErrback(_err_and_stop)
780 801
781 802
782 803 def get_args():
783 804 base_parser = argparse.ArgumentParser(add_help=False)
784 805 base_parser.add_argument(
785 806 '-r',
786 807 action='store_true',
787 808 dest='r',
788 809 help='try to reuse FURL files. Use with --client-port and --engine-port'
789 810 )
790 811 base_parser.add_argument(
791 812 '--client-port',
792 813 type=int,
793 814 dest='client_port',
794 815 help='the port the controller will listen on for client connections',
795 816 default=0
796 817 )
797 818 base_parser.add_argument(
798 819 '--engine-port',
799 820 type=int,
800 821 dest='engine_port',
801 822 help='the port the controller will listen on for engine connections',
802 823 default=0
803 824 )
804 825 base_parser.add_argument(
805 826 '-x',
806 827 action='store_true',
807 828 dest='x',
808 829 help='turn off client security'
809 830 )
810 831 base_parser.add_argument(
811 832 '-y',
812 833 action='store_true',
813 834 dest='y',
814 835 help='turn off engine security'
815 836 )
816 837 base_parser.add_argument(
817 838 "--logdir",
818 839 type=str,
819 840 dest="logdir",
820 841 help="directory to put log files (default=$IPYTHONDIR/log)",
821 842 default=pjoin(get_ipython_dir(),'log')
822 843 )
823 844 base_parser.add_argument(
824 845 "-n",
825 846 "--num",
826 847 type=int,
827 848 dest="n",
828 849 default=2,
829 850 help="the number of engines to start"
830 851 )
831 852
832 853 parser = argparse.ArgumentParser(
833 854 description='IPython cluster startup. This starts a controller and\
834 855 engines using various approaches. Use the IPYTHONDIR environment\
835 856 variable to change your IPython directory from the default of\
836 857 .ipython or _ipython. The log and security subdirectories of your\
837 858 IPython directory will be used by this script for log files and\
838 859 security files.'
839 860 )
840 861 subparsers = parser.add_subparsers(
841 862 help='available cluster types. For help, do "ipcluster TYPE --help"')
842 863
843 864 parser_local = subparsers.add_parser(
844 865 'local',
845 866 help='run a local cluster',
846 867 parents=[base_parser]
847 868 )
848 869 parser_local.set_defaults(func=main_local)
849 870
850 871 parser_mpirun = subparsers.add_parser(
851 872 'mpirun',
852 873 help='run a cluster using mpirun (mpiexec also works)',
853 874 parents=[base_parser]
854 875 )
855 876 parser_mpirun.add_argument(
856 877 "--mpi",
857 878 type=str,
858 879 dest="mpi", # Don't put a default here to allow no MPI support
859 880 help="how to call MPI_Init (default=mpi4py)"
860 881 )
861 882 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
862 883
863 884 parser_mpiexec = subparsers.add_parser(
864 885 'mpiexec',
865 886 help='run a cluster using mpiexec (mpirun also works)',
866 887 parents=[base_parser]
867 888 )
868 889 parser_mpiexec.add_argument(
869 890 "--mpi",
870 891 type=str,
871 892 dest="mpi", # Don't put a default here to allow no MPI support
872 893 help="how to call MPI_Init (default=mpi4py)"
873 894 )
874 895 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
875
896
876 897 parser_pbs = subparsers.add_parser(
877 'pbs',
898 'pbs',
878 899 help='run a pbs cluster',
879 900 parents=[base_parser]
880 901 )
881 902 parser_pbs.add_argument(
903 '-s',
882 904 '--pbs-script',
883 type=str,
905 type=str,
884 906 dest='pbsscript',
885 907 help='PBS script template',
886 908 default=''
887 909 )
910 parser_pbs.add_argument(
911 '-q',
912 '--queue',
913 type=str,
914 dest='pbsqueue',
915 help='PBS queue to use when starting the engines',
916 default=None,
917 )
888 918 parser_pbs.set_defaults(func=main_pbs)
889
919
890 920 parser_sge = subparsers.add_parser(
891 'sge',
921 'sge',
892 922 help='run an sge cluster',
893 923 parents=[base_parser]
894 924 )
895 925 parser_sge.add_argument(
926 '-s',
896 927 '--sge-script',
897 type=str,
928 type=str,
898 929 dest='sgescript',
899 930 help='SGE script template',
900 931 default='' # SGEEngineSet will create one if not specified
901 932 )
933 parser_sge.add_argument(
934 '-q',
935 '--queue',
936 type=str,
937 dest='sgequeue',
938 help='SGE queue to use when starting the engines',
939 default=None,
940 )
902 941 parser_sge.set_defaults(func=main_sge)
903 942
904 943 parser_lsf = subparsers.add_parser(
905 'lsf',
944 'lsf',
906 945 help='run an lsf cluster',
907 946 parents=[base_parser]
908 947 )
948
909 949 parser_lsf.add_argument(
950 '-s',
910 951 '--lsf-script',
911 type=str,
952 type=str,
912 953 dest='lsfscript',
913 954 help='LSF script template',
914 955 default='' # LSFEngineSet will create one if not specified
915 956 )
957
958 parser_lsf.add_argument(
959 '-q',
960 '--queue',
961 type=str,
962 dest='lsfqueue',
963 help='LSF queue to use when starting the engines',
964 default=None,
965 )
916 966 parser_lsf.set_defaults(func=main_lsf)
917 967
918 968 parser_ssh = subparsers.add_parser(
919 969 'ssh',
920 970 help='run a cluster using ssh, should have ssh-keys setup',
921 971 parents=[base_parser]
922 972 )
923 973 parser_ssh.add_argument(
924 974 '--clusterfile',
925 975 type=str,
926 976 dest='clusterfile',
927 977 help='python file describing the cluster',
928 978 default='clusterfile.py',
929 979 )
930 980 parser_ssh.add_argument(
931 981 '--sshx',
932 982 type=str,
933 983 dest='sshx',
934 984 help='sshx launcher helper'
935 985 )
936 986 parser_ssh.set_defaults(func=main_ssh)
937 987
938 988 args = parser.parse_args()
939 989 return args
940 990
941 991 def main():
942 992 args = get_args()
943 993 reactor.callWhenRunning(args.func, args)
944 994 log.startLogging(sys.stdout)
945 995 reactor.run()
946 996
947 997 if __name__ == '__main__':
948 998 main()
General Comments 0
You need to be logged in to leave comments. Login now