##// END OF EJS Templates
Remove bash-ism and use standard posix redirect in sshx.sh script....
Fernando Perez -
Show More
@@ -1,1041 +1,1041 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import stat
22 22 import tempfile
23 23 pjoin = os.path.join
24 24
25 25 from twisted.internet import reactor, defer
26 26 from twisted.internet.protocol import ProcessProtocol
27 27 from twisted.internet.error import ProcessDone, ProcessTerminated
28 28 from twisted.internet.utils import getProcessOutput
29 29 from twisted.python import failure, log
30 30
31 31 from IPython.external import argparse
32 32 from IPython.external import Itpl
33 33 from IPython.genutils import (
34 34 get_ipython_dir,
35 35 get_log_dir,
36 36 get_security_dir,
37 37 num_cpus
38 38 )
39 39 from IPython.kernel.fcutil import have_crypto
40 40
41 41 # Create various ipython directories if they don't exist.
42 42 # This must be done before IPython.kernel.config is imported.
43 43 from IPython.iplib import user_setup
44 44 if os.name == 'posix':
45 45 rc_suffix = ''
46 46 else:
47 47 rc_suffix = '.ini'
48 48 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
49 49 get_log_dir()
50 50 get_security_dir()
51 51
52 52 from IPython.kernel.config import config_manager as kernel_config_manager
53 53 from IPython.kernel.error import SecurityError, FileTimeoutError
54 54 from IPython.kernel.fcutil import have_crypto
55 55 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
56 56 from IPython.kernel.util import printer
57 57
58 58 #-----------------------------------------------------------------------------
59 59 # General process handling code
60 60 #-----------------------------------------------------------------------------
61 61
62 62
63 63 class ProcessStateError(Exception):
64 64 pass
65 65
66 66 class UnknownStatus(Exception):
67 67 pass
68 68
69 69 class LauncherProcessProtocol(ProcessProtocol):
70 70 """
71 71 A ProcessProtocol to go with the ProcessLauncher.
72 72 """
73 73 def __init__(self, process_launcher):
74 74 self.process_launcher = process_launcher
75 75
76 76 def connectionMade(self):
77 77 self.process_launcher.fire_start_deferred(self.transport.pid)
78 78
79 79 def processEnded(self, status):
80 80 value = status.value
81 81 if isinstance(value, ProcessDone):
82 82 self.process_launcher.fire_stop_deferred(0)
83 83 elif isinstance(value, ProcessTerminated):
84 84 self.process_launcher.fire_stop_deferred(
85 85 {'exit_code':value.exitCode,
86 86 'signal':value.signal,
87 87 'status':value.status
88 88 }
89 89 )
90 90 else:
91 91 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
92 92
93 93 def outReceived(self, data):
94 94 log.msg(data)
95 95
96 96 def errReceived(self, data):
97 97 log.err(data)
98 98
99 99 class ProcessLauncher(object):
100 100 """
101 101 Start and stop an external process in an asynchronous manner.
102 102
103 103 Currently this uses deferreds to notify other parties of process state
104 104 changes. This is an awkward design and should be moved to using
105 105 a formal NotificationCenter.
106 106 """
107 107 def __init__(self, cmd_and_args):
108 108 self.cmd = cmd_and_args[0]
109 109 self.args = cmd_and_args
110 110 self._reset()
111 111
112 112 def _reset(self):
113 113 self.process_protocol = None
114 114 self.pid = None
115 115 self.start_deferred = None
116 116 self.stop_deferreds = []
117 117 self.state = 'before' # before, running, or after
118 118
119 119 @property
120 120 def running(self):
121 121 if self.state == 'running':
122 122 return True
123 123 else:
124 124 return False
125 125
126 126 def fire_start_deferred(self, pid):
127 127 self.pid = pid
128 128 self.state = 'running'
129 129 log.msg('Process %r has started with pid=%i' % (self.args, pid))
130 130 self.start_deferred.callback(pid)
131 131
132 132 def start(self):
133 133 if self.state == 'before':
134 134 self.process_protocol = LauncherProcessProtocol(self)
135 135 self.start_deferred = defer.Deferred()
136 136 self.process_transport = reactor.spawnProcess(
137 137 self.process_protocol,
138 138 self.cmd,
139 139 self.args,
140 140 env=os.environ
141 141 )
142 142 return self.start_deferred
143 143 else:
144 144 s = 'the process has already been started and has state: %r' % \
145 145 self.state
146 146 return defer.fail(ProcessStateError(s))
147 147
148 148 def get_stop_deferred(self):
149 149 if self.state == 'running' or self.state == 'before':
150 150 d = defer.Deferred()
151 151 self.stop_deferreds.append(d)
152 152 return d
153 153 else:
154 154 s = 'this process is already complete'
155 155 return defer.fail(ProcessStateError(s))
156 156
157 157 def fire_stop_deferred(self, exit_code):
158 158 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
159 159 self.state = 'after'
160 160 for d in self.stop_deferreds:
161 161 d.callback(exit_code)
162 162
163 163 def signal(self, sig):
164 164 """
165 165 Send a signal to the process.
166 166
167 167 The argument sig can be ('KILL','INT', etc.) or any signal number.
168 168 """
169 169 if self.state == 'running':
170 170 self.process_transport.signalProcess(sig)
171 171
172 172 # def __del__(self):
173 173 # self.signal('KILL')
174 174
175 175 def interrupt_then_kill(self, delay=1.0):
176 176 self.signal('INT')
177 177 reactor.callLater(delay, self.signal, 'KILL')
178 178
179 179
180 180 #-----------------------------------------------------------------------------
181 181 # Code for launching controller and engines
182 182 #-----------------------------------------------------------------------------
183 183
184 184
185 185 class ControllerLauncher(ProcessLauncher):
186 186
187 187 def __init__(self, extra_args=None):
188 188 if sys.platform == 'win32':
189 189 # This logic is needed because the ipcontroller script doesn't
190 190 # always get installed in the same way or in the same location.
191 191 from IPython.kernel.scripts import ipcontroller
192 192 script_location = ipcontroller.__file__.replace('.pyc', '.py')
193 193 # The -u option here turns on unbuffered output, which is required
194 194 # on Win32 to prevent wierd conflict and problems with Twisted.
195 195 # Also, use sys.executable to make sure we are picking up the
196 196 # right python exe.
197 197 args = [sys.executable, '-u', script_location]
198 198 else:
199 199 args = ['ipcontroller']
200 200 self.extra_args = extra_args
201 201 if extra_args is not None:
202 202 args.extend(extra_args)
203 203
204 204 ProcessLauncher.__init__(self, args)
205 205
206 206
207 207 class EngineLauncher(ProcessLauncher):
208 208
209 209 def __init__(self, extra_args=None):
210 210 if sys.platform == 'win32':
211 211 # This logic is needed because the ipcontroller script doesn't
212 212 # always get installed in the same way or in the same location.
213 213 from IPython.kernel.scripts import ipengine
214 214 script_location = ipengine.__file__.replace('.pyc', '.py')
215 215 # The -u option here turns on unbuffered output, which is required
216 216 # on Win32 to prevent wierd conflict and problems with Twisted.
217 217 # Also, use sys.executable to make sure we are picking up the
218 218 # right python exe.
219 219 args = [sys.executable, '-u', script_location]
220 220 else:
221 221 args = ['ipengine']
222 222 self.extra_args = extra_args
223 223 if extra_args is not None:
224 224 args.extend(extra_args)
225 225
226 226 ProcessLauncher.__init__(self, args)
227 227
228 228
229 229 class LocalEngineSet(object):
230 230
231 231 def __init__(self, extra_args=None):
232 232 self.extra_args = extra_args
233 233 self.launchers = []
234 234
235 235 def start(self, n):
236 236 dlist = []
237 237 for i in range(n):
238 238 print "starting engine:", i
239 239 el = EngineLauncher(extra_args=self.extra_args)
240 240 d = el.start()
241 241 self.launchers.append(el)
242 242 dlist.append(d)
243 243 dfinal = gatherBoth(dlist, consumeErrors=True)
244 244 dfinal.addCallback(self._handle_start)
245 245 return dfinal
246 246
247 247 def _handle_start(self, r):
248 248 log.msg('Engines started with pids: %r' % r)
249 249 return r
250 250
251 251 def _handle_stop(self, r):
252 252 log.msg('Engines received signal: %r' % r)
253 253 return r
254 254
255 255 def signal(self, sig):
256 256 dlist = []
257 257 for el in self.launchers:
258 258 d = el.get_stop_deferred()
259 259 dlist.append(d)
260 260 el.signal(sig)
261 261 dfinal = gatherBoth(dlist, consumeErrors=True)
262 262 dfinal.addCallback(self._handle_stop)
263 263 return dfinal
264 264
265 265 def interrupt_then_kill(self, delay=1.0):
266 266 dlist = []
267 267 for el in self.launchers:
268 268 d = el.get_stop_deferred()
269 269 dlist.append(d)
270 270 el.interrupt_then_kill(delay)
271 271 dfinal = gatherBoth(dlist, consumeErrors=True)
272 272 dfinal.addCallback(self._handle_stop)
273 273 return dfinal
274 274
275 275 class BatchEngineSet(object):
276 276
277 277 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
278 278 name = ''
279 279 submit_command = ''
280 280 delete_command = ''
281 281 job_id_regexp = ''
282 282 job_array_regexp = ''
283 283 job_array_template = ''
284 284 queue_regexp = ''
285 285 queue_template = ''
286 286 default_template = ''
287 287
288 288 def __init__(self, template_file, queue, **kwargs):
289 289 self.template_file = template_file
290 290 self.queue = queue
291 291
292 292 def parse_job_id(self, output):
293 293 m = re.search(self.job_id_regexp, output)
294 294 if m is not None:
295 295 job_id = m.group()
296 296 else:
297 297 raise Exception("job id couldn't be determined: %s" % output)
298 298 self.job_id = job_id
299 299 log.msg('Job started with job id: %r' % job_id)
300 300 return job_id
301 301
302 302 def handle_error(self, f):
303 303 f.printTraceback()
304 304 f.raiseException()
305 305
306 306 def start(self, n):
307 307 log.msg("starting %d engines" % n)
308 308 self._temp_file = tempfile.NamedTemporaryFile()
309 309 os.chmod(self._temp_file.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
310 310 if self.template_file:
311 311 log.msg("Using %s script %s" % (self.name, self.template_file))
312 312 contents = open(self.template_file, 'r').read()
313 313 new_script = contents
314 314 regex = re.compile(self.job_array_regexp)
315 315 if not regex.search(contents):
316 316 log.msg("adding job array settings to %s script" % self.name)
317 317 new_script = self.job_array_template % n +'\n' + new_script
318 318 print self.queue_regexp
319 319 regex = re.compile(self.queue_regexp)
320 320 print regex.search(contents)
321 321 if self.queue and not regex.search(contents):
322 322 log.msg("adding queue settings to %s script" % self.name)
323 323 new_script = self.queue_template % self.queue + '\n' + new_script
324 324 if new_script != contents:
325 325 self._temp_file.write(new_script)
326 326 self.template_file = self._temp_file.name
327 327 else:
328 328 default_script = self.default_template % n
329 329 if self.queue:
330 330 default_script = self.queue_template % self.queue + \
331 331 '\n' + default_script
332 332 log.msg("using default ipengine %s script: \n%s" %
333 333 (self.name, default_script))
334 334 self._temp_file.file.write(default_script)
335 335 self.template_file = self._temp_file.name
336 336 self._temp_file.file.close()
337 337 d = getProcessOutput(self.submit_command,
338 338 [self.template_file],
339 339 env=os.environ)
340 340 d.addCallback(self.parse_job_id)
341 341 d.addErrback(self.handle_error)
342 342 return d
343 343
344 344 def kill(self):
345 345 d = getProcessOutput(self.delete_command,
346 346 [self.job_id],env=os.environ)
347 347 return d
348 348
349 349 class PBSEngineSet(BatchEngineSet):
350 350
351 351 name = 'PBS'
352 352 submit_command = 'qsub'
353 353 delete_command = 'qdel'
354 354 job_id_regexp = '\d+'
355 355 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
356 356 job_array_template = '#PBS -t 1-%d'
357 357 queue_regexp = '#PBS[ \t]+-q[ \t]+\w+'
358 358 queue_template = '#PBS -q %s'
359 359 default_template="""#!/bin/sh
360 360 #PBS -V
361 361 #PBS -t 1-%d
362 362 #PBS -N ipengine
363 363 eid=$(($PBS_ARRAYID - 1))
364 364 ipengine --logfile=ipengine${eid}.log
365 365 """
366 366
367 367 class SGEEngineSet(PBSEngineSet):
368 368
369 369 name = 'SGE'
370 370 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
371 371 job_array_template = '#$ -t 1-%d'
372 372 queue_regexp = '#\$[ \t]+-q[ \t]+\w+'
373 373 queue_template = '#$ -q %s'
374 374 default_template="""#$ -V
375 375 #$ -S /bin/sh
376 376 #$ -t 1-%d
377 377 #$ -N ipengine
378 378 eid=$(($SGE_TASK_ID - 1))
379 379 ipengine --logfile=ipengine${eid}.log
380 380 """
381 381
382 382 class LSFEngineSet(PBSEngineSet):
383 383
384 384 name = 'LSF'
385 385 submit_command = 'bsub'
386 386 delete_command = 'bkill'
387 387 job_array_regexp = '#BSUB[ \t]-J+\w+\[\d+-\d+\]'
388 388 job_array_template = '#BSUB -J ipengine[1-%d]'
389 389 queue_regexp = '#BSUB[ \t]+-q[ \t]+\w+'
390 390 queue_template = '#BSUB -q %s'
391 391 default_template="""#!/bin/sh
392 392 #BSUB -J ipengine[1-%d]
393 393 eid=$(($LSB_JOBINDEX - 1))
394 394 ipengine --logfile=ipengine${eid}.log
395 395 """
396 396 bsub_wrapper="""#!/bin/sh
397 397 bsub < $1
398 398 """
399 399
400 400 def __init__(self, template_file, queue, **kwargs):
401 401 self._bsub_wrapper = self._make_bsub_wrapper()
402 402 self.submit_command = self._bsub_wrapper.name
403 403 PBSEngineSet.__init__(self,template_file, queue, **kwargs)
404 404
405 405 def _make_bsub_wrapper(self):
406 406 bsub_wrapper = tempfile.NamedTemporaryFile()
407 407 bsub_wrapper.write(self.bsub_wrapper)
408 408 bsub_wrapper.file.close()
409 409 os.chmod(bsub_wrapper.name, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
410 410 return bsub_wrapper
411 411
412 412 sshx_template_prefix="""#!/bin/sh
413 413 """
414 sshx_template_suffix=""""$@" &> /dev/null &
414 sshx_template_suffix=""""$@" > /dev/null 2>&1 &
415 415 echo $!
416 416 """
417 417
418 418 engine_killer_template="""#!/bin/sh
419 419 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
420 420 """
421 421
422 422 def escape_strings(val):
423 423 val = val.replace('(','\(')
424 424 val = val.replace(')','\)')
425 425 if ' ' in val:
426 426 val = '"%s"'%val
427 427 return val
428 428
429 429 class SSHEngineSet(object):
430 430 sshx_template_prefix=sshx_template_prefix
431 431 sshx_template_suffix=sshx_template_suffix
432 432 engine_killer_template=engine_killer_template
433 433
434 434 def __init__(self, engine_hosts, sshx=None, copyenvs=None, ipengine="ipengine"):
435 435 """Start a controller on localhost and engines using ssh.
436 436
437 437 The engine_hosts argument is a dict with hostnames as keys and
438 438 the number of engine (int) as values. sshx is the name of a local
439 439 file that will be used to run remote commands. This file is used
440 440 to setup the environment properly.
441 441 """
442 442
443 443 self.temp_dir = tempfile.gettempdir()
444 444 if sshx is not None:
445 445 self.sshx = sshx
446 446 else:
447 447 # Write the sshx.sh file locally from our template.
448 448 self.sshx = os.path.join(
449 449 self.temp_dir,
450 450 '%s-main-sshx.sh' % os.environ['USER']
451 451 )
452 452 f = open(self.sshx, 'w')
453 453 f.writelines(self.sshx_template_prefix)
454 454 if copyenvs:
455 455 for key, val in sorted(os.environ.items()):
456 456 newval = escape_strings(val)
457 457 f.writelines('export %s=%s\n'%(key,newval))
458 458 f.writelines(self.sshx_template_suffix)
459 459 f.close()
460 460 self.engine_command = ipengine
461 461 self.engine_hosts = engine_hosts
462 462 # Write the engine killer script file locally from our template.
463 463 self.engine_killer = os.path.join(
464 464 self.temp_dir,
465 465 '%s-local-engine_killer.sh' % os.environ['USER']
466 466 )
467 467 f = open(self.engine_killer, 'w')
468 468 f.writelines(self.engine_killer_template)
469 469 f.close()
470 470
471 471 def start(self, send_furl=False):
472 472 dlist = []
473 473 for host in self.engine_hosts.keys():
474 474 count = self.engine_hosts[host]
475 475 d = self._start(host, count, send_furl)
476 476 dlist.append(d)
477 477 return gatherBoth(dlist, consumeErrors=True)
478 478
479 479 def _start(self, hostname, count=1, send_furl=False):
480 480 if send_furl:
481 481 d = self._scp_furl(hostname)
482 482 else:
483 483 d = defer.succeed(None)
484 484 d.addCallback(lambda r: self._scp_sshx(hostname))
485 485 d.addCallback(lambda r: self._ssh_engine(hostname, count))
486 486 return d
487 487
488 488 def _scp_furl(self, hostname):
489 489 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
490 490 cmd_list = scp_cmd.split()
491 491 cmd_list[1] = os.path.expanduser(cmd_list[1])
492 492 log.msg('Copying furl file: %s' % scp_cmd)
493 493 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
494 494 return d
495 495
496 496 def _scp_sshx(self, hostname):
497 497 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
498 498 self.sshx, hostname,
499 499 self.temp_dir, os.environ['USER']
500 500 )
501 501 print
502 502 log.msg("Copying sshx: %s" % scp_cmd)
503 503 sshx_scp = scp_cmd.split()
504 504 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
505 505 return d
506 506
507 507 def _ssh_engine(self, hostname, count):
508 508 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
509 509 hostname, self.temp_dir,
510 510 os.environ['USER'], self.engine_command
511 511 )
512 512 cmds = exec_engine.split()
513 513 dlist = []
514 514 log.msg("about to start engines...")
515 515 for i in range(count):
516 516 log.msg('Starting engines: %s' % exec_engine)
517 517 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
518 518 dlist.append(d)
519 519 return gatherBoth(dlist, consumeErrors=True)
520 520
521 521 def kill(self):
522 522 dlist = []
523 523 for host in self.engine_hosts.keys():
524 524 d = self._killall(host)
525 525 dlist.append(d)
526 526 return gatherBoth(dlist, consumeErrors=True)
527 527
528 528 def _killall(self, hostname):
529 529 d = self._scp_engine_killer(hostname)
530 530 d.addCallback(lambda r: self._ssh_kill(hostname))
531 531 # d.addErrback(self._exec_err)
532 532 return d
533 533
534 534 def _scp_engine_killer(self, hostname):
535 535 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
536 536 self.engine_killer,
537 537 hostname,
538 538 self.temp_dir,
539 539 os.environ['USER']
540 540 )
541 541 cmds = scp_cmd.split()
542 542 log.msg('Copying engine_killer: %s' % scp_cmd)
543 543 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
544 544 return d
545 545
546 546 def _ssh_kill(self, hostname):
547 547 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
548 548 hostname,
549 549 self.temp_dir,
550 550 os.environ['USER']
551 551 )
552 552 log.msg('Killing engine: %s' % kill_cmd)
553 553 kill_cmd = kill_cmd.split()
554 554 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
555 555 return d
556 556
557 557 def _exec_err(self, r):
558 558 log.msg(r)
559 559
560 560 #-----------------------------------------------------------------------------
561 561 # Main functions for the different types of clusters
562 562 #-----------------------------------------------------------------------------
563 563
564 564 # TODO:
565 565 # The logic in these codes should be moved into classes like LocalCluster
566 566 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
567 567 # The main functions should then just parse the command line arguments, create
568 568 # the appropriate class and call a 'start' method.
569 569
570 570
571 571 def check_security(args, cont_args):
572 572 """Check to see if we should run with SSL support."""
573 573 if (not args.x or not args.y) and not have_crypto:
574 574 log.err("""
575 575 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
576 576 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
577 577 reactor.stop()
578 578 return False
579 579 if args.x:
580 580 cont_args.append('-x')
581 581 if args.y:
582 582 cont_args.append('-y')
583 583 return True
584 584
585 585
586 586 def check_reuse(args, cont_args):
587 587 """Check to see if we should try to resuse FURL files."""
588 588 if args.r:
589 589 cont_args.append('-r')
590 590 if args.client_port == 0 or args.engine_port == 0:
591 591 log.err("""
592 592 To reuse FURL files, you must also set the client and engine ports using
593 593 the --client-port and --engine-port options.""")
594 594 reactor.stop()
595 595 return False
596 596 cont_args.append('--client-port=%i' % args.client_port)
597 597 cont_args.append('--engine-port=%i' % args.engine_port)
598 598 return True
599 599
600 600
601 601 def _err_and_stop(f):
602 602 """Errback to log a failure and halt the reactor on a fatal error."""
603 603 log.err(f)
604 604 reactor.stop()
605 605
606 606
607 607 def _delay_start(cont_pid, start_engines, furl_file, reuse):
608 608 """Wait for controller to create FURL files and the start the engines."""
609 609 if not reuse:
610 610 if os.path.isfile(furl_file):
611 611 os.unlink(furl_file)
612 612 log.msg('Waiting for controller to finish starting...')
613 613 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
614 614 d.addCallback(lambda _: log.msg('Controller started'))
615 615 d.addCallback(lambda _: start_engines(cont_pid))
616 616 return d
617 617
618 618
619 619 def main_local(args):
620 620 cont_args = []
621 621 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
622 622
623 623 # Check security settings before proceeding
624 624 if not check_security(args, cont_args):
625 625 return
626 626
627 627 # See if we are reusing FURL files
628 628 if not check_reuse(args, cont_args):
629 629 return
630 630
631 631 cl = ControllerLauncher(extra_args=cont_args)
632 632 dstart = cl.start()
633 633 def start_engines(cont_pid):
634 634 engine_args = []
635 635 engine_args.append('--logfile=%s' % \
636 636 pjoin(args.logdir,'ipengine%s-' % cont_pid))
637 637 eset = LocalEngineSet(extra_args=engine_args)
638 638 def shutdown(signum, frame):
639 639 log.msg('Stopping local cluster')
640 640 # We are still playing with the times here, but these seem
641 641 # to be reliable in allowing everything to exit cleanly.
642 642 eset.interrupt_then_kill(0.5)
643 643 cl.interrupt_then_kill(0.5)
644 644 reactor.callLater(1.0, reactor.stop)
645 645 signal.signal(signal.SIGINT,shutdown)
646 646 d = eset.start(args.n)
647 647 return d
648 648 config = kernel_config_manager.get_config_obj()
649 649 furl_file = config['controller']['engine_furl_file']
650 650 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
651 651 dstart.addErrback(_err_and_stop)
652 652
653 653
654 654 def main_mpi(args):
655 655 cont_args = []
656 656 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
657 657
658 658 # Check security settings before proceeding
659 659 if not check_security(args, cont_args):
660 660 return
661 661
662 662 # See if we are reusing FURL files
663 663 if not check_reuse(args, cont_args):
664 664 return
665 665
666 666 cl = ControllerLauncher(extra_args=cont_args)
667 667 dstart = cl.start()
668 668 def start_engines(cont_pid):
669 669 raw_args = [args.cmd]
670 670 raw_args.extend(['-n',str(args.n)])
671 671 raw_args.append('ipengine')
672 672 raw_args.append('-l')
673 673 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
674 674 if args.mpi:
675 675 raw_args.append('--mpi=%s' % args.mpi)
676 676 eset = ProcessLauncher(raw_args)
677 677 def shutdown(signum, frame):
678 678 log.msg('Stopping local cluster')
679 679 # We are still playing with the times here, but these seem
680 680 # to be reliable in allowing everything to exit cleanly.
681 681 eset.interrupt_then_kill(1.0)
682 682 cl.interrupt_then_kill(1.0)
683 683 reactor.callLater(2.0, reactor.stop)
684 684 signal.signal(signal.SIGINT,shutdown)
685 685 d = eset.start()
686 686 return d
687 687 config = kernel_config_manager.get_config_obj()
688 688 furl_file = config['controller']['engine_furl_file']
689 689 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
690 690 dstart.addErrback(_err_and_stop)
691 691
692 692
693 693 def main_pbs(args):
694 694 cont_args = []
695 695 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
696 696
697 697 # Check security settings before proceeding
698 698 if not check_security(args, cont_args):
699 699 return
700 700
701 701 # See if we are reusing FURL files
702 702 if not check_reuse(args, cont_args):
703 703 return
704 704
705 705 if args.pbsscript and not os.path.isfile(args.pbsscript):
706 706 log.err('PBS script does not exist: %s' % args.pbsscript)
707 707 return
708 708
709 709 cl = ControllerLauncher(extra_args=cont_args)
710 710 dstart = cl.start()
711 711 def start_engines(r):
712 712 pbs_set = PBSEngineSet(args.pbsscript, args.pbsqueue)
713 713 def shutdown(signum, frame):
714 714 log.msg('Stopping PBS cluster')
715 715 d = pbs_set.kill()
716 716 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
717 717 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
718 718 signal.signal(signal.SIGINT,shutdown)
719 719 d = pbs_set.start(args.n)
720 720 return d
721 721 config = kernel_config_manager.get_config_obj()
722 722 furl_file = config['controller']['engine_furl_file']
723 723 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
724 724 dstart.addErrback(_err_and_stop)
725 725
726 726 def main_sge(args):
727 727 cont_args = []
728 728 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
729 729
730 730 # Check security settings before proceeding
731 731 if not check_security(args, cont_args):
732 732 return
733 733
734 734 # See if we are reusing FURL files
735 735 if not check_reuse(args, cont_args):
736 736 return
737 737
738 738 if args.sgescript and not os.path.isfile(args.sgescript):
739 739 log.err('SGE script does not exist: %s' % args.sgescript)
740 740 return
741 741
742 742 cl = ControllerLauncher(extra_args=cont_args)
743 743 dstart = cl.start()
744 744 def start_engines(r):
745 745 sge_set = SGEEngineSet(args.sgescript, args.sgequeue)
746 746 def shutdown(signum, frame):
747 747 log.msg('Stopping sge cluster')
748 748 d = sge_set.kill()
749 749 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
750 750 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
751 751 signal.signal(signal.SIGINT,shutdown)
752 752 d = sge_set.start(args.n)
753 753 return d
754 754 config = kernel_config_manager.get_config_obj()
755 755 furl_file = config['controller']['engine_furl_file']
756 756 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
757 757 dstart.addErrback(_err_and_stop)
758 758
759 759 def main_lsf(args):
760 760 cont_args = []
761 761 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
762 762
763 763 # Check security settings before proceeding
764 764 if not check_security(args, cont_args):
765 765 return
766 766
767 767 # See if we are reusing FURL files
768 768 if not check_reuse(args, cont_args):
769 769 return
770 770
771 771 if args.lsfscript and not os.path.isfile(args.lsfscript):
772 772 log.err('LSF script does not exist: %s' % args.lsfscript)
773 773 return
774 774
775 775 cl = ControllerLauncher(extra_args=cont_args)
776 776 dstart = cl.start()
777 777 def start_engines(r):
778 778 lsf_set = LSFEngineSet(args.lsfscript, args.lsfqueue)
779 779 def shutdown(signum, frame):
780 780 log.msg('Stopping LSF cluster')
781 781 d = lsf_set.kill()
782 782 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
783 783 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
784 784 signal.signal(signal.SIGINT,shutdown)
785 785 d = lsf_set.start(args.n)
786 786 return d
787 787 config = kernel_config_manager.get_config_obj()
788 788 furl_file = config['controller']['engine_furl_file']
789 789 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
790 790 dstart.addErrback(_err_and_stop)
791 791
792 792
793 793 def main_ssh(args):
794 794 """Start a controller on localhost and engines using ssh.
795 795
796 796 Your clusterfile should look like::
797 797
798 798 send_furl = False # True, if you want
799 799 engines = {
800 800 'engine_host1' : engine_count,
801 801 'engine_host2' : engine_count2
802 802 }
803 803 """
804 804 clusterfile = {}
805 805 execfile(args.clusterfile, clusterfile)
806 806 if not clusterfile.has_key('send_furl'):
807 807 clusterfile['send_furl'] = False
808 808
809 809 cont_args = []
810 810 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
811 811
812 812 # Check security settings before proceeding
813 813 if not check_security(args, cont_args):
814 814 return
815 815
816 816 # See if we are reusing FURL files
817 817 if not check_reuse(args, cont_args):
818 818 return
819 819
820 820 cl = ControllerLauncher(extra_args=cont_args)
821 821 dstart = cl.start()
822 822 def start_engines(cont_pid):
823 823 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx,
824 824 copyenvs=args.copyenvs)
825 825 def shutdown(signum, frame):
826 826 d = ssh_set.kill()
827 827 cl.interrupt_then_kill(1.0)
828 828 reactor.callLater(2.0, reactor.stop)
829 829 signal.signal(signal.SIGINT,shutdown)
830 830 d = ssh_set.start(clusterfile['send_furl'])
831 831 return d
832 832 config = kernel_config_manager.get_config_obj()
833 833 furl_file = config['controller']['engine_furl_file']
834 834 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
835 835 dstart.addErrback(_err_and_stop)
836 836
837 837
838 838 def get_args():
839 839 base_parser = argparse.ArgumentParser(add_help=False)
840 840 base_parser.add_argument(
841 841 '-r',
842 842 action='store_true',
843 843 dest='r',
844 844 help='try to reuse FURL files. Use with --client-port and --engine-port'
845 845 )
846 846 base_parser.add_argument(
847 847 '--client-port',
848 848 type=int,
849 849 dest='client_port',
850 850 help='the port the controller will listen on for client connections',
851 851 default=0
852 852 )
853 853 base_parser.add_argument(
854 854 '--engine-port',
855 855 type=int,
856 856 dest='engine_port',
857 857 help='the port the controller will listen on for engine connections',
858 858 default=0
859 859 )
860 860 base_parser.add_argument(
861 861 '-x',
862 862 action='store_true',
863 863 dest='x',
864 864 help='turn off client security'
865 865 )
866 866 base_parser.add_argument(
867 867 '-y',
868 868 action='store_true',
869 869 dest='y',
870 870 help='turn off engine security'
871 871 )
872 872 base_parser.add_argument(
873 873 "--logdir",
874 874 type=str,
875 875 dest="logdir",
876 876 help="directory to put log files (default=$IPYTHONDIR/log)",
877 877 default=pjoin(get_ipython_dir(),'log')
878 878 )
879 879 base_parser.add_argument(
880 880 "-n",
881 881 "--num",
882 882 type=int,
883 883 dest="n",
884 884 default=2,
885 885 help="the number of engines to start"
886 886 )
887 887
888 888 parser = argparse.ArgumentParser(
889 889 description='IPython cluster startup. This starts a controller and\
890 890 engines using various approaches. Use the IPYTHONDIR environment\
891 891 variable to change your IPython directory from the default of\
892 892 .ipython or _ipython. The log and security subdirectories of your\
893 893 IPython directory will be used by this script for log files and\
894 894 security files.'
895 895 )
896 896 subparsers = parser.add_subparsers(
897 897 help='available cluster types. For help, do "ipcluster TYPE --help"')
898 898
899 899 parser_local = subparsers.add_parser(
900 900 'local',
901 901 help='run a local cluster',
902 902 parents=[base_parser]
903 903 )
904 904 parser_local.set_defaults(func=main_local)
905 905
906 906 parser_mpirun = subparsers.add_parser(
907 907 'mpirun',
908 908 help='run a cluster using mpirun (mpiexec also works)',
909 909 parents=[base_parser]
910 910 )
911 911 parser_mpirun.add_argument(
912 912 "--mpi",
913 913 type=str,
914 914 dest="mpi", # Don't put a default here to allow no MPI support
915 915 help="how to call MPI_Init (default=mpi4py)"
916 916 )
917 917 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
918 918
919 919 parser_mpiexec = subparsers.add_parser(
920 920 'mpiexec',
921 921 help='run a cluster using mpiexec (mpirun also works)',
922 922 parents=[base_parser]
923 923 )
924 924 parser_mpiexec.add_argument(
925 925 "--mpi",
926 926 type=str,
927 927 dest="mpi", # Don't put a default here to allow no MPI support
928 928 help="how to call MPI_Init (default=mpi4py)"
929 929 )
930 930 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
931 931
932 932 parser_pbs = subparsers.add_parser(
933 933 'pbs',
934 934 help='run a pbs cluster',
935 935 parents=[base_parser]
936 936 )
937 937 parser_pbs.add_argument(
938 938 '-s',
939 939 '--pbs-script',
940 940 type=str,
941 941 dest='pbsscript',
942 942 help='PBS script template',
943 943 default=''
944 944 )
945 945 parser_pbs.add_argument(
946 946 '-q',
947 947 '--queue',
948 948 type=str,
949 949 dest='pbsqueue',
950 950 help='PBS queue to use when starting the engines',
951 951 default=None,
952 952 )
953 953 parser_pbs.set_defaults(func=main_pbs)
954 954
955 955 parser_sge = subparsers.add_parser(
956 956 'sge',
957 957 help='run an sge cluster',
958 958 parents=[base_parser]
959 959 )
960 960 parser_sge.add_argument(
961 961 '-s',
962 962 '--sge-script',
963 963 type=str,
964 964 dest='sgescript',
965 965 help='SGE script template',
966 966 default='' # SGEEngineSet will create one if not specified
967 967 )
968 968 parser_sge.add_argument(
969 969 '-q',
970 970 '--queue',
971 971 type=str,
972 972 dest='sgequeue',
973 973 help='SGE queue to use when starting the engines',
974 974 default=None,
975 975 )
976 976 parser_sge.set_defaults(func=main_sge)
977 977
978 978 parser_lsf = subparsers.add_parser(
979 979 'lsf',
980 980 help='run an lsf cluster',
981 981 parents=[base_parser]
982 982 )
983 983
984 984 parser_lsf.add_argument(
985 985 '-s',
986 986 '--lsf-script',
987 987 type=str,
988 988 dest='lsfscript',
989 989 help='LSF script template',
990 990 default='' # LSFEngineSet will create one if not specified
991 991 )
992 992
993 993 parser_lsf.add_argument(
994 994 '-q',
995 995 '--queue',
996 996 type=str,
997 997 dest='lsfqueue',
998 998 help='LSF queue to use when starting the engines',
999 999 default=None,
1000 1000 )
1001 1001 parser_lsf.set_defaults(func=main_lsf)
1002 1002
1003 1003 parser_ssh = subparsers.add_parser(
1004 1004 'ssh',
1005 1005 help='run a cluster using ssh, should have ssh-keys setup',
1006 1006 parents=[base_parser]
1007 1007 )
1008 1008 parser_ssh.add_argument(
1009 1009 '-e',
1010 1010 '--copyenvs',
1011 1011 action='store_true',
1012 1012 dest='copyenvs',
1013 1013 help='Copy current shell environment to remote location',
1014 1014 default=False,
1015 1015 )
1016 1016 parser_ssh.add_argument(
1017 1017 '--clusterfile',
1018 1018 type=str,
1019 1019 dest='clusterfile',
1020 1020 help='python file describing the cluster',
1021 1021 default='clusterfile.py',
1022 1022 )
1023 1023 parser_ssh.add_argument(
1024 1024 '--sshx',
1025 1025 type=str,
1026 1026 dest='sshx',
1027 1027 help='sshx launcher helper'
1028 1028 )
1029 1029 parser_ssh.set_defaults(func=main_ssh)
1030 1030
1031 1031 args = parser.parse_args()
1032 1032 return args
1033 1033
1034 1034 def main():
1035 1035 args = get_args()
1036 1036 reactor.callWhenRunning(args.func, args)
1037 1037 log.startLogging(sys.stdout)
1038 1038 reactor.run()
1039 1039
1040 1040 if __name__ == '__main__':
1041 1041 main()
General Comments 0
You need to be logged in to leave comments. Login now