##// END OF EJS Templates
update PBSEngineSet to use job arrays...
Justin Riley -
Show More
@@ -1,908 +1,885 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import tempfile
22 22 pjoin = os.path.join
23 23
24 24 from twisted.internet import reactor, defer
25 25 from twisted.internet.protocol import ProcessProtocol
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 28 from twisted.python import failure, log
29 29
30 30 from IPython.external import argparse
31 31 from IPython.external import Itpl
32 32 from IPython.genutils import (
33 33 get_ipython_dir,
34 34 get_log_dir,
35 35 get_security_dir,
36 36 num_cpus
37 37 )
38 38 from IPython.kernel.fcutil import have_crypto
39 39
40 40 # Create various ipython directories if they don't exist.
41 41 # This must be done before IPython.kernel.config is imported.
42 42 from IPython.iplib import user_setup
43 43 if os.name == 'posix':
44 44 rc_suffix = ''
45 45 else:
46 46 rc_suffix = '.ini'
47 47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 48 get_log_dir()
49 49 get_security_dir()
50 50
51 51 from IPython.kernel.config import config_manager as kernel_config_manager
52 52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 53 from IPython.kernel.fcutil import have_crypto
54 54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 55 from IPython.kernel.util import printer
56 56
57 57 #-----------------------------------------------------------------------------
58 58 # General process handling code
59 59 #-----------------------------------------------------------------------------
60 60
61 61
62 62 class ProcessStateError(Exception):
63 63 pass
64 64
65 65 class UnknownStatus(Exception):
66 66 pass
67 67
68 68 class LauncherProcessProtocol(ProcessProtocol):
69 69 """
70 70 A ProcessProtocol to go with the ProcessLauncher.
71 71 """
72 72 def __init__(self, process_launcher):
73 73 self.process_launcher = process_launcher
74 74
75 75 def connectionMade(self):
76 76 self.process_launcher.fire_start_deferred(self.transport.pid)
77 77
78 78 def processEnded(self, status):
79 79 value = status.value
80 80 if isinstance(value, ProcessDone):
81 81 self.process_launcher.fire_stop_deferred(0)
82 82 elif isinstance(value, ProcessTerminated):
83 83 self.process_launcher.fire_stop_deferred(
84 84 {'exit_code':value.exitCode,
85 85 'signal':value.signal,
86 86 'status':value.status
87 87 }
88 88 )
89 89 else:
90 90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91 91
92 92 def outReceived(self, data):
93 93 log.msg(data)
94 94
95 95 def errReceived(self, data):
96 96 log.err(data)
97 97
98 98 class ProcessLauncher(object):
99 99 """
100 100 Start and stop an external process in an asynchronous manner.
101 101
102 102 Currently this uses deferreds to notify other parties of process state
103 103 changes. This is an awkward design and should be moved to using
104 104 a formal NotificationCenter.
105 105 """
106 106 def __init__(self, cmd_and_args):
107 107 self.cmd = cmd_and_args[0]
108 108 self.args = cmd_and_args
109 109 self._reset()
110 110
111 111 def _reset(self):
112 112 self.process_protocol = None
113 113 self.pid = None
114 114 self.start_deferred = None
115 115 self.stop_deferreds = []
116 116 self.state = 'before' # before, running, or after
117 117
118 118 @property
119 119 def running(self):
120 120 if self.state == 'running':
121 121 return True
122 122 else:
123 123 return False
124 124
125 125 def fire_start_deferred(self, pid):
126 126 self.pid = pid
127 127 self.state = 'running'
128 128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 129 self.start_deferred.callback(pid)
130 130
131 131 def start(self):
132 132 if self.state == 'before':
133 133 self.process_protocol = LauncherProcessProtocol(self)
134 134 self.start_deferred = defer.Deferred()
135 135 self.process_transport = reactor.spawnProcess(
136 136 self.process_protocol,
137 137 self.cmd,
138 138 self.args,
139 139 env=os.environ
140 140 )
141 141 return self.start_deferred
142 142 else:
143 143 s = 'the process has already been started and has state: %r' % \
144 144 self.state
145 145 return defer.fail(ProcessStateError(s))
146 146
147 147 def get_stop_deferred(self):
148 148 if self.state == 'running' or self.state == 'before':
149 149 d = defer.Deferred()
150 150 self.stop_deferreds.append(d)
151 151 return d
152 152 else:
153 153 s = 'this process is already complete'
154 154 return defer.fail(ProcessStateError(s))
155 155
156 156 def fire_stop_deferred(self, exit_code):
157 157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 158 self.state = 'after'
159 159 for d in self.stop_deferreds:
160 160 d.callback(exit_code)
161 161
162 162 def signal(self, sig):
163 163 """
164 164 Send a signal to the process.
165 165
166 166 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 167 """
168 168 if self.state == 'running':
169 169 self.process_transport.signalProcess(sig)
170 170
171 171 # def __del__(self):
172 172 # self.signal('KILL')
173 173
174 174 def interrupt_then_kill(self, delay=1.0):
175 175 self.signal('INT')
176 176 reactor.callLater(delay, self.signal, 'KILL')
177 177
178 178
179 179 #-----------------------------------------------------------------------------
180 180 # Code for launching controller and engines
181 181 #-----------------------------------------------------------------------------
182 182
183 183
184 184 class ControllerLauncher(ProcessLauncher):
185 185
186 186 def __init__(self, extra_args=None):
187 187 if sys.platform == 'win32':
188 188 # This logic is needed because the ipcontroller script doesn't
189 189 # always get installed in the same way or in the same location.
190 190 from IPython.kernel.scripts import ipcontroller
191 191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 192 # The -u option here turns on unbuffered output, which is required
193 193 # on Win32 to prevent wierd conflict and problems with Twisted.
194 194 # Also, use sys.executable to make sure we are picking up the
195 195 # right python exe.
196 196 args = [sys.executable, '-u', script_location]
197 197 else:
198 198 args = ['ipcontroller']
199 199 self.extra_args = extra_args
200 200 if extra_args is not None:
201 201 args.extend(extra_args)
202 202
203 203 ProcessLauncher.__init__(self, args)
204 204
205 205
206 206 class EngineLauncher(ProcessLauncher):
207 207
208 208 def __init__(self, extra_args=None):
209 209 if sys.platform == 'win32':
210 210 # This logic is needed because the ipcontroller script doesn't
211 211 # always get installed in the same way or in the same location.
212 212 from IPython.kernel.scripts import ipengine
213 213 script_location = ipengine.__file__.replace('.pyc', '.py')
214 214 # The -u option here turns on unbuffered output, which is required
215 215 # on Win32 to prevent wierd conflict and problems with Twisted.
216 216 # Also, use sys.executable to make sure we are picking up the
217 217 # right python exe.
218 218 args = [sys.executable, '-u', script_location]
219 219 else:
220 220 args = ['ipengine']
221 221 self.extra_args = extra_args
222 222 if extra_args is not None:
223 223 args.extend(extra_args)
224 224
225 225 ProcessLauncher.__init__(self, args)
226 226
227 227
228 228 class LocalEngineSet(object):
229 229
230 230 def __init__(self, extra_args=None):
231 231 self.extra_args = extra_args
232 232 self.launchers = []
233 233
234 234 def start(self, n):
235 235 dlist = []
236 236 for i in range(n):
237 237 print "starting engine:", i
238 238 el = EngineLauncher(extra_args=self.extra_args)
239 239 d = el.start()
240 240 self.launchers.append(el)
241 241 dlist.append(d)
242 242 dfinal = gatherBoth(dlist, consumeErrors=True)
243 243 dfinal.addCallback(self._handle_start)
244 244 return dfinal
245 245
246 246 def _handle_start(self, r):
247 247 log.msg('Engines started with pids: %r' % r)
248 248 return r
249 249
250 250 def _handle_stop(self, r):
251 251 log.msg('Engines received signal: %r' % r)
252 252 return r
253 253
254 254 def signal(self, sig):
255 255 dlist = []
256 256 for el in self.launchers:
257 257 d = el.get_stop_deferred()
258 258 dlist.append(d)
259 259 el.signal(sig)
260 260 dfinal = gatherBoth(dlist, consumeErrors=True)
261 261 dfinal.addCallback(self._handle_stop)
262 262 return dfinal
263 263
264 264 def interrupt_then_kill(self, delay=1.0):
265 265 dlist = []
266 266 for el in self.launchers:
267 267 d = el.get_stop_deferred()
268 268 dlist.append(d)
269 269 el.interrupt_then_kill(delay)
270 270 dfinal = gatherBoth(dlist, consumeErrors=True)
271 271 dfinal.addCallback(self._handle_stop)
272 272 return dfinal
273 273
274
275 274 class BatchEngineSet(object):
276
277 # Subclasses must fill these in. See PBSEngineSet
275
276 # Subclasses must fill these in. See PBSEngineSet/SGEEngineSet
277 name = ''
278 278 submit_command = ''
279 279 delete_command = ''
280 script_param_prefix = ''
280 281 job_id_regexp = ''
281
282 job_array_regexp = ''
283 default_template = ''
284
282 285 def __init__(self, template_file, **kwargs):
283 286 self.template_file = template_file
284 self.context = {}
285 self.context.update(kwargs)
286 self.batch_file = self.template_file+'-run'
287
287
288 288 def parse_job_id(self, output):
289 m = re.match(self.job_id_regexp, output)
289 m = re.search(self.job_id_regexp, output)
290 290 if m is not None:
291 291 job_id = m.group()
292 292 else:
293 293 raise Exception("job id couldn't be determined: %s" % output)
294 294 self.job_id = job_id
295 295 log.msg('Job started with job id: %r' % job_id)
296 296 return job_id
297
298 def write_batch_script(self, n):
299 self.context['n'] = n
300 template = open(self.template_file, 'r').read()
301 log.msg('Using template for batch script: %s' % self.template_file)
302 script_as_string = Itpl.itplns(template, self.context)
303 log.msg('Writing instantiated batch script: %s' % self.batch_file)
304 f = open(self.batch_file,'w')
305 f.write(script_as_string)
306 f.close()
307
297
308 298 def handle_error(self, f):
309 299 f.printTraceback()
310 300 f.raiseException()
311
312 def start(self, n):
313 self.write_batch_script(n)
314 d = getProcessOutput(self.submit_command,
315 [self.batch_file],env=os.environ)
316 d.addCallback(self.parse_job_id)
317 d.addErrback(self.handle_error)
318 return d
319
320 def kill(self):
321 d = getProcessOutput(self.delete_command,
322 [self.job_id],env=os.environ)
323 return d
324
325 class PBSEngineSet(BatchEngineSet):
326
327 submit_command = 'qsub'
328 delete_command = 'qdel'
329 job_id_regexp = '\d+'
330
331 def __init__(self, template_file, **kwargs):
332 BatchEngineSet.__init__(self, template_file, **kwargs)
333
334 class SGEEngineSet(PBSEngineSet):
335
336 def __init__(self, template_file, **kwargs):
337 BatchEngineSet.__init__(self, template_file, **kwargs)
338 self._temp_file = None
339
340 def parse_job_id(self, output):
341 m = re.search(self.job_id_regexp, output)
342 if m is not None:
343 job_id = m.group()
344 else:
345 raise Exception("job id couldn't be determined: %s" % output)
346 self.job_id = job_id
347 log.msg('job started with job id: %r' % job_id)
348 return job_id
349 301
350 302 def start(self, n):
351 303 log.msg("starting %d engines" % n)
352 304 self._temp_file = tempfile.NamedTemporaryFile()
353 regex = re.compile('#\$[ \t]+-t[ \t]+\d+')
305 regex = re.compile(self.job_array_regexp)
354 306 if self.template_file:
355 log.msg("Using sge script %s" % self.template_file)
307 log.msg("Using %s script %s" % (self.name, self.template_file))
356 308 contents = open(self.template_file, 'r').read()
357 309 if not regex.search(contents):
358 log.msg("adding job array settings to sge script")
359 contents = ("#$ -t 1-%d\n" % n) + contents
310 log.msg("adding job array settings to %s script" % self.name)
311 contents = ("%s -t 1-%d\n" % (self.script_param_prefix,n)) + contents
360 312 self._temp_file.write(contents)
361 313 self.template_file = self._temp_file.name
362 314 else:
363 log.msg("using default ipengine sge script: \n%s" %
364 (sge_template % n))
365 self._temp_file.file.write(sge_template % n)
315 log.msg("using default ipengine %s script: \n%s" %
316 (self.name, (self.default_template % n)))
317 self._temp_file.file.write(self.default_template % n)
366 318 self.template_file = self._temp_file.name
367 319 self._temp_file.file.flush()
368 320 d = getProcessOutput(self.submit_command,
369 321 [self.template_file],
370 322 env=os.environ)
371 323 d.addCallback(self.parse_job_id)
372 324 d.addErrback(self.handle_error)
373 325 return d
374 326
375 sge_template="""#$ -V
327 def kill(self):
328 d = getProcessOutput(self.delete_command,
329 [self.job_id],env=os.environ)
330 return d
331
332 class PBSEngineSet(BatchEngineSet):
333
334 name = 'PBS'
335 submit_command = 'qsub'
336 delete_command = 'qdel'
337 script_param_prefix = "#PBS"
338 job_id_regexp = '\d+'
339 job_array_regexp = '#PBS[ \t]+-t[ \t]+\d+'
340 default_template="""#PBS -V
341 #PBS -t 1-%d
342 #PBS -N ipengine
343 eid=$(($PBS_ARRAYID - 1))
344 ipengine --logfile=ipengine${eid}.log
345 """
346
347 class SGEEngineSet(PBSEngineSet):
348
349 name = 'SGE'
350 script_param_prefix = "#$"
351 job_array_regexp = '#\$[ \t]+-t[ \t]+\d+'
352 default_template="""#$ -V
376 353 #$ -t 1-%d
377 354 #$ -N ipengine
378 355 eid=$(($SGE_TASK_ID - 1))
379 356 ipengine --logfile=ipengine${eid}.log
380 357 """
381 358
382 359 sshx_template="""#!/bin/sh
383 360 "$@" &> /dev/null &
384 361 echo $!
385 362 """
386 363
387 364 engine_killer_template="""#!/bin/sh
388 365 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
389 366 """
390 367
391 368 class SSHEngineSet(object):
392 369 sshx_template=sshx_template
393 370 engine_killer_template=engine_killer_template
394 371
395 372 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
396 373 """Start a controller on localhost and engines using ssh.
397 374
398 375 The engine_hosts argument is a dict with hostnames as keys and
399 376 the number of engine (int) as values. sshx is the name of a local
400 377 file that will be used to run remote commands. This file is used
401 378 to setup the environment properly.
402 379 """
403 380
404 381 self.temp_dir = tempfile.gettempdir()
405 382 if sshx is not None:
406 383 self.sshx = sshx
407 384 else:
408 385 # Write the sshx.sh file locally from our template.
409 386 self.sshx = os.path.join(
410 387 self.temp_dir,
411 388 '%s-main-sshx.sh' % os.environ['USER']
412 389 )
413 390 f = open(self.sshx, 'w')
414 391 f.writelines(self.sshx_template)
415 392 f.close()
416 393 self.engine_command = ipengine
417 394 self.engine_hosts = engine_hosts
418 395 # Write the engine killer script file locally from our template.
419 396 self.engine_killer = os.path.join(
420 397 self.temp_dir,
421 398 '%s-local-engine_killer.sh' % os.environ['USER']
422 399 )
423 400 f = open(self.engine_killer, 'w')
424 401 f.writelines(self.engine_killer_template)
425 402 f.close()
426 403
427 404 def start(self, send_furl=False):
428 405 dlist = []
429 406 for host in self.engine_hosts.keys():
430 407 count = self.engine_hosts[host]
431 408 d = self._start(host, count, send_furl)
432 409 dlist.append(d)
433 410 return gatherBoth(dlist, consumeErrors=True)
434 411
435 412 def _start(self, hostname, count=1, send_furl=False):
436 413 if send_furl:
437 414 d = self._scp_furl(hostname)
438 415 else:
439 416 d = defer.succeed(None)
440 417 d.addCallback(lambda r: self._scp_sshx(hostname))
441 418 d.addCallback(lambda r: self._ssh_engine(hostname, count))
442 419 return d
443 420
444 421 def _scp_furl(self, hostname):
445 422 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
446 423 cmd_list = scp_cmd.split()
447 424 cmd_list[1] = os.path.expanduser(cmd_list[1])
448 425 log.msg('Copying furl file: %s' % scp_cmd)
449 426 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
450 427 return d
451 428
452 429 def _scp_sshx(self, hostname):
453 430 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
454 431 self.sshx, hostname,
455 432 self.temp_dir, os.environ['USER']
456 433 )
457 434 print
458 435 log.msg("Copying sshx: %s" % scp_cmd)
459 436 sshx_scp = scp_cmd.split()
460 437 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
461 438 return d
462 439
463 440 def _ssh_engine(self, hostname, count):
464 441 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
465 442 hostname, self.temp_dir,
466 443 os.environ['USER'], self.engine_command
467 444 )
468 445 cmds = exec_engine.split()
469 446 dlist = []
470 447 log.msg("about to start engines...")
471 448 for i in range(count):
472 449 log.msg('Starting engines: %s' % exec_engine)
473 450 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
474 451 dlist.append(d)
475 452 return gatherBoth(dlist, consumeErrors=True)
476 453
477 454 def kill(self):
478 455 dlist = []
479 456 for host in self.engine_hosts.keys():
480 457 d = self._killall(host)
481 458 dlist.append(d)
482 459 return gatherBoth(dlist, consumeErrors=True)
483 460
484 461 def _killall(self, hostname):
485 462 d = self._scp_engine_killer(hostname)
486 463 d.addCallback(lambda r: self._ssh_kill(hostname))
487 464 # d.addErrback(self._exec_err)
488 465 return d
489 466
490 467 def _scp_engine_killer(self, hostname):
491 468 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
492 469 self.engine_killer,
493 470 hostname,
494 471 self.temp_dir,
495 472 os.environ['USER']
496 473 )
497 474 cmds = scp_cmd.split()
498 475 log.msg('Copying engine_killer: %s' % scp_cmd)
499 476 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
500 477 return d
501 478
502 479 def _ssh_kill(self, hostname):
503 480 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
504 481 hostname,
505 482 self.temp_dir,
506 483 os.environ['USER']
507 484 )
508 485 log.msg('Killing engine: %s' % kill_cmd)
509 486 kill_cmd = kill_cmd.split()
510 487 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
511 488 return d
512 489
513 490 def _exec_err(self, r):
514 491 log.msg(r)
515 492
516 493 #-----------------------------------------------------------------------------
517 494 # Main functions for the different types of clusters
518 495 #-----------------------------------------------------------------------------
519 496
520 497 # TODO:
521 498 # The logic in these codes should be moved into classes like LocalCluster
522 499 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
523 500 # The main functions should then just parse the command line arguments, create
524 501 # the appropriate class and call a 'start' method.
525 502
526 503
527 504 def check_security(args, cont_args):
528 505 """Check to see if we should run with SSL support."""
529 506 if (not args.x or not args.y) and not have_crypto:
530 507 log.err("""
531 508 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
532 509 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
533 510 reactor.stop()
534 511 return False
535 512 if args.x:
536 513 cont_args.append('-x')
537 514 if args.y:
538 515 cont_args.append('-y')
539 516 return True
540 517
541 518
542 519 def check_reuse(args, cont_args):
543 520 """Check to see if we should try to resuse FURL files."""
544 521 if args.r:
545 522 cont_args.append('-r')
546 523 if args.client_port == 0 or args.engine_port == 0:
547 524 log.err("""
548 525 To reuse FURL files, you must also set the client and engine ports using
549 526 the --client-port and --engine-port options.""")
550 527 reactor.stop()
551 528 return False
552 529 cont_args.append('--client-port=%i' % args.client_port)
553 530 cont_args.append('--engine-port=%i' % args.engine_port)
554 531 return True
555 532
556 533
557 534 def _err_and_stop(f):
558 535 """Errback to log a failure and halt the reactor on a fatal error."""
559 536 log.err(f)
560 537 reactor.stop()
561 538
562 539
563 540 def _delay_start(cont_pid, start_engines, furl_file, reuse):
564 541 """Wait for controller to create FURL files and the start the engines."""
565 542 if not reuse:
566 543 if os.path.isfile(furl_file):
567 544 os.unlink(furl_file)
568 545 log.msg('Waiting for controller to finish starting...')
569 546 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
570 547 d.addCallback(lambda _: log.msg('Controller started'))
571 548 d.addCallback(lambda _: start_engines(cont_pid))
572 549 return d
573 550
574 551
575 552 def main_local(args):
576 553 cont_args = []
577 554 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
578 555
579 556 # Check security settings before proceeding
580 557 if not check_security(args, cont_args):
581 558 return
582 559
583 560 # See if we are reusing FURL files
584 561 if not check_reuse(args, cont_args):
585 562 return
586 563
587 564 cl = ControllerLauncher(extra_args=cont_args)
588 565 dstart = cl.start()
589 566 def start_engines(cont_pid):
590 567 engine_args = []
591 568 engine_args.append('--logfile=%s' % \
592 569 pjoin(args.logdir,'ipengine%s-' % cont_pid))
593 570 eset = LocalEngineSet(extra_args=engine_args)
594 571 def shutdown(signum, frame):
595 572 log.msg('Stopping local cluster')
596 573 # We are still playing with the times here, but these seem
597 574 # to be reliable in allowing everything to exit cleanly.
598 575 eset.interrupt_then_kill(0.5)
599 576 cl.interrupt_then_kill(0.5)
600 577 reactor.callLater(1.0, reactor.stop)
601 578 signal.signal(signal.SIGINT,shutdown)
602 579 d = eset.start(args.n)
603 580 return d
604 581 config = kernel_config_manager.get_config_obj()
605 582 furl_file = config['controller']['engine_furl_file']
606 583 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
607 584 dstart.addErrback(_err_and_stop)
608 585
609 586
610 587 def main_mpi(args):
611 588 cont_args = []
612 589 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
613 590
614 591 # Check security settings before proceeding
615 592 if not check_security(args, cont_args):
616 593 return
617 594
618 595 # See if we are reusing FURL files
619 596 if not check_reuse(args, cont_args):
620 597 return
621 598
622 599 cl = ControllerLauncher(extra_args=cont_args)
623 600 dstart = cl.start()
624 601 def start_engines(cont_pid):
625 602 raw_args = [args.cmd]
626 603 raw_args.extend(['-n',str(args.n)])
627 604 raw_args.append('ipengine')
628 605 raw_args.append('-l')
629 606 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
630 607 if args.mpi:
631 608 raw_args.append('--mpi=%s' % args.mpi)
632 609 eset = ProcessLauncher(raw_args)
633 610 def shutdown(signum, frame):
634 611 log.msg('Stopping local cluster')
635 612 # We are still playing with the times here, but these seem
636 613 # to be reliable in allowing everything to exit cleanly.
637 614 eset.interrupt_then_kill(1.0)
638 615 cl.interrupt_then_kill(1.0)
639 616 reactor.callLater(2.0, reactor.stop)
640 617 signal.signal(signal.SIGINT,shutdown)
641 618 d = eset.start()
642 619 return d
643 620 config = kernel_config_manager.get_config_obj()
644 621 furl_file = config['controller']['engine_furl_file']
645 622 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
646 623 dstart.addErrback(_err_and_stop)
647 624
648 625
649 626 def main_pbs(args):
650 627 cont_args = []
651 628 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
652 629
653 630 # Check security settings before proceeding
654 631 if not check_security(args, cont_args):
655 632 return
656 633
657 634 # See if we are reusing FURL files
658 635 if not check_reuse(args, cont_args):
659 636 return
660 637
661 638 cl = ControllerLauncher(extra_args=cont_args)
662 639 dstart = cl.start()
663 640 def start_engines(r):
664 641 pbs_set = PBSEngineSet(args.pbsscript)
665 642 def shutdown(signum, frame):
666 643 log.msg('Stopping pbs cluster')
667 644 d = pbs_set.kill()
668 645 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
669 646 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
670 647 signal.signal(signal.SIGINT,shutdown)
671 648 d = pbs_set.start(args.n)
672 649 return d
673 650 config = kernel_config_manager.get_config_obj()
674 651 furl_file = config['controller']['engine_furl_file']
675 652 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
676 653 dstart.addErrback(_err_and_stop)
677 654
678 655 def main_sge(args):
679 656 cont_args = []
680 657 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
681 658
682 659 # Check security settings before proceeding
683 660 if not check_security(args, cont_args):
684 661 return
685 662
686 663 # See if we are reusing FURL files
687 664 if not check_reuse(args, cont_args):
688 665 return
689 666
690 667 if args.sgescript and not os.path.isfile(args.sgescript):
691 668 log.err('SGE script does not exist: %s' % args.sgescript)
692 669 return
693 670
694 671 cl = ControllerLauncher(extra_args=cont_args)
695 672 dstart = cl.start()
696 673 def start_engines(r):
697 674 sge_set = SGEEngineSet(args.sgescript)
698 675 def shutdown(signum, frame):
699 676 log.msg('Stopping sge cluster')
700 677 d = sge_set.kill()
701 678 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
702 679 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
703 680 signal.signal(signal.SIGINT,shutdown)
704 681 d = sge_set.start(args.n)
705 682 return d
706 683 config = kernel_config_manager.get_config_obj()
707 684 furl_file = config['controller']['engine_furl_file']
708 685 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
709 686 dstart.addErrback(_err_and_stop)
710 687
711 688
712 689 def main_ssh(args):
713 690 """Start a controller on localhost and engines using ssh.
714 691
715 692 Your clusterfile should look like::
716 693
717 694 send_furl = False # True, if you want
718 695 engines = {
719 696 'engine_host1' : engine_count,
720 697 'engine_host2' : engine_count2
721 698 }
722 699 """
723 700 clusterfile = {}
724 701 execfile(args.clusterfile, clusterfile)
725 702 if not clusterfile.has_key('send_furl'):
726 703 clusterfile['send_furl'] = False
727 704
728 705 cont_args = []
729 706 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
730 707
731 708 # Check security settings before proceeding
732 709 if not check_security(args, cont_args):
733 710 return
734 711
735 712 # See if we are reusing FURL files
736 713 if not check_reuse(args, cont_args):
737 714 return
738 715
739 716 cl = ControllerLauncher(extra_args=cont_args)
740 717 dstart = cl.start()
741 718 def start_engines(cont_pid):
742 719 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
743 720 def shutdown(signum, frame):
744 721 d = ssh_set.kill()
745 722 cl.interrupt_then_kill(1.0)
746 723 reactor.callLater(2.0, reactor.stop)
747 724 signal.signal(signal.SIGINT,shutdown)
748 725 d = ssh_set.start(clusterfile['send_furl'])
749 726 return d
750 727 config = kernel_config_manager.get_config_obj()
751 728 furl_file = config['controller']['engine_furl_file']
752 729 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
753 730 dstart.addErrback(_err_and_stop)
754 731
755 732
756 733 def get_args():
757 734 base_parser = argparse.ArgumentParser(add_help=False)
758 735 base_parser.add_argument(
759 736 '-r',
760 737 action='store_true',
761 738 dest='r',
762 739 help='try to reuse FURL files. Use with --client-port and --engine-port'
763 740 )
764 741 base_parser.add_argument(
765 742 '--client-port',
766 743 type=int,
767 744 dest='client_port',
768 745 help='the port the controller will listen on for client connections',
769 746 default=0
770 747 )
771 748 base_parser.add_argument(
772 749 '--engine-port',
773 750 type=int,
774 751 dest='engine_port',
775 752 help='the port the controller will listen on for engine connections',
776 753 default=0
777 754 )
778 755 base_parser.add_argument(
779 756 '-x',
780 757 action='store_true',
781 758 dest='x',
782 759 help='turn off client security'
783 760 )
784 761 base_parser.add_argument(
785 762 '-y',
786 763 action='store_true',
787 764 dest='y',
788 765 help='turn off engine security'
789 766 )
790 767 base_parser.add_argument(
791 768 "--logdir",
792 769 type=str,
793 770 dest="logdir",
794 771 help="directory to put log files (default=$IPYTHONDIR/log)",
795 772 default=pjoin(get_ipython_dir(),'log')
796 773 )
797 774 base_parser.add_argument(
798 775 "-n",
799 776 "--num",
800 777 type=int,
801 778 dest="n",
802 779 default=2,
803 780 help="the number of engines to start"
804 781 )
805 782
806 783 parser = argparse.ArgumentParser(
807 784 description='IPython cluster startup. This starts a controller and\
808 785 engines using various approaches. Use the IPYTHONDIR environment\
809 786 variable to change your IPython directory from the default of\
810 787 .ipython or _ipython. The log and security subdirectories of your\
811 788 IPython directory will be used by this script for log files and\
812 789 security files.'
813 790 )
814 791 subparsers = parser.add_subparsers(
815 792 help='available cluster types. For help, do "ipcluster TYPE --help"')
816 793
817 794 parser_local = subparsers.add_parser(
818 795 'local',
819 796 help='run a local cluster',
820 797 parents=[base_parser]
821 798 )
822 799 parser_local.set_defaults(func=main_local)
823 800
824 801 parser_mpirun = subparsers.add_parser(
825 802 'mpirun',
826 803 help='run a cluster using mpirun (mpiexec also works)',
827 804 parents=[base_parser]
828 805 )
829 806 parser_mpirun.add_argument(
830 807 "--mpi",
831 808 type=str,
832 809 dest="mpi", # Don't put a default here to allow no MPI support
833 810 help="how to call MPI_Init (default=mpi4py)"
834 811 )
835 812 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
836 813
837 814 parser_mpiexec = subparsers.add_parser(
838 815 'mpiexec',
839 816 help='run a cluster using mpiexec (mpirun also works)',
840 817 parents=[base_parser]
841 818 )
842 819 parser_mpiexec.add_argument(
843 820 "--mpi",
844 821 type=str,
845 822 dest="mpi", # Don't put a default here to allow no MPI support
846 823 help="how to call MPI_Init (default=mpi4py)"
847 824 )
848 825 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
849 826
850 827 parser_pbs = subparsers.add_parser(
851 828 'pbs',
852 829 help='run a pbs cluster',
853 830 parents=[base_parser]
854 831 )
855 832 parser_pbs.add_argument(
856 833 '--pbs-script',
857 834 type=str,
858 835 dest='pbsscript',
859 836 help='PBS script template',
860 default='pbs.template'
837 default=''
861 838 )
862 839 parser_pbs.set_defaults(func=main_pbs)
863 840
864 841 parser_sge = subparsers.add_parser(
865 842 'sge',
866 843 help='run an sge cluster',
867 844 parents=[base_parser]
868 845 )
869 846 parser_sge.add_argument(
870 847 '--sge-script',
871 848 type=str,
872 849 dest='sgescript',
873 850 help='SGE script template',
874 851 default='' # SGEEngineSet will create one if not specified
875 852 )
876 853 parser_sge.set_defaults(func=main_sge)
877 854
878 855 parser_ssh = subparsers.add_parser(
879 856 'ssh',
880 857 help='run a cluster using ssh, should have ssh-keys setup',
881 858 parents=[base_parser]
882 859 )
883 860 parser_ssh.add_argument(
884 861 '--clusterfile',
885 862 type=str,
886 863 dest='clusterfile',
887 864 help='python file describing the cluster',
888 865 default='clusterfile.py',
889 866 )
890 867 parser_ssh.add_argument(
891 868 '--sshx',
892 869 type=str,
893 870 dest='sshx',
894 871 help='sshx launcher helper'
895 872 )
896 873 parser_ssh.set_defaults(func=main_ssh)
897 874
898 875 args = parser.parse_args()
899 876 return args
900 877
901 878 def main():
902 879 args = get_args()
903 880 reactor.callWhenRunning(args.func, args)
904 881 log.startLogging(sys.stdout)
905 882 reactor.run()
906 883
907 884 if __name__ == '__main__':
908 885 main()
General Comments 0
You need to be logged in to leave comments. Login now